Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize SortPreservingMergeStream for single-column merge #13642

Open
Tracked by #10313
Dandandan opened this issue Dec 4, 2024 · 5 comments
Open
Tracked by #10313

Optimize SortPreservingMergeStream for single-column merge #13642

Dandandan opened this issue Dec 4, 2024 · 5 comments
Assignees
Labels
enhancement New feature or request performance Make DataFusion faster

Comments

@Dandandan
Copy link
Contributor

Dandandan commented Dec 4, 2024

Is your feature request related to a problem or challenge?

Currently SortPreservingMergeStream converts column to row-format even when sorting a single column.

We can optimize SortPreservingMergeStream to not do this when the sort expression is a single column, which avoids the conversion and makes comparisons faster.

Describe the solution you'd like

Optimize the SortPreservingMergeStream for single column sorts, only converting to row-format when dealing with multi-column sorts.

Describe alternatives you've considered

No response

Additional context

This should be beneficial for many queries involving SortPreservingMerge, such as #13586

@Dandandan Dandandan added enhancement New feature or request performance Make DataFusion faster labels Dec 4, 2024
@alan910127
Copy link
Contributor

take

@comphead
Copy link
Contributor

comphead commented Dec 6, 2024

Hi @Dandandan does the SortPreservingMergeStream work with row format? I checked quickly https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/sorts/merge.rs#L44 I dont see RowConverter or SortFields in there. Am I missing something?

@alan910127
Copy link
Contributor

As shown in my flamegraph profile, there are code paths that invoke arrow::Row::cmp.
From what I understand, RowValues (which implements CursorValues and whose cmp method is called in is_gt) might come from RowCursorStream::convert_batch.
However, I’m not entirely sure if this is accurate or if it can explain for the lack of usage of RowConverter or SortFields.

@comphead
Copy link
Contributor

comphead commented Dec 7, 2024

Thanks for the flamegraph, indeed looks like it happens when dealing with working with PartitionedStream. namely in

let cursor = self.convert_batch(&batch)?;

and type CursorStream<C> = Box<dyn PartitionedStream<Output = Result<(C, RecordBatch)>>>; so it explains

@jayzhan211
Copy link
Contributor

It seems single column case is optimized here

// Special case single column comparisons with optimized cursor implementations
if expressions.len() == 1 {
let sort = expressions[0].clone();
let data_type = sort.expr.data_type(schema.as_ref())?;
downcast_primitive! {
data_type => (primitive_merge_helper, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker),
DataType::Utf8 => merge_helper!(StringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeUtf8 => merge_helper!(LargeStringArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::Binary => merge_helper!(BinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
DataType::LargeBinary => merge_helper!(LargeBinaryArray, sort, streams, schema, metrics, batch_size, fetch, reservation, enable_round_robin_tie_breaker)
_ => {}
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

No branches or pull requests

4 participants