Skip to content

Commit

Permalink
[FLINK-12603][network] Remove getOwningTaskName method from InputGate
Browse files Browse the repository at this point in the history
In order to make abstract InputGate simple for extending new implementations in shuffle service architecture, we could remove unnecessary methods from it.
InputGate#getOwningTaskName is only used for debugging log in BarrierBuffer and StreamInputProcessor. This task name could also be generated in StreamTask
via Environment#getTaskInfo and Environment#getExecutionId. Then it could be passed into the constructors of BarrierBuffer/StreamInputProcessor for use.

This closes apache#8529.
  • Loading branch information
zhijiangW authored and tillrohrmann committed Jun 5, 2019
1 parent 41d0ac0 commit 663b40d
Show file tree
Hide file tree
Showing 14 changed files with 67 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ public abstract class InputGate implements AutoCloseable {

public abstract int getNumberOfInputChannels();

public abstract String getOwningTaskName();

public abstract boolean isFinished();

public abstract void requestPartitions() throws IOException, InterruptedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,6 @@ public int getNumberOfQueuedBuffers() {
return 0;
}

@Override
public String getOwningTaskName() {
return owningTaskName;
}

public CompletableFuture<Void> getCloseFuture() {
return closeFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ public int getNumberOfInputChannels() {
return totalNumberOfInputChannels;
}

@Override
public String getOwningTaskName() {
// all input gates have the same owning task
return inputGates[0].getOwningTaskName();
}

@Override
public boolean isFinished() {
for (InputGate inputGate : inputGates) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public int getNumberOfInputChannels() {
return inputGate.getNumberOfInputChannels();
}

@Override
public String getOwningTaskName() {
return inputGate.getOwningTaskName();
}

@Override
public boolean isFinished() {
return inputGate.isFinished();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
Expand Down Expand Up @@ -79,6 +80,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*/
private final long maxBufferedBytes;

private final String taskName;

/**
* The sequence of buffers/events that has been unblocked and must now be consumed before
* requesting further data from the input gate.
Expand Down Expand Up @@ -119,11 +122,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
*
* @param inputGate The input gate to draw the buffers and events from.
* @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
*/
public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IOException {
this (inputGate, bufferBlocker, -1);
@VisibleForTesting
BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) {
this (inputGate, bufferBlocker, -1, "Testing: No task associated");
}

/**
Expand All @@ -136,11 +138,9 @@ public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) throws IO
* @param inputGate The input gate to draw the buffers and events from.
* @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier.
* @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts.
*
* @throws IOException Thrown, when the spilling to temp files cannot be initialized.
* @param taskName The task name for logging.
*/
public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes)
throws IOException {
BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes, String taskName) {
checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);

this.inputGate = inputGate;
Expand All @@ -150,6 +150,8 @@ public BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxB

this.bufferBlocker = checkNotNull(bufferBlocker);
this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();

this.taskName = taskName;
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -213,7 +215,7 @@ else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
}

private void completeBufferedSequence() throws IOException {
LOG.debug("{}: Finished feeding back buffered data.", inputGate.getOwningTaskName());
LOG.debug("{}: Finished feeding back buffered data.", taskName);

currentBuffered.cleanup();
currentBuffered = queuedBuffered.pollFirst();
Expand Down Expand Up @@ -249,7 +251,7 @@ else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
taskName,
barrierId,
currentCheckpointId);

Expand Down Expand Up @@ -283,7 +285,7 @@ else if (barrierId > currentCheckpointId) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
inputGate.getOwningTaskName(),
taskName,
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
Expand Down Expand Up @@ -314,9 +316,7 @@ private void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) th
if (barrierId == currentCheckpointId) {
// cancel this alignment
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.",
inputGate.getOwningTaskName(),
barrierId);
LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", taskName, barrierId);
}

releaseBlocksAndResetBarriers();
Expand All @@ -326,7 +326,7 @@ else if (barrierId > currentCheckpointId) {
// we canceled the next which also cancels the current
LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
inputGate.getOwningTaskName(),
taskName,
barrierId,
currentCheckpointId);

Expand Down Expand Up @@ -357,9 +357,7 @@ else if (barrierId > currentCheckpointId) {
latestAlignmentDurationNanos = 0L;

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Checkpoint {} canceled, skipping alignment.",
inputGate.getOwningTaskName(),
barrierId);
LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", taskName, barrierId);
}

notifyAbortOnCancellationBarrier(barrierId);
Expand Down Expand Up @@ -414,7 +412,7 @@ private void checkSizeLimit() throws Exception {
if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
// exceeded our limit - abort this checkpoint
LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.",
inputGate.getOwningTaskName(),
taskName,
currentCheckpointId,
maxBufferedBytes);

Expand Down Expand Up @@ -458,9 +456,7 @@ private void beginNewAlignment(long checkpointId, int channelIndex) throws IOExc
startOfAlignmentTimestamp = System.nanoTime();

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Starting stream alignment for checkpoint {}.",
inputGate.getOwningTaskName(),
checkpointId);
LOG.debug("{}: Starting stream alignment for checkpoint {}.", taskName, checkpointId);
}
}

