From 12d60522394607691ed066e2fd065db43a45a8e2 Mon Sep 17 00:00:00 2001 From: "zky.zhoukeyong" Date: Wed, 15 Nov 2023 22:12:38 +0800 Subject: [PATCH] [CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots should check null for endpoint ### What changes were proposed in this pull request? When I kill -9 a Worker process, Master will not exclude the worker until heartbeat timeout. During this time, Master will still allocate slots on this Worker, causing NPE when register shuffle ``` Caused by: java.lang.NullPointerException at org.apache.celeborn.client.LifecycleManager.requestWorkerReserveSlots(LifecycleManager.scala:1246) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?] at org.apache.celeborn.client.LifecycleManager.$anonfun$reserveSlots$2(LifecycleManager.scala:864) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?] at org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:301) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?] at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?] at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?] at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?] at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_372] ``` ### Why are the changes needed? ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test and passes GA Closes #2104 from waitinfuture/1130. Authored-by: zky.zhoukeyong Signed-off-by: zky.zhoukeyong --- .../celeborn/client/LifecycleManager.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 9f0ca9da47e..cf54dfcd804 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -21,6 +21,8 @@ import java.nio.ByteBuffer import java.util import java.util.{function, List => JList} import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Consumer import scala.collection.JavaConverters._ import scala.collection.mutable @@ -33,6 +35,7 @@ import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu import org.apache.celeborn.client.listener.WorkerStatusListener import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient +import org.apache.celeborn.common.exception.CelebornIOException import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo} @@ -720,22 +723,27 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism) ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) { case (workerInfo, (primaryLocations, replicaLocations)) => - val res = requestWorkerReserveSlots( - workerInfo.endpoint, - ReserveSlots( - appUniqueId, - shuffleId, - primaryLocations, - replicaLocations, - partitionSplitThreshold, - partitionSplitMode, - getPartitionType(shuffleId), - rangeReadFilter, - userIdentifier, - conf.pushDataTimeoutMs, - if (getPartitionType(shuffleId) == PartitionType.MAP) - conf.clientShuffleMapPartitionSplitEnabled - else true)) + val res = + if (workerInfo.endpoint == null) { + ReserveSlotsResponse(StatusCode.REQUEST_FAILED, s"$workerInfo endpoint is NULL!") + } else { + requestWorkerReserveSlots( + workerInfo.endpoint, + ReserveSlots( + appUniqueId, + shuffleId, + primaryLocations, + replicaLocations, + partitionSplitThreshold, + partitionSplitMode, + getPartitionType(shuffleId), + rangeReadFilter, + userIdentifier, + conf.pushDataTimeoutMs, + if (getPartitionType(shuffleId) == PartitionType.MAP) + conf.clientShuffleMapPartitionSplitEnabled + else true)) + } if (res.status.equals(StatusCode.SUCCESS)) { logDebug(s"Successfully allocated " + s"partitions buffer for shuffleId $shuffleId" +