Skip to content

Commit

Permalink
TEZ-3206. Have unordered partitioned KV output send partition stats via
Browse files Browse the repository at this point in the history
VertexManagerEvent. Contributed by Ming Ma.
  • Loading branch information
sidseth committed May 23, 2016
1 parent cc68f7b commit 2ecef25
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES

ALL CHANGES:

TEZ-3206. Have unordered partitioned KV output send partition stats via VertexManagerEvent.
TEZ-3237. Corrupted shuffle transfers to disk are not detected during transfer
TEZ-3240. Improvements to tez.lib.uris to allow for multiple tarballs and mixing tarballs and jars.
TEZ-3246. Improve diagnostics when DAG killed by user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent

private void handleVertexManagerEvent(VertexManagerEvent vmEvent) {
// currently events from multiple attempts of the same task can be ignored because
// their output will be the same. However, with pipelined events that may not hold.
// their output will be the same.
TaskIdentifier producerTask = vmEvent.getProducerAttemptIdentifier().getTaskIdentifier();
if (!taskWithVmEvents.add(producerTask)) {
LOG.info("Ignoring vertex manager event from: " + producerTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
Expand All @@ -55,6 +56,7 @@
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
Expand All @@ -63,7 +65,9 @@
import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,6 +115,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
private final ListeningExecutorService spillExecutor;

private final int[] numRecordsPerPartition;
// uncompressed size for each partition
private final long[] sizePerPartition;
private volatile long spilledSize = 0;

/**
Expand Down Expand Up @@ -197,10 +203,12 @@ public UnorderedPartitionedKVWriter(OutputContext outputContext, Configuration c
.setDaemon(true)
.setNameFormat(
"UnorderedOutSpiller {"
+ TezUtilsInternal.cleanVertexName(outputContext.getDestinationVertexName()) + "}")
+ TezUtilsInternal.cleanVertexName(
outputContext.getDestinationVertexName()) + "}")
.build());
spillExecutor = MoreExecutors.listeningDecorator(executor);
numRecordsPerPartition = new int[numPartitions];
sizePerPartition = new long[numPartitions];

outputLargeRecordsCounter = outputContext.getCounters().findCounter(
TaskCounter.OUTPUT_LARGE_RECORDS);
Expand Down Expand Up @@ -250,7 +258,11 @@ public void write(Object key, Object value) throws IOException {
}
if (skipBuffers) {
//special case, where we have only one partition and pipelining is disabled.
writer.append(key, value); // ???? Why is outputrecordscounter not updated here?
// The reason outputRecordsCounter isn't updated here:
// For skipBuffers case, IFile writer has the reference to
// outputRecordsCounter and during its close method call,
// it will update the outputRecordsCounter.
writer.append(key, value);
outputContext.notifyProgress();
} else {
int partition = partitioner.getPartition(key, value, numPartitions);
Expand All @@ -272,7 +284,7 @@ private void write(Object key, Object value, int partition) throws IOException {
int metaStart = currentBuffer.nextPosition;
currentBuffer.availableSize -= (META_SIZE + metaSkip);
currentBuffer.nextPosition += META_SIZE;

keySerializer.serialize(key);

if (currentBuffer.full) {
Expand All @@ -293,7 +305,7 @@ private void write(Object key, Object value, int partition) throws IOException {

int valStart = currentBuffer.nextPosition;
valSerializer.serialize(value);

if (currentBuffer.full) {
// Value too large for current buffer, or K-V too large for entire buffer.
if (metaStart == 0) {
Expand Down Expand Up @@ -324,6 +336,8 @@ private void write(Object key, Object value, int partition) throws IOException {
outputContext.notifyProgress();
currentBuffer.partitionPositions[partition] = metaStart;
currentBuffer.recordsPerPartition[partition]++;
currentBuffer.sizePerPartition[partition] +=
currentBuffer.nextPosition - (metaStart + META_SIZE);
currentBuffer.numRecords++;

}
Expand Down Expand Up @@ -353,6 +367,7 @@ private void setupNextBuffer() throws IOException {
private void updateGlobalStats(WrappedBuffer buffer) {
for (int i = 0; i < numPartitions; i++) {
numRecordsPerPartition[i] += buffer.recordsPerPartition[i];
sizePerPartition[i] += buffer.sizePerPartition[i];
}
}

Expand Down Expand Up @@ -472,6 +487,7 @@ public static long getInitialMemoryRequirement(Configuration conf, long maxAvail

@Override
public List<Event> close() throws IOException, InterruptedException {
List<Event> eventList = Lists.newLinkedList();
isShutdown.set(true);
spillLock.lock();
try {
Expand Down Expand Up @@ -513,12 +529,15 @@ public List<Event> close() throws IOException, InterruptedException {
if (outputRecordsCounter.getValue() == 0) {
emptyPartitions.set(0);
}
sizePerPartition[0] = rawLen;
cleanupCurrentBuffer();

outputBytesWithOverheadCounter.increment(rawLen);
fileOutputBytesCounter.increment(compLen + indexFileSizeEstimate);
return Collections.singletonList(generateDMEvent(false, -1, false, outputContext
eventList.add(generateVMEvent());
eventList.add(generateDMEvent(false, -1, false, outputContext
.getUniqueIdentifier(), emptyPartitions));
return eventList;
}

//Regular code path.
Expand All @@ -528,12 +547,17 @@ public List<Event> close() throws IOException, InterruptedException {
finalSpill();
}
cleanupCurrentBuffer();
return Collections.singletonList(generateDMEvent());
eventList.add(generateVMEvent());
eventList.add(generateDMEvent());
return eventList;
}

//For pipelined case, send out an event in case finalspill generated a spill file.
if (finalSpill()) {
sendPipelinedEventForSpill(currentBuffer.recordsPerPartition, numSpills.get() - 1, true);
// VertexManagerEvent is only sent at the end and thus sizePerPartition is used
// for the sum of all spills.
sendPipelinedEventForSpill(currentBuffer.recordsPerPartition,
sizePerPartition, numSpills.get() - 1, true);
}
cleanupCurrentBuffer();
return events;
Expand All @@ -551,6 +575,39 @@ private BitSet getEmptyPartitions(int[] recordsPerPartition) {
return emptyPartitions;
}

private Event generateVMEvent() throws IOException {
return generateVMEvent(this.sizePerPartition);
}

private Event generateVMEvent(long[] sizePerPartition) throws IOException {
ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder =
ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();

long outputSize = outputContext.getCounters().
findCounter(TaskCounter.OUTPUT_BYTES).getValue();

// Set this information only when required. In pipelined shuffle,
// multiple events would end up adding up to final output size.
// This is needed for auto-reduce parallelism to work properly.
vmBuilder.setOutputSize(outputSize);

//set partition stats
if (sizePerPartition != null && sizePerPartition.length > 0) {
RoaringBitmap stats = ShuffleUtils.getPartitionStatsForPhysicalOutput(
sizePerPartition);
DataOutputBuffer dout = new DataOutputBuffer();
stats.serialize(dout);
ByteString partitionStatsBytes =
TezCommonUtils.compressByteArrayToByteString(dout.getData());
vmBuilder.setPartitionStats(partitionStatsBytes);
}

VertexManagerEvent vmEvent = VertexManagerEvent.create(
outputContext.getDestinationVertexName(),
vmBuilder.build().toByteString().asReadOnlyByteBuffer());
return vmEvent;
}

private Event generateDMEvent() throws IOException {
BitSet emptyPartitions = getEmptyPartitions(numRecordsPerPartition);
return generateDMEvent(false, -1, false, outputContext.getUniqueIdentifier(), emptyPartitions);
Expand Down Expand Up @@ -609,12 +666,14 @@ private void cleanup() {
private boolean finalSpill() throws IOException {
if (currentBuffer.nextPosition == 0) {
if (pipelinedShuffle) {
List<Event> eventList = Lists.newLinkedList();
eventList.add(generateVMEvent(new long[numPartitions]));
//Send final event with all empty partitions and null path component.
BitSet emptyPartitions = new BitSet(numPartitions);
emptyPartitions.flip(0, numPartitions);

outputContext.sendEvents(
Collections.singletonList(generateDMEvent(true, numSpills.get(), true, null, emptyPartitions)));
eventList.add(generateDMEvent(true, numSpills.get(), true,
null, emptyPartitions));
outputContext.sendEvents(eventList);
}
return false;
} else {
Expand Down Expand Up @@ -785,6 +844,7 @@ private void writeLargeRecord(final Object key, final Object value, final int pa
writer.append(key, value);
outputLargeRecordsCounter.increment(1);
numRecordsPerPartition[i]++;
sizePerPartition[i] += writer.getRawLength();
writer.close();
additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
Expand All @@ -805,7 +865,8 @@ private void writeLargeRecord(final Object key, final Object value, final int pa
}
handleSpillIndex(spillPathDetails, spillRecord);

sendPipelinedEventForSpill(emptyPartitions, spillIndex, false);
sendPipelinedEventForSpill(emptyPartitions, sizePerPartition,
spillIndex, false);

LOG.info(destNameTrimmed + ": " + "Finished writing large record of size " + outSize + " to spill file " + spillIndex);
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -862,6 +923,8 @@ private static class WrappedBuffer {

private final int[] partitionPositions;
private final int[] recordsPerPartition;
// uncompressed size for each partition
private final long[] sizePerPartition;
private final int numPartitions;
private final int size;

Expand All @@ -878,10 +941,12 @@ private static class WrappedBuffer {
WrappedBuffer(int numPartitions, int size) {
this.partitionPositions = new int[numPartitions];
this.recordsPerPartition = new int[numPartitions];
this.sizePerPartition = new long[numPartitions];
this.numPartitions = numPartitions;
for (int i = 0; i < numPartitions; i++) {
this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
this.recordsPerPartition[i] = 0;
this.sizePerPartition[i] = 0;
}
size = size - (size % INT_SIZE);
this.size = size;
Expand All @@ -894,6 +959,7 @@ void reset() {
for (int i = 0; i < numPartitions; i++) {
this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
this.recordsPerPartition[i] = 0;
this.sizePerPartition[i] = 0;
}
numRecords = 0;
nextPosition = 0;
Expand All @@ -908,27 +974,36 @@ void cleanup() {
}
}

private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean isFinalUpdate) {
private void sendPipelinedEventForSpill(
BitSet emptyPartitions, long[] sizePerPartition, int spillNumber,
boolean isFinalUpdate) {
List<Event> eventList = Lists.newLinkedList();
if (!pipelinedShuffle) {
return;
}
//Send out an event for consuming.
try {
String pathComponent = (outputContext.getUniqueIdentifier() + "_" + spillNumber);
if (isFinalUpdate) {
eventList.add(generateVMEvent(sizePerPartition));
}
Event compEvent = generateDMEvent(true, spillNumber, isFinalUpdate,
pathComponent, emptyPartitions);
eventList.add(compEvent);

LOG.info(destNameTrimmed + ": " + "Adding spill event for spill (final update=" + isFinalUpdate + "), spillId=" + spillNumber);
outputContext.sendEvents(Collections.singletonList(compEvent));
outputContext.sendEvents(eventList);
} catch (IOException e) {
LOG.error(destNameTrimmed + ": " + "Error in sending pipelined events", e);
outputContext.reportFailure(TaskFailureType.NON_FATAL, e, "Error in sending pipelined events");
}
}

private void sendPipelinedEventForSpill(int[] recordsPerPartition, int spillNumber, boolean isFinalUpdate) {
private void sendPipelinedEventForSpill(int[] recordsPerPartition,
long[] sizePerPartition, int spillNumber, boolean isFinalUpdate) {
BitSet emptyPartitions = getEmptyPartitions(recordsPerPartition);
sendPipelinedEventForSpill(emptyPartitions, spillNumber, isFinalUpdate);
sendPipelinedEventForSpill(emptyPartitions, sizePerPartition, spillNumber,
isFinalUpdate);
}

private class SpillCallback implements FutureCallback<SpillResult> {
Expand All @@ -943,7 +1018,8 @@ private class SpillCallback implements FutureCallback<SpillResult> {
public void onSuccess(SpillResult result) {
spilledSize += result.spillSize;

sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition, spillNumber, false);
sendPipelinedEventForSpill(result.wrappedBuffer.recordsPerPartition,
result.wrappedBuffer.sizePerPartition, spillNumber, false);

try {
result.wrappedBuffer.reset();
Expand Down
Loading

0 comments on commit 2ecef25

Please sign in to comment.