Skip to content

Commit

Permalink
[FLINK-27902][network] Using mustBePipelinedConsumed method to comput…
Browse files Browse the repository at this point in the history
…e pipelined regions.
  • Loading branch information
reswqa authored and xintongsong committed Jun 8, 2022
1 parent 588ef59 commit e0b971c
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,21 @@ public static Set<Set<LogicalVertex>> computePipelinedRegions(
final Map<LogicalVertex, Set<LogicalVertex>> vertexToRegion =
buildRawRegions(
topologicallySortedVertices,
LogicalPipelinedRegionComputeUtil::getNonReconnectableConsumedResults);
LogicalPipelinedRegionComputeUtil::getMustBePipelinedConsumedResults);

// Since LogicalTopology is a DAG, there is no need to do cycle detection nor to merge
// regions on cycles.
return uniqueVertexGroups(vertexToRegion);
}

private static Iterable<LogicalResult> getNonReconnectableConsumedResults(
LogicalVertex vertex) {
List<LogicalResult> nonReconnectableConsumedResults = new ArrayList<>();
private static Iterable<LogicalResult> getMustBePipelinedConsumedResults(LogicalVertex vertex) {
List<LogicalResult> mustBePipelinedConsumedResults = new ArrayList<>();
for (LogicalResult consumedResult : vertex.getConsumedResults()) {
if (!consumedResult.getResultType().isReconnectable()) {
nonReconnectableConsumedResults.add(consumedResult);
if (consumedResult.getResultType().mustBePipelinedConsumed()) {
mustBePipelinedConsumedResults.add(consumedResult);
}
}
return nonReconnectableConsumedResults;
return mustBePipelinedConsumedResults;
}

private LogicalPipelinedRegionComputeUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class PipelinedRegionComputeUtil {
static <V extends Vertex<?, ?, V, R>, R extends Result<?, ?, V, R>>
Map<V, Set<V>> buildRawRegions(
final Iterable<? extends V> topologicallySortedVertices,
final Function<V, Iterable<R>> getNonReconnectableConsumedResults) {
final Function<V, Iterable<R>> getMustBePipelinedConsumedResults) {

final Map<V, Set<V>> vertexToRegion = new IdentityHashMap<>();

Expand All @@ -45,11 +45,10 @@ Map<V, Set<V>> buildRawRegions(
currentRegion.add(vertex);
vertexToRegion.put(vertex, currentRegion);

// Similar to the BLOCKING ResultPartitionType, each vertex connected through
// PIPELINED_APPROXIMATE is also considered as a single region. This attribute is
// called "reconnectable". Reconnectable will be removed after FLINK-19895, see also
// {@link ResultPartitionType#isReconnectable}
for (R consumedResult : getNonReconnectableConsumedResults.apply(vertex)) {
// Each vertex connected through not mustBePipelined consumingConstraint is considered
// as a
// single region.
for (R consumedResult : getMustBePipelinedConsumedResults.apply(vertex)) {
final V producerVertex = consumedResult.getProducer();
final Set<V> producerRegion = vertexToRegion.get(producerVertex);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static Set<Set<SchedulingExecutionVertex>> computePipelinedRegions(
buildRawRegions(
topologicallySortedVertices,
vertex ->
getNonReconnectableConsumedResults(
getMustBePipelinedConsumedResults(
vertex, resultPartitionRetriever));

return mergeRegionsOnCycles(vertexToRegion, executionVertexRetriever);
Expand Down Expand Up @@ -113,7 +113,7 @@ private static List<List<Integer>> buildOutEdgesDesc(
final List<Integer> currentRegionOutEdges = new ArrayList<>();
for (SchedulingExecutionVertex vertex : currentRegion) {
for (SchedulingResultPartition producedResult : vertex.getProducedResults()) {
if (!producedResult.getResultType().isReconnectable()) {
if (producedResult.getResultType().mustBePipelinedConsumed()) {
continue;
}
final Optional<ConsumerVertexGroup> consumerVertexGroup =
Expand Down Expand Up @@ -143,23 +143,23 @@ private static List<List<Integer>> buildOutEdgesDesc(
return outEdges;
}

private static Iterable<SchedulingResultPartition> getNonReconnectableConsumedResults(
private static Iterable<SchedulingResultPartition> getMustBePipelinedConsumedResults(
SchedulingExecutionVertex vertex,
Function<IntermediateResultPartitionID, ? extends SchedulingResultPartition>
resultPartitionRetriever) {
List<SchedulingResultPartition> nonReconnectableConsumedResults = new ArrayList<>();
List<SchedulingResultPartition> mustBePipelinedConsumedResults = new ArrayList<>();
for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
for (IntermediateResultPartitionID partitionId : consumedPartitionGroup) {
SchedulingResultPartition consumedResult =
resultPartitionRetriever.apply(partitionId);
if (consumedResult.getResultType().isReconnectable()) {
if (!consumedResult.getResultType().mustBePipelinedConsumed()) {
// The result types of partitions in one ConsumedPartitionGroup are all the same
break;
}
nonReconnectableConsumedResults.add(consumedResult);
mustBePipelinedConsumedResults.add(consumedResult);
}
}
return nonReconnectableConsumedResults;
return mustBePipelinedConsumedResults;
}

private SchedulingPipelinedRegionComputeUtil() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public boolean isPipelined() {
return isPipelined;
}

/** return if this partition's upstream and downstream must be scheduled in the same time. */
public boolean mustBePipelinedConsumed() {
return consumingConstraint == ConsumingConstraint.MUST_BE_PIPELINED;
}

public boolean isReconnectable() {
return isReconnectable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ public void notifyExecutionGraphUpdated(
.map(ExecutionJobVertex::getJobVertexId)
.collect(Collectors.toSet());

// any PIPELINED input should be from within this new set so that existing pipelined regions
// will not change
// any mustBePipelinedConsumed input should be from within this new set so that existing
// pipelined regions will not change
newlyInitializedJobVertices.stream()
.map(ExecutionJobVertex::getJobVertex)
.flatMap(v -> v.getInputs().stream())
.map(JobEdge::getSource)
.filter(r -> r.getResultType().isPipelined())
.filter(r -> r.getResultType().mustBePipelinedConsumed())
.map(IntermediateDataSet::getProducer)
.map(JobVertex::getID)
.forEach(id -> checkState(newJobVertexIds.contains(id)));
Expand Down

0 comments on commit e0b971c

Please sign in to comment.