Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Limit together with pushdown_filters #13788

Merged
merged 4 commits into from
Dec 16, 2024
Merged

Conversation

zhuqi-lucas
Copy link
Contributor

@zhuqi-lucas zhuqi-lucas commented Dec 15, 2024

Which issue does this PR close?

Closes #13724

Rationale for this change

We always limit the partition file even with filter before limit, so it will return the wrong file group numbers for us to filter the data before the final limit.

Fix way, we should not limit the partition files when we have filter already with limit.

What changes are included in this PR?

Are these changes tested?

Testing code with generating files.

#[tokio::main]
pub async fn main() -> Result<()> {
    let output_dir = "/tmp/test_parquet_data";
    let num_files = 120; // Number of Parquet files
    let rows_per_file = 200; // Number of rows per Parquet file

    // Generate the dataset
    generate_test_data(output_dir, num_files, rows_per_file);
    println!("Generated {} Parquet files with {} rows each", num_files, rows_per_file);

    let file_path = "/tmp/test_parquet_data/part-119.parquet"; // 最后一个文件
    let file = File::open(file_path).unwrap();
    let parquet_reader = SerializedFileReader::new(file).unwrap();
    let mut iter = parquet_reader.get_row_iter(None).unwrap();

    while let Some(record) = iter.next() {
        println!("{:?}", record);
    }


    let mut parquet_options = ParquetReadOptions::new();
    parquet_options = parquet_options.parquet_pruning(true);

    let mut config = SessionConfig::new();
    config.options_mut().execution.parquet.pushdown_filters = true;

    let state = SessionStateBuilder::new().with_config(config).build();
    let ctx = SessionContext::from(state);

    let mut df = ctx
        .read_parquet(output_dir, parquet_options.clone())
        .await
        .unwrap();

    df = df
        .filter(col("a").eq(lit(
            "23asdas23",
        )))
        .unwrap();

    df = df.limit(0, Some(1)).unwrap();

    let batch = df.collect().await.unwrap();



    println!("{:?}", batch);

    Ok(())
}


fn generate_test_data(output_dir: &str, num_files: usize, rows_per_file: usize) {
    // Define the schema
    let schema = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Utf8, false),
        Field::new("b", DataType::Int32, false),
    ]));

    for file_index in 0..num_files {
        // Generate data for this file
        let mut a_values = Vec::new();
        let mut b_values = Vec::new();

        for row_index in 0..rows_per_file {
            // Fill in rows
            if file_index == num_files - 1 && row_index == rows_per_file / 2 {
                // Add the target row deep in the dataset
                a_values.push("23asdas23".to_string());
                b_values.push(999);
            } else {
                a_values.push(format!("random_{}_{}", file_index, row_index));
                b_values.push((file_index * rows_per_file + row_index) as i32);
            }
        }

        // Create Arrow arrays
        let a_array = Arc::new(StringArray::from(a_values)) as Arc<dyn arrow::array::Array>;
        let b_array = Arc::new(Int32Array::from(b_values)) as Arc<dyn arrow::array::Array>;

        // Create a record batch
        let batch = RecordBatch::try_new(schema.clone(), vec![a_array, b_array]).unwrap();

        // Write to a Parquet file
        let file_path = format!("{}/part-{}.parquet", output_dir, file_index);
        let file = File::create(file_path).unwrap();
        let props = WriterProperties::builder().build();
        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
        writer.write(&batch).unwrap();
        writer.close().unwrap();
    }
}

After this PR, the testing will print the right value instead of the empty value.

println!("{:?}", batch);

Are there any user-facing changes?

No

@github-actions github-actions bot added the core Core DataFusion crate label Dec 15, 2024
@korowa
Copy link
Contributor

korowa commented Dec 15, 2024

Thank you @zhuqi-lucas.

Could you, please, add a test which could serve as a reproducer for this issue? Ideally if it would be an sqllogictest simpler than the example from the description, something like

statement ok
set datafusion.execution.parquet.pushdown_filters = true;

# this one is also required to make DF skip second file due to "sufficient" amount of rows
statement ok
set datafusion.execution.collect_statistics = true;

statement ok
CREATE EXTERNAL TABLE test_filter_with_limit
(
  part_key INT,
  value INT,
)
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_filter_with_limit/'
PARTITIONED BY (part_key);

# There will be 2 records filtered from the table to check that `limit 1` actually applied
statement ok
insert into test_filter_with_limit (part_key, value) values (1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100);

query II
select * from test_filter_with_limit where value = 2 limit 1;
----
2 2

