Skip to content

Commit

Permalink
feat(window): initial impl window function (databendlabs#10700)
Browse files Browse the repository at this point in the history
Co-authored-by: sundy-li <[email protected]>
Co-authored-by: RinChanNOWWW <[email protected]>
  • Loading branch information
3 people authored Mar 28, 2023
1 parent c97ccca commit 0df493f
Show file tree
Hide file tree
Showing 34 changed files with 1,462 additions and 861 deletions.
499 changes: 278 additions & 221 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ sqllogic-test: build

stateless-cluster-test: build
rm -rf ./_meta*/
bash ./scripts/ci/ci-run-stateless-tests-cluster.sh
ulimit -n 10000;ulimit -s 16384; bash ./scripts/ci/ci-run-stateless-tests-cluster.sh

stateless-cluster-test-tls: build
rm -rf ./_meta*/
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/ci-run-stateless-tests-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ cd "$SCRIPT_PATH/../../tests" || exit

echo "Starting databend-test"
# 13_0004_q4: https://github.com/datafuselabs/databend/issues/8107
./databend-test --mode 'cluster' --run-dir 0_stateless --skip '13_0004_q4'
./databend-test --mode 'cluster' --run-dir 0_stateless --skip '13_0004_q4', '13_0008_q8'
100 changes: 93 additions & 7 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use common_sql::executor::RuntimeFilterSource;
use common_sql::executor::Sort;
use common_sql::executor::TableScan;
use common_sql::executor::UnionAll;
use common_sql::executor::Window;
use common_sql::plans::JoinType;
use common_sql::ColumnBinding;
use common_sql::IndexType;
Expand All @@ -85,6 +86,7 @@ use crate::pipelines::processors::transforms::TransformPartialAggregate;
use crate::pipelines::processors::transforms::TransformPartialGroupBy;
use crate::pipelines::processors::transforms::TransformRightJoin;
use crate::pipelines::processors::transforms::TransformRightSemiAntiJoin;
use crate::pipelines::processors::transforms::TransformWindow;
use crate::pipelines::processors::AggregatorParams;
use crate::pipelines::processors::JoinHashTable;
use crate::pipelines::processors::LeftJoinCompactor;
Expand Down Expand Up @@ -165,6 +167,7 @@ impl PipelineBuilder {
PhysicalPlan::AggregateExpand(aggregate) => self.build_aggregate_expand(aggregate),
PhysicalPlan::AggregatePartial(aggregate) => self.build_aggregate_partial(aggregate),
PhysicalPlan::AggregateFinal(aggregate) => self.build_aggregate_final(aggregate),
PhysicalPlan::Window(window) => self.build_window(window),
PhysicalPlan::Sort(sort) => self.build_sort(sort),
PhysicalPlan::Limit(limit) => self.build_limit(limit),
PhysicalPlan::HashJoin(join) => self.build_join(join),
Expand Down Expand Up @@ -708,6 +711,80 @@ impl PipelineBuilder {
Ok(params)
}

fn build_window(&mut self, window: &Window) -> Result<()> {
self.build_pipeline(&window.input)?;

let input_schema = window.input.output_schema()?;

if !window.partition_by.is_empty() || !window.order_by.is_empty() {
let old_output_len = self.main_pipeline.output_len();

let mut sort_desc =
Vec::with_capacity(window.partition_by.len() + window.order_by.len());

for part in &window.partition_by {
let offset = input_schema.index_of(&part.to_string())?;
sort_desc.push(SortColumnDescription {
offset,
asc: true,
nulls_first: true,
})
}

for order_desc in &window.order_by {
let offset = input_schema.index_of(&order_desc.order_by.to_string())?;
sort_desc.push(SortColumnDescription {
offset,
asc: order_desc.asc,
nulls_first: order_desc.nulls_first,
})
}

self.build_sort_pipeline(input_schema.clone(), sort_desc, window.plan_id, None)?;

self.main_pipeline.resize(old_output_len)?;
}

// let input_schema = window.input.output_schema()?;
let agg_func = AggregateFunctionFactory::instance().get(
window.agg_func.sig.name.as_str(),
window.agg_func.sig.params.clone(),
window.agg_func.sig.args.clone(),
)?;

let arguments = window
.agg_func
.args
.iter()
.map(|p| {
let offset = input_schema.index_of(&p.to_string())?;
Ok(offset)
})
.collect::<Result<Vec<_>>>()?;

let partition_by = window
.partition_by
.iter()
.map(|p| {
let offset = input_schema.index_of(&p.to_string())?;
Ok(offset)
})
.collect::<Result<Vec<_>>>()?;

// Window
self.main_pipeline.add_transform(|input, output| {
let transform = TransformWindow::try_create(
input,
output,
agg_func.clone(),
arguments.clone(),
partition_by.clone(),
window.window_frame.clone(),
)?;
Ok(ProcessorPtr::create(transform))
})
}

fn build_sort(&mut self, sort: &Sort) -> Result<()> {
self.build_pipeline(&sort.input)?;

Expand All @@ -726,23 +803,32 @@ impl PipelineBuilder {
})
.collect::<Result<Vec<_>>>()?;

let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
self.build_sort_pipeline(input_schema, sort_desc, sort.plan_id, sort.limit)
}

fn build_sort_pipeline(
&mut self,
input_schema: DataSchemaRef,
sort_desc: Vec<SortColumnDescription>,
plan_id: u32,
limit: Option<usize>,
) -> Result<()> {
let block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;

// TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1
if self.main_pipeline.output_len() == 1 || max_threads == 1 {
self.main_pipeline.resize(max_threads)?;
}

// Sort
self.main_pipeline.add_transform(|input, output| {
let transform =
TransformSortPartial::try_create(input, output, sort.limit, sort_desc.clone())?;
TransformSortPartial::try_create(input, output, limit, sort_desc.clone())?;

if self.enable_profiling {
Ok(ProcessorPtr::create(ProfileWrapper::create(
transform,
sort.plan_id,
plan_id,
self.prof_span_set.clone(),
)))
} else {
Expand All @@ -757,14 +843,14 @@ impl PipelineBuilder {
output,
input_schema.clone(),
block_size,
sort.limit,
limit,
sort_desc.clone(),
)?;

if self.enable_profiling {
Ok(ProcessorPtr::create(ProfileWrapper::create(
transform,
sort.plan_id,
plan_id,
self.prof_span_set.clone(),
)))
} else {
Expand All @@ -777,7 +863,7 @@ impl PipelineBuilder {
&mut self.main_pipeline,
input_schema,
block_size,
sort.limit,
limit,
sort_desc,
)
}
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,3 @@ pub use transforms::TransformResortAddOn;
pub use transforms::TransformRuntimeFilter;
pub use transforms::TransformSortPartial;
pub use transforms::TransformWindow;
pub use transforms::WindowFrame;
pub use transforms::WindowFrameBound;
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,3 @@ pub use transform_runtime_filter::TransformRuntimeFilter;
pub use transform_sort_merge::SortMergeCompactor;
pub use transform_sort_partial::TransformSortPartial;
pub use window::TransformWindow;
pub use window::WindowFrame;
pub use window::WindowFrameBound;

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod frame;
mod transform_window;

pub use frame::WindowFrame;
pub use frame::WindowFrameBound;
pub use transform_window::TransformWindow;
Loading

0 comments on commit 0df493f

Please sign in to comment.