Skip to content

Commit

Permalink
TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.c…
Browse files Browse the repository at this point in the history
…lose() and the ShufflePenaltyReferee thread (apache#273) (Laszlo Bodor,  Sungwoo Park, reviewed by Rajesh Balamohan)
  • Loading branch information
abstractdog authored Mar 6, 2023
1 parent 6bd6f9c commit 25a9536
Showing 1 changed file with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -759,20 +759,22 @@ private void logProgress() {
}
}

public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
boolean readError, boolean connectError) {
failedShuffleCounter.increment(1);
inputContext.notifyProgress();
int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
int failures;

if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
*/
failedShufflesSinceLastCompletion++;
synchronized (this) {
failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
if (!fetchFailure.isLocalFetch()) {
/**
* Track the number of failures that has happened since last completion.
* This gets reset on a successful copy.
*/
failedShufflesSinceLastCompletion++;
}
}

/**
* Inform AM:
* - In case of read/connect error
Expand All @@ -794,14 +796,18 @@ public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHo
}

//Restart consumer in case shuffle is not healthy
if (!isShuffleHealthy(fetchFailure)) {
try {
checkShuffleHealthy(fetchFailure);
} catch (IOException e) {
// reportException should be called outside synchronized(this) due to TEZ-4334
exceptionReporter.reportException(e);
return;
}

penalizeHost(host, failures);
}

private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
private void isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) throws IOException {
int attemptFailures = getFailureCount(srcAttempt);
if (attemptFailures >= abortFailureLimit) {
// This task has seen too many fetch failures - report it as failed. The
Expand All @@ -816,15 +822,11 @@ private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
inputContext.getSourceVertexName(),
srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
IOException ioe = new IOException(errorMsg);
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(ioe);
return true;
throw new IOException(errorMsg);
}
return false;
}

private void penalizeHost(MapHost host, int failures) {
private synchronized void penalizeHost(MapHost host, int failures) {
host.penalize();

HostPort hostPort = new HostPort(host.getHost(), host.getPort());
Expand Down Expand Up @@ -1008,14 +1010,15 @@ private boolean isFetcherHealthy(String logContext) {
return fetcherHealthy;
}

boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
/**
* This method checks if the current shuffle is healthy and throw IOException if it's not,
* then the caller is supposed to handle the IOException.
*/
private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFailure)
throws IOException {
InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
if (isAbortLimitExceeedFor(srcAttempt)) {
return false;
}

final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;
// supposed to throw IOException if exceeded
isAbortLimitExceeedFor(srcAttempt);

int doneMaps = numInputs - remainingMaps.get();

Expand All @@ -1025,7 +1028,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)doneMaps / numInputs)
>= MIN_REQUIRED_PROGRESS_PERCENT);
>= minReqProgressFraction);

// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
Expand All @@ -1038,7 +1041,7 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {

boolean reducerStalled = (shuffleProgressDuration > 0) &&
(((float)stallDuration / shuffleProgressDuration)
>= MAX_ALLOWED_STALL_TIME_PERCENT);
>= maxStallTimeFraction);

// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
Expand All @@ -1059,10 +1062,8 @@ boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
LOG.debug("Host failures=" + hostFailures.keySet());
}
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause()));
return false;
throw new IOException(errorMsg, fetchFailure.getCause());
}
return true;
}

public synchronized void addKnownMapOutput(String inputHostName,
Expand Down

0 comments on commit 25a9536

Please sign in to comment.