Skip to content

Commit

Permalink
[FLINK-19938][network] Implement shuffle data read scheduling for sor…
Browse files Browse the repository at this point in the history
…t-merge blocking shuffle

This patch implements SortMergeResultPartitionReadScheduler which can read data for all downstream tasks consuming the corresponding SortMergeResultPartition. The scheduler always tries to read shuffle data in order of file offset, which maximums the sequential read so can improve the blocking shuffle performance.

This closes apache#13924
  • Loading branch information
kevin.cyj authored and zhuzhurk committed Mar 29, 2021
1 parent 71122fd commit f1e69bb
Show file tree
Hide file tree
Showing 15 changed files with 1,318 additions and 352 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
Expand All @@ -56,14 +54,13 @@ public class BatchShuffleReadBufferPool {
*/
private static final int NUM_BYTES_PER_REQUEST = 8 * 1024 * 1024;

/** Total direct memory size in bytes can can be allocated and used by this buffer pool. */
private final long totalBytes;

/**
* Maximum time to wait when requesting read buffers from this buffer pool before throwing an
* exception.
* Wait for at most 2 seconds before return if there is no enough available buffers currently.
*/
private final Duration requestTimeout;
private static final Duration WAITING_TIME = Duration.ofSeconds(2);

/** Total direct memory size in bytes can can be allocated and used by this buffer pool. */
private final long totalBytes;

/** The number of total buffers in this buffer pool. */
private final int numTotalBuffers;
Expand All @@ -86,7 +83,7 @@ public class BatchShuffleReadBufferPool {
@GuardedBy("buffers")
private boolean initialized;

public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, Duration requestTimeout) {
public BatchShuffleReadBufferPool(long totalBytes, int bufferSize) {
checkArgument(totalBytes > 0, "Total memory size must be positive.");
checkArgument(bufferSize > 0, "Size of buffer must be positive.");
checkArgument(
Expand All @@ -101,7 +98,6 @@ public BatchShuffleReadBufferPool(long totalBytes, int bufferSize, Duration requ

this.totalBytes = totalBytes;
this.bufferSize = bufferSize;
this.requestTimeout = checkNotNull(requestTimeout);

this.numTotalBuffers = (int) Math.min(totalBytes / bufferSize, Integer.MAX_VALUE);
this.numBuffersPerRequest =
Expand All @@ -114,28 +110,31 @@ long getTotalBytes() {
}

@VisibleForTesting
int getNumBuffersPerRequest() {
return numBuffersPerRequest;
}

@VisibleForTesting
int getNumTotalBuffers() {
public int getNumTotalBuffers() {
return numTotalBuffers;
}

@VisibleForTesting
int getAvailableBuffers() {
public int getAvailableBuffers() {
synchronized (buffers) {
return buffers.size();
}
}

public int getNumBuffersPerRequest() {
return numBuffersPerRequest;
}

public int getMaxConcurrentRequests() {
return numBuffersPerRequest > 0 ? numTotalBuffers / numBuffersPerRequest : 0;
}

public int getBufferSize() {
return bufferSize;
}

/** Initializes this buffer pool which allocates all the buffers. */
private void initialize() {
public void initialize() {
LOG.info(
"Initializing batch shuffle IO buffer pool: numBuffers={}, bufferSize={}.",
numTotalBuffers,
Expand Down Expand Up @@ -180,8 +179,7 @@ private void initialize() {

/**
* Requests a collection of buffers (determined by {@link #numBuffersPerRequest}) from this
* buffer pool. Exception will be thrown if no enough buffers can be allocated in the given
* timeout.
* buffer pool.
*/
public List<MemorySegment> requestBuffers() throws Exception {
List<MemorySegment> allocated = new ArrayList<>(numBuffersPerRequest);
Expand All @@ -192,18 +190,13 @@ public List<MemorySegment> requestBuffers() throws Exception {
initialize();
}

Deadline deadline = Deadline.fromNow(requestTimeout);
Deadline deadline = Deadline.fromNow(WAITING_TIME);
while (buffers.size() < numBuffersPerRequest) {
checkState(!destroyed, "Buffer pool is already destroyed.");

buffers.wait(requestTimeout.toMillis());
buffers.wait(WAITING_TIME.toMillis());
if (!deadline.hasTimeLeft()) {
throw new TimeoutException(
String.format(
"Can't allocate enough buffers in the given timeout, which means"
+ " there is a fierce contention for read buffers, please"
+ " increase '%s'.",
TaskManagerOptions.NETWORK_BATCH_SHUFFLE_READ_MEMORY.key()));
return allocated; // return the empty list
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -130,9 +129,7 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment(
// dead lock or "insufficient network buffer" error
BatchShuffleReadBufferPool batchShuffleReadBufferPool =
new BatchShuffleReadBufferPool(
config.batchShuffleReadMemoryBytes(),
config.networkBufferSize(),
Duration.ofMinutes(5)); // 5 min buffer request timeout by default
config.batchShuffleReadMemoryBytes(), config.networkBufferSize());

// we create a separated IO executor pool here for batch shuffle instead of reusing the
// TaskManager IO executor pool directly to avoid the potential side effects of execution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,22 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import static org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/** Reader which can read all data of the target subpartition from a {@link PartitionedFile}. */
public class PartitionedFileReader implements AutoCloseable {
class PartitionedFileReader {

/** Used to read buffers from file channel. */
private final ByteBuffer headerBuf = BufferReaderWriterUtil.allocatedHeaderBuffer();
Expand All @@ -61,104 +56,71 @@ public class PartitionedFileReader implements AutoCloseable {
/** Next data region to be read. */
private int nextRegionToRead;

/** Next file offset to be read. */
private long nextOffsetToRead;

/** Number of remaining buffers in the current data region read. */
private int currentRegionRemainingBuffers;

/** Whether this partitioned file reader is closed. */
private boolean isClosed;
PartitionedFileReader(
PartitionedFile partitionedFile,
int targetSubpartition,
FileChannel dataFileChannel,
FileChannel indexFileChannel) {
checkArgument(checkNotNull(dataFileChannel).isOpen(), "Data file channel must be opened.");
checkArgument(
checkNotNull(indexFileChannel).isOpen(), "Index file channel must be opened.");

public PartitionedFileReader(PartitionedFile partitionedFile, int targetSubpartition)
throws IOException {
this.partitionedFile = checkNotNull(partitionedFile);
this.targetSubpartition = targetSubpartition;
this.dataFileChannel = dataFileChannel;
this.indexFileChannel = indexFileChannel;

this.indexEntryBuf = ByteBuffer.allocateDirect(PartitionedFile.INDEX_ENTRY_SIZE);
BufferReaderWriterUtil.configureByteBuffer(indexEntryBuf);

this.dataFileChannel = openFileChannel(partitionedFile.getDataFilePath());
try {
this.indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
} catch (Throwable throwable) {
IOUtils.closeQuietly(dataFileChannel);
throw throwable;
}
}

private FileChannel openFileChannel(Path path) throws IOException {
return FileChannel.open(path, StandardOpenOption.READ);
}

private boolean moveToNextReadableRegion() throws IOException {
if (currentRegionRemainingBuffers > 0) {
return true;
}

while (nextRegionToRead < partitionedFile.getNumRegions()) {
private void moveToNextReadableRegion() throws IOException {
while (currentRegionRemainingBuffers <= 0
&& nextRegionToRead < partitionedFile.getNumRegions()) {
partitionedFile.getIndexEntry(
indexFileChannel, indexEntryBuf, nextRegionToRead, targetSubpartition);
long dataOffset = indexEntryBuf.getLong();
nextOffsetToRead = indexEntryBuf.getLong();
currentRegionRemainingBuffers = indexEntryBuf.getInt();
++nextRegionToRead;

if (currentRegionRemainingBuffers > 0) {
dataFileChannel.position(dataOffset);
return true;
}
}

return false;
}

/**
* Reads a buffer from the {@link PartitionedFile} and moves the read position forward.
* Reads a buffer from the current region of the target {@link PartitionedFile} and moves the
* read position forward.
*
* <p>Note: The caller is responsible for recycling the target buffer if any exception occurs.
*
* @param target The target {@link MemorySegment} to read data to.
* @param recycler The {@link BufferRecycler} which is responsible to recycle the target buffer.
* @return A {@link Buffer} containing the data read.
*/
@Nullable
public Buffer readBuffer(MemorySegment target, BufferRecycler recycler) throws IOException {
checkState(!isClosed, "File reader is already closed.");

if (moveToNextReadableRegion()) {
--currentRegionRemainingBuffers;
return readFromByteChannel(dataFileChannel, headerBuf, target, recycler);
Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler) throws IOException {
if (currentRegionRemainingBuffers == 0) {
return null;
}

return null;
dataFileChannel.position(nextOffsetToRead);
Buffer buffer = readFromByteChannel(dataFileChannel, headerBuf, target, recycler);
nextOffsetToRead = dataFileChannel.position();
--currentRegionRemainingBuffers;
return buffer;
}

@VisibleForTesting
public boolean hasRemaining() throws IOException {
checkState(!isClosed, "File reader is already closed.");

return moveToNextReadableRegion();
boolean hasRemaining() throws IOException {
moveToNextReadableRegion();
return currentRegionRemainingBuffers > 0;
}

@Override
public void close() throws IOException {
if (isClosed) {
return;
}
isClosed = true;

IOException exception = null;
try {
if (dataFileChannel != null) {
dataFileChannel.close();
}
} catch (IOException ioException) {
exception = ioException;
}

try {
if (indexFileChannel != null) {
indexFileChannel.close();
}
} catch (IOException ioException) {
exception = ExceptionUtils.firstOrSuppressed(ioException, exception);
}

if (exception != null) {
throw exception;
}
/** Gets read priority of this file reader. Smaller value indicates higher priority. */
long getPriority() {
return nextOffsetToRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public ResultPartition create(
type,
subpartitions.length,
maxParallelism,
networkBufferSize,
batchShuffleReadBufferPool,
batchShuffleReadIOExecutor,
partitionManager,
channelManager.createChannel().getPath(),
bufferCompressor,
Expand Down
Loading

0 comments on commit f1e69bb

Please sign in to comment.