Skip to content

Commit

Permalink
[FLINK-12883][runtime] Introduce PartitionReleaseStrategy
Browse files Browse the repository at this point in the history
- Introduce interface PartitionReleaseStrategy.
- Introduce RegionPartitionReleaseStrategy and
  NotReleasingPartitionReleaseStrategy implementations, which can be configured
  via a new config option.
- Add unit tests for new classes.
- Increase visibility of methods in TestingSchedulingTopology for unit tests
  outside of its package.
  • Loading branch information
GJL authored and zentol committed Jul 2, 2019
1 parent 2f5fc23 commit c9aa9a1
Show file tree
Hide file tree
Showing 17 changed files with 1,154 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public class JobManagerOptions {
text("'legacy': legacy scheduler"),
text("'ng': new generation scheduler"))
.build());
/**
* Config parameter controlling whether partitions should already be released during the job execution.
*/
@Documentation.ExcludeFromDocumentation("User normally should not be expected to deactivate this feature. " +
"We aim at removing this flag eventually.")
public static final ConfigOption<Boolean> PARTITION_RELEASE_DURING_JOB_EXECUTION =
key("jobmanager.partition.release-during-job-execution")
.defaultValue(true)
.withDescription("Controls whether partitions should already be released during the job execution.");

@Documentation.ExcludeFromDocumentation("dev use only; likely temporary")
public static final ConfigOption<Boolean> FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
import org.apache.flink.runtime.executiongraph.failover.adapter.DefaultFailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.NotReleasingPartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand All @@ -66,6 +70,11 @@
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.adapter.ExecutionGraphToSchedulingTopologyAdapter;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.SharedStateRegistry;
Expand Down Expand Up @@ -250,6 +259,12 @@ public class ExecutionGraph implements AccessExecutionGraph {
/** The total number of vertices currently in the execution graph. */
private int numVerticesTotal;

private final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory;

private PartitionReleaseStrategy partitionReleaseStrategy;

private SchedulingTopology schedulingTopology;

// ------ Configuration of the Execution -------

/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
Expand Down Expand Up @@ -413,6 +428,7 @@ public ExecutionGraph(
userClassLoader,
blobWriter,
allocationTimeout,
new NotReleasingPartitionReleaseStrategy.Factory(),
NettyShuffleMaster.INSTANCE,
true,
new PartitionTrackerImpl(
Expand All @@ -433,6 +449,7 @@ public ExecutionGraph(
ClassLoader userClassLoader,
BlobWriter blobWriter,
Time allocationTimeout,
PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
ShuffleMaster<?> shuffleMaster,
boolean forcePartitionReleaseOnConsumption,
PartitionTracker partitionTracker) throws IOException {
Expand Down Expand Up @@ -464,6 +481,8 @@ public ExecutionGraph(
this.rpcTimeout = checkNotNull(rpcTimeout);
this.allocationTimeout = checkNotNull(allocationTimeout);

this.partitionReleaseStrategyFactory = checkNotNull(partitionReleaseStrategyFactory);

this.restartStrategy = restartStrategy;
this.kvStateLocationRegistry = new KvStateLocationRegistry(jobInformation.getJobId(), getAllVertices());

Expand Down Expand Up @@ -913,6 +932,11 @@ public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobExcepti
}

failoverStrategy.notifyNewVertices(newExecJobVertices);

schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
schedulingTopology,
new DefaultFailoverTopology(this));
}

public void scheduleForExecution() throws JobException {
Expand Down Expand Up @@ -1605,36 +1629,9 @@ public boolean updateState(TaskExecutionState state) {

if (attempt != null) {
try {
Map<String, Accumulator<?, ?>> accumulators;

switch (state.getExecutionState()) {
case RUNNING:
return attempt.switchToRunning();

case FINISHED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFinished(accumulators, state.getIOMetrics());
return true;

case CANCELED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.completeCancelling(accumulators, state.getIOMetrics());
return true;

case FAILED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
return true;

default:
// we mark as failed and return false, which triggers the TaskManager
// to remove the task
attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
return false;
}
final boolean stateUpdated = updateStateInternal(state, attempt);
maybeReleasePartitions(attempt);
return stateUpdated;
}
catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
Expand All @@ -1649,6 +1646,77 @@ public boolean updateState(TaskExecutionState state) {
}
}

private boolean updateStateInternal(final TaskExecutionState state, final Execution attempt) {
Map<String, Accumulator<?, ?>> accumulators;

switch (state.getExecutionState()) {
case RUNNING:
return attempt.switchToRunning();

case FINISHED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFinished(accumulators, state.getIOMetrics());
return true;

case CANCELED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.completeCancelling(accumulators, state.getIOMetrics());
return true;

case FAILED:
// this deserialization is exception-free
accumulators = deserializeAccumulators(state);
attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics());
return true;

default:
// we mark as failed and return false, which triggers the TaskManager
// to remove the task
attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState()));
return false;
}
}

private void maybeReleasePartitions(final Execution attempt) {
final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();

if (attempt.getState() == ExecutionState.FINISHED) {
final List<IntermediateResultPartitionID> releasablePartitions = partitionReleaseStrategy.vertexFinished(finishedExecutionVertex);
releasePartitions(releasablePartitions);
} else {
partitionReleaseStrategy.vertexUnfinished(finishedExecutionVertex);
}
}

private void releasePartitions(final List<IntermediateResultPartitionID> releasablePartitions) {
if (releasablePartitions.size() > 0) {
final List<ResultPartitionID> partitionIds = releasablePartitions.stream()
.map(this::createResultPartitionId)
.collect(Collectors.toList());

partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
}
}

private ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) {
final SchedulingResultPartition schedulingResultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
final SchedulingExecutionVertex producer = schedulingResultPartition.getProducer();
final ExecutionVertexID producerId = producer.getId();
final JobVertexID jobVertexId = producerId.getJobVertexId();
final ExecutionJobVertex jobVertex = getJobVertex(jobVertexId);
checkNotNull(jobVertex, "Unknown job vertex %s", jobVertexId);

final ExecutionVertex[] taskVertices = jobVertex.getTaskVertices();
final int subtaskIndex = producerId.getSubtaskIndex();
checkState(subtaskIndex < taskVertices.length, "Invalid subtask index %d for job vertex %s", subtaskIndex, jobVertexId);

final ExecutionVertex taskVertex = taskVertices[subtaskIndex];
final Execution execution = taskVertex.getCurrentExecutionAttempt();
return new ResultPartitionID(resultPartitionId, execution.getAttemptId());
}