Comment on lines 849 to 852
let mut statistic_file_limit = limit;
if !filters.is_empty() {
statistic_file_limit = None;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut statistic_file_limit = limit;
if !filters.is_empty() {
statistic_file_limit = None;
}
let statistic_file_limit = if filters.is_empty() { limit } else { None };

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion @korowa !

@zhuqi-lucas
Copy link
Contributor Author

Thank you @zhuqi-lucas.

Could you, please, add a test which could serve as a reproducer for this issue? Ideally if it would be an sqllogictest simpler than the example from the description, something like

statement ok
set datafusion.execution.parquet.pushdown_filters = true;

# this one is also required to make DF skip second file due to "sufficient" amount of rows
statement ok
set datafusion.execution.collect_statistics = true;

statement ok
CREATE EXTERNAL TABLE test_filter_with_limit
(
  part_key INT,
  value INT,
)
STORED AS PARQUET
LOCATION 'test_files/scratch/parquet/test_filter_with_limit/'
PARTITIONED BY (part_key);

# There will be 2 records filtered from the table to check that `limit 1` actually applied
statement ok
insert into test_filter_with_limit (part_key, value) values (1, 0), (1, 1), (1, 100), (2, 0), (2, 2), (2, 2), (2, 100);

query II
select * from test_filter_with_limit where value = 2 limit 1;
----
2 2

Good suggestion! Thanks @korowa , i will address it soon.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Dec 15, 2024
Copy link
Contributor

@korowa korowa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

I'm going to merge it tomorrow, in case anyone else would like to review it.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @zhuqi-lucas and @korowa

I tried to verify the test covers the code by reverting the code change:

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$ git diff
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index b12f37ed7..ffe49dd2b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -843,13 +843,8 @@ impl TableProvider for ListingTable {
             });
         // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
         let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
-
-        // We should not limit the number of partitioned files to scan if there are filters and limit
-        // at the same time. This is because the limit should be applied after the filters are applied.
-        let statistic_file_limit = if filters.is_empty() { limit } else { None };
-
         let (mut partitioned_file_lists, statistics) = self
-            .list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
+            .list_files_for_scan(session_state, &partition_filters, limit)
             .await?;

         // if no files need to be read, return an `EmptyExec`

And running the test:

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$ cargo test --test sqllogictests -- pushdown
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.15s
     Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-b3c91103e28d27e7)
Executed "parquet_filter_pushdown.slt". Took 24.915792ms
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$

However, the test seems to pass without the code changes 🤔

@zhuqi-lucas
Copy link
Contributor Author

Thank you @zhuqi-lucas and @korowa

I tried to verify the test covers the code by reverting the code change:

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$ git diff
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index b12f37ed7..ffe49dd2b 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -843,13 +843,8 @@ impl TableProvider for ListingTable {
             });
         // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here?
         let session_state = state.as_any().downcast_ref::<SessionState>().unwrap();
-
-        // We should not limit the number of partitioned files to scan if there are filters and limit
-        // at the same time. This is because the limit should be applied after the filters are applied.
-        let statistic_file_limit = if filters.is_empty() { limit } else { None };
-
         let (mut partitioned_file_lists, statistics) = self
-            .list_files_for_scan(session_state, &partition_filters, statistic_file_limit)
+            .list_files_for_scan(session_state, &partition_filters, limit)
             .await?;

         // if no files need to be read, return an `EmptyExec`

And running the test:

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$ cargo test --test sqllogictests -- pushdown
    Finished `test` profile [unoptimized + debuginfo] target(s) in 0.15s
     Running bin/sqllogictests.rs (target/debug/deps/sqllogictests-b3c91103e28d27e7)
Executed "parquet_filter_pushdown.slt". Took 24.915792ms
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$

However, the test seems to pass without the code changes 🤔

Thank you @alamb for review, i will try to change the testing, it may not hit the test case.

@zhuqi-lucas
Copy link
Contributor Author

@alamb @korowa
Updated the testing now, it will hit the testing case, thanks!

Before the PR, the testing result:

query II
select * from test_filter_with_limit where value = 2 limit 1;
----

After the PR, the testing result:

query II
select * from test_filter_with_limit where value = 2 limit 1;
----
2 2

@zhuqi-lucas zhuqi-lucas requested a review from alamb December 16, 2024 02:35
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that the test does indeed fail now without the change. Thank you very much @zhuqi-lucas and @korowa

[SQL] select * from test_filter_with_limit where value = 2 limit 1;
[Diff] (-expected|+actual)
-   2 2
at test_files/push_down_filter.slt:186

Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

@alamb alamb merged commit fe53eaf into apache:main Dec 16, 2024
25 checks passed
@korowa
Copy link
Contributor

korowa commented Dec 16, 2024

Thank you!

However, the test seems to pass without the code changes 🤔

It looks like there is another .slt in the logs:

Executed "parquet_filter_pushdown.slt". Took 24.915792ms

while the new test was added into push_down_filter.slt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Limit together with pushdown_filters
3 participants