Skip to content

Commit

Permalink
[CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots should che…
Browse files Browse the repository at this point in the history
…ck null for endpoint

### What changes were proposed in this pull request?
When I kill -9 a Worker process, Master will not exclude the worker until heartbeat timeout.
During this time, Master will still allocate slots on this Worker, causing NPE when register shuffle
```
Caused by: java.lang.NullPointerException
	at org.apache.celeborn.client.LifecycleManager.requestWorkerReserveSlots(LifecycleManager.scala:1246) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
	at org.apache.celeborn.client.LifecycleManager.$anonfun$reserveSlots$2(LifecycleManager.scala:864) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
	at org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:301) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?]
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?]
	at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?]
	at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?]
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?]
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_372]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_372]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_372]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_372]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_372]
```

### Why are the changes needed?
ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test and passes GA

Closes #2104 from waitinfuture/1130.

Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
waitinfuture committed Nov 15, 2023
1 parent 69e14fd commit 12d6052
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.nio.ByteBuffer
import java.util
import java.util.{function, List => JList}
import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -33,6 +35,7 @@ import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu
import org.apache.celeborn.client.listener.WorkerStatusListener
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.exception.CelebornIOException
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo}
Expand Down Expand Up @@ -720,22 +723,27 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism)
ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) {
case (workerInfo, (primaryLocations, replicaLocations)) =>
val res = requestWorkerReserveSlots(
workerInfo.endpoint,
ReserveSlots(
appUniqueId,
shuffleId,
primaryLocations,
replicaLocations,
partitionSplitThreshold,
partitionSplitMode,
getPartitionType(shuffleId),
rangeReadFilter,
userIdentifier,
conf.pushDataTimeoutMs,
if (getPartitionType(shuffleId) == PartitionType.MAP)
conf.clientShuffleMapPartitionSplitEnabled
else true))
val res =
if (workerInfo.endpoint == null) {
ReserveSlotsResponse(StatusCode.REQUEST_FAILED, s"$workerInfo endpoint is NULL!")
} else {
requestWorkerReserveSlots(
workerInfo.endpoint,
ReserveSlots(
appUniqueId,
shuffleId,
primaryLocations,
replicaLocations,
partitionSplitThreshold,
partitionSplitMode,
getPartitionType(shuffleId),
rangeReadFilter,
userIdentifier,
conf.pushDataTimeoutMs,
if (getPartitionType(shuffleId) == PartitionType.MAP)
conf.clientShuffleMapPartitionSplitEnabled
else true))
}
if (res.status.equals(StatusCode.SUCCESS)) {
logDebug(s"Successfully allocated " +
s"partitions buffer for shuffleId $shuffleId" +
Expand Down

0 comments on commit 12d6052

Please sign in to comment.