Skip to content

Commit

Permalink
Prevent query from deadlocking when task output buffer is full
Browse files Browse the repository at this point in the history
When query is of shape:

SourceStage
           \
        PartitionedStage(broadcast output buffer)
        /     \
  ProbeStage  BuildStage

Then it might happen that task output buffer for
PartitionedStage gets full blocking upstream stages
(ProbeStage and BuildStage).

Previously, phased scheduled did not schedule SourceStage
in such situation because PartitionedStage was already considered
completed.

This commit makes PhasedExecutionSchedule#unblockStagesWithFullOutputBuffer
check to be run for all stages (including scheduled, but running stages).
  • Loading branch information
sopel39 committed Nov 8, 2022
1 parent 5d124c4 commit c1f1942
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ private Set<PlanFragmentId> removeCompletedStage(StageExecution stage)
.filter(dependentFragmentId -> fragmentDependency.inDegreeOf(dependentFragmentId) == 1)
.collect(toImmutableSet());
fragmentDependency.removeVertex(fragmentId);
fragmentTopology.removeVertex(fragmentId);
activeStages.remove(stage);
return fragmentsToExecute;
}

private Set<PlanFragmentId> unblockStagesWithFullOutputBuffer()
{
// find stages that are blocked on full task output buffer
Set<PlanFragmentId> blockedFragments = activeStages.stream()
// iterate over all stages, not only scheduled ones; stages which are scheduled but still running
// (e.g. partitioned stage with broadcast output buffer) might have blocked task output buffer
Set<PlanFragmentId> blockedFragments = stagesByFragmentId.values().stream()
.filter(StageExecution::isAnyTaskBlocked)
.map(stage -> stage.getFragment().getId())
.collect(toImmutableSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static io.trino.execution.scheduler.StageExecution.State.ABORTED;
import static io.trino.execution.scheduler.StageExecution.State.FINISHED;
import static io.trino.execution.scheduler.StageExecution.State.FLUSHING;
import static io.trino.execution.scheduler.StageExecution.State.SCHEDULED;
import static io.trino.execution.scheduler.policy.PlanUtils.createAggregationFragment;
import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastAndPartitionedJoinPlanFragment;
import static io.trino.execution.scheduler.policy.PlanUtils.createBroadcastJoinPlanFragment;
Expand Down Expand Up @@ -222,6 +223,48 @@ public void testStageWithBroadcastAndPartitionedJoin()
assertThat(getActiveFragments(schedule)).containsExactly(joinFragment.getId(), probeFragment.getId());
}

@Test
public void testSourceStageBroadcastJoinWithPartitionedJoinBuildSide()
{
PlanFragment nestedJoinBuildFragment = createTableScanPlanFragment("nested_join_build");
PlanFragment nestedJoinProbeFragment = createTableScanPlanFragment("nested_join_probe");
PlanFragment nestedJoinFragment = createJoinPlanFragment(INNER, "nested_join", nestedJoinBuildFragment, nestedJoinProbeFragment);
PlanFragment joinSourceFragment = createBroadcastJoinPlanFragment("probe", nestedJoinFragment);

TestingStageExecution nestedJoinBuildStage = new TestingStageExecution(nestedJoinBuildFragment);
TestingStageExecution nestedJoinProbeStage = new TestingStageExecution(nestedJoinProbeFragment);
TestingStageExecution nestedJoinStage = new TestingStageExecution(nestedJoinFragment);
TestingStageExecution joinSourceStage = new TestingStageExecution(joinSourceFragment);

PhasedExecutionSchedule schedule = PhasedExecutionSchedule.forStages(ImmutableSet.of(
nestedJoinBuildStage, nestedJoinProbeStage, nestedJoinStage, joinSourceStage), dynamicFilterService);

// nestedJoinStage and nestedJoinProbeStage should start immediately
DirectedGraph<PlanFragmentId, FragmentsEdge> dependencies = schedule.getFragmentDependency();
assertThat(dependencies.edgeSet()).containsExactlyInAnyOrder(
new FragmentsEdge(nestedJoinBuildFragment.getId(), joinSourceFragment.getId()),
new FragmentsEdge(nestedJoinFragment.getId(), joinSourceFragment.getId()),
new FragmentsEdge(nestedJoinBuildFragment.getId(), nestedJoinProbeFragment.getId()),
new FragmentsEdge(nestedJoinProbeFragment.getId(), joinSourceFragment.getId()));
assertThat(getActiveFragments(schedule)).containsExactly(nestedJoinBuildFragment.getId(), nestedJoinFragment.getId());

// Mark nestedJoinFragment and nestedJoinBuildFragment as scheduled.
// joinSourceFragment still has dependency on nestedJoinProbeFragment
nestedJoinStage.setState(SCHEDULED);
nestedJoinBuildStage.setState(FINISHED);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(nestedJoinProbeFragment.getId());

// mark nestedJoinFragment buffer as full, now joinSourceFragment is forced to be scheduled
nestedJoinStage.setAnyTaskBlocked(true);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(nestedJoinProbeFragment.getId(), joinSourceFragment.getId());

nestedJoinProbeStage.setState(FINISHED);
schedule.schedule();
assertThat(getActiveFragments(schedule)).containsExactly(joinSourceFragment.getId());
}

private Set<PlanFragmentId> getActiveFragments(PhasedExecutionSchedule schedule)
{
return schedule.getActiveStages().stream()
Expand Down

0 comments on commit c1f1942

Please sign in to comment.