Skip to content

Commit

Permalink
[FLINK-20309][network] Add NetworkActionsLogger for easier debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
pnowojski authored and AHeise committed Jan 15, 2021
1 parent 78b4e60 commit 5e4c1fa
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -84,6 +85,7 @@ static ChannelStateWriteRequest buildWriteRequest(
writer -> {
while (iterator.hasNext()) {
Buffer buffer = iterator.next();
NetworkActionsLogger.log(ChannelStateWriteRequest.class, name, buffer);
try {
checkArgument(buffer.isBuffer());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
Expand Down Expand Up @@ -66,6 +67,7 @@ public BufferWithContext<Buffer> getBuffer(InputChannelInfo channelInfo)
@Override
public void recover(InputChannelInfo channelInfo, Buffer buffer) {
if (buffer.readableBytes() > 0) {
NetworkActionsLogger.log(getClass(), "recover", buffer);
getChannel(channelInfo).onRecoveredStateBuffer(buffer);
} else {
buffer.recycleBuffer();
Expand Down Expand Up @@ -116,6 +118,7 @@ public void recover(
throws IOException {
bufferBuilderAndConsumer.f0.finish();
if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
NetworkActionsLogger.log(getClass(), "recover", bufferBuilderAndConsumer.f1);
boolean added =
getSubpartition(subpartitionInfo)
.add(bufferBuilderAndConsumer.f1, Integer.MIN_VALUE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.logger;

import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

import static org.apache.flink.util.Preconditions.checkState;

/** Utility class for logging actions that happened in the network stack for debugging purposes. */
public class NetworkActionsLogger {
private static final Logger LOG = LoggerFactory.getLogger(NetworkActionsLogger.class);

private static final boolean ENABLED = LOG.isTraceEnabled();
private static final boolean INCLUDE_HASH = true;

public static void log(Class<?> clazz, String action, Buffer buffer) {
if (ENABLED) {
LOG.trace("{}#{} buffer = [{}]", clazz.getSimpleName(), action, toPrettyString(buffer));
}
}

public static void log(Class<?> clazz, String action, BufferConsumer bufferConsumer) {
if (ENABLED) {
Buffer buffer = null;
try (BufferConsumer copiedBufferConsumer = bufferConsumer.copy()) {
buffer = copiedBufferConsumer.build();
log(clazz, action, buffer);
checkState(copiedBufferConsumer.isFinished());
} finally {
if (buffer != null) {
buffer.recycleBuffer();
}
}
}
}

private static String toPrettyString(Buffer buffer) {
StringBuilder prettyString = new StringBuilder("size=").append(buffer.getSize());
if (INCLUDE_HASH) {
byte[] bytes = new byte[buffer.getSize()];
buffer.readOnlySlice().asByteBuf().readBytes(bytes);
prettyString.append(", hash=").append(Arrays.hashCode(bytes));
}
return prettyString.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -444,6 +445,7 @@ public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOEx
final boolean wasEmpty;
boolean firstPriorityEvent = false;
synchronized (receivedBuffers) {
NetworkActionsLogger.log(getClass(), "onBuffer", buffer);
// Similar to notifyBufferAvailable(), make sure that we never add a buffer
// after releaseAllResources() released all buffers from receivedBuffers
// (see above for details).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
Expand Down Expand Up @@ -652,6 +653,8 @@ private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetN
checkUnavailability();
continue;
}
NetworkActionsLogger.log(
getClass(), "waitAndGetNextData", bufferAndAvailabilityOpt.get().buffer());

final BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
if (bufferAndAvailability.moreAvailable()) {
Expand Down

0 comments on commit 5e4c1fa

Please sign in to comment.