From 5f5f01b09eab82876a823033824e44c8f7171e56 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 1 Nov 2023 15:59:57 +0800 Subject: [PATCH] [CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake --- .../apache/celeborn/common/CelebornConf.scala | 84 ++++++++ dev/deps/dependencies-server | 1 + docs/configuration/worker.md | 8 + pom.xml | 8 + .../service/deploy/worker/Worker.scala | 6 + .../deploy/worker/monitor/JVMQuake.scala | 199 ++++++++++++++++++ .../deploy/monitor/JVMQuakeSuite.scala | 70 ++++++ 7 files changed, 376 insertions(+) create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala create mode 100644 worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 52e2a414b07..679f4c1b069 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -671,6 +671,21 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED) def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED) def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX) + def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED) + def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL) + def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT) + def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED) + def workerJvmQuakeDumpPath: Option[String] = get(WORKER_JVM_QUAKE_DUMP_PATH) + + def workerJvmQuakeDumpThreshold: Duration = + getTimeAsMs( + WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, + WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond + def workerJvmQuakeKillThreshold: Duration = + getTimeAsMs( + WORKER_JVM_QUAKE_KILL_THRESHOLD.key, + WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond + def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE) // ////////////////////////////////////////////////////// // Metrics System // @@ -2835,6 +2850,75 @@ object CelebornConf extends Logging { .longConf .createOptional + val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.jvm.quake.enabled") + .categories("worker") + .version("0.3.2") + .doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " + + "which enables early detection of memory management issues and facilitates fast failure.") + .booleanConf + .createWithDefault(false) + + val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn.worker.jvm.quake.check.interval") + .categories("worker") + .version("0.3.2") + .doc("Interval of gc behavior checking for worker jvm quake.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] = + buildConf("celeborn.worker.jvm.quake.runtime.weight") + .categories("worker") + .version("0.3.2") + .doc( + "The factor by which to multiply running JVM time, when weighing it against GCing time. " + + "\"Deficit\" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold " + + "to determine whether to take action.") + .doubleConf + .createWithDefault(5) + + val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.worker.jvm.quake.dump.threshold") + .categories("worker") + .version("0.3.2") + .doc("The threshold of heap dump for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") + + val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.worker.jvm.quake.kill.threshold") + .categories("worker") + .version("0.3.2") + .doc("The threshold of system kill for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("60s") + + val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.jvm.quake.dump.enabled") + .categories("worker") + .version("0.3.2") + .doc("Whether to heap dump for the maximum GC \"deficit\" during worker jvm quake.") + .booleanConf + .createWithDefault(true) + + val WORKER_JVM_QUAKE_DUMP_PATH: OptionalConfigEntry[String] = + buildConf("celeborn.worker.jvm.quake.dump.path") + .categories("worker") + .version("0.3.2") + .doc("The path of heap dump for the maximum GC \"deficit\" during worker jvm quake, " + + "which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid].") + .stringConf + .createOptional + + val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] = + buildConf("celeborn.worker.jvm.quake.exit.code") + .categories("worker") + .version("0.3.2") + .doc("The exit code of system kill for the maximum GC \"deficit\" during worker jvm quake.") + .intConf + .createWithDefault(502) + val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] = buildConf("celeborn.client.application.heartbeatInterval") .withAlternative("celeborn.application.heartbeatInterval") diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index f93601cc352..38199368544 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -36,6 +36,7 @@ log4j-api/2.17.2//log4j-api-2.17.2.jar log4j-core/2.17.2//log4j-core-2.17.2.jar log4j-slf4j-impl/2.17.2//log4j-slf4j-impl-2.17.2.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 984be9f4fd4..6911feb280a 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -64,6 +64,14 @@ license: | | celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 | | celeborn.worker.http.host | <localhost> | Worker's http host. | 0.4.0 | | celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 | +| celeborn.worker.jvm.quake.check.interval | 1s | Interval of gc behavior checking for worker jvm quake. | 0.3.2 | +| celeborn.worker.jvm.quake.dump.enabled | true | Whether to heap dump for the maximum GC "deficit" during worker jvm quake. | 0.3.2 | +| celeborn.worker.jvm.quake.dump.path | <undefined> | The path of heap dump for the maximum GC "deficit" during worker jvm quake, which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid]. | 0.3.2 | +| celeborn.worker.jvm.quake.dump.threshold | 30s | The threshold of heap dump for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 | +| celeborn.worker.jvm.quake.enabled | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.3.2 | +| celeborn.worker.jvm.quake.exit.code | 502 | The exit code of system kill for the maximum GC "deficit" during worker jvm quake. | 0.3.2 | +| celeborn.worker.jvm.quake.kill.threshold | 60s | The threshold of system kill for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 | +| celeborn.worker.jvm.quake.runtime.weight | 5.0 | The factor by which to multiply running JVM time, when weighing it against GCing time. "Deficit" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold to determine whether to take action. | 0.3.2 | | celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device monitor to check disk. | 0.3.0 | | celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.3.0 | | celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | diff --git a/pom.xml b/pom.xml index 0510f59c44c..a89b69ab809 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,7 @@ --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED + --add-opens=java.base/sun.jvmstat.monitor=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED @@ -1134,6 +1135,13 @@ 8 + + + com.github.olivergondza + maven-jdk-tools-wrapper + 0.1 + + diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index b3bef5688a7..d4c5129a234 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -51,6 +51,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_ import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager} import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState +import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager} private[celeborn] class Worker( @@ -271,6 +272,9 @@ private[celeborn] class Worker( private val userResourceConsumptions = JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() + private val jvmQuake = JVMQuake.create(conf) + jvmQuake.start() + workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () => workerInfo.getShuffleKeySet.size } @@ -430,6 +434,8 @@ private[celeborn] class Worker( if (!stopped) { logInfo("Stopping Worker.") + jvmQuake.stop() + if (sendHeartbeatTask != null) { if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { sendHeartbeatTask.cancel(false) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala new file mode 100644 index 00000000000..e1227c7e435 --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.monitor + +import java.io.File +import java.lang.management.ManagementFactory +import java.nio.file.Files +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import com.google.common.annotations.VisibleForTesting +import com.sun.management.HotSpotDiagnosticMXBean +import org.apache.commons.io.FileUtils +import sun.jvmstat.monitor._ + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.util.{ThreadUtils, Utils} + +/** + * The JVM quake provides granular monitoring of GC behavior, which enables early detection of memory management + * issues and facilitates fast failure. + * + * Note: The principle is in alignment with GC instability detection algorithm for jvmquake project of Netflix: + * https://github.com/Netflix-Skunkworks/jvmquake. + * + * @param conf Celeborn configuration with jvm quake config. + */ +class JVMQuake(conf: CelebornConf) extends Logging { + + import JVMQuake._ + + val dumpFile = "worker-quake-heapdump.hprof" + var heapDumped: Boolean = false + + private[this] val enabled = conf.workerJvmQuakeEnabled + private[this] val checkInterval = conf.workerJvmQuakeCheckInterval + private[this] val runtimeWeight = conf.workerJvmQuakeRuntimeWeight + private[this] val dumpThreshold = conf.workerJvmQuakeDumpThreshold.toNanos + private[this] val killThreshold = conf.workerJvmQuakeKillThreshold.toNanos + private[this] val dumpEnabled = conf.workerJvmQuakeDumpEnabled + private[this] val dumpPath = conf.workerJvmQuakeDumpPath + private[this] val exitCode = conf.workerJvmQuakeExitCode + private[this] val dumpDir = + s"${Utils.createDirectory(FileUtils.getTempDirectoryPath).getPath}/jvm-quake/dump" + + private[this] var lastExitTime: Long = 0L + private[this] var lastGCTime: Long = 0L + private[this] var bucket: Long = 0L + private[this] var scheduler: ScheduledExecutorService = _ + + def start(): Unit = { + if (enabled) { + lastExitTime = getLastExitTime + lastGCTime = getLastGCTime + scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake") + scheduler.scheduleWithFixedDelay( + new Runnable() { + override def run(): Unit = { + JVMQuake.this.run() + } + }, + 0, + checkInterval, + TimeUnit.MILLISECONDS) + } + } + + def stop(): Unit = { + if (enabled) { + scheduler.shutdown() + } + } + + private def run(): Unit = { + val currentExitTime = getLastExitTime + val currentGCTime = getLastGCTime + val gcTime = currentGCTime - lastGCTime + val runTime = currentExitTime - lastExitTime - gcTime + + bucket = Math.max(0, bucket + gcTime - BigDecimal(runTime * runtimeWeight).toLong) + logDebug(s"Time: (gc time: $gcTime, execution time: $runTime)") + logDebug( + s"Capacity: (bucket: $bucket, dump threshold: $dumpThreshold, kill threshold: $killThreshold)") + + if (bucket > dumpThreshold) { + logError(s"JVM GC has reached the threshold: bucket: $bucket, dumpThreshold: $dumpThreshold.") + if (shouldHeapDump) { + val savePath = getHeapDumpSavePath + val linkPath = getHeapDumpLinkPath + heapDump(savePath, linkPath) + } else if (bucket > killThreshold) { + System.exit(exitCode) + } + } + lastExitTime = currentExitTime + lastGCTime = currentGCTime + } + + def shouldHeapDump: Boolean = { + dumpEnabled && !heapDumped + } + + @VisibleForTesting + def getHeapDumpSavePath: String = + dumpPath.getOrElse(s"$dumpDir/$getProcessId/hprof") + + @VisibleForTesting + def getHeapDumpLinkPath: String = + s"$dumpDir/$getProcessId/link" + + private def heapDump(savePath: String, linkPath: String, live: Boolean = false): Unit = { + val saveDir = new File(savePath) + if (!saveDir.exists()) { + saveDir.mkdirs() + } + val heapDump = new File(saveDir, dumpFile) + if (heapDump.exists()) { + logInfo(s"Heap dump of worker exists: $heapDump.") + heapDumped = true + return + } + logInfo(s"Starting heap dump: $heapDump.") + ManagementFactory.newPlatformMXBeanProxy( + ManagementFactory.getPlatformMBeanServer, + "com.sun.management:type=HotSpotDiagnostic", + classOf[HotSpotDiagnosticMXBean]).dumpHeap(heapDump.getAbsolutePath, live) + val linkDir = new File(linkPath) + if (linkDir.exists()) { + logInfo(s"Symbolic link of heap dump exists: $linkPath.") + } else if (!linkDir.getParentFile.exists()) { + linkDir.getParentFile.mkdirs() + } + try { + Files.createSymbolicLink(linkDir.toPath, saveDir.toPath) + logInfo(s"Created symbolic link: $linkPath.") + } catch { + case e: Exception => + logError("Create symbolic link failed.", e) + } finally { + heapDumped = true + logInfo(s"Finished heap dump: $dumpFile.") + } + } +} + +object JVMQuake { + + private[this] var quake: JVMQuake = _ + + def create(conf: CelebornConf): JVMQuake = { + set(new JVMQuake(conf)) + quake + } + + def get: JVMQuake = { + quake + } + + def set(quake: JVMQuake): Unit = { + this.quake = quake + } + + private[this] lazy val monitoredVm: MonitoredVm = { + val host = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost")) + host.getMonitoredVm(new VmIdentifier( + "local://%s@localhost".format(getProcessId))) + } + + private[this] lazy val ygcExitTimeMonitor: Monitor = + monitoredVm.findByName("sun.gc.collector.0.lastExitTime") + private[this] lazy val fgcExitTimeMonitor: Monitor = + monitoredVm.findByName("sun.gc.collector.1.lastExitTime") + private[this] lazy val ygcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.0.time") + private[this] lazy val fgcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.1.time") + + private def getLastExitTime: Long = Math.max( + ygcExitTimeMonitor.getValue.asInstanceOf[Long], + fgcExitTimeMonitor.getValue.asInstanceOf[Long]) + + private def getLastGCTime: Long = + ygcTimeMonitor.getValue.asInstanceOf[Long] + fgcTimeMonitor.getValue.asInstanceOf[Long] + + private def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala new file mode 100644 index 00000000000..dc508049353 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.monitor + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.junit.Assert.assertTrue + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf._ +import org.apache.celeborn.common.util.JavaUtils +import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake + +class JVMQuakeSuite extends CelebornFunSuite { + + private val allocation = new ArrayBuffer[Array[Byte]]() + + override def afterEach(): Unit = { + allocation.clear() + System.gc() + } + + test("[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake") { + val quake = new JVMQuake(new CelebornConf().set(WORKER_JVM_QUAKE_ENABLED.key, "true") + .set(WORKER_JVM_QUAKE_RUNTIME_WEIGHT.key, "1") + .set(WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, "1s") + .set(WORKER_JVM_QUAKE_KILL_THRESHOLD.key, "2s")) + quake.start() + allocateMemory(quake) + quake.stop() + + assertTrue(quake.heapDumped) + val heapDump = new File(s"${quake.getHeapDumpSavePath}/${quake.dumpFile}") + assert(heapDump.exists()) + JavaUtils.deleteRecursively(heapDump) + JavaUtils.deleteRecursively(new File(quake.getHeapDumpLinkPath)) + } + + def allocateMemory(quake: JVMQuake): Unit = { + val capacity = 1024 * 100 + while (allocation.size * capacity < Runtime.getRuntime.maxMemory / 4) { + val bytes = new Array[Byte](capacity) + allocation.append(bytes) + } + while (quake.shouldHeapDump) { + for (index <- allocation.indices) { + val bytes = new Array[Byte](capacity) + allocation(index) = bytes + } + } + } +}