Skip to content

Commit

Permalink
[FLINK-4410] [runtime] Rework checkpoint stats tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Jan 10, 2017
1 parent 6ea77ed commit 0d1f4bc
Show file tree
Hide file tree
Showing 58 changed files with 4,679 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,12 @@ public final class ConfigConstants {
/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";

/** Flag to disable checkpoint stats. */
/**
* Flag to disable checkpoint stats.
*
* @deprecated Not possible to disable any longer. Use history size of 0.
*/
@Deprecated
public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";

/** Config parameter defining the number of checkpoints to remember for recent history. */
Expand Down Expand Up @@ -1226,7 +1231,8 @@ public final class ConfigConstants {
/** By default, submitting jobs from the web-frontend is allowed. */
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;

/** Default flag to disable checkpoint stats. */
/** Config key has been deprecated. Therefore, no default value required. */
@Deprecated
public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;

/** Default number of checkpoints to remember for recent history. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testJobManagerJMXMetricAccess() throws Exception {
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
500, 500, 50, 5, ExternalizedCheckpointSettings.none()));
500, 500, 50, 5, ExternalizedCheckpointSettings.none(), true));

flink.waitForActorsToBeAlive();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.JobVertexID;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Base class for checkpoint statistics.
*/
public abstract class AbstractCheckpointStats {

/** ID of this checkpoint. */
final long checkpointId;

/** Timestamp when the checkpoint was triggered at the coordinator. */
final long triggerTimestamp;

/** {@link TaskStateStats} accessible by their ID. */
final Map<JobVertexID, TaskStateStats> taskStats;

/** Total number of subtasks over all tasks. */
final int numberOfSubtasks;

/** Properties of the checkpoint. */
final CheckpointProperties props;

AbstractCheckpointStats(
long checkpointId,
long triggerTimestamp,
CheckpointProperties props,
int numberOfSubtasks,
Map<JobVertexID, TaskStateStats> taskStats) {

this.checkpointId = checkpointId;
this.triggerTimestamp = triggerTimestamp;
this.taskStats = checkNotNull(taskStats);
checkArgument(taskStats.size() > 0, "Empty task stats");
checkArgument(numberOfSubtasks > 0, "Non-positive number of subtasks");
this.numberOfSubtasks = numberOfSubtasks;
this.props = checkNotNull(props);
}

/**
* Returns the status of this checkpoint.
*
* @return Status of this checkpoint
*/
public abstract CheckpointStatsStatus getStatus();

/**
* Returns the number of acknowledged subtasks.
*
* @return The number of acknowledged subtasks.
*/
public abstract int getNumberOfAcknowledgedSubtasks();

/**
* Returns the total checkpoint state size over all subtasks.
*
* @return Total checkpoint state size over all subtasks.
*/
public abstract long getStateSize();

/**
* Returns the total buffered bytes during alignment over all subtasks.
*
* <p>Can return <code>-1</code> if the runtime did not report this.
*
* @return Total buffered bytes during alignment over all subtasks.
*/
public abstract long getAlignmentBuffered();

/**
* Returns the latest acknowledged subtask stats or <code>null</code> if
* none was acknowledged yet.
*
* @return Latest acknowledged subtask stats or <code>null</code>
*/
@Nullable
public abstract SubtaskStateStats getLatestAcknowledgedSubtaskStats();

/**
* Returns the ID of this checkpoint.
*
* @return ID of this checkpoint.
*/
public long getCheckpointId() {
return checkpointId;
}

/**
* Returns the timestamp when the checkpoint was triggered.
*
* @return Timestamp when the checkpoint was triggered.
*/
public long getTriggerTimestamp() {
return triggerTimestamp;
}

/**
* Returns the properties of this checkpoint.
*
* @return Properties of this checkpoint.
*/
public CheckpointProperties getProperties() {
return props;
}

/**
* Returns the total number of subtasks involved in this checkpoint.
*
* @return Total number of subtasks involved in this checkpoint.
*/
public int getNumberOfSubtasks() {
return numberOfSubtasks;
}

/**
* Returns the task state stats for the given job vertex ID or
* <code>null</code> if no task with such an ID is available.
*
* @param jobVertexId Job vertex ID of the task stats to look up.
* @return The task state stats instance for the given ID or <code>null</code>.
*/
public TaskStateStats getTaskStateStats(JobVertexID jobVertexId) {
return taskStats.get(jobVertexId);
}

/**
* Returns all task state stats instances.
*
* @return All task state stats instances.
*/
public Collection<TaskStateStats> getAllTaskStateStats() {
return taskStats.values();
}

/**
* Returns the ack timestamp of the latest acknowledged subtask or
* <code>-1</code> if none was acknowledged yet.
*
* @return Ack timestamp of the latest acknowledged subtask or <code>-1</code>.
*/
public long getLatestAckTimestamp() {
SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
if (subtask != null) {
return subtask.getAckTimestamp();
} else {
return -1;
}
}

/**
* Returns the duration of this checkpoint calculated as the time since
* triggering until the latest acknowledged subtask or <code>-1</code> if
* no subtask was acknowledged yet.
*
* @return Duration of this checkpoint or <code>-1</code> if no subtask was acknowledged yet.
*/
public long getEndToEndDuration() {
SubtaskStateStats subtask = getLatestAcknowledgedSubtaskStats();
if (subtask != null) {
return Math.max(0, subtask.getAckTimestamp() - triggerTimestamp);
} else {
return -1;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
Expand All @@ -40,6 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -147,8 +147,9 @@ public class CheckpointCoordinator {
/** Flag marking the coordinator as shut down (not accepting any messages any more) */
private volatile boolean shutdown;

/** Helper for tracking checkpoint statistics */
private final CheckpointStatsTracker statsTracker;
/** Optional tracker for checkpoint statistics. */
@Nullable
private CheckpointStatsTracker statsTracker;

/** Default checkpoint properties **/
private final CheckpointProperties checkpointProperties;
Expand All @@ -170,7 +171,6 @@ public CheckpointCoordinator(
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
String checkpointDirectory,
CheckpointStatsTracker statsTracker,
Executor executor) {

// sanity checks
Expand Down Expand Up @@ -209,7 +209,6 @@ public CheckpointCoordinator(
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.statsTracker = checkNotNull(statsTracker);

this.timer = new Timer("Checkpoint Timer", true);

Expand All @@ -231,6 +230,15 @@ public CheckpointCoordinator(
this.executor = checkNotNull(executor);
}

/**
* Sets the checkpoint stats tracker.
*
* @param statsTracker The checkpoint stats tracker.
*/
public void setCheckpointStatsTracker(@Nullable CheckpointStatsTracker statsTracker) {
this.statsTracker = statsTracker;
}

// --------------------------------------------------------------------------------------------
// Clean shutdown
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -428,11 +436,19 @@ CheckpointTriggerResult triggerCheckpoint(
checkpointID,
timestamp,
ackTasks,
isPeriodic,
props,
targetDirectory,
executor);

if (statsTracker != null) {
PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
checkpointID,
timestamp,
props);

checkpoint.setStatsCallback(callback);
}

// schedule the timer that will clean up the expired checkpoints
TimerTask canceller = new TimerTask() {
@Override
Expand Down Expand Up @@ -632,7 +648,7 @@ public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws C

if (checkpoint != null && !checkpoint.isDiscarded()) {

switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState())) {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetaData())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
Expand Down Expand Up @@ -769,8 +785,6 @@ public void run() {
ee.notifyCheckpointComplete(checkpointId, timestamp);
}
}

statsTracker.onCompletedCheckpoint(completedCheckpoint);
}

private void rememberRecentCheckpointId(long id) {
Expand Down Expand Up @@ -876,6 +890,17 @@ public boolean restoreLatestCheckpointedState(

stateAssignmentOperation.assignStates();

if (statsTracker != null) {
long restoreTimestamp = System.currentTimeMillis();
RestoredCheckpointStats restored = new RestoredCheckpointStats(
latest.getCheckpointID(),
latest.getProperties(),
restoreTimestamp,
latest.getExternalPath());

statsTracker.reportRestoredCheckpoint(restored);
}

return true;
}
}
Expand Down
Loading

0 comments on commit 0d1f4bc

Please sign in to comment.