Skip to content

Commit

Permalink
TEZ-4087 : Shuffle: Fix shuffle cleanup to prevent thread leaks (Raje…
Browse files Browse the repository at this point in the history
…sh Balamohan via Prasanth J, Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <[email protected]>
  • Loading branch information
rbalamohan authored and ashutoshc committed May 27, 2020
1 parent f047d4a commit 7659726
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ public MergeManager(Configuration conf,
this.onDiskMerger = new OnDiskMerger(this);
}

void setupParentThread(Thread shuffleSchedulerThread) {
LOG.info("Setting merger's parent thread to "
+ shuffleSchedulerThread.getName());
if (this.memToMemMerger != null) {
memToMemMerger.setParentThread(shuffleSchedulerThread);
}
this.inMemoryMerger.setParentThread(shuffleSchedulerThread);;
this.onDiskMerger.setParentThread(shuffleSchedulerThread);
}

@Private
void configureAndStart() {
if (this.memToMemMerger != null) {
Expand Down Expand Up @@ -714,7 +724,8 @@ public IntermediateMemoryToMemoryMerger(MergeManager manager,
int mergeFactor) {
super(manager, mergeFactor, exceptionReporter);
setName("MemToMemMerger [" + TezUtilsInternal
.cleanVertexName(inputContext.getSourceVertexName()) + "]");
.cleanVertexName(inputContext.getSourceVertexName())
+ "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}

Expand Down Expand Up @@ -831,8 +842,9 @@ private class InMemoryMerger extends MergeThread<MapOutput> {

public InMemoryMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
setName("MemtoDiskMerger [" + TezUtilsInternal
.cleanVertexName(inputContext.getSourceVertexName()) + "]");
setName("MemtoDiskMerger [" + TezUtilsInternal
.cleanVertexName(inputContext.getSourceVertexName())
+ "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}

Expand Down Expand Up @@ -952,8 +964,9 @@ class OnDiskMerger extends MergeThread<FileChunk> {

public OnDiskMerger(MergeManager manager) {
super(manager, ioSortFactor, exceptionReporter);
setName("DiskToDiskMerger [" + TezUtilsInternal
.cleanVertexName(inputContext.getSourceVertexName()) + "]");
setName("DiskToDiskMerger [" + TezUtilsInternal
.cleanVertexName(inputContext.getSourceVertexName())
+ "_" + inputContext.getUniqueIdentifier() + "]");
setDaemon(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ abstract class MergeThread<T> extends Thread {
private final ExceptionReporter reporter;
private boolean closed = false;
private final int mergeFactor;

private Thread shuffleSchedulerThread;

public MergeThread(MergeManager manager, int mergeFactor,
ExceptionReporter reporter) {
Expand All @@ -60,6 +62,10 @@ public synchronized void close() throws InterruptedException {
}
}

public void setParentThread(Thread shuffleSchedulerThread) {
this.shuffleSchedulerThread = shuffleSchedulerThread;
}

public synchronized boolean isInProgress() {
return inProgress;
}
Expand All @@ -81,7 +87,11 @@ public synchronized void startMerge(Set<T> inputs) {

public synchronized void waitForMerge() throws InterruptedException {
while (inProgress) {
wait();
if (shuffleSchedulerThread != null
&& !shuffleSchedulerThread.isAlive()) {
return;
}
wait(5000);
}
}

Expand All @@ -91,7 +101,11 @@ public void run() {
// Wait for notification to start the merge...
synchronized (this) {
while (!inProgress) {
wait();
if (shuffleSchedulerThread != null
&& !shuffleSchedulerThread.isAlive()) {
return;
}
wait(5000);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ public ShuffleScheduler(InputContext inputContext,

public void start() throws Exception {
shuffleSchedulerThread = Thread.currentThread();
mergeManager.setupParentThread(shuffleSchedulerThread);
ShuffleSchedulerCallable schedulerCallable = new ShuffleSchedulerCallable();
schedulerCallable.call();
}
Expand Down

0 comments on commit 7659726

Please sign in to comment.