Skip to content

Commit

Permalink
[CELEBORN-1112] Inform celeborn application is shutdown, then celebor…
Browse files Browse the repository at this point in the history
…n cluster can release resource immediately

### What changes were proposed in this pull request?
Unregister application to Celeborn master After Application stopped, then master will expire the related shuffle resource immediately, resulting in resource savings.

### Why are the changes needed?
Currently Celeborn master expires the related application shuffle resource only when application is being checked timeout,
this would greatly delay the release of resources, which is not conducive to saving resources.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
PASS GA

Closes #2075 from RexXiong/CELEBORN-1112.

Authored-by: Shuang <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
RexXiong authored and waitinfuture committed Nov 8, 2023
1 parent b7e4dc4 commit 931880a
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.duration.DurationInt
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.protocol.message.ControlMessages.{HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
import org.apache.celeborn.common.util.{ThreadUtils, Utils}

Expand All @@ -36,8 +36,11 @@ class ApplicationHeartbeater(
shuffleMetrics: () => (Long, Long),
workerStatusTracker: WorkerStatusTracker) extends Logging {

private var stopped = false

// Use independent app heartbeat threads to avoid being blocked by other operations.
private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
private val applicationUnregisterEnabled = conf.applicationUnregisterEnabled
private val appHeartbeatHandlerThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("celeborn-app-heartbeat")
private var appHeartbeat: ScheduledFuture[_] = _
Expand Down Expand Up @@ -96,8 +99,31 @@ class ApplicationHeartbeater(
}
}

private def unregisterApplication(): Unit = {
try {
// Then unregister Application
val response = masterClient.askSync[ApplicationLostResponse](
ApplicationLost(appId),
classOf[ApplicationLostResponse])
logInfo(s"Unregister Application $appId with response status: ${response.status}")
} catch {
case e: Exception =>
logWarning("AskSync unRegisterApplication failed.", e)
}
}

def stop(): Unit = {
appHeartbeat.cancel(true)
ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis)
stopped.synchronized {
if (!stopped) {
// Stop appHeartbeat first
logInfo(s"Stop Application heartbeat $appId")
appHeartbeat.cancel(true)
ThreadUtils.shutdown(appHeartbeatHandlerThread, 800.millis)
if (applicationUnregisterEnabled) {
unregisterApplication()
}
stopped = true
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1202,4 +1202,12 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends

// Initialize at the end of LifecycleManager construction.
initialize()

/**
* A convenient method to stop [[RpcEndpoint]].
*/
override def stop(): Unit = {
heartbeater.stop()
super.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def appHeartbeatTimeoutMs: Long = get(APPLICATION_HEARTBEAT_TIMEOUT)
def hdfsExpireDirsTimeoutMS: Long = get(HDFS_EXPIRE_DIRS_TIMEOUT)
def appHeartbeatIntervalMs: Long = get(APPLICATION_HEARTBEAT_INTERVAL)
def applicationUnregisterEnabled: Boolean = get(APPLICATION_UNREGISTER_ENABLED)

def clientCheckedUseAllocatedWorkers: Boolean = get(CLIENT_CHECKED_USE_ALLOCATED_WORKERS)
def clientExcludedWorkerExpireTimeout: Long = get(CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT)
def clientExcludeReplicaOnFailureEnabled: Boolean =
Expand Down Expand Up @@ -2900,6 +2902,15 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10s")

val APPLICATION_UNREGISTER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.application.unregister.enabled")
.categories("client")
.version("0.3.2")
.doc("When true, Celeborn client will inform celeborn master the application is already shutdown during client " +
"exit, this allows the cluster to release resources immediately, resulting in resource savings.")
.booleanConf
.createWithDefault(true)

val CLIENT_EXCLUDE_PEER_WORKER_ON_FAILURE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.excludePeerWorkerOnFailure.enabled")
.categories("client")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ trait RpcEndpoint {
/**
* A convenient method to stop [[RpcEndpoint]].
*/
final def stop(): Unit = {
def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ license: |
| Key | Default | Description | Since |
| --- | ------- | ----------- | ----- |
| celeborn.client.application.heartbeatInterval | 10s | Interval for client to send heartbeat message to master. | 0.3.0 |
| celeborn.client.application.unregister.enabled | true | When true, Celeborn client will inform celeborn master the application is already shutdown during client exit, this allows the cluster to release resources immediately, resulting in resource savings. | 0.3.2 |
| celeborn.client.closeIdleConnections | true | Whether client will close idle connections. | 0.3.0 |
| celeborn.client.commitFiles.ignoreExcludedWorker | false | When true, LifecycleManager will skip workers which are in the excluded list. | 0.3.0 |
| celeborn.client.eagerlyCreateInputStream.threads | 32 | Threads count for streamCreatorPool in CelebornShuffleReader. | 0.3.1 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ private[celeborn] class Master(
handleUnregisterShuffle(context, applicationId, shuffleId, requestId))

case ApplicationLost(appId, requestId) =>
logDebug(s"Received ApplicationLost request $requestId, $appId.")
logDebug(
s"Received ApplicationLost request $requestId, $appId from ${context.senderAddress}.")
executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))

case HeartbeatFromWorker(
Expand Down

0 comments on commit 931880a

Please sign in to comment.