diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 89515fdbad49b..0f9e18d67aea4 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -173,6 +173,15 @@ public class JobManagerOptions { text("'legacy': legacy scheduler"), text("'ng': new generation scheduler")) .build()); + /** + * Config parameter controlling whether partitions should already be released during the job execution. + */ + @Documentation.ExcludeFromDocumentation("User normally should not be expected to deactivate this feature. " + + "We aim at removing this flag eventually.") + public static final ConfigOption PARTITION_RELEASE_DURING_JOB_EXECUTION = + key("jobmanager.partition.release-during-job-execution") + .defaultValue(true) + .withDescription("Controls whether partitions should already be released during the job execution."); @Documentation.ExcludeFromDocumentation("dev use only; likely temporary") public static final ConfigOption FORCE_PARTITION_RELEASE_ON_CONSUMPTION = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 514121ef35e54..ce65b68b98946 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -48,6 +48,9 @@ import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy; import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; @@ -55,6 +58,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -66,6 +70,11 @@ import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -250,6 +259,12 @@ public class ExecutionGraph implements AccessExecutionGraph { /** The total number of vertices currently in the execution graph. */ private int numVerticesTotal; + private final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory; + + private PartitionReleaseStrategy partitionReleaseStrategy; + + private SchedulingTopology schedulingTopology; + // ------ Configuration of the Execution ------- /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able @@ -413,6 +428,7 @@ public ExecutionGraph( userClassLoader, blobWriter, allocationTimeout, + new NotReleasingPartitionReleaseStrategy.Factory(), NettyShuffleMaster.INSTANCE, true, new PartitionTrackerImpl( @@ -433,6 +449,7 @@ public ExecutionGraph( ClassLoader userClassLoader, BlobWriter blobWriter, Time allocationTimeout, + PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory, ShuffleMaster shuffleMaster, boolean forcePartitionReleaseOnConsumption, PartitionTracker partitionTracker) throws IOException { @@ -464,6 +481,8 @@ public ExecutionGraph( this.rpcTimeout = checkNotNull(rpcTimeout); this.allocationTimeout = checkNotNull(allocationTimeout); + this.partitionReleaseStrategyFactory = checkNotNull(partitionReleaseStrategyFactory); + this.restartStrategy = restartStrategy; this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices()); @@ -913,6 +932,11 @@ public void attachJobGraph(List topologiallySorted) throws JobExcepti } failoverStrategy.notifyNewVertices(newExecJobVertices); + + schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this); + partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance( + schedulingTopology, + new DefaultFailoverTopology(this)); } public void scheduleForExecution() throws JobException { @@ -1605,36 +1629,9 @@ public boolean updateState(TaskExecutionState state) { if (attempt != null) { try { - Map> accumulators; - - switch (state.getExecutionState()) { - case RUNNING: - return attempt.switchToRunning(); - - case FINISHED: - // this deserialization is exception-free - accumulators = deserializeAccumulators(state); - attempt.markFinished(accumulators, state.getIOMetrics()); - return true; - - case CANCELED: - // this deserialization is exception-free - accumulators = deserializeAccumulators(state); - attempt.completeCancelling(accumulators, state.getIOMetrics()); - return true; - - case FAILED: - // this deserialization is exception-free - accumulators = deserializeAccumulators(state); - attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics()); - return true; - - default: - // we mark as failed and return false, which triggers the TaskManager - // to remove the task - attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); - return false; - } + final boolean stateUpdated = updateStateInternal(state, attempt); + maybeReleasePartitions(attempt); + return stateUpdated; } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); @@ -1649,6 +1646,77 @@ public boolean updateState(TaskExecutionState state) { } } + private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) { + Map> accumulators; + + switch (state.getExecutionState()) { + case RUNNING: + return attempt.switchToRunning(); + + case FINISHED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFinished(accumulators, state.getIOMetrics()); + return true; + + case CANCELED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.completeCancelling(accumulators, state.getIOMetrics()); + return true; + + case FAILED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics()); + return true; + + default: + // we mark as failed and return false, which triggers the TaskManager + // to remove the task + attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); + return false; + } + } + + private void maybeReleasePartitions(final Execution attempt) { + final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID(); + + if (attempt.getState() == ExecutionState.FINISHED) { + final List releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex); + releasePartitions(releasablePartitions); + } else { + partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex); + } + } + + private void releasePartitions(final List releasablePartitions) { + if (releasablePartitions.size() > 0) { + final List partitionIds = releasablePartitions.stream() + .map(this::createResultPartitionId) + .collect(Collectors.toList()); + + partitionTracker.stopTrackingAndReleasePartitions(partitionIds); + } + } + + private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) { + final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId); + final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer(); + final ExecutionVertexID producerId = producer.getId(); + final JobVertexID jobVertexId = producerId.getJobVertexId(); + final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId); + checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId); + + final ExecutionVertex[] taskVertices = jobVertex.getTaskVertices(); + final int subtaskIndex = producerId.getSubtaskIndex(); + checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", subtaskIndex, jobVertexId); + + final ExecutionVertex taskVertex = taskVertices[subtaskIndex]; + final Execution execution = taskVertex.getCurrentExecutionAttempt(); + return new ResultPartitionID(resultPartitionId, execution.getAttemptId()); + } + /** * Deserializes accumulators from a task state update. * @@ -1835,4 +1903,8 @@ ShuffleMaster getShuffleMaster() { public PartitionTracker getPartitionTracker() { return partitionTracker; } + + PartitionReleaseStrategy getPartitionReleaseStrategy() { + return partitionReleaseStrategy; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index f21d703fdadae..1b9d7a11eb7b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -38,6 +38,8 @@ import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader; import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge; import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; @@ -117,6 +119,9 @@ public static ExecutionGraph buildGraph( final int maxPriorAttemptsHistoryLength = jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE); + final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory = + PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig); + final boolean forcePartitionReleaseOnConsumption = jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION); @@ -136,6 +141,7 @@ public static ExecutionGraph buildGraph( classLoader, blobWriter, allocationTimeout, + partitionReleaseStrategyFactory, shuffleMaster, forcePartitionReleaseOnConsumption, partitionTracker); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 5fc0b7215b9f9..349aaff502578 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -601,6 +601,7 @@ public Execution resetForNewExecution(final long timestamp, final long originati if (oldState.isTerminal()) { if (oldState == FINISHED) { oldExecution.stopTrackingAndReleasePartitions(); + getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId); } priorExecutions.add(oldExecution.archive()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java index 14d28b740daf6..8f19ed95686c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java @@ -19,6 +19,9 @@ package org.apache.flink.runtime.executiongraph.failover.flip1; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,14 +30,29 @@ import java.util.IdentityHashMap; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; /** - * Utility for computing pipeliend regions. + * Utility for computing pipelined regions. */ public final class PipelinedRegionComputeUtil { private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class); + public static Set toPipelinedRegionsSet(final Set> distinctRegions) { + return distinctRegions.stream() + .map(toExecutionVertexIdSet()) + .map(PipelinedRegion::from) + .collect(Collectors.toSet()); + } + + private static Function, Set> toExecutionVertexIdSet() { + return failoverVertices -> failoverVertices.stream() + .map(FailoverVertex::getExecutionVertexID) + .collect(Collectors.toSet()); + } + public static Set> computePipelinedRegions(final FailoverTopology topology) { // currently we let a job with co-location constraints fail as one region // putting co-located vertices in the same region with each other can be a future improvement diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java new file mode 100644 index 0000000000000..e386870486695 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/NotReleasingPartitionReleaseStrategy.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import java.util.Collections; +import java.util.List; + +/** + * Does not release intermediate result partitions during job execution. Relies on partitions being + * released at the end of the job. + */ +public class NotReleasingPartitionReleaseStrategy implements PartitionReleaseStrategy { + + @Override + public List vertexFinished(final ExecutionVertexID finishedVertex) { + return Collections.emptyList(); + } + + @Override + public void vertexUnfinished(final ExecutionVertexID executionVertexID) { + } + + /** + * Factory for {@link NotReleasingPartitionReleaseStrategy}. + */ + public static class Factory implements PartitionReleaseStrategy.Factory { + + @Override + public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy, final FailoverTopology failoverTopology) { + return new NotReleasingPartitionReleaseStrategy(); + } + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java new file mode 100644 index 0000000000000..d7b317e10e746 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategy.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import java.util.List; + +/** + * Interface for strategies that decide when to release + * {@link IntermediateResultPartition IntermediateResultPartitions}. + */ +public interface PartitionReleaseStrategy { + + /** + * Calling this method informs the strategy that a vertex finished. + * + * @param finishedVertex Id of the vertex that finished the execution + * @return A list of result partitions that can be released + */ + List vertexFinished(ExecutionVertexID finishedVertex); + + /** + * Calling this method informs the strategy that a vertex is no longer in finished state, e.g., + * when a vertex is re-executed. + * + * @param executionVertexID Id of the vertex that is no longer in finished state. + */ + void vertexUnfinished(ExecutionVertexID executionVertexID); + + /** + * Factory for {@link PartitionReleaseStrategy}. + */ + interface Factory { + PartitionReleaseStrategy createInstance(SchedulingTopology schedulingStrategy, FailoverTopology failoverTopology); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java new file mode 100644 index 0000000000000..744a2707a65c5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoader.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; + +/** + * Instantiates a {@link RegionPartitionReleaseStrategy}. + */ +public final class PartitionReleaseStrategyFactoryLoader { + + public static PartitionReleaseStrategy.Factory loadPartitionReleaseStrategyFactory(final Configuration configuration) { + final boolean partitionReleaseDuringJobExecution = configuration.getBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION); + if (partitionReleaseDuringJobExecution) { + return new RegionPartitionReleaseStrategy.Factory(); + } else { + return new NotReleasingPartitionReleaseStrategy.Factory(); + } + } + + private PartitionReleaseStrategyFactoryLoader() { + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java new file mode 100644 index 0000000000000..36c042e42c9d0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegion.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Set of execution vertices that are connected through pipelined intermediate result partitions. + */ +public class PipelinedRegion implements Iterable { + + private final Set executionVertexIds; + + private PipelinedRegion(final Set executionVertexIds) { + this.executionVertexIds = new HashSet<>(checkNotNull(executionVertexIds)); + } + + public static PipelinedRegion from(final Set executionVertexIds) { + return new PipelinedRegion(executionVertexIds); + } + + public static PipelinedRegion from(final ExecutionVertexID... executionVertexIds) { + return new PipelinedRegion(new HashSet<>(Arrays.asList(executionVertexIds))); + } + + public Set getExecutionVertexIds() { + return executionVertexIds; + } + + public boolean contains(final ExecutionVertexID executionVertexId) { + return executionVertexIds.contains(executionVertexId); + } + + @Override + public Iterator iterator() { + return executionVertexIds.iterator(); + } + + @Override + public String toString() { + return "PipelinedRegion{" + + "executionVertexIds=" + executionVertexIds + + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java new file mode 100644 index 0000000000000..5cbce45f01145 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionConsumedBlockingPartitions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; + +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A set of intermediate result partitions that are incident to one {@link PipelinedRegion}. + */ +class PipelinedRegionConsumedBlockingPartitions { + + private final PipelinedRegion pipelinedRegion; + + private final Set consumedBlockingPartitions; + + PipelinedRegionConsumedBlockingPartitions( + final PipelinedRegion pipelinedRegion, + final Set consumedBlockingPartitions) { + this.pipelinedRegion = checkNotNull(pipelinedRegion); + this.consumedBlockingPartitions = checkNotNull(consumedBlockingPartitions); + } + + public Set getConsumedBlockingPartitions() { + return consumedBlockingPartitions; + } + + public PipelinedRegion getPipelinedRegion() { + return pipelinedRegion; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java new file mode 100644 index 0000000000000..c92fa8b1b767d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionView.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a virtual execution state of a {@link PipelinedRegion}. + * + *

A pipelined region can be either finished or unfinished. It is finished iff. all its + * executions have reached the finished state. + */ +class PipelinedRegionExecutionView { + + private final PipelinedRegion pipelinedRegion; + + private final Set unfinishedVertices; + + PipelinedRegionExecutionView(final PipelinedRegion pipelinedRegion) { + this.pipelinedRegion = checkNotNull(pipelinedRegion); + this.unfinishedVertices = new HashSet<>(pipelinedRegion.getExecutionVertexIds()); + } + + public boolean isFinished() { + return unfinishedVertices.isEmpty(); + } + + public void vertexFinished(final ExecutionVertexID executionVertexId) { + checkArgument(pipelinedRegion.contains(executionVertexId)); + unfinishedVertices.remove(executionVertexId); + } + + public void vertexUnfinished(final ExecutionVertexID executionVertexId) { + checkArgument(pipelinedRegion.contains(executionVertexId)); + unfinishedVertices.add(executionVertexId); + } + + public PipelinedRegion getPipelinedRegion() { + return pipelinedRegion; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java new file mode 100644 index 0000000000000..b930e100e19ab --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.PipelinedRegionComputeUtil; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Releases blocking intermediate result partitions that are incident to a {@link PipelinedRegion}, + * as soon as the region's execution vertices are finished. + */ +public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy { + + private final SchedulingTopology schedulingTopology; + + private final Map consumedBlockingPartitionsByRegion = new IdentityHashMap<>(); + + private final Map regionExecutionViewByVertex = new HashMap<>(); + + public RegionPartitionReleaseStrategy( + final SchedulingTopology schedulingTopology, + final Set pipelinedRegions) { + + this.schedulingTopology = checkNotNull(schedulingTopology); + + checkNotNull(pipelinedRegions); + initConsumedBlockingPartitionsByRegion(pipelinedRegions); + initRegionExecutionViewByVertex(pipelinedRegions); + } + + private void initConsumedBlockingPartitionsByRegion(final Set pipelinedRegions) { + for (PipelinedRegion pipelinedRegion : pipelinedRegions) { + final PipelinedRegionConsumedBlockingPartitions consumedPartitions = computeConsumedPartitionsOfVertexRegion(pipelinedRegion); + consumedBlockingPartitionsByRegion.put(pipelinedRegion, consumedPartitions); + } + } + + private void initRegionExecutionViewByVertex(final Set pipelinedRegions) { + for (PipelinedRegion pipelinedRegion : pipelinedRegions) { + final PipelinedRegionExecutionView regionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion); + for (ExecutionVertexID executionVertexId : pipelinedRegion) { + regionExecutionViewByVertex.put(executionVertexId, regionExecutionView); + } + } + } + + private PipelinedRegionConsumedBlockingPartitions computeConsumedPartitionsOfVertexRegion(final PipelinedRegion pipelinedRegion) { + final Set resultPartitionsOutsideOfRegion = findResultPartitionsOutsideOfRegion(pipelinedRegion); + return new PipelinedRegionConsumedBlockingPartitions(pipelinedRegion, resultPartitionsOutsideOfRegion); + } + + private Set findResultPartitionsOutsideOfRegion(final PipelinedRegion pipelinedRegion) { + final Set allConsumedPartitionsInRegion = pipelinedRegion + .getExecutionVertexIds() + .stream() + .map(schedulingTopology::getVertexOrThrow) + .flatMap(schedulingExecutionVertex -> schedulingExecutionVertex.getConsumedResultPartitions().stream()) + .collect(Collectors.toSet()); + + return filterResultPartitionsOutsideOfRegion(allConsumedPartitionsInRegion, pipelinedRegion); + } + + private static Set filterResultPartitionsOutsideOfRegion( + final Collection resultPartitions, + final PipelinedRegion pipelinedRegion) { + + final Set result = new HashSet<>(); + for (final SchedulingResultPartition maybeOutsidePartition : resultPartitions) { + final SchedulingExecutionVertex producer = maybeOutsidePartition.getProducer(); + if (!pipelinedRegion.contains(producer.getId())) { + result.add(maybeOutsidePartition.getId()); + } + } + return result; + } + + @Override + public List vertexFinished(final ExecutionVertexID finishedVertex) { + final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(finishedVertex); + regionExecutionView.vertexFinished(finishedVertex); + + if (regionExecutionView.isFinished()) { + final PipelinedRegion pipelinedRegion = getPipelinedRegionForVertex(finishedVertex); + final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion = getConsumedBlockingPartitionsForRegion(pipelinedRegion); + return filterReleasablePartitions(consumedPartitionsOfVertexRegion); + } + return Collections.emptyList(); + } + + @Override + public void vertexUnfinished(final ExecutionVertexID executionVertexId) { + final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId); + regionExecutionView.vertexUnfinished(executionVertexId); + } + + private PipelinedRegionExecutionView getPipelinedRegionExecutionViewForVertex(final ExecutionVertexID executionVertexId) { + final PipelinedRegionExecutionView pipelinedRegionExecutionView = regionExecutionViewByVertex.get(executionVertexId); + checkState(pipelinedRegionExecutionView != null, + "PipelinedRegionExecutionView not found for execution vertex %s", executionVertexId); + return pipelinedRegionExecutionView; + } + + private PipelinedRegion getPipelinedRegionForVertex(final ExecutionVertexID executionVertexId) { + final PipelinedRegionExecutionView pipelinedRegionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId); + return pipelinedRegionExecutionView.getPipelinedRegion(); + } + + private PipelinedRegionConsumedBlockingPartitions getConsumedBlockingPartitionsForRegion(final PipelinedRegion pipelinedRegion) { + final PipelinedRegionConsumedBlockingPartitions pipelinedRegionConsumedBlockingPartitions = consumedBlockingPartitionsByRegion.get(pipelinedRegion); + checkState(pipelinedRegionConsumedBlockingPartitions != null, + "Consumed partitions not found for pipelined region %s", pipelinedRegion); + checkState(pipelinedRegionConsumedBlockingPartitions.getPipelinedRegion() == pipelinedRegion); + return pipelinedRegionConsumedBlockingPartitions; + } + + private List filterReleasablePartitions(final PipelinedRegionConsumedBlockingPartitions consumedPartitionsOfVertexRegion) { + return consumedPartitionsOfVertexRegion + .getConsumedBlockingPartitions() + .stream() + .filter(this::areConsumerRegionsFinished) + .collect(Collectors.toList()); + } + + private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) { + final SchedulingResultPartition resultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId); + final Collection consumers = resultPartition.getConsumers(); + return consumers + .stream() + .map(SchedulingExecutionVertex::getId) + .allMatch(this::isRegionOfVertexFinished); + } + + private boolean isRegionOfVertexFinished(final ExecutionVertexID executionVertexId) { + final PipelinedRegionExecutionView regionExecutionView = getPipelinedRegionExecutionViewForVertex(executionVertexId); + return regionExecutionView.isFinished(); + } + + /** + * Factory for {@link PartitionReleaseStrategy}. + */ + public static class Factory implements PartitionReleaseStrategy.Factory { + + @Override + public PartitionReleaseStrategy createInstance( + final SchedulingTopology schedulingStrategy, + final FailoverTopology failoverTopology) { + + final Set> distinctRegions = PipelinedRegionComputeUtil.computePipelinedRegions(failoverTopology); + return new RegionPartitionReleaseStrategy( + schedulingStrategy, + PipelinedRegionComputeUtil.toPipelinedRegionsSet(distinctRegions)); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java new file mode 100644 index 0000000000000..01106429a6b5b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.core.IsEqual.equalTo; + +/** + * Tests for the interactions of the {@link ExecutionGraph} and {@link PartitionReleaseStrategy}. + */ +public class ExecutionGraphPartitionReleaseTest extends TestLogger { + + private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private static final TestingComponentMainThreadExecutor mainThreadExecutor = + new TestingComponentMainThreadExecutor( + TestingComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService)); + + @Test + public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws Exception { + // setup a simple pipeline of 3 operators with blocking partitions + final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobVertex operatorVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobVertex sinkVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + + operatorVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + sinkVertex.connectNewDataSetAsInput(operatorVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + // setup partition tracker to intercept partition release calls + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + final Queue releasedPartitions = new ArrayDeque<>(); + partitionTracker.setStopTrackingAndReleasePartitionsConsumer( + partitionIds -> releasedPartitions.add(partitionIds.iterator().next())); + + final ExecutionGraph executionGraph = createExecutionGraph(partitionTracker, sourceVertex, operatorVertex, sinkVertex); + + // finish vertices one after another, and verify that the appropriate partitions are released + mainThreadExecutor.execute(() -> { + final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph); + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED)); + assertThat(releasedPartitions, empty()); + }); + + mainThreadExecutor.execute(() -> { + final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph); + final Execution operatorExecution = getCurrentExecution(operatorVertex, executionGraph); + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operatorExecution.getAttemptId(), ExecutionState.FINISHED)); + assertThat(releasedPartitions, hasSize(1)); + assertThat(releasedPartitions.remove(), equalTo(new ResultPartitionID( + sourceExecution.getVertex().getProducedPartitions().keySet().iterator().next(), + sourceExecution.getAttemptId()))); + }); + + mainThreadExecutor.execute(() -> { + final Execution operatorExecution = getCurrentExecution(operatorVertex, executionGraph); + final Execution sinkExecution = getCurrentExecution(sinkVertex, executionGraph); + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sinkExecution.getAttemptId(), ExecutionState.FINISHED)); + + assertThat(releasedPartitions, hasSize(1)); + assertThat(releasedPartitions.remove(), equalTo(new ResultPartitionID( + operatorExecution.getVertex().getProducedPartitions().keySet().iterator().next(), + operatorExecution.getAttemptId()))); + }); + } + + @Test + public void testStrategyNotifiedOfUnFinishedVertices() throws Exception { + // setup a pipeline of 2 failover regions (f1 -> f2), where + // f1 is just a source + // f2 consists of 3 operators (o1,o2,o3), where o1 consumes f1, and o2/o3 consume o1 + final JobVertex sourceVertex = ExecutionGraphTestUtils.createNoOpVertex("source", 1); + final JobVertex operator1Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator1", 1); + final JobVertex operator2Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator2", 1); + final JobVertex operator3Vertex = ExecutionGraphTestUtils.createNoOpVertex("operator3", 1); + + operator1Vertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + operator2Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + operator3Vertex.connectNewDataSetAsInput(operator1Vertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + // setup partition tracker to intercept partition release calls + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + final Queue releasedPartitions = new ArrayDeque<>(); + partitionTracker.setStopTrackingAndReleasePartitionsConsumer( + partitionIds -> releasedPartitions.add(partitionIds.iterator().next())); + + final ExecutionGraph executionGraph = createExecutionGraph( + partitionTracker, sourceVertex, operator1Vertex, operator2Vertex, operator3Vertex); + + mainThreadExecutor.execute(() -> { + final Execution sourceExecution = getCurrentExecution(sourceVertex, executionGraph); + // finish the source; this should not result in any release calls since the consumer o1 was not finished + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), sourceExecution.getAttemptId(), ExecutionState.FINISHED)); + assertThat(releasedPartitions, empty()); + }); + + mainThreadExecutor.execute(() -> { + final Execution operator1Execution = getCurrentExecution(operator1Vertex, executionGraph); + // finish o1 and schedule the consumers (o2,o3); this should not result in any release calls since not all operators of the pipelined region are finished + for (final IntermediateResultPartitionID partitionId : operator1Execution.getVertex().getProducedPartitions().keySet()) { + executionGraph.scheduleOrUpdateConsumers(new ResultPartitionID(partitionId, operator1Execution.getAttemptId())); + } + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator1Execution.getAttemptId(), ExecutionState.FINISHED)); + assertThat(releasedPartitions, empty()); + }); + + mainThreadExecutor.execute(() -> { + final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph); + // finish o2; this should not result in any release calls since o3 was not finished + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator2Execution.getAttemptId(), ExecutionState.FINISHED)); + assertThat(releasedPartitions, empty()); + }); + + mainThreadExecutor.execute(() -> { + final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph); + // reset o2 + operator2Execution.getVertex().resetForNewExecution(0L, 1L); + assertThat(releasedPartitions, empty()); + }); + + mainThreadExecutor.execute(() -> { + final Execution operator3Execution = getCurrentExecution(operator3Vertex, executionGraph); + // finish o3; this should not result in any release calls since o2 was reset + executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), operator3Execution.getAttemptId(), ExecutionState.FINISHED)); + assertThat(releasedPartitions, empty()); + }); + } + + private static Execution getCurrentExecution(final JobVertex jobVertex, final ExecutionGraph executionGraph) { + return executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt(); + } + + private ExecutionGraph createExecutionGraph(final PartitionTracker partitionTracker, final JobVertex... vertices) throws Exception { + final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph( + null, + new JobGraph(new JobID(), "test job", vertices), + new Configuration(), + scheduledExecutorService, + mainThreadExecutor.getMainThreadExecutor(), + new TestingSlotProvider(ignored -> CompletableFuture.completedFuture(new TestingLogicalSlot())), + ExecutionGraphPartitionReleaseTest.class.getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + AkkaUtils.getDefaultTimeout(), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + VoidBlobWriter.getInstance(), + AkkaUtils.getDefaultTimeout(), + log, + NettyShuffleMaster.INSTANCE, + partitionTracker); + + executionGraph.start(mainThreadExecutor.getMainThreadExecutor()); + mainThreadExecutor.execute(executionGraph::scheduleForExecution); + + return executionGraph; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java new file mode 100644 index 0000000000000..1b9ca7a42eb7e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RegionPartitionReleaseStrategyTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion; +import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.RegionPartitionReleaseStrategy; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link RegionPartitionReleaseStrategy}. + */ +public class RegionPartitionReleaseStrategyTest extends TestLogger { + + private TestingSchedulingTopology testingSchedulingTopology; + + @Before + public void setUp() throws Exception { + testingSchedulingTopology = new TestingSchedulingTopology(); + } + + @Test + public void releasePartitionsIfDownstreamRegionIsFinished() { + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().finish(); + final List resultPartitions = testingSchedulingTopology.connectPointwise(producers, consumers).finish(); + + final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId(); + final ExecutionVertexID onlyConsumerVertexId = consumers.get(0).getId(); + final IntermediateResultPartitionID onlyResultPartitionId = resultPartitions.get(0).getId(); + + final Set pipelinedRegions = pipelinedRegionsSet( + PipelinedRegion.from(onlyProducerVertexId), + PipelinedRegion.from(onlyConsumerVertexId)); + + final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions); + + final List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlyConsumerVertexId); + assertThat(partitionsToRelease, contains(onlyResultPartitionId)); + } + + @Test + public void releasePartitionsIfDownstreamRegionWithMultipleOperatorsIsFinished() { + final List sourceVertices = testingSchedulingTopology.addExecutionVertices().finish(); + final List intermediateVertices = testingSchedulingTopology.addExecutionVertices().finish(); + final List sinkVertices = testingSchedulingTopology.addExecutionVertices().finish(); + final List sourceResultPartitions = testingSchedulingTopology.connectAllToAll(sourceVertices, intermediateVertices).finish(); + testingSchedulingTopology.connectAllToAll(intermediateVertices, sinkVertices).withResultPartitionType(ResultPartitionType.PIPELINED).finish(); + + final ExecutionVertexID onlySourceVertexId = sourceVertices.get(0).getId(); + final ExecutionVertexID onlyIntermediateVertexId = intermediateVertices.get(0).getId(); + final ExecutionVertexID onlySinkVertexId = sinkVertices.get(0).getId(); + final IntermediateResultPartitionID onlySourceResultPartitionId = sourceResultPartitions.get(0).getId(); + + final Set pipelinedRegions = pipelinedRegionsSet( + PipelinedRegion.from(onlySourceVertexId), + PipelinedRegion.from(onlyIntermediateVertexId, onlySinkVertexId)); + + final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions); + + regionPartitionReleaseStrategy.vertexFinished(onlyIntermediateVertexId); + final List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(onlySinkVertexId); + assertThat(partitionsToRelease, contains(onlySourceResultPartitionId)); + } + + @Test + public void notReleasePartitionsIfDownstreamRegionIsNotFinished() { + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId(); + final ExecutionVertexID consumerVertex1 = consumers.get(0).getId(); + final ExecutionVertexID consumerVertex2 = consumers.get(1).getId(); + + final Set pipelinedRegions = pipelinedRegionsSet( + PipelinedRegion.from(onlyProducerVertexId), + PipelinedRegion.from(consumerVertex1, consumerVertex2)); + + final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions); + + final List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1); + assertThat(partitionsToRelease, is(empty())); + } + + @Test + public void toggleVertexFinishedUnfinished() { + final List producers = testingSchedulingTopology.addExecutionVertices().finish(); + final List consumers = testingSchedulingTopology.addExecutionVertices().withParallelism(2).finish(); + testingSchedulingTopology.connectAllToAll(producers, consumers).finish(); + + final ExecutionVertexID onlyProducerVertexId = producers.get(0).getId(); + final ExecutionVertexID consumerVertex1 = consumers.get(0).getId(); + final ExecutionVertexID consumerVertex2 = consumers.get(1).getId(); + + final Set pipelinedRegions = pipelinedRegionsSet( + PipelinedRegion.from(onlyProducerVertexId), + PipelinedRegion.from(consumerVertex1, consumerVertex2)); + + final RegionPartitionReleaseStrategy regionPartitionReleaseStrategy = new RegionPartitionReleaseStrategy(testingSchedulingTopology, pipelinedRegions); + + regionPartitionReleaseStrategy.vertexFinished(consumerVertex1); + regionPartitionReleaseStrategy.vertexFinished(consumerVertex2); + regionPartitionReleaseStrategy.vertexUnfinished(consumerVertex2); + + final List partitionsToRelease = regionPartitionReleaseStrategy.vertexFinished(consumerVertex1); + assertThat(partitionsToRelease, is(empty())); + } + + private static Set pipelinedRegionsSet(final PipelinedRegion... pipelinedRegions) { + return new HashSet<>(Arrays.asList(pipelinedRegions)); + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java new file mode 100644 index 0000000000000..c10893818af4b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PartitionReleaseStrategyFactoryLoaderTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; + +import org.junit.Test; + +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link PartitionReleaseStrategyFactoryLoader}. + */ +public class PartitionReleaseStrategyFactoryLoaderTest { + + @Test + public void featureEnabledByDefault() { + final Configuration emptyConfiguration = new Configuration(); + final PartitionReleaseStrategy.Factory factory = + PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration); + + assertThat(factory, is(instanceOf(RegionPartitionReleaseStrategy.Factory.class))); + } + + @Test + public void featureCanBeDisabled() { + final Configuration emptyConfiguration = new Configuration(); + emptyConfiguration.setBoolean(JobManagerOptions.PARTITION_RELEASE_DURING_JOB_EXECUTION, false); + + final PartitionReleaseStrategy.Factory factory = + PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(emptyConfiguration); + + assertThat(factory, is(instanceOf(NotReleasingPartitionReleaseStrategy.Factory.class))); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java new file mode 100644 index 0000000000000..a6960ef51e4e9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/PipelinedRegionExecutionViewTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test for {@link PipelinedRegionExecutionView}. + */ +public class PipelinedRegionExecutionViewTest extends TestLogger { + + private static final ExecutionVertexID TEST_EXECUTION_VERTEX_ID = new ExecutionVertexID(new JobVertexID(), 0); + + @Test + public void regionIsUnfinishedIfNotAllVerticesAreFinished() { + final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID); + final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion); + + assertFalse(pipelinedRegionExecutionView.isFinished()); + } + + @Test + public void regionIsFinishedIfAllVerticesAreFinished() { + final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID); + final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion); + + pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID); + + assertTrue(pipelinedRegionExecutionView.isFinished()); + } + + @Test + public void vertexCanBeUnfinished() { + final PipelinedRegion pipelinedRegion = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID); + final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(pipelinedRegion); + + pipelinedRegionExecutionView.vertexFinished(TEST_EXECUTION_VERTEX_ID); + pipelinedRegionExecutionView.vertexUnfinished(TEST_EXECUTION_VERTEX_ID); + + assertFalse(pipelinedRegionExecutionView.isFinished()); + } + + @Test(expected = IllegalArgumentException.class) + public void finishingUnknownVertexThrowsException() { + final PipelinedRegion from = PipelinedRegion.from(TEST_EXECUTION_VERTEX_ID); + final PipelinedRegionExecutionView pipelinedRegionExecutionView = new PipelinedRegionExecutionView(from); + + final ExecutionVertexID unknownVertexId = new ExecutionVertexID(new JobVertexID(), 0); + pipelinedRegionExecutionView.vertexFinished(unknownVertexId); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java index 1aaac58c986f7..e0ea6c4e94973 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java @@ -79,18 +79,18 @@ private void addSchedulingExecutionVertices(List producers, final List consumers) { return new ProducerConsumerPointwiseConnectionBuilder(producers, consumers); } - ProducerConsumerConnectionBuilder connectAllToAll( + public ProducerConsumerConnectionBuilder connectAllToAll( final List producers, final List consumers) { @@ -117,12 +117,12 @@ protected ProducerConsumerConnectionBuilder( this.consumers = consumers; } - ProducerConsumerConnectionBuilder withResultPartitionType(final ResultPartitionType resultPartitionType) { + public ProducerConsumerConnectionBuilder withResultPartitionType(final ResultPartitionType resultPartitionType) { this.resultPartitionType = resultPartitionType; return this; } - ProducerConsumerConnectionBuilder withResultPartitionState(final SchedulingResultPartition.ResultPartitionState state) { + public ProducerConsumerConnectionBuilder withResultPartitionState(final SchedulingResultPartition.ResultPartitionState state) { this.resultPartitionState = state; return this; } @@ -229,12 +229,12 @@ public class SchedulingExecutionVerticesBuilder { private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY; - SchedulingExecutionVerticesBuilder withParallelism(final int parallelism) { + public SchedulingExecutionVerticesBuilder withParallelism(final int parallelism) { this.parallelism = parallelism; return this; } - SchedulingExecutionVerticesBuilder withInputDependencyConstraint(final InputDependencyConstraint inputDependencyConstraint) { + public SchedulingExecutionVerticesBuilder withInputDependencyConstraint(final InputDependencyConstraint inputDependencyConstraint) { this.inputDependencyConstraint = inputDependencyConstraint; return this; }