From 69e14fd3419f2d8b61f1a9f09f7a9ce0c452d861 Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Wed, 15 Nov 2023 19:48:39 +0800 Subject: [PATCH 1/3] [CELEBORN-1128] Fix incorrect method reference in ConcurrentHashMap.contains ### What changes were proposed in this pull request? ConcurrentHashMap.contains main containsValue ,not containsKey. In the current codebase, there is a misuse of the contains method in the ConcurrentHashMap class. ### Why are the changes needed? ConcurrentHashMap.contains misuse ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No Closes #2102 from lyy-pineapple/hashMap. Authored-by: liangyongyuan Signed-off-by: zky.zhoukeyong --- .../org/apache/celeborn/client/WorkerStatusTrackerSuite.scala | 4 ++-- .../apache/celeborn/service/deploy/worker/Controller.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala index 5d3355a33bc..1f606dd3280 100644 --- a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala @@ -61,13 +61,13 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { // test new added workers Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0"))) Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3"))) - Assert.assertTrue(!statusTracker.excludedWorkers.contains(mock("host4"))) + Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4"))) Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4"))) // test re heartbeat with shutdown workers val response3 = buildResponse(Array.empty, Array.empty, Array("host4")) statusTracker.handleHeartbeatResponse(response3) - Assert.assertTrue(!statusTracker.excludedWorkers.contains(mock("host4"))) + Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4"))) Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4"))) // test remove diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 8418d69d638..d55b7dadb8c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -353,7 +353,8 @@ private[deploy] class Controller( epoch: Long): Unit = { def alreadyCommitted(shuffleKey: String, epoch: Long): Boolean = { - shuffleCommitInfos.contains(shuffleKey) && shuffleCommitInfos.get(shuffleKey).contains(epoch) + shuffleCommitInfos.containsKey(shuffleKey) && shuffleCommitInfos.get(shuffleKey).containsKey( + epoch) } // Reply SHUFFLE_NOT_REGISTERED if shuffleKey does not exist AND the shuffle is not committed. From 12d60522394607691ed066e2fd065db43a45a8e2 Mon Sep 17 00:00:00 2001 From: "zky.zhoukeyong" Date: Wed, 15 Nov 2023 22:12:38 +0800 Subject: [PATCH 2/3] [CELEBORN-1130] LifecycleManager#requestWorkerReserveSlots should check null for endpoint ### What changes were proposed in this pull request? When I kill -9 a Worker process, Master will not exclude the worker until heartbeat timeout. During this time, Master will still allocate slots on this Worker, causing NPE when register shuffle ``` Caused by: java.lang.NullPointerException at org.apache.celeborn.client.LifecycleManager.requestWorkerReserveSlots(LifecycleManager.scala:1246) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?] at org.apache.celeborn.client.LifecycleManager.$anonfun$reserveSlots$2(LifecycleManager.scala:864) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?] at org.apache.celeborn.common.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:301) ~[celeborn-client-spark-3-shaded_2.12-0.4.0-SNAPSHOT.jar:?] at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) ~[scala-library-2.12.15.jar:?] at scala.util.Success.$anonfun$map$1(Try.scala:255) ~[scala-library-2.12.15.jar:?] at scala.util.Success.map(Try.scala:213) ~[scala-library-2.12.15.jar:?] at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) ~[scala-library-2.12.15.jar:?] at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) ~[scala-library-2.12.15.jar:?] at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_372] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) ~[?:1.8.0_372] ``` ### Why are the changes needed? ditto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test and passes GA Closes #2104 from waitinfuture/1130. Authored-by: zky.zhoukeyong Signed-off-by: zky.zhoukeyong --- .../celeborn/client/LifecycleManager.scala | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 9f0ca9da47e..cf54dfcd804 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -21,6 +21,8 @@ import java.nio.ByteBuffer import java.util import java.util.{function, List => JList} import java.util.concurrent.{Callable, ConcurrentHashMap, ScheduledFuture, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger +import java.util.function.Consumer import scala.collection.JavaConverters._ import scala.collection.mutable @@ -33,6 +35,7 @@ import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, Shu import org.apache.celeborn.client.listener.WorkerStatusListener import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient +import org.apache.celeborn.common.exception.CelebornIOException import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ShufflePartitionLocationInfo, WorkerInfo} @@ -720,22 +723,27 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends Math.min(Math.max(1, workerPartitionLocations.size), conf.clientRpcMaxParallelism) ThreadUtils.parmap(workerPartitionLocations, "ReserveSlot", parallelism) { case (workerInfo, (primaryLocations, replicaLocations)) => - val res = requestWorkerReserveSlots( - workerInfo.endpoint, - ReserveSlots( - appUniqueId, - shuffleId, - primaryLocations, - replicaLocations, - partitionSplitThreshold, - partitionSplitMode, - getPartitionType(shuffleId), - rangeReadFilter, - userIdentifier, - conf.pushDataTimeoutMs, - if (getPartitionType(shuffleId) == PartitionType.MAP) - conf.clientShuffleMapPartitionSplitEnabled - else true)) + val res = + if (workerInfo.endpoint == null) { + ReserveSlotsResponse(StatusCode.REQUEST_FAILED, s"$workerInfo endpoint is NULL!") + } else { + requestWorkerReserveSlots( + workerInfo.endpoint, + ReserveSlots( + appUniqueId, + shuffleId, + primaryLocations, + replicaLocations, + partitionSplitThreshold, + partitionSplitMode, + getPartitionType(shuffleId), + rangeReadFilter, + userIdentifier, + conf.pushDataTimeoutMs, + if (getPartitionType(shuffleId) == PartitionType.MAP) + conf.clientShuffleMapPartitionSplitEnabled + else true)) + } if (res.status.equals(StatusCode.SUCCESS)) { logDebug(s"Successfully allocated " + s"partitions buffer for shuffleId $shuffleId" + From 758018f5125187f467a4889f8aa7aa48e2977aa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E7=A5=A5=E5=B9=B3?= Date: Wed, 15 Nov 2023 22:30:39 +0800 Subject: [PATCH 3/3] [CELEBORN-1129] More easy to dedicate createReaderWithRetry error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Add lastException to CelebornIOException when createReaderWithRetry meet error ### Why are the changes needed? Now we should to find the detail executor to dedicate the detail error msg ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes #2103 from wxplovecc/easy-to-dedicate-error. Authored-by: 吴祥平 Signed-off-by: zky.zhoukeyong --- .../org/apache/celeborn/client/read/CelebornInputStream.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java index 22cc54a5ff7..29f63538071 100644 --- a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java +++ b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java @@ -312,6 +312,7 @@ private boolean isCriticalCause(Exception e) { } private PartitionReader createReaderWithRetry(PartitionLocation location) throws IOException { + Exception lastException = null; while (fetchChunkRetryCnt < fetchChunkMaxRetry) { try { if (isExcluded(location)) { @@ -319,6 +320,7 @@ private PartitionReader createReaderWithRetry(PartitionLocation location) throws } return createReader(location, fetchChunkRetryCnt, fetchChunkMaxRetry); } catch (Exception e) { + lastException = e; excludeFailedLocation(location, e); fetchChunkRetryCnt++; if (location.hasPeer()) { @@ -345,7 +347,7 @@ private PartitionReader createReaderWithRetry(PartitionLocation location) throws } } } - throw new CelebornIOException("createPartitionReader failed! " + location); + throw new CelebornIOException("createPartitionReader failed! " + location, lastException); } private ByteBuf getNextChunk() throws IOException {