/**
* Deserializes accumulators from a task state update.
*
Expand Down Expand Up @@ -1835,4 +1903,8 @@ ShuffleMaster<?> getShuffleMaster() {
public PartitionTracker getPartitionTracker() {
return partitionTracker;
}

PartitionReleaseStrategy getPartitionReleaseStrategy() {
return partitionReleaseStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionReleaseStrategyFactoryLoader;
import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
Expand Down Expand Up @@ -117,6 +119,9 @@ public static ExecutionGraph buildGraph(
final int maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);

final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);

final boolean forcePartitionReleaseOnConsumption =
jobManagerConfig.getBoolean(JobManagerOptions.FORCE_PARTITION_RELEASE_ON_CONSUMPTION);

Expand All @@ -136,6 +141,7 @@ public static ExecutionGraph buildGraph(
classLoader,
blobWriter,
allocationTimeout,
partitionReleaseStrategyFactory,
shuffleMaster,
forcePartitionReleaseOnConsumption,
partitionTracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ public Execution resetForNewExecution(final long timestamp, final long originati
if (oldState.isTerminal()) {
if (oldState == FINISHED) {
oldExecution.stopTrackingAndReleasePartitions();
getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
}

priorExecutions.add(oldExecution.archive());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package org.apache.flink.runtime.executiongraph.failover.flip1;

import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,14 +30,29 @@
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Utility for computing pipeliend regions.
* Utility for computing pipelined regions.
*/
public final class PipelinedRegionComputeUtil {

private static final Logger LOG = LoggerFactory.getLogger(PipelinedRegionComputeUtil.class);

public static Set<PipelinedRegion> toPipelinedRegionsSet(final Set<Set<FailoverVertex>> distinctRegions) {
return distinctRegions.stream()
.map(toExecutionVertexIdSet())
.map(PipelinedRegion::from)
.collect(Collectors.toSet());
}

private static Function<Set<FailoverVertex>, Set<ExecutionVertexID>> toExecutionVertexIdSet() {
return failoverVertices -> failoverVertices.stream()
.map(FailoverVertex::getExecutionVertexID)
.collect(Collectors.toSet());
}

public static Set<Set<FailoverVertex>> computePipelinedRegions(final FailoverTopology topology) {
// currently we let a job with co-location constraints fail as one region
// putting co-located vertices in the same region with each other can be a future improvement
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease;

import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;

import java.util.Collections;
import java.util.List;

/**
* Does not release intermediate result partitions during job execution. Relies on partitions being
* released at the end of the job.
*/
public class NotReleasingPartitionReleaseStrategy implements PartitionReleaseStrategy {

@Override
public List<IntermediateResultPartitionID> vertexFinished(final ExecutionVertexID finishedVertex) {
return Collections.emptyList();
}

@Override
public void vertexUnfinished(final ExecutionVertexID executionVertexID) {
}

/**
* Factory for {@link NotReleasingPartitionReleaseStrategy}.
*/
public static class Factory implements PartitionReleaseStrategy.Factory {

@Override
public PartitionReleaseStrategy createInstance(final SchedulingTopology schedulingStrategy, final FailoverTopology failoverTopology) {
return new NotReleasingPartitionReleaseStrategy();
}
}

}
Loading

0 comments on commit c9aa9a1

Please sign in to comment.