Skip to content

Commit

Permalink
[CELEBORN-1771] Bring forward PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1.Move the peerWorker available judgement out of ThreadPool.
2.Move `retain` after the available worker judgment Which means we don't have to release if peerWorker is unavailable.
2. Add `fileWriter.decrementPendingWrites()` if peerWorker is unavailable since it will return and won't decrementPendingWreites in `writeLocalData`.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UT & cluster testing.

Closes apache#2989 from zaynt4606/clb1771.

Authored-by: zhengtao <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
zaynt4606 authored and RexXiong committed Dec 24, 2024
1 parent 680b072 commit 2eb4c23
Showing 1 changed file with 53 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,26 +273,27 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
val writePromise = Promise[Array[StatusCode]]()
// for primary, send data to replica
if (doReplicate) {
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
fileWriter.decrementPendingWrites()
handlePushDataConnectionFail(callbackWithTimer, location)
return
}

pushData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushData.body().release()
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
handlePushDataConnectionFail(callbackWithTimer, location)
return
}

// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
Expand Down Expand Up @@ -424,6 +425,26 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
}
}

def handlePushDataConnectionFail(
callbackWithTimer: RpcResponseCallback,
location: PartitionLocation): Unit = {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushData replication failed caused by unavailable peer for partitionLocation: $location")
callbackWithTimer.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
}

def handlePushMergedDataConnectionFail(
pushMergedDataCallback: PushMergedDataCallback,
location: PartitionLocation): Unit = {
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed caused by unavailable peer for partitionLocation: $location")
pushMergedDataCallback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
}

def handlePushMergedData(
pushMergedData: PushMergedData,
callback: RpcResponseCallback): Unit = {
Expand Down Expand Up @@ -582,27 +603,32 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
val writePromise = Promise[Array[StatusCode]]()
// for primary, send data to replica
if (doReplicate) {
val location = partitionIdToLocations.head._2
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
for (fileWriterIndex <- 0 until totalFileWriters) {
val fileWriter = fileWriters(fileWriterIndex)
if (fileWriter != null && !pushMergedDataCallback.isHardSplitPartition(fileWriterIndex)) {
fileWriter.decrementPendingWrites()
}
}
handlePushMergedDataConnectionFail(pushMergedDataCallback, location)
return
}
pushMergedData.body().retain()
replicateThreadPool.submit(new Runnable {
override def run(): Unit = {
val location = partitionIdToLocations.head._2
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
peer.getRpcPort,
peer.getPushPort,
peer.getFetchPort,
peer.getReplicatePort)
if (unavailablePeers.containsKey(peerWorker)) {
pushMergedData.body().release()
workerSource.incCounter(WorkerSource.REPLICATE_DATA_CREATE_CONNECTION_FAIL_COUNT)
logError(
s"PushMergedData replication failed caused by unavailable peer for partitionLocation: $location")
pushMergedDataCallback.onFailure(
new CelebornIOException(StatusCode.PUSH_DATA_CREATE_CONNECTION_FAIL_REPLICA))
handlePushMergedDataConnectionFail(pushMergedDataCallback, location)
return
}

// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
Expand Down

0 comments on commit 2eb4c23

Please sign in to comment.