From e2f640ce3ba52691707d18450257ebdfbe4f93a6 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Fri, 1 Nov 2024 15:58:53 +0800 Subject: [PATCH] [CELEBORN-1660] Using map for workers to find worker fast ### What changes were proposed in this pull request? Using map for workers so that we can find a worker by uniqueId fast. ### Why are the changes needed? For large celeborn cluster, it might be slow. - updateWorkerHeartbeatMeta https://github.com/apache/celeborn/blob/1e77f01cd317b1dc885965d6053b391db1d42bc7/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java#L222 - handleWorkerLost https://github.com/apache/celeborn/blob/1e77f01cd317b1dc885965d6053b391db1d42bc7/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala#L762-L765 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes #2870 from turboFei/worksMap. Lead-authored-by: Wang, Fei Co-authored-by: Fei Wang Signed-off-by: mingji --- .../clustermeta/AbstractMetaManager.java | 58 +++++++++---------- .../service/deploy/master/Master.scala | 58 +++++++++---------- .../master/http/api/v1/WorkerResource.scala | 5 +- .../clustermeta/DefaultMetaSystemSuiteJ.java | 12 ++-- .../ha/MasterStateMachineSuiteJ.java | 15 +++-- .../ha/RatisMasterStatusSystemSuiteJ.java | 50 ++++++++-------- 6 files changed, 99 insertions(+), 99 deletions(-) diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 4080edae465..f1d8a37e815 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -64,7 +64,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler { public final Map> registeredAppAndShuffles = JavaUtils.newConcurrentHashMap(); public final Set hostnameSet = ConcurrentHashMap.newKeySet(); - public final Set workers = ConcurrentHashMap.newKeySet(); + public final Map workersMap = JavaUtils.newConcurrentHashMap(); public final ConcurrentHashMap lostWorkers = JavaUtils.newConcurrentHashMap(); public final ConcurrentHashMap workerEventInfos = @@ -170,8 +170,8 @@ public void updateWorkerLostMeta( WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort); workerLostEvents.add(worker); // remove worker from workers - synchronized (workers) { - workers.remove(worker); + synchronized (workersMap) { + workersMap.remove(worker.toUniqueId()); lostWorkers.put(worker, System.currentTimeMillis()); } excludedWorkers.remove(worker); @@ -182,15 +182,15 @@ public void updateWorkerRemoveMeta( String host, int rpcPort, int pushPort, int fetchPort, int replicatePort) { WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort); // remove worker from workers - synchronized (workers) { - workers.remove(worker); + synchronized (workersMap) { + workersMap.remove(worker.toUniqueId()); lostWorkers.put(worker, System.currentTimeMillis()); } excludedWorkers.remove(worker); } public void removeWorkersUnavailableInfoMeta(List unavailableWorkers) { - synchronized (workers) { + synchronized (workersMap) { for (WorkerInfo workerInfo : unavailableWorkers) { if (lostWorkers.containsKey(workerInfo)) { lostWorkers.remove(workerInfo); @@ -219,8 +219,8 @@ public void updateWorkerHeartbeatMeta( host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, userResourceConsumption); AtomicLong availableSlots = new AtomicLong(); LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks); - synchronized (workers) { - Optional workerInfo = workers.stream().filter(w -> w.equals(worker)).findFirst(); + synchronized (workersMap) { + Optional workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId())); workerInfo.ifPresent( info -> { info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize)); @@ -287,10 +287,8 @@ public void updateRegisterWorkerMeta( workerInfo.networkLocation_$eq(rackResolver.resolve(host).getNetworkLocation()); } workerInfo.updateDiskMaxSlots(estimatedPartitionSize); - synchronized (workers) { - if (!workers.contains(workerInfo)) { - workers.add(workerInfo); - } + synchronized (workersMap) { + workersMap.putIfAbsent(workerInfo.toUniqueId(), workerInfo); shutdownWorkers.remove(workerInfo); lostWorkers.remove(workerInfo); excludedWorkers.remove(workerInfo); @@ -315,7 +313,7 @@ public void writeMetaInfoToFile(File file) throws IOException, RuntimeException manuallyExcludedWorkers, workerLostEvents, appHeartbeatTime, - workers, + new HashSet(workersMap.values()), partitionTotalWritten.sum(), partitionTotalFileCount.sum(), appDiskUsageMetric.snapShots(), @@ -381,7 +379,7 @@ public void restoreMetaFromFile(File file) throws IOException { .collect(Collectors.toList()); scala.collection.immutable.Map resolveMap = rackResolver.resolveToMap(workerHostList); - workers.addAll( + workersMap.putAll( workerInfoSet.stream() .peek( workerInfo -> { @@ -391,7 +389,7 @@ public void restoreMetaFromFile(File file) throws IOException { resolveMap.get(workerInfo.host()).get().getNetworkLocation()); } }) - .collect(Collectors.toSet())); + .collect(Collectors.toMap(WorkerInfo::toUniqueId, w -> w))); snapshotMetaInfo .getLostWorkersMap() @@ -437,11 +435,11 @@ public void restoreMetaFromFile(File file) throws IOException { LOG.info("Successfully restore meta info from snapshot {}", file.getAbsolutePath()); LOG.info( "Worker size: {}, Registered shuffle size: {}. Worker excluded list size: {}. Manually Excluded list size: {}", - workers.size(), + workersMap.size(), registeredAppAndShuffles.size(), excludedWorkers.size(), manuallyExcludedWorkers.size()); - workers.forEach(workerInfo -> LOG.info(workerInfo.toString())); + workersMap.values().forEach(workerInfo -> LOG.info(workerInfo.toString())); registeredAppAndShuffles.forEach( (appId, shuffleId) -> LOG.info("RegisteredShuffle {}-{}", appId, shuffleId)); } @@ -449,7 +447,7 @@ public void restoreMetaFromFile(File file) throws IOException { private void cleanUpState() { registeredAppAndShuffles.clear(); hostnameSet.clear(); - workers.clear(); + workersMap.clear(); lostWorkers.clear(); appHeartbeatTime.clear(); excludedWorkers.clear(); @@ -464,7 +462,7 @@ private void cleanUpState() { } public void updateMetaByReportWorkerUnavailable(List failedWorkers) { - synchronized (this.workers) { + synchronized (this.workersMap) { shutdownWorkers.addAll(failedWorkers); } } @@ -473,7 +471,7 @@ public void updateWorkerEventMeta(int workerEventTypeValue, List wor long eventTime = System.currentTimeMillis(); ResourceProtos.WorkerEventType eventType = ResourceProtos.WorkerEventType.forNumber(workerEventTypeValue); - synchronized (this.workers) { + synchronized (this.workersMap) { for (WorkerInfo workerInfo : workerInfoList) { WorkerEventInfo workerEventInfo = workerEventInfos.get(workerInfo); LOG.info("Received worker event: {} for worker: {}", eventType, workerInfo.toUniqueId()); @@ -489,7 +487,7 @@ public void updateWorkerEventMeta(int workerEventTypeValue, List wor } public void updateMetaByReportWorkerDecommission(List workers) { - synchronized (this.workers) { + synchronized (this.workersMap) { decommissionWorkers.addAll(workers); } } @@ -520,19 +518,19 @@ public void updatePartitionSize() { "Celeborn cluster estimated partition size changed from {} to {}", Utils.bytesToString(oldEstimatedPartitionSize), Utils.bytesToString(estimatedPartitionSize)); - workers.stream() - .filter( - worker -> - !excludedWorkers.contains(worker) && !manuallyExcludedWorkers.contains(worker)) - .forEach(workerInfo -> workerInfo.updateDiskMaxSlots(estimatedPartitionSize)); + + HashSet workers = new HashSet(workersMap.values()); + excludedWorkers.forEach(workers::remove); + manuallyExcludedWorkers.forEach(workers::remove); + workers.forEach(workerInfo -> workerInfo.updateDiskMaxSlots(estimatedPartitionSize)); } public boolean isWorkerAvailable(WorkerInfo workerInfo) { - return !excludedWorkers.contains(workerInfo) + return (workerInfo.getWorkerStatus().getState() == PbWorkerStatus.State.Normal + && !workerEventInfos.containsKey(workerInfo)) + && !excludedWorkers.contains(workerInfo) && !shutdownWorkers.contains(workerInfo) - && !manuallyExcludedWorkers.contains(workerInfo) - && (!workerEventInfos.containsKey(workerInfo) - && workerInfo.getWorkerStatus().getState() == PbWorkerStatus.State.Normal); + && !manuallyExcludedWorkers.contains(workerInfo); } public void updateApplicationMeta(ApplicationMeta applicationMeta) { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index bef74b1c53d..1f065b3d85e 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -224,13 +224,13 @@ private[celeborn] class Master( masterSource.addGauge(MasterSource.REGISTERED_SHUFFLE_COUNT) { () => statusSystem.registeredShuffleCount } - masterSource.addGauge(MasterSource.WORKER_COUNT) { () => statusSystem.workers.size } + masterSource.addGauge(MasterSource.WORKER_COUNT) { () => statusSystem.workersMap.size } masterSource.addGauge(MasterSource.LOST_WORKER_COUNT) { () => statusSystem.lostWorkers.size } masterSource.addGauge(MasterSource.EXCLUDED_WORKER_COUNT) { () => statusSystem.excludedWorkers.size + statusSystem.manuallyExcludedWorkers.size } masterSource.addGauge(MasterSource.AVAILABLE_WORKER_COUNT) { () => - statusSystem.workers.asScala.count { w => + statusSystem.workersMap.values().asScala.count { w => statusSystem.isWorkerAvailable(w) } } @@ -242,7 +242,7 @@ private[celeborn] class Master( } masterSource.addGauge(MasterSource.PARTITION_SIZE) { () => statusSystem.estimatedPartitionSize } masterSource.addGauge(MasterSource.ACTIVE_SHUFFLE_SIZE) { () => - statusSystem.workers.parallelStream() + statusSystem.workersMap.values().parallelStream() .mapToLong(new ToLongFunction[WorkerInfo]() { override def applyAsLong(value: WorkerInfo): Long = value.userResourceConsumption.values().parallelStream() @@ -252,7 +252,7 @@ private[celeborn] class Master( }).sum() } masterSource.addGauge(MasterSource.ACTIVE_SHUFFLE_FILE_COUNT) { () => - statusSystem.workers.parallelStream() + statusSystem.workersMap.values().parallelStream() .mapToLong(new ToLongFunction[WorkerInfo]() { override def applyAsLong(value: WorkerInfo): Long = value.userResourceConsumption.values().parallelStream() @@ -263,11 +263,11 @@ private[celeborn] class Master( } masterSource.addGauge(MasterSource.DEVICE_CELEBORN_TOTAL_CAPACITY) { () => - statusSystem.workers.asScala.toList.map(_.totalSpace()).sum + statusSystem.workersMap.values().asScala.toList.map(_.totalSpace()).sum } masterSource.addGauge(MasterSource.DEVICE_CELEBORN_FREE_CAPACITY) { () => - statusSystem.workers.asScala.toList.map(_.totalActualUsableSpace()).sum + statusSystem.workersMap.values().asScala.toList.map(_.totalActualUsableSpace()).sum } masterSource.addGauge(MasterSource.IS_ACTIVE_MASTER) { () => isMasterActive } @@ -596,7 +596,7 @@ private[celeborn] class Master( return } - statusSystem.workers.asScala.foreach { worker => + statusSystem.workersMap.values().asScala.foreach { worker => if (worker.lastHeartbeat < currentTime - workerHeartbeatTimeoutMs && !statusSystem.workerLostEvents.contains(worker)) { logWarning(s"Worker ${worker.readableAddress()} timeout! Trigger WorkerLost event.") @@ -635,18 +635,18 @@ private[celeborn] class Master( if (HAHelper.getAppTimeoutDeadline(statusSystem) > currentTime) { return } - statusSystem.appHeartbeatTime.keySet().asScala.foreach { key => - if (statusSystem.appHeartbeatTime.get(key) < currentTime - appHeartbeatTimeoutMs) { - logWarning(s"Application $key timeout, trigger applicationLost event.") + statusSystem.appHeartbeatTime.asScala.foreach { case (appId, heartbeatTime) => + if (heartbeatTime < currentTime - appHeartbeatTimeoutMs) { + logWarning(s"Application $appId timeout, trigger applicationLost event.") val requestId = MasterClient.genRequestId() - var res = self.askSync[ApplicationLostResponse](ApplicationLost(key, requestId)) + var res = self.askSync[ApplicationLostResponse](ApplicationLost(appId, requestId)) var retry = 1 while (res.status != StatusCode.SUCCESS && retry <= 3) { - res = self.askSync[ApplicationLostResponse](ApplicationLost(key, requestId)) + res = self.askSync[ApplicationLostResponse](ApplicationLost(appId, requestId)) retry += 1 } if (retry > 3) { - logWarning(s"Handle ApplicationLost event for $key failed more than 3 times!") + logWarning(s"Handle ApplicationLost event for $appId failed more than 3 times!") } } } @@ -667,7 +667,7 @@ private[celeborn] class Master( workerStatus: WorkerStatus, requestId: String): Unit = { val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort) - val registered = statusSystem.workers.asScala.contains(targetWorker) + val registered = statusSystem.workersMap.containsKey(targetWorker.toUniqueId()) if (!registered) { logWarning(s"Received heartbeat from unknown worker " + s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.") @@ -758,10 +758,7 @@ private[celeborn] class Master( -1, new util.HashMap[String, DiskInfo](), JavaUtils.newConcurrentHashMap[UserIdentifier, ResourceConsumption]()) - val worker: WorkerInfo = statusSystem.workers - .asScala - .find(_ == targetWorker) - .orNull + val worker: WorkerInfo = statusSystem.workersMap.get(targetWorker.toUniqueId()) if (worker == null) { logWarning(s"Unknown worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort" + s" for WorkerLost handler!") @@ -806,7 +803,7 @@ private[celeborn] class Master( return } - if (statusSystem.workers.contains(workerToRegister)) { + if (statusSystem.workersMap.containsKey(workerToRegister.toUniqueId())) { logWarning(s"Receive RegisterWorker while worker" + s" ${workerToRegister.toString()} already exists, re-register.") statusSystem.handleRegisterWorker( @@ -908,7 +905,7 @@ private[celeborn] class Master( // offer slots val slots = masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") { - statusSystem.workers.synchronized { + statusSystem.workersMap.synchronized { if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) { SlotsAllocator.offerSlotsLoadAware( selectedWorkers, @@ -1121,24 +1118,24 @@ private[celeborn] class Master( fileCount, System.currentTimeMillis(), requestId) - // unknown workers will retain in needCheckedWorkerList - needCheckedWorkerList.removeAll(statusSystem.workers) + val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w => + statusSystem.workersMap.containsKey(w.toUniqueId())).asJava if (shouldResponse) { // UserResourceConsumption and DiskInfo are eliminated from WorkerInfo // during serialization of HeartbeatFromApplicationResponse var availableWorksSentToClient = new util.ArrayList[WorkerInfo]() if (needAvailableWorkers) { availableWorksSentToClient = new util.ArrayList[WorkerInfo]( - statusSystem.workers.asScala.filter(worker => - statusSystem.isWorkerAvailable(worker)).asJava) + statusSystem.workersMap.values().asScala.filter(worker => + statusSystem.isWorkerAvailable(worker)).toList.asJava) } - var appRelatedShuffles = + val appRelatedShuffles = statusSystem.registeredAppAndShuffles.getOrDefault(appId, Collections.emptySet()) context.reply(HeartbeatFromApplicationResponse( StatusCode.SUCCESS, new util.ArrayList( (statusSystem.excludedWorkers.asScala ++ statusSystem.manuallyExcludedWorkers.asScala).asJava), - needCheckedWorkerList, + unknownWorkers, new util.ArrayList[WorkerInfo]( (statusSystem.shutdownWorkers.asScala ++ statusSystem.decommissionWorkers.asScala).asJava), availableWorksSentToClient, @@ -1215,7 +1212,7 @@ private[celeborn] class Master( // TODO: Support calculate topN app resource consumption. private def computeUserResourceConsumption( userIdentifier: UserIdentifier): ResourceConsumption = { - val resourceConsumption = statusSystem.workers.asScala.flatMap { + val resourceConsumption = statusSystem.workersMap.values().asScala.flatMap { workerInfo => workerInfo.userResourceConsumption.asScala.get(userIdentifier) }.foldRight(ResourceConsumption(0, 0, 0, 0))(_ add _) resourceConsumption @@ -1249,7 +1246,7 @@ private[celeborn] class Master( private def workersAvailable( tmpExcludedWorkerList: Set[WorkerInfo] = Set.empty): util.List[WorkerInfo] = { - statusSystem.workers.asScala.filter { w => + statusSystem.workersMap.values().asScala.filter { w => statusSystem.isWorkerAvailable(w) && !tmpExcludedWorkerList.contains(w) }.toList.asJava } @@ -1282,7 +1279,7 @@ private[celeborn] class Master( } private def getWorkers: String = { - statusSystem.workers.asScala.mkString("\n") + statusSystem.workersMap.values().asScala.mkString("\n") } override def handleWorkerEvent( @@ -1411,7 +1408,8 @@ private[celeborn] class Master( ",")} and remove ${removeWorkers.map(_.readableAddress).mkString(",")}.\n") } val unknownExcludedWorkers = - (addWorkers ++ removeWorkers).filter(!statusSystem.workers.contains(_)) + (addWorkers ++ removeWorkers).filterNot(w => + statusSystem.workersMap.containsKey(w.toUniqueId())) if (unknownExcludedWorkers.nonEmpty) { sb.append( s"Unknown workers ${unknownExcludedWorkers.map(_.readableAddress).mkString(",")}." + diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala index 8d1b20bc3a0..34e6c734e9c 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/http/api/v1/WorkerResource.scala @@ -51,7 +51,7 @@ class WorkerResource extends ApiRequestContext { @GET def workers: WorkersResponse = { new WorkersResponse() - .workers(statusSystem.workers.asScala.map(ApiUtils.workerData).toSeq.asJava) + .workers(statusSystem.workersMap.values().asScala.map(ApiUtils.workerData).toSeq.asJava) .lostWorkers(statusSystem.lostWorkers.asScala.toSeq.sortBy(_._2) .map(kv => new WorkerTimestampData().worker(ApiUtils.workerData(kv._1)).timestamp(kv._2)).asJava) @@ -134,7 +134,8 @@ class WorkerResource extends ApiRequestContext { s"eventType(${request.getEventType}) and workers(${request.getWorkers}) are required") } val workers = request.getWorkers.asScala.map(ApiUtils.toWorkerInfo).toSeq - val (filteredWorkers, unknownWorkers) = workers.partition(statusSystem.workers.contains) + val (filteredWorkers, unknownWorkers) = + workers.partition(w => statusSystem.workersMap.containsKey(w.toUniqueId())) if (filteredWorkers.isEmpty) { throw new BadRequestException( s"None of the workers are known: ${unknownWorkers.map(_.readableAddress).mkString(", ")}") diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index 65660158ed6..cde62c4cf0c 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -157,7 +157,7 @@ public void testHandleRegisterWorker() { userResourceConsumption3, getNewReqeustId()); - assertEquals(3, statusSystem.workers.size()); + assertEquals(3, statusSystem.workersMap.size()); } @Test @@ -253,7 +253,7 @@ public void testHandleWorkerLost() { statusSystem.handleWorkerLost( HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, getNewReqeustId()); - assertEquals(2, statusSystem.workers.size()); + assertEquals(2, statusSystem.workersMap.size()); } private static final String APPID1 = "appId1"; @@ -376,20 +376,20 @@ public void testHandleReleaseSlots() { userResourceConsumption3, getNewReqeustId()); - assertEquals(3, statusSystem.workers.size()); + assertEquals(3, statusSystem.workersMap.size()); Map> workersToAllocate = new HashMap<>(); Map allocation = new HashMap<>(); allocation.put("disk1", 5); workersToAllocate.put( - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() .toUniqueId(), allocation); workersToAllocate.put( - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME2)) .findFirst() .get() @@ -399,7 +399,7 @@ public void testHandleReleaseSlots() { statusSystem.handleRequestSlots(SHUFFLEKEY1, HOSTNAME1, workersToAllocate, getNewReqeustId()); assertEquals( 0, - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java index 9696dba2de3..d2539ba5d37 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java @@ -232,20 +232,23 @@ public void testObjSerde() throws IOException, InterruptedException { AppDiskUsageSnapShot originCurrentSnapshot = masterStatusSystem.appDiskUsageMetric.currentSnapShot().get(); - masterStatusSystem.workers.add(new WorkerInfo(host1, 9095, 9094, 9093, 9092, 9091)); - masterStatusSystem.workers.add(new WorkerInfo(host2, 9095, 9094, 9093, 9092, 9091)); - masterStatusSystem.workers.add(new WorkerInfo(host3, 9095, 9094, 9093, 9092, 9091)); + WorkerInfo workerInfo1 = new WorkerInfo(host1, 9095, 9094, 9093, 9092, 9091); + WorkerInfo workerInfo2 = new WorkerInfo(host2, 9095, 9094, 9093, 9092, 9091); + WorkerInfo workerInfo3 = new WorkerInfo(host3, 9095, 9094, 9093, 9092, 9091); + masterStatusSystem.workersMap.put(workerInfo1.toUniqueId(), workerInfo1); + masterStatusSystem.workersMap.put(workerInfo2.toUniqueId(), workerInfo2); + masterStatusSystem.workersMap.put(workerInfo3.toUniqueId(), workerInfo3); masterStatusSystem.writeMetaInfoToFile(tmpFile); masterStatusSystem.hostnameSet.clear(); masterStatusSystem.excludedWorkers.clear(); masterStatusSystem.manuallyExcludedWorkers.clear(); - masterStatusSystem.workers.clear(); + masterStatusSystem.workersMap.clear(); masterStatusSystem.restoreMetaFromFile(tmpFile); - Assert.assertEquals(3, masterStatusSystem.workers.size()); + Assert.assertEquals(3, masterStatusSystem.workersMap.size()); Assert.assertEquals(3, masterStatusSystem.excludedWorkers.size()); Assert.assertEquals(2, masterStatusSystem.manuallyExcludedWorkers.size()); Assert.assertEquals(3, masterStatusSystem.hostnameSet.size()); @@ -260,7 +263,7 @@ public void testObjSerde() throws IOException, InterruptedException { Assert.assertArrayEquals(originSnapshots, masterStatusSystem.appDiskUsageMetric.snapShots()); masterStatusSystem.restoreMetaFromFile(tmpFile); - Assert.assertEquals(3, masterStatusSystem.workers.size()); + Assert.assertEquals(3, masterStatusSystem.workersMap.size()); } private String getNewReqeustId() { diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index 4f2fae78abe..e4170a02cf3 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -347,16 +347,16 @@ public void testHandleRegisterWorker() throws InterruptedException { getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(3, STATUSSYSTEM1.workers.size()); - Assert.assertEquals(3, STATUSSYSTEM2.workers.size()); - Assert.assertEquals(3, STATUSSYSTEM3.workers.size()); + Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size()); + Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size()); + Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size()); - assertWorkers(STATUSSYSTEM1.workers); - assertWorkers(STATUSSYSTEM2.workers); - assertWorkers(STATUSSYSTEM3.workers); + assertWorkers(STATUSSYSTEM1.workersMap.values()); + assertWorkers(STATUSSYSTEM2.workersMap.values()); + assertWorkers(STATUSSYSTEM3.workersMap.values()); } - private void assertWorkers(Set workerInfos) { + private void assertWorkers(Collection workerInfos) { for (WorkerInfo workerInfo : workerInfos) { assertWorker(workerInfo); } @@ -479,9 +479,9 @@ public void testHandleWorkerLost() throws InterruptedException { HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(2, STATUSSYSTEM1.workers.size()); - Assert.assertEquals(2, STATUSSYSTEM2.workers.size()); - Assert.assertEquals(2, STATUSSYSTEM3.workers.size()); + Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size()); + Assert.assertEquals(2, STATUSSYSTEM2.workersMap.size()); + Assert.assertEquals(2, STATUSSYSTEM3.workersMap.size()); } @Test @@ -571,21 +571,21 @@ public void testHandleRequestSlots() { Assert.assertEquals( 0, - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() .usedSlots()); Assert.assertEquals( 0, - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME2)) .findFirst() .get() .usedSlots()); Assert.assertEquals( 0, - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME3)) .findFirst() .get() @@ -632,22 +632,22 @@ public void testHandleReleaseSlots() throws InterruptedException { getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(3, STATUSSYSTEM1.workers.size()); - Assert.assertEquals(3, STATUSSYSTEM2.workers.size()); - Assert.assertEquals(3, STATUSSYSTEM3.workers.size()); + Assert.assertEquals(3, STATUSSYSTEM1.workersMap.size()); + Assert.assertEquals(3, STATUSSYSTEM2.workersMap.size()); + Assert.assertEquals(3, STATUSSYSTEM3.workersMap.size()); Map> workersToAllocate = new HashMap<>(); Map allocations = new HashMap<>(); allocations.put("disk1", 5); workersToAllocate.put( - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() .toUniqueId(), allocations); workersToAllocate.put( - statusSystem.workers.stream() + statusSystem.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME2)) .findFirst() .get() @@ -661,21 +661,21 @@ public void testHandleReleaseSlots() throws InterruptedException { Assert.assertEquals( 0, - STATUSSYSTEM1.workers.stream() + STATUSSYSTEM1.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() .usedSlots()); Assert.assertEquals( 0, - STATUSSYSTEM2.workers.stream() + STATUSSYSTEM2.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() .usedSlots()); Assert.assertEquals( 0, - STATUSSYSTEM3.workers.stream() + STATUSSYSTEM3.workersMap.values().stream() .filter(w -> w.host().equals(HOSTNAME1)) .findFirst() .get() @@ -1087,21 +1087,21 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { public void resetStatus() { STATUSSYSTEM1.registeredAppAndShuffles.clear(); STATUSSYSTEM1.hostnameSet.clear(); - STATUSSYSTEM1.workers.clear(); + STATUSSYSTEM1.workersMap.clear(); STATUSSYSTEM1.appHeartbeatTime.clear(); STATUSSYSTEM1.excludedWorkers.clear(); STATUSSYSTEM1.workerLostEvents.clear(); STATUSSYSTEM2.registeredAppAndShuffles.clear(); STATUSSYSTEM2.hostnameSet.clear(); - STATUSSYSTEM2.workers.clear(); + STATUSSYSTEM2.workersMap.clear(); STATUSSYSTEM2.appHeartbeatTime.clear(); STATUSSYSTEM2.excludedWorkers.clear(); STATUSSYSTEM2.workerLostEvents.clear(); STATUSSYSTEM3.registeredAppAndShuffles.clear(); STATUSSYSTEM3.hostnameSet.clear(); - STATUSSYSTEM3.workers.clear(); + STATUSSYSTEM3.workersMap.clear(); STATUSSYSTEM3.appHeartbeatTime.clear(); STATUSSYSTEM3.excludedWorkers.clear(); STATUSSYSTEM3.workerLostEvents.clear(); @@ -1280,7 +1280,7 @@ public void testHandleRemoveWorkersUnavailableInfo() throws InterruptedException statusSystem.handleReportWorkerUnavailable(unavailableWorkers, getNewReqeustId()); Thread.sleep(3000L); - Assert.assertEquals(2, STATUSSYSTEM1.workers.size()); + Assert.assertEquals(2, STATUSSYSTEM1.workersMap.size()); Assert.assertEquals(1, STATUSSYSTEM1.shutdownWorkers.size()); Assert.assertEquals(1, STATUSSYSTEM2.shutdownWorkers.size());