From df40a28959d6845e9ddb91a433eae4f062194b54 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 27 Oct 2023 11:20:28 +0800 Subject: [PATCH] [CELEBORN-1032][FOLLOWUP] Use scheduleWithFixedDelay instead of scheduleAtFixedRate in threads pool of master and worker ### What changes were proposed in this pull request? Use `scheduleWithFixedDelay` instead of `scheduleAtFixedRate` in thread pool of Celeborn Master and Worker. ### Why are the changes needed? Follow up #1970. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal tests. Closes #2048 from SteNicholas/CELEBORN-1032. Authored-by: SteNicholas Signed-off-by: zky.zhoukeyong --- .../apache/spark/shuffle/celeborn/SendBufferPool.java | 2 +- .../java/org/apache/celeborn/client/ReviveManager.java | 2 +- .../celeborn/client/ApplicationHeartbeater.scala | 2 +- .../celeborn/client/ChangePartitionManager.scala | 2 +- .../org/apache/celeborn/client/CommitManager.scala | 2 +- .../org/apache/celeborn/client/LifecycleManager.scala | 2 +- .../celeborn/client/ReleasePartitionManager.scala | 2 +- .../common/network/server/TransportChannelHandler.java | 2 +- .../celeborn/common/meta/AppDiskUsageMetric.scala | 2 +- .../apache/celeborn/service/deploy/master/Master.scala | 10 +++++----- .../apache/celeborn/service/deploy/worker/Worker.scala | 4 ++-- .../service/deploy/worker/storage/DeviceMonitor.scala | 2 +- .../service/deploy/worker/storage/StorageManager.scala | 4 ++-- 13 files changed, 19 insertions(+), 19 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java index 849694518da..9731a654b40 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/celeborn/SendBufferPool.java @@ -57,7 +57,7 @@ private SendBufferPool(int capacity, long checkInterval, long timeout) { pushTaskQueues = new LinkedList<>(); lastAquireTime = System.currentTimeMillis(); - cleaner.scheduleAtFixedRate( + cleaner.scheduleWithFixedDelay( () -> { if (System.currentTimeMillis() - lastAquireTime > timeout) { synchronized (this) { diff --git a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java index 7f8082a73ad..47d87878d46 100644 --- a/client/src/main/java/org/apache/celeborn/client/ReviveManager.java +++ b/client/src/main/java/org/apache/celeborn/client/ReviveManager.java @@ -46,7 +46,7 @@ public ReviveManager(ShuffleClientImpl shuffleClient, CelebornConf conf) { this.interval = conf.clientPushReviveInterval(); this.batchSize = conf.clientPushReviveBatchSize(); - batchReviveRequestScheduler.scheduleAtFixedRate( + batchReviveRequestScheduler.scheduleWithFixedDelay( () -> { Map> shuffleMap = new HashMap<>(); do { diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index 38dc2a47d9e..e117ec3e1a2 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -43,7 +43,7 @@ class ApplicationHeartbeater( private var appHeartbeat: ScheduledFuture[_] = _ def start(): Unit = { - appHeartbeat = appHeartbeatHandlerThread.scheduleAtFixedRate( + appHeartbeat = appHeartbeatHandlerThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { try { diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index ca3b2077eef..e7b1a45b2de 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -70,7 +70,7 @@ class ChangePartitionManager( def start(): Unit = { batchHandleChangePartition = batchHandleChangePartitionSchedulerThread.map { // noinspection ConvertExpressionToSAM - _.scheduleAtFixedRate( + _.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { try { diff --git a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala index 1322f02d871..91efcd30ec9 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CommitManager.scala @@ -94,7 +94,7 @@ class CommitManager(appUniqueId: String, val conf: CelebornConf, lifecycleManage lifecycleManager.registerWorkerStatusListener(new ShutdownWorkerListener) batchHandleCommitPartition = batchHandleCommitPartitionSchedulerThread.map { - _.scheduleAtFixedRate( + _.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { committedPartitionInfo.asScala.foreach { case (shuffleId, shuffleCommittedInfo) => diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 807c4a6601a..ea1517a21f9 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -147,7 +147,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends override def onStart(): Unit = { // noinspection ConvertExpressionToSAM - checkForShuffleRemoval = forwardMessageThread.scheduleAtFixedRate( + checkForShuffleRemoval = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(RemoveExpiredShuffle) diff --git a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala index e28b9a46a99..aabb965ae56 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ReleasePartitionManager.scala @@ -54,7 +54,7 @@ class ReleasePartitionManager( def start(): Unit = { batchHandleReleasePartition = batchHandleReleasePartitionSchedulerThread.map { // noinspection ConvertExpressionToSAM - _.scheduleAtFixedRate( + _.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { try { diff --git a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java index f0eae1a7ef0..01219516e09 100644 --- a/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java +++ b/common/src/main/java/org/apache/celeborn/common/network/server/TransportChannelHandler.java @@ -106,7 +106,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { if (enableHeartbeat) { heartbeatFuture = ctx.executor() - .scheduleAtFixedRate( + .scheduleWithFixedDelay( () -> { logger.debug("send heartbeat"); ctx.writeAndFlush(new Heartbeat()); diff --git a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala index 46b7452d523..f66756ba157 100644 --- a/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala +++ b/common/src/main/scala/org/apache/celeborn/common/meta/AppDiskUsageMetric.scala @@ -136,7 +136,7 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging { }) } - logExecutor.scheduleAtFixedRate( + logExecutor.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { if (currentSnapShot.get() != null) { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index a94989dd6e8..1e9c813e609 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -136,7 +136,7 @@ private[celeborn] class Master( conf.estimatedPartitionSizeForEstimationUpdateInterval private val partitionSizeUpdateService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("partition-size-updater") - partitionSizeUpdateService.scheduleAtFixedRate( + partitionSizeUpdateService.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { executeWithLeaderChecker( @@ -199,7 +199,7 @@ private[celeborn] class Master( // start threads to check timeout for workers and applications override def onStart(): Unit = { - checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate( + checkForWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(ControlMessages.pbCheckForWorkerTimeout) @@ -209,7 +209,7 @@ private[celeborn] class Master( workerHeartbeatTimeoutMs, TimeUnit.MILLISECONDS) - checkForApplicationTimeOutTask = forwardMessageThread.scheduleAtFixedRate( + checkForApplicationTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForApplicationTimeOut) @@ -219,7 +219,7 @@ private[celeborn] class Master( appHeartbeatTimeoutMs / 2, TimeUnit.MILLISECONDS) - checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate( + checkForUnavailableWorkerTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForWorkerUnavailableInfoTimeout) @@ -230,7 +230,7 @@ private[celeborn] class Master( TimeUnit.MILLISECONDS) if (hasHDFSStorage) { - checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleAtFixedRate( + checkForHDFSRemnantDirsTimeOutTask = forwardMessageThread.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { self.send(CheckForHDFSExpiredDirsTimeout) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 1c15efb4bb5..f958084379a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -372,7 +372,7 @@ private[celeborn] class Worker( registerWithMaster() // start heartbeat - sendHeartbeatTask = forwardMessageScheduler.scheduleAtFixedRate( + sendHeartbeatTask = forwardMessageScheduler.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { heartbeatToMaster() } }, @@ -380,7 +380,7 @@ private[celeborn] class Worker( heartbeatInterval, TimeUnit.MILLISECONDS) - checkFastFailTask = forwardMessageScheduler.scheduleAtFixedRate( + checkFastFailTask = forwardMessageScheduler.scheduleWithFixedDelay( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { unavailablePeers.entrySet().forEach { entry: JMap.Entry[WorkerInfo, Long] => diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala index 714529c589c..4681140b969 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala @@ -103,7 +103,7 @@ class LocalDeviceMonitor( } override def startCheck(): Unit = { - diskChecker.scheduleAtFixedRate( + diskChecker.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { logDebug("Device check start") diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 79390ee77af..56c82d02d63 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -214,7 +214,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs saveCommittedFileInfosExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor( "StorageManager-save-committed-fileinfo-thread") - saveCommittedFileInfosExecutor.scheduleAtFixedRate( + saveCommittedFileInfosExecutor.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { if (!committedFileInfos.isEmpty) { @@ -588,7 +588,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs private val storageScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("storage-scheduler") - storageScheduler.scheduleAtFixedRate( + storageScheduler.scheduleWithFixedDelay( new Runnable { override def run(): Unit = { try {