Expand All @@ -486,9 +482,7 @@ private void onBarrier(int channelIndex) throws IOException {
numBarriersReceived++;

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received barrier from channel {}.",
inputGate.getOwningTaskName(),
channelIndex);
LOG.debug("{}: Received barrier from channel {}.", taskName, channelIndex);
}
}
else {
Expand All @@ -501,8 +495,7 @@ private void onBarrier(int channelIndex) throws IOException {
* Makes sure the just written data is the next to be consumed.
*/
private void releaseBlocksAndResetBarriers() throws IOException {
LOG.debug("{}: End of stream alignment, feeding buffered data back.",
inputGate.getOwningTaskName());
LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName);

for (int i = 0; i < blockedChannels.length; i++) {
blockedChannels[i] = false;
Expand All @@ -519,8 +512,7 @@ private void releaseBlocksAndResetBarriers() throws IOException {
// uncommon case: buffered data pending
// push back the pending data, if we have any
LOG.debug("{}: Checkpoint skipped via buffered data:" +
"Pushing back current alignment buffers and feeding back new alignment data first.",
inputGate.getOwningTaskName());
"Pushing back current alignment buffers and feeding back new alignment data first.", taskName);

// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
Expand All @@ -534,7 +526,7 @@ private void releaseBlocksAndResetBarriers() throws IOException {

if (LOG.isDebugEnabled()) {
LOG.debug("{}: Size of buffered data: {} bytes",
inputGate.getOwningTaskName(),
taskName,
currentBuffered == null ? 0L : currentBuffered.size());
}

Expand Down Expand Up @@ -577,7 +569,7 @@ public long getAlignmentDurationNanos() {
@Override
public String toString() {
return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d",
inputGate.getOwningTaskName(),
taskName,
currentCheckpointId,
numBarriersReceived,
numClosedChannels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public static CheckpointBarrierHandler createCheckpointBarrierHandler(
CheckpointingMode checkpointMode,
IOManager ioManager,
InputGate inputGate,
Configuration taskManagerConfig) throws IOException {
Configuration taskManagerConfig,
String taskName) throws IOException {

CheckpointBarrierHandler barrierHandler;
if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
Expand All @@ -53,9 +54,17 @@ public static CheckpointBarrierHandler createCheckpointBarrierHandler(
}

if (taskManagerConfig.getBoolean(NetworkEnvironmentOptions.NETWORK_CREDIT_MODEL)) {
barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
barrierHandler = new BarrierBuffer(
inputGate,
new CachedBufferBlocker(inputGate.getPageSize()),
maxAlign,
taskName);
} else {
barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
barrierHandler = new BarrierBuffer(
inputGate,
new BufferSpiller(ioManager, inputGate.getPageSize()),
maxAlign,
taskName);
}
} else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
barrierHandler = new BarrierTracker(inputGate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,18 @@ public StreamInputProcessor(
StreamStatusMaintainer streamStatusMaintainer,
OneInputStreamOperator<IN, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge watermarkGauge) throws IOException {
WatermarkGauge watermarkGauge,
String taskName) throws IOException {

InputGate inputGate = InputGateUtil.createInputGate(inputGates);

this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);
checkpointedTask,
checkpointMode,
ioManager,
inputGate,
taskManagerConfig,
taskName);

this.lock = checkNotNull(lock);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,18 @@ public StreamTwoInputProcessor(
TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
TaskIOMetricGroup metrics,
WatermarkGauge input1WatermarkGauge,
WatermarkGauge input2WatermarkGauge) throws IOException {
WatermarkGauge input2WatermarkGauge,
String taskName) throws IOException {

final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);

this.barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler(
checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig);
checkpointedTask,
checkpointMode,
ioManager,
inputGate,
taskManagerConfig,
taskName);

this.lock = checkNotNull(lock);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public void init() throws Exception {
getStreamStatusMaintainer(),
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
inputWatermarkGauge);
inputWatermarkGauge,
getTaskNameWithSubtaskAndId());
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
// wrap watermark gauge since registered metrics must be unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,16 @@ public String getName() {
return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
}

/**
* Gets the name of the task, appended with the subtask indicator and execution id.
*
* @return The name of the task, with subtask indicator and execution id.
*/
String getTaskNameWithSubtaskAndId() {
return getEnvironment().getTaskInfo().getTaskNameWithSubtasks() +
" (" + getEnvironment().getExecutionId() + ')';
}

/**
* Gets the lock object on which all operations that involve data and state mutation have to lock.
* @return The checkpoint lock object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void init() throws Exception {
this.headOperator,
getEnvironment().getMetricGroup().getIOMetricGroup(),
input1WatermarkGauge,
input2WatermarkGauge);
input2WatermarkGauge,
getTaskNameWithSubtaskAndId());

headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void testBreakCheckpointAtAlignmentLimit() throws Exception {

// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000);
BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 1000, "Testing");

AbstractInvokable toNotify = mock(AbstractInvokable.class);
buffer.registerCheckpointEventHandler(toNotify);
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testAlignmentLimitWithQueuedAlignments() throws Exception {

// the barrier buffer has a limit that only 1000 bytes may be spilled in alignment
MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500);
BarrierBuffer buffer = new BarrierBuffer(gate, new BufferSpiller(ioManager, gate.getPageSize()), 500, "Testing");

AbstractInvokable toNotify = mock(AbstractInvokable.class);
buffer.registerCheckpointEventHandler(toNotify);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,11 @@ private static class RandomGeneratingInputGate extends InputGate {
private int currentChannel = 0;
private long c = 0;

private final String owningTaskName;

public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
this(bufferPools, barrierGens, "TestTask");
}

public RandomGeneratingInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens, String owningTaskName) {
this.numberOfChannels = bufferPools.length;
this.currentBarriers = new int[numberOfChannels];
this.bufferPools = bufferPools;
this.barrierGens = barrierGens;
this.owningTaskName = owningTaskName;
this.isAvailable = AVAILABLE;
}

Expand All @@ -158,11 +151,6 @@ public int getNumberOfInputChannels() {
return numberOfChannels;
}

@Override
public String getOwningTaskName() {
return owningTaskName;
}

@Override
public boolean isFinished() {
return false;
Expand Down
Loading

0 comments on commit 663b40d

Please sign in to comment.