From 931880a82d1024158f0e888d4377f73f8a07effb Mon Sep 17 00:00:00 2001 From: Shuang Date: Wed, 8 Nov 2023 20:46:51 +0800 Subject: [PATCH] [CELEBORN-1112] Inform celeborn application is shutdown, then celeborn cluster can release resource immediately ### What changes were proposed in this pull request? Unregister application to Celeborn master After Application stopped, then master will expire the related shuffle resource immediately, resulting in resource savings. ### Why are the changes needed? Currently Celeborn master expires the related application shuffle resource only when application is being checked timeout, this would greatly delay the release of resources, which is not conducive to saving resources. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? PASS GA Closes #2075 from RexXiong/CELEBORN-1112. Authored-by: Shuang Signed-off-by: zky.zhoukeyong --- .../client/ApplicationHeartbeater.scala | 32 +++++++++++++++++-- .../celeborn/client/LifecycleManager.scala | 8 +++++ .../apache/celeborn/common/CelebornConf.scala | 11 +++++++ .../celeborn/common/rpc/RpcEndpoint.scala | 2 +- docs/configuration/client.md | 1 + .../service/deploy/master/Master.scala | 3 +- 6 files changed, 52 insertions(+), 5 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index e117ec3e1a2..67692da7469 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration.DurationInt import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.internal.Logging -import org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID} +import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID} import org.apache.celeborn.common.protocol.message.StatusCode import org.apache.celeborn.common.util.{ThreadUtils, Utils} @@ -36,8 +36,11 @@ class ApplicationHeartbeater( shuffleMetrics: () => (Long, Long), workerStatusTracker: WorkerStatusTracker) extends Logging { + private var stopped = false + // Use independent app heartbeat threads to avoid being blocked by other operations. private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs + private val applicationUnregisterEnabled = conf.applicationUnregisterEnabled private val appHeartbeatHandlerThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-app-heartbeat") private var appHeartbeat: ScheduledFuture[_] = _ @@ -96,8 +99,31 @@ class ApplicationHeartbeater( } } + private def unregisterApplication(): Unit = { + try { + // Then unregister Application + val response = masterClient.askSync[ApplicationLostResponse]( + ApplicationLost(appId), + classOf[ApplicationLostResponse]) + logInfo(s"Unregister Application $appId with response status: ${response.status}") + } catch { + case e: Exception => + logWarning("AskSync unRegisterApplication failed.", e) + } + } + def stop(): Unit = { - appHeartbeat.cancel(true) - ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis) + stopped.synchronized { + if (!stopped) { + // Stop appHeartbeat first + logInfo(s"Stop Application heartbeat $appId") + appHeartbeat.cancel(true) + ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis) + if (applicationUnregisterEnabled) { + unregisterApplication() + } + stopped = true + } + } } } diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 9f1513c327d..9f0ca9da47e 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -1202,4 +1202,12 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // Initialize at the end of LifecycleManager construction. initialize() + + /** + * A convenient method to stop [[RpcEndpoint]]. + */ + override def stop(): Unit = { + heartbeater.stop() + super.stop() + } } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 06e815f9814..2e31f78277d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -729,6 +729,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT) def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT) def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL) + def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED) + def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS) def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT) def clientExcludeReplicaOnFailureEnabled: Boolean = @@ -2900,6 +2902,15 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("10s") + val APPLICATION_UNREGISTER_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.application.unregister.enabled") + .categories("client") + .version("0.3.2") + .doc("When true, Celeborn client will inform celeborn master the application is already shutdown during client " + + "exit, this allows the cluster to release resources immediately, resulting in resource savings.") + .booleanConf + .createWithDefault(true) + val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled") .categories("client") diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala index 4dc466af993..a0e0803963d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala @@ -128,7 +128,7 @@ trait RpcEndpoint { /** * A convenient method to stop [[RpcEndpoint]]. */ - final def stop(): Unit = { + def stop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index f16e43193da..1c1d4e7a620 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -20,6 +20,7 @@ license: | | Key | Default | Description | Since | | --- | ------- | ----------- | ----- | | celeborn.client.application.heartbeatInterval | 10s | Interval for client to send heartbeat message to master. | 0.3.0 | +| celeborn.client.application.unregister.enabled | true | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 | | celeborn.client.closeIdleConnections | true | Whether client will close idle connections. | 0.3.0 | | celeborn.client.commitFiles.ignoreExcludedWorker | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 | | celeborn.client.eagerlyCreateInputStream.threads | 32 | Threads count for streamCreatorPool in CelebornShuffleReader. | 0.3.1 | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 9f2d5d26b06..1c03903a748 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -372,7 +372,8 @@ private[celeborn] class Master( handleUnregisterShuffle(context, applicationId, shuffleId, requestId)) case ApplicationLost(appId, requestId) => - logDebug(s"Received ApplicationLost request $requestId, $appId.") + logDebug( + s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.") executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId)) case HeartbeatFromWorker(