Skip to content

Commit

Permalink
Merge branch 'apache:main' into CELEBORN-1123
Browse files Browse the repository at this point in the history
  • Loading branch information
gaochao0509 authored Nov 17, 2023
2 parents 66f71fb + 758018f commit 7097326
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,15 @@ private boolean isCriticalCause(Exception e) {
}

private PartitionReader createReaderWithRetry(PartitionLocation location) throws IOException {
Exception lastException = null;
while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
try {
if (isExcluded(location)) {
throw new CelebornIOException("Fetch data from excluded worker! " + location);
}
return createReader(location, fetchChunkRetryCnt, fetchChunkMaxRetry);
} catch (Exception e) {
lastException = e;
excludeFailedLocation(location, e);
fetchChunkRetryCnt++;
if (location.hasPeer()) {
Expand All @@ -345,7 +347,7 @@ private PartitionReader createReaderWithRetry(PartitionLocation location) throws
}
}
}
throw new CelebornIOException("createPartitionReader failed! " + location);
throw new CelebornIOException("createPartitionReader failed! " + location, lastException);
}

private ByteBuf getNextChunk() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.nio.ByteBuffer
import java.util
import java.util.{function, List => JList}
import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -33,6 +35,7 @@ import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo}
Expand Down Expand Up @@ -720,22 +723,27 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism)
ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
case (workerInfo, (primaryLocations, replicaLocations)) =>
val res = requestWorkerReserveSlots(
workerInfo.endpoint,
ReserveSlots(
appUniqueId,
shuffleId,
primaryLocations,
replicaLocations,
partitionSplitThreshold,
partitionSplitMode,
getPartitionType(shuffleId),
rangeReadFilter,
userIdentifier,
conf.pushDataTimeoutMs,
if (getPartitionType(shuffleId) == PartitionType.MAP)
conf.clientShuffleMapPartitionSplitEnabled
else true))
val res =
if (workerInfo.endpoint == null) {
ReserveSlotsResponse(StatusCode.REQUEST_FAILED, s"$workerInfo endpoint is NULL!")
} else {
requestWorkerReserveSlots(
workerInfo.endpoint,
ReserveSlots(
appUniqueId,
shuffleId,
primaryLocations,
replicaLocations,
partitionSplitThreshold,
partitionSplitMode,
getPartitionType(shuffleId),
rangeReadFilter,
userIdentifier,
conf.pushDataTimeoutMs,
if (getPartitionType(shuffleId) == PartitionType.MAP)
conf.clientShuffleMapPartitionSplitEnabled
else true))
}
if (res.status.equals(StatusCode.SUCCESS)) {
logDebug(s"Successfully allocated " +
s"partitions buffer for shuffleId $shuffleId" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
// test new added workers
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0")))
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
Assert.assertTrue(!statusTracker.excludedWorkers.contains(mock("host4")))
Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4")))
Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))

// test re heartbeat with shutdown workers
val response3 = buildResponse(Array.empty, Array.empty, Array("host4"))
statusTracker.handleHeartbeatResponse(response3)
Assert.assertTrue(!statusTracker.excludedWorkers.contains(mock("host4")))
Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4")))
Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))

// test remove
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ private[deploy] class Controller(
epoch: Long): Unit = {

def alreadyCommitted(shuffleKey: String, epoch: Long): Boolean = {
shuffleCommitInfos.contains(shuffleKey) && shuffleCommitInfos.get(shuffleKey).contains(epoch)
shuffleCommitInfos.containsKey(shuffleKey) && shuffleCommitInfos.get(shuffleKey).containsKey(
epoch)
}

// Reply SHUFFLE_NOT_REGISTERED if shuffleKey does not exist AND the shuffle is not committed.
Expand Down

0 comments on commit 7097326

Please sign in to comment.