From 6a7d1a59a07da3d7450f4fc69971c1309c5a91db Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Wed, 1 Nov 2023 15:59:57 +0800 Subject: [PATCH] [CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake --- LICENSE-binary | 2 + .../apache/celeborn/common/CelebornConf.scala | 84 ++++++++ dev/deps/dependencies-client-flink-1.14 | 1 + dev/deps/dependencies-client-flink-1.15 | 1 + dev/deps/dependencies-client-flink-1.17 | 1 + dev/deps/dependencies-client-mr | 1 + dev/deps/dependencies-client-spark-2.4 | 1 + dev/deps/dependencies-client-spark-3.0 | 1 + dev/deps/dependencies-client-spark-3.1 | 1 + dev/deps/dependencies-client-spark-3.2 | 1 + dev/deps/dependencies-client-spark-3.3 | 1 + dev/deps/dependencies-client-spark-3.4 | 1 + dev/deps/dependencies-client-spark-3.5 | 1 + dev/deps/dependencies-server | 1 + docs/configuration/worker.md | 8 + licenses-binary/LISENCE-jdktools.txt | 19 ++ pom.xml | 8 + project/CelebornBuild.scala | 6 + .../service/deploy/worker/Worker.scala | 6 + .../deploy/worker/monitor/JVMQuake.scala | 199 ++++++++++++++++++ .../deploy/monitor/JVMQuakeSuite.scala | 70 ++++++ 21 files changed, 414 insertions(+) create mode 100644 licenses-binary/LISENCE-jdktools.txt create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala create mode 100644 worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala diff --git a/LICENSE-binary b/LICENSE-binary index 585c150c312..61838e78a2f 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -304,6 +304,8 @@ javax.servlet:javax.servlet-api MIT License ------------ +See license/LISENCE-jdktools.txt for details. +com.github.olivergondza:maven-jdk-tools-wrapper See license/LICENSE-slf4j.txt for details. org.slf4j:jul-to-slf4j org.slf4j:slf4j-api diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 52e2a414b07..679f4c1b069 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -671,6 +671,21 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED) def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED) def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX) + def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED) + def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL) + def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT) + def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED) + def workerJvmQuakeDumpPath: Option[String] = get(WORKER_JVM_QUAKE_DUMP_PATH) + + def workerJvmQuakeDumpThreshold: Duration = + getTimeAsMs( + WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, + WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond + def workerJvmQuakeKillThreshold: Duration = + getTimeAsMs( + WORKER_JVM_QUAKE_KILL_THRESHOLD.key, + WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond + def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE) // ////////////////////////////////////////////////////// // Metrics System // @@ -2835,6 +2850,75 @@ object CelebornConf extends Logging { .longConf .createOptional + val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.jvm.quake.enabled") + .categories("worker") + .version("0.3.2") + .doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " + + "which enables early detection of memory management issues and facilitates fast failure.") + .booleanConf + .createWithDefault(false) + + val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn.worker.jvm.quake.check.interval") + .categories("worker") + .version("0.3.2") + .doc("Interval of gc behavior checking for worker jvm quake.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] = + buildConf("celeborn.worker.jvm.quake.runtime.weight") + .categories("worker") + .version("0.3.2") + .doc( + "The factor by which to multiply running JVM time, when weighing it against GCing time. " + + "\"Deficit\" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold " + + "to determine whether to take action.") + .doubleConf + .createWithDefault(5) + + val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.worker.jvm.quake.dump.threshold") + .categories("worker") + .version("0.3.2") + .doc("The threshold of heap dump for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") + + val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.worker.jvm.quake.kill.threshold") + .categories("worker") + .version("0.3.2") + .doc("The threshold of system kill for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("60s") + + val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.jvm.quake.dump.enabled") + .categories("worker") + .version("0.3.2") + .doc("Whether to heap dump for the maximum GC \"deficit\" during worker jvm quake.") + .booleanConf + .createWithDefault(true) + + val WORKER_JVM_QUAKE_DUMP_PATH: OptionalConfigEntry[String] = + buildConf("celeborn.worker.jvm.quake.dump.path") + .categories("worker") + .version("0.3.2") + .doc("The path of heap dump for the maximum GC \"deficit\" during worker jvm quake, " + + "which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid].") + .stringConf + .createOptional + + val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] = + buildConf("celeborn.worker.jvm.quake.exit.code") + .categories("worker") + .version("0.3.2") + .doc("The exit code of system kill for the maximum GC \"deficit\" during worker jvm quake.") + .intConf + .createWithDefault(502) + val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] = buildConf("celeborn.client.application.heartbeatInterval") .withAlternative("celeborn.application.heartbeatInterval") diff --git a/dev/deps/dependencies-client-flink-1.14 b/dev/deps/dependencies-client-flink-1.14 index 7e8c870ff48..a6b65304e1a 100644 --- a/dev/deps/dependencies-client-flink-1.14 +++ b/dev/deps/dependencies-client-flink-1.14 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-flink-1.15 b/dev/deps/dependencies-client-flink-1.15 index 7e8c870ff48..a6b65304e1a 100644 --- a/dev/deps/dependencies-client-flink-1.15 +++ b/dev/deps/dependencies-client-flink-1.15 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-flink-1.17 b/dev/deps/dependencies-client-flink-1.17 index 7e8c870ff48..a6b65304e1a 100644 --- a/dev/deps/dependencies-client-flink-1.17 +++ b/dev/deps/dependencies-client-flink-1.17 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-mr b/dev/deps/dependencies-client-mr index 8f6a629affa..b93c8c918f6 100644 --- a/dev/deps/dependencies-client-mr +++ b/dev/deps/dependencies-client-mr @@ -121,6 +121,7 @@ kerby-util/1.0.1//kerby-util-1.0.1.jar kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-2.4 b/dev/deps/dependencies-client-spark-2.4 index b759797500a..85cc01ad372 100644 --- a/dev/deps/dependencies-client-spark-2.4 +++ b/dev/deps/dependencies-client-spark-2.4 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.4.0//lz4-java-1.4.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.0 b/dev/deps/dependencies-client-spark-3.0 index 5005ba525cf..1fd1889b9bf 100644 --- a/dev/deps/dependencies-client-spark-3.0 +++ b/dev/deps/dependencies-client-spark-3.0 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.7.1//lz4-java-1.7.1.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.1 b/dev/deps/dependencies-client-spark-3.1 index 6e8fd970315..94b649f6efa 100644 --- a/dev/deps/dependencies-client-spark-3.1 +++ b/dev/deps/dependencies-client-spark-3.1 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.7.1//lz4-java-1.7.1.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.2 b/dev/deps/dependencies-client-spark-3.2 index 6fbebf79d10..899fe568ec6 100644 --- a/dev/deps/dependencies-client-spark-3.2 +++ b/dev/deps/dependencies-client-spark-3.2 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.7.1//lz4-java-1.7.1.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.3 b/dev/deps/dependencies-client-spark-3.3 index 7e8c870ff48..a6b65304e1a 100644 --- a/dev/deps/dependencies-client-spark-3.3 +++ b/dev/deps/dependencies-client-spark-3.3 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.4 b/dev/deps/dependencies-client-spark-3.4 index c6a87cc07ac..90e3b1954ad 100644 --- a/dev/deps/dependencies-client-spark-3.4 +++ b/dev/deps/dependencies-client-spark-3.4 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.5 b/dev/deps/dependencies-client-spark-3.5 index 98abc8a5fb9..9f34bfdd677 100644 --- a/dev/deps/dependencies-client-spark-3.5 +++ b/dev/deps/dependencies-client-spark-3.5 @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index f93601cc352..38199368544 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -36,6 +36,7 @@ log4j-api/2.17.2//log4j-api-2.17.2.jar log4j-core/2.17.2//log4j-core-2.17.2.jar log4j-slf4j-impl/2.17.2//log4j-slf4j-impl-2.17.2.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 984be9f4fd4..6911feb280a 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -64,6 +64,14 @@ license: | | celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 | | celeborn.worker.http.host | <localhost> | Worker's http host. | 0.4.0 | | celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 | +| celeborn.worker.jvm.quake.check.interval | 1s | Interval of gc behavior checking for worker jvm quake. | 0.3.2 | +| celeborn.worker.jvm.quake.dump.enabled | true | Whether to heap dump for the maximum GC "deficit" during worker jvm quake. | 0.3.2 | +| celeborn.worker.jvm.quake.dump.path | <undefined> | The path of heap dump for the maximum GC "deficit" during worker jvm quake, which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid]. | 0.3.2 | +| celeborn.worker.jvm.quake.dump.threshold | 30s | The threshold of heap dump for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 | +| celeborn.worker.jvm.quake.enabled | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.3.2 | +| celeborn.worker.jvm.quake.exit.code | 502 | The exit code of system kill for the maximum GC "deficit" during worker jvm quake. | 0.3.2 | +| celeborn.worker.jvm.quake.kill.threshold | 60s | The threshold of system kill for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 | +| celeborn.worker.jvm.quake.runtime.weight | 5.0 | The factor by which to multiply running JVM time, when weighing it against GCing time. "Deficit" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold to determine whether to take action. | 0.3.2 | | celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device monitor to check disk. | 0.3.0 | | celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.3.0 | | celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | diff --git a/licenses-binary/LISENCE-jdktools.txt b/licenses-binary/LISENCE-jdktools.txt new file mode 100644 index 00000000000..32ce42631ee --- /dev/null +++ b/licenses-binary/LISENCE-jdktools.txt @@ -0,0 +1,19 @@ +Copyright (c) 2016 Red Hat, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pom.xml b/pom.xml index 0510f59c44c..c18c6fb24e0 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ -XX:+IgnoreUnrecognizedVMOptions + --add-exports=java.base/sun.jvmstat.monitor=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED @@ -1134,6 +1135,13 @@ 8 + + + com.github.olivergondza + maven-jdk-tools-wrapper + 0.1 + + diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index f27001ce473..910f67b3a2b 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -50,6 +50,7 @@ object Dependencies { val junitVersion = "4.13.2" val leveldbJniVersion = "1.8" val log4j2Version = "2.17.2" + val jdkToolsVersion = "0.1" val metricsVersion = "3.2.6" val mockitoVersion = "4.11.0" val nettyVersion = "4.1.93.Final" @@ -71,6 +72,7 @@ object Dependencies { val commonsIo = "commons-io" % "commons-io" % commonsIoVersion val commonsLang3 = "org.apache.commons" % "commons-lang3" % commonsLang3Version val commonsLogging = "commons-logging" % "commons-logging" % commonsLoggingVersion + val jdkTools = "com.github.olivergondza" % "maven-jdk-tools-wrapper" % jdkToolsVersion val findbugsJsr305 = "com.google.code.findbugs" % "jsr305" % findbugsVersion val guava = "com.google.guava" % "guava" % guavaVersion val hadoopClientApi = "org.apache.hadoop" % "hadoop-client-api" % hadoopVersion @@ -156,12 +158,14 @@ object CelebornCommonSettings { Dependencies.commonsCompress, Dependencies.commonsLogging, Dependencies.findbugsJsr305, + Dependencies.jdkTools, Dependencies.slf4jApi), // Make sure any tests in any project that uses Spark is configured for running well locally Test / javaOptions ++= Seq( "-Xmx4g", "-XX:+IgnoreUnrecognizedVMOptions", + "--add-exports=java.base/sun.jvmstat.monitor=ALL-UNNAMED", "--add-opens=java.base/java.lang=ALL-UNNAMED", "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", @@ -316,6 +320,7 @@ object CelebornCommon { Dependencies.commonsLang3, Dependencies.hadoopClientApi, Dependencies.hadoopClientRuntime, + Dependencies.jdkTools, Dependencies.ratisClient, Dependencies.ratisCommon, Dependencies.leveldbJniAll, @@ -429,6 +434,7 @@ object CelebornWorker { Dependencies.guava, Dependencies.commonsIo, Dependencies.ioNetty, + Dependencies.jdkTools, Dependencies.log4j12Api, Dependencies.log4jSlf4jImpl, Dependencies.leveldbJniAll, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index b3bef5688a7..d4c5129a234 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -51,6 +51,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_ import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager} import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState +import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager} private[celeborn] class Worker( @@ -271,6 +272,9 @@ private[celeborn] class Worker( private val userResourceConsumptions = JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() + private val jvmQuake = JVMQuake.create(conf) + jvmQuake.start() + workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () => workerInfo.getShuffleKeySet.size } @@ -430,6 +434,8 @@ private[celeborn] class Worker( if (!stopped) { logInfo("Stopping Worker.") + jvmQuake.stop() + if (sendHeartbeatTask != null) { if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { sendHeartbeatTask.cancel(false) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala new file mode 100644 index 00000000000..e1227c7e435 --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.monitor + +import java.io.File +import java.lang.management.ManagementFactory +import java.nio.file.Files +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import com.google.common.annotations.VisibleForTesting +import com.sun.management.HotSpotDiagnosticMXBean +import org.apache.commons.io.FileUtils +import sun.jvmstat.monitor._ + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.util.{ThreadUtils, Utils} + +/** + * The JVM quake provides granular monitoring of GC behavior, which enables early detection of memory management + * issues and facilitates fast failure. + * + * Note: The principle is in alignment with GC instability detection algorithm for jvmquake project of Netflix: + * https://github.com/Netflix-Skunkworks/jvmquake. + * + * @param conf Celeborn configuration with jvm quake config. + */ +class JVMQuake(conf: CelebornConf) extends Logging { + + import JVMQuake._ + + val dumpFile = "worker-quake-heapdump.hprof" + var heapDumped: Boolean = false + + private[this] val enabled = conf.workerJvmQuakeEnabled + private[this] val checkInterval = conf.workerJvmQuakeCheckInterval + private[this] val runtimeWeight = conf.workerJvmQuakeRuntimeWeight + private[this] val dumpThreshold = conf.workerJvmQuakeDumpThreshold.toNanos + private[this] val killThreshold = conf.workerJvmQuakeKillThreshold.toNanos + private[this] val dumpEnabled = conf.workerJvmQuakeDumpEnabled + private[this] val dumpPath = conf.workerJvmQuakeDumpPath + private[this] val exitCode = conf.workerJvmQuakeExitCode + private[this] val dumpDir = + s"${Utils.createDirectory(FileUtils.getTempDirectoryPath).getPath}/jvm-quake/dump" + + private[this] var lastExitTime: Long = 0L + private[this] var lastGCTime: Long = 0L + private[this] var bucket: Long = 0L + private[this] var scheduler: ScheduledExecutorService = _ + + def start(): Unit = { + if (enabled) { + lastExitTime = getLastExitTime + lastGCTime = getLastGCTime + scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake") + scheduler.scheduleWithFixedDelay( + new Runnable() { + override def run(): Unit = { + JVMQuake.this.run() + } + }, + 0, + checkInterval, + TimeUnit.MILLISECONDS) + } + } + + def stop(): Unit = { + if (enabled) { + scheduler.shutdown() + } + } + + private def run(): Unit = { + val currentExitTime = getLastExitTime + val currentGCTime = getLastGCTime + val gcTime = currentGCTime - lastGCTime + val runTime = currentExitTime - lastExitTime - gcTime + + bucket = Math.max(0, bucket + gcTime - BigDecimal(runTime * runtimeWeight).toLong) + logDebug(s"Time: (gc time: $gcTime, execution time: $runTime)") + logDebug( + s"Capacity: (bucket: $bucket, dump threshold: $dumpThreshold, kill threshold: $killThreshold)") + + if (bucket > dumpThreshold) { + logError(s"JVM GC has reached the threshold: bucket: $bucket, dumpThreshold: $dumpThreshold.") + if (shouldHeapDump) { + val savePath = getHeapDumpSavePath + val linkPath = getHeapDumpLinkPath + heapDump(savePath, linkPath) + } else if (bucket > killThreshold) { + System.exit(exitCode) + } + } + lastExitTime = currentExitTime + lastGCTime = currentGCTime + } + + def shouldHeapDump: Boolean = { + dumpEnabled && !heapDumped + } + + @VisibleForTesting + def getHeapDumpSavePath: String = + dumpPath.getOrElse(s"$dumpDir/$getProcessId/hprof") + + @VisibleForTesting + def getHeapDumpLinkPath: String = + s"$dumpDir/$getProcessId/link" + + private def heapDump(savePath: String, linkPath: String, live: Boolean = false): Unit = { + val saveDir = new File(savePath) + if (!saveDir.exists()) { + saveDir.mkdirs() + } + val heapDump = new File(saveDir, dumpFile) + if (heapDump.exists()) { + logInfo(s"Heap dump of worker exists: $heapDump.") + heapDumped = true + return + } + logInfo(s"Starting heap dump: $heapDump.") + ManagementFactory.newPlatformMXBeanProxy( + ManagementFactory.getPlatformMBeanServer, + "com.sun.management:type=HotSpotDiagnostic", + classOf[HotSpotDiagnosticMXBean]).dumpHeap(heapDump.getAbsolutePath, live) + val linkDir = new File(linkPath) + if (linkDir.exists()) { + logInfo(s"Symbolic link of heap dump exists: $linkPath.") + } else if (!linkDir.getParentFile.exists()) { + linkDir.getParentFile.mkdirs() + } + try { + Files.createSymbolicLink(linkDir.toPath, saveDir.toPath) + logInfo(s"Created symbolic link: $linkPath.") + } catch { + case e: Exception => + logError("Create symbolic link failed.", e) + } finally { + heapDumped = true + logInfo(s"Finished heap dump: $dumpFile.") + } + } +} + +object JVMQuake { + + private[this] var quake: JVMQuake = _ + + def create(conf: CelebornConf): JVMQuake = { + set(new JVMQuake(conf)) + quake + } + + def get: JVMQuake = { + quake + } + + def set(quake: JVMQuake): Unit = { + this.quake = quake + } + + private[this] lazy val monitoredVm: MonitoredVm = { + val host = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost")) + host.getMonitoredVm(new VmIdentifier( + "local://%s@localhost".format(getProcessId))) + } + + private[this] lazy val ygcExitTimeMonitor: Monitor = + monitoredVm.findByName("sun.gc.collector.0.lastExitTime") + private[this] lazy val fgcExitTimeMonitor: Monitor = + monitoredVm.findByName("sun.gc.collector.1.lastExitTime") + private[this] lazy val ygcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.0.time") + private[this] lazy val fgcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.1.time") + + private def getLastExitTime: Long = Math.max( + ygcExitTimeMonitor.getValue.asInstanceOf[Long], + fgcExitTimeMonitor.getValue.asInstanceOf[Long]) + + private def getLastGCTime: Long = + ygcTimeMonitor.getValue.asInstanceOf[Long] + fgcTimeMonitor.getValue.asInstanceOf[Long] + + private def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala new file mode 100644 index 00000000000..dc508049353 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/monitor/JVMQuakeSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.monitor + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.junit.Assert.assertTrue + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf._ +import org.apache.celeborn.common.util.JavaUtils +import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake + +class JVMQuakeSuite extends CelebornFunSuite { + + private val allocation = new ArrayBuffer[Array[Byte]]() + + override def afterEach(): Unit = { + allocation.clear() + System.gc() + } + + test("[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake") { + val quake = new JVMQuake(new CelebornConf().set(WORKER_JVM_QUAKE_ENABLED.key, "true") + .set(WORKER_JVM_QUAKE_RUNTIME_WEIGHT.key, "1") + .set(WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, "1s") + .set(WORKER_JVM_QUAKE_KILL_THRESHOLD.key, "2s")) + quake.start() + allocateMemory(quake) + quake.stop() + + assertTrue(quake.heapDumped) + val heapDump = new File(s"${quake.getHeapDumpSavePath}/${quake.dumpFile}") + assert(heapDump.exists()) + JavaUtils.deleteRecursively(heapDump) + JavaUtils.deleteRecursively(new File(quake.getHeapDumpLinkPath)) + } + + def allocateMemory(quake: JVMQuake): Unit = { + val capacity = 1024 * 100 + while (allocation.size * capacity < Runtime.getRuntime.maxMemory / 4) { + val bytes = new Array[Byte](capacity) + allocation.append(bytes) + } + while (quake.shouldHeapDump) { + for (index <- allocation.indices) { + val bytes = new Array[Byte](capacity) + allocation(index) = bytes + } + } + } +}