Skip to content

Commit

Permalink
[BugFix] Fix the bug of LocalPartitionTopnSinkOperator set finishing …
Browse files Browse the repository at this point in the history
…without check is_cancelled (StarRocks#48037)

Signed-off-by: trueeyu <[email protected]>
  • Loading branch information
trueeyu authored Jul 9, 2024
1 parent 69fd4d7 commit 3774b6e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
6 changes: 3 additions & 3 deletions be/src/exec/pipeline/sort/local_partition_topn_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Status LocalPartitionTopnContext::prepare(RuntimeState* state) {
}

Status LocalPartitionTopnContext::push_one_chunk_to_partitioner(RuntimeState* state, const ChunkPtr& chunk) {
auto st = _chunks_partitioner->offer<true>(
RETURN_IF_ERROR(_chunks_partitioner->offer<true>(
chunk,
[this, state](size_t partition_idx) {
_chunks_sorters.emplace_back(std::make_shared<ChunksSorterTopn>(
Expand All @@ -69,11 +69,11 @@ Status LocalPartitionTopnContext::push_one_chunk_to_partitioner(RuntimeState* st
},
[this, state](size_t partition_idx, const ChunkPtr& chunk) {
(void)_chunks_sorters[partition_idx]->update(state, chunk);
});
}));
if (_chunks_partitioner->is_passthrough()) {
RETURN_IF_ERROR(transfer_all_chunks_from_partitioner_to_sorters(state));
}
return st;
return Status::OK();
}

void LocalPartitionTopnContext::sink_complete() {
Expand Down
18 changes: 11 additions & 7 deletions be/src/exec/pipeline/sort/local_partition_topn_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,17 @@ Status LocalPartitionTopnSinkOperator::push_chunk(RuntimeState* state, const Chu

Status LocalPartitionTopnSinkOperator::set_finishing(RuntimeState* state) {
ONCE_DETECT(_set_finishing_once);
RETURN_IF_ERROR(_partition_topn_ctx->transfer_all_chunks_from_partitioner_to_sorters(state));
_partition_topn_ctx->sink_complete();
_unique_metrics->add_info_string("IsPassThrough", _partition_topn_ctx->is_passthrough() ? "Yes" : "No");
auto* partition_num_counter = ADD_COUNTER(_unique_metrics, "PartitionNum", TUnit::UNIT);
COUNTER_SET(partition_num_counter, static_cast<int64_t>(_partition_topn_ctx->num_partitions()));
_is_finished = true;
return Status::OK();
DeferOp defer([&]() {
_partition_topn_ctx->sink_complete();
_unique_metrics->add_info_string("IsPassThrough", _partition_topn_ctx->is_passthrough() ? "Yes" : "No");
auto* partition_num_counter = ADD_COUNTER(_unique_metrics, "PartitionNum", TUnit::UNIT);
COUNTER_SET(partition_num_counter, static_cast<int64_t>(_partition_topn_ctx->num_partitions()));
_is_finished = true;
});
if (state->is_cancelled()) {
return Status::OK();
}
return _partition_topn_ctx->transfer_all_chunks_from_partitioner_to_sorters(state);
}

OperatorPtr LocalPartitionTopnSinkOperatorFactory::create(int32_t degree_of_parallelism, int32_t driver_sequence) {
Expand Down

0 comments on commit 3774b6e

Please sign in to comment.