Skip to content

Commit

Permalink
[CELEBORN-678][FOLLOWUP] MapperAttempts for a shuffle should reply MA…
Browse files Browse the repository at this point in the history
…P_ENDED when mapper has already been ended from speculative task

### What changes were proposed in this pull request?

MapperAttempts for a shuffle replies the `MAP_ENDED` when mapper has already been ended for receving push data or push merged data from speculative task.

Follow up #1591.

### Why are the changes needed?

When mapper has already been ended for receving push data or push merged data from speculative task, `PushDataHandler` should trigger MapEnd instead of StageEnd for worker. Meanwhile, the `ShuffleClientImpl` should handle `STAGE_ENDED` as MapEnd, otherwise causes that other tasks of the stage could not send shuffle data for data lost.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Internal test.

Closes #2190 from SteNicholas/CELEBORN-678.

Authored-by: SteNicholas <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
SteNicholas authored and waitinfuture committed Dec 27, 2023
1 parent f8eb160 commit 3097ffe
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -954,8 +954,10 @@ public int pushOrMergeData(
new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
stageEndShuffleSet.add(shuffleId);
if (response.remaining() > 0 && response.get() == StatusCode.MAP_ENDED.getValue()) {
mapperEndMap
.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
.add(mapId);
}
logger.debug(
"Push data to {} success for shuffle {} map {} attempt {} partition {} batch {}.",
Expand Down Expand Up @@ -1350,8 +1352,10 @@ public void onSuccess(ByteBuffer response) {
groupedBatchId,
Arrays.toString(batchIds));
pushState.removeBatch(groupedBatchId, hostPort);
if (response.remaining() > 0 && response.get() == StatusCode.STAGE_ENDED.getValue()) {
stageEndShuffleSet.add(shuffleId);
if (response.remaining() > 0 && response.get() == StatusCode.MAP_ENDED.getValue()) {
mapperEndMap
.computeIfAbsent(shuffleId, (id) -> ConcurrentHashMap.newKeySet())
.add(mapId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
logInfo(
s"[Case1] Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
s" attempt $attemptId), but this mapper has already been ended.")
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
} else {
logInfo(
s"Receive push data for committed hard split partition of (shuffle $shuffleKey, " +
Expand Down Expand Up @@ -470,7 +470,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
s"task(shuffle $shuffleKey, map $mapId, attempt $attemptId), " +
s"but this mapper has already been ended.")
callbackWithTimer.onSuccess(
ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
ByteBuffer.wrap(Array[Byte](StatusCode.MAP_ENDED.getValue)))
} else {
logInfo(s"[Case1] Receive push merged data for committed hard split partition of " +
s"(shuffle $shuffleKey, map $mapId attempt $attemptId)")
Expand Down

0 comments on commit 3097ffe

Please sign in to comment.