Skip to content

Commit

Permalink
Paralize aggregate operator if there is a group by clause (StarRocks#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyehcf authored Nov 11, 2021
1 parent a1864c8 commit 1aeba29
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 13 deletions.
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/exchange/local_exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class PartitionExchanger final : public LocalExchanger {
LocalExchangeSourceOperatorFactory* _source;
const bool _is_shuffle;
// Compute per-row partition values.
const std::vector<ExprContext*>& _partition_expr_ctxs;
const std::vector<ExprContext*> _partition_expr_ctxs;

vectorized::Columns _partitions_columns;
std::vector<uint32_t> _hash_values;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec
return _context.get_pipelines();
}

} // namespace starrocks::pipeline
} // namespace starrocks::pipeline
22 changes: 13 additions & 9 deletions be/src/exec/vectorized/aggregate/aggregate_blocking_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,17 +163,21 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory> > AggregateBlockingNode::
using namespace pipeline;
OpFactories operators_with_sink = _children[0]->decompose_to_pipeline(context);
auto& agg_node = _tnode.agg_node;
size_t degree_of_parallelism;
if (agg_node.need_finalize) {
// Parallelism of finalize must be 1
degree_of_parallelism = 1;
operators_with_sink = context->maybe_interpolate_local_passthrough_exchange(operators_with_sink);
} else {
// We cannot get degree of parallelism from PipelineBuilderContext, of which is only a suggest value
// and we may set other parallelism for source operator in many special cases
degree_of_parallelism =
down_cast<SourceOperatorFactory*>(operators_with_sink[0].get())->degree_of_parallelism();
// If finalize aggregate with group by clause, then it can be paralized
if (agg_node.__isset.grouping_exprs && !_tnode.agg_node.grouping_exprs.empty()) {
std::vector<ExprContext*> group_by_expr_ctxs;
Expr::create_expr_trees(_pool, _tnode.agg_node.grouping_exprs, &group_by_expr_ctxs);
operators_with_sink =
context->maybe_interpolate_local_shuffle_exchange(operators_with_sink, group_by_expr_ctxs);
} else {
operators_with_sink = context->maybe_interpolate_local_passthrough_exchange(operators_with_sink);
}
}
// We cannot get degree of parallelism from PipelineBuilderContext, of which is only a suggest value
// and we may set other parallelism for source operator in many special cases
size_t degree_of_parallelism =
down_cast<SourceOperatorFactory*>(operators_with_sink[0].get())->degree_of_parallelism();

// shared by sink operator and source operator
AggregatorFactoryPtr aggregator_factory = std::make_shared<AggregatorFactory>(_tnode);
Expand Down
2 changes: 0 additions & 2 deletions be/src/exec/vectorized/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ using AggregatorPtr = std::shared_ptr<Aggregator>;

// Component used to process aggregation including bloking aggregate and streaming aggregate
// it contains common data struct and algorithm of aggregation
// TODO(hcf) this component is shared by multiply sink/source operators in pipeline engine
// TODO(hcf) all the data should be protected by lightweight lock
class Aggregator {
public:
Aggregator(const TPlanNode& tnode);
Expand Down

0 comments on commit 1aeba29

Please sign in to comment.