Skip to content

Commit

Permalink
[CELEBORN-1763] Fix DataPusher be blocked for a long time
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
fix DataPusher be blocked for a long time

### Why are the changes needed?
The worker has been at a performance bottleneck for a long time, the slow start strategy adjusts its maxInFlight to 1, which may cause RequestInFlight to exceed maxInFlight. If the task’s main thread has been blocked in the waitIdleQueueFullWithLock call, then the main thread will not be able to detect the sending failure since this failure changes the exception in the push state, and the waitIdleQueueFullWithLock function does not check for it

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
GA

Closes apache#2978 from zhaostu4/fix_pusher_block.

Authored-by: zhangzhao.08 <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
zhaostu4 authored and turboFei committed Dec 23, 2024
1 parent eaa0726 commit 9e04ff4
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.util.ThreadUtils;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;

public class DataPusher {
private static final Logger logger = LoggerFactory.getLogger(DataPusher.class);
Expand All @@ -43,6 +45,7 @@ public class DataPusher {

private LinkedBlockingQueue<PushTask> idleQueue;
// partition -> PushTask Queue
private final PushState pushState;
private final DataPushQueue dataPushQueue;
private final ReentrantLock idleLock = new ReentrantLock();
private final Condition idleFull = idleLock.newCondition();
Expand Down Expand Up @@ -98,6 +101,8 @@ public DataPusher(
this.client = client;
this.afterPush = afterPush;
this.mapStatusLengths = mapStatusLengths;
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
this.pushState = client.getPushState(mapKey);

pushThread =
ThreadUtils.newDaemonThread(
Expand Down Expand Up @@ -194,6 +199,9 @@ public void checkException() throws IOException {
if (exceptionRef.get() != null) {
throw exceptionRef.get();
}
if (pushState.exception.get() != null) {
throw pushState.exception.get();
}
}

protected void pushData(PushTask task) throws IOException {
Expand All @@ -217,6 +225,7 @@ private void waitIdleQueueFullWithLock() throws InterruptedException {
while (idleQueue != null
&& idleQueue.remainingCapacity() > 0
&& exceptionRef.get() == null
&& pushState.exception.get() == null
&& (pushThread != null && pushThread.isAlive())) {
idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
}
Expand All @@ -229,7 +238,9 @@ private void waitIdleQueueFullWithLock() throws InterruptedException {
}

protected boolean stillRunning() {
return !terminated && !Objects.nonNull(exceptionRef.get());
return !terminated
&& !Objects.nonNull(exceptionRef.get())
&& !Objects.nonNull(pushState.exception.get());
}

public DataPushQueue getDataPushQueue() {
Expand Down

0 comments on commit 9e04ff4

Please sign in to comment.