From 4dfcd9b56b9343594de682a219fd5ae2de7e3fd5 Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Tue, 28 Nov 2023 20:45:08 +0800 Subject: [PATCH] [CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake ### What changes were proposed in this pull request? Introduce JVM monitoring in Celeborn Worker using JVMQuake to enable early detection of memory management issues and facilitate fast failure. ### Why are the changes needed? When facing out-of-control memory management in Celeborn Worker we typically use JVMkill as a remedy by killing the process and generating a heap dump for post-analysis. However, even with jvmkill protection, we may still encounter issues caused by JVM running out of memory, such as repeated execution of Full GC without performing any useful work during the pause time. Since the JVM does not exhaust 100% of resources, JVMkill will not be triggered. Therefore JVMQuake is introduced to provide more granular monitoring of GC behavior, enabling early detection of memory management issues and facilitating fast failure. Refers to the principle of [jvmquake](https://github.com/Netflix-Skunkworks/jvmquake) which is a JVMTI agent that attaches to your JVM and automatically signals and kills it when the program has become unstable. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? `JVMQuakeSuite` Closes #2061 from SteNicholas/CELEBORN-1092. Authored-by: SteNicholas Signed-off-by: Fu Chen --- LICENSE-binary | 2 + .../apache/celeborn/common/CelebornConf.scala | 86 ++++++++ .../apache/celeborn/common/util/Utils.scala | 2 + dev/dependencies.sh | 8 +- dev/deps/dependencies-client-flink-1.14 | 1 + dev/deps/dependencies-client-flink-1.15 | 1 + dev/deps/dependencies-client-flink-1.17 | 1 + dev/deps/dependencies-client-flink-1.18 | 1 + dev/deps/dependencies-client-mr | 1 + dev/deps/dependencies-client-spark-2.4 | 1 + dev/deps/dependencies-client-spark-3.0 | 1 + dev/deps/dependencies-client-spark-3.1 | 1 + dev/deps/dependencies-client-spark-3.2 | 1 + dev/deps/dependencies-client-spark-3.3 | 1 + dev/deps/dependencies-client-spark-3.4 | 1 + dev/deps/dependencies-client-spark-3.5 | 1 + dev/deps/dependencies-server | 1 + docs/configuration/worker.md | 8 + licenses-binary/LISENCE-jdktools.txt | 21 ++ pom.xml | 8 + project/CelebornBuild.scala | 4 + project/JDKTools.scala | 77 +++++++ .../service/deploy/worker/Worker.scala | 10 + .../deploy/worker/monitor/JVMQuake.scala | 197 ++++++++++++++++++ .../deploy/worker/monitor/JVMQuakeSuite.scala | 69 ++++++ 25 files changed, 501 insertions(+), 4 deletions(-) create mode 100644 licenses-binary/LISENCE-jdktools.txt create mode 100644 project/JDKTools.scala create mode 100644 worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala create mode 100644 worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala diff --git a/LICENSE-binary b/LICENSE-binary index 585c150c312..61838e78a2f 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -304,6 +304,8 @@ javax.servlet:javax.servlet-api MIT License ------------ +See license/LISENCE-jdktools.txt for details. +com.github.olivergondza:maven-jdk-tools-wrapper See license/LICENSE-slf4j.txt for details. org.slf4j:jul-to-slf4j org.slf4j:slf4j-api diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 247727df210..422d0f55521 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -689,6 +689,21 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED) def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED) def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX) + def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED) + def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL) + def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT) + def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED) + def workerJvmQuakeDumpPath: String = get(WORKER_JVM_QUAKE_DUMP_PATH) + + def workerJvmQuakeDumpThreshold: Duration = + getTimeAsMs( + WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, + WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond + def workerJvmQuakeKillThreshold: Duration = + getTimeAsMs( + WORKER_JVM_QUAKE_KILL_THRESHOLD.key, + WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond + def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE) // ////////////////////////////////////////////////////// // Metrics System // @@ -2896,6 +2911,77 @@ object CelebornConf extends Logging { .longConf .createOptional + val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.jvmQuake.enabled") + .categories("worker") + .version("0.4.0") + .doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " + + "which enables early detection of memory management issues and facilitates fast failure.") + .booleanConf + .createWithDefault(false) + + val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] = + buildConf("celeborn.worker.jvmQuake.check.interval") + .categories("worker") + .version("0.4.0") + .doc("Interval of gc behavior checking for worker jvm quake.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] = + buildConf("celeborn.worker.jvmQuake.runtimeWeight") + .categories("worker") + .version("0.4.0") + .doc( + "The factor by which to multiply running JVM time, when weighing it against GCing time. " + + "'Deficit' is accumulated as `gc_time - runtime * runtime_weight`, and is compared against threshold " + + "to determine whether to take action.") + .doubleConf + .createWithDefault(5) + + val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.worker.jvmQuake.dump.threshold") + .categories("worker") + .version("0.4.0") + .doc("The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. " + + "Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") + + val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] = + buildConf("celeborn.worker.jvmQuake.kill.threshold") + .categories("worker") + .version("0.4.0") + .doc("The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("60s") + + val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.jvmQuake.dump.enabled") + .categories("worker") + .version("0.4.0") + .doc("Whether to heap dump for the maximum GC 'deficit' during worker jvm quake.") + .booleanConf + .createWithDefault(true) + + val WORKER_JVM_QUAKE_DUMP_PATH: ConfigEntry[String] = + buildConf("celeborn.worker.jvmQuake.dump.path") + .categories("worker") + .version("0.4.0") + .doc("The path of heap dump for the maximum GC 'deficit' during worker jvm quake.") + .stringConf + .transform(_.replace("", System.getProperty("java.io.tmpdir")) + .replace("", Utils.getProcessId)) + .createWithDefault(s"/jvm-quake/dump/") + + val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] = + buildConf("celeborn.worker.jvmQuake.exitCode") + .categories("worker") + .version("0.4.0") + .doc("The exit code of system kill for the maximum GC 'deficit' during worker jvm quake.") + .intConf + .createWithDefault(502) + val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] = buildConf("celeborn.client.application.heartbeatInterval") .withAlternative("celeborn.application.heartbeatInterval") diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 5704f3f88cc..47c26ae09dc 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -1091,4 +1091,6 @@ object Utils extends Logging { } labelPart(0).trim -> labelPart(1).trim } + + def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0) } diff --git a/dev/dependencies.sh b/dev/dependencies.sh index 73c6c41c462..ae44b2e4516 100755 --- a/dev/dependencies.sh +++ b/dev/dependencies.sh @@ -56,9 +56,9 @@ function mvn_build_classpath() { } function sbt_build_client_classpath() { - PATTERN="$SBT_PROJECT / Runtime / externalDependencyClasspath" + PATTERN="$SBT_PROJECT / Runtime / managedClasspath" deps=$( - $SBT -P$MODULE "clean; export Runtime/externalDependencyClasspath" | \ + $SBT -P$MODULE "clean; export Runtime/managedClasspath" | \ awk -v pat="$PATTERN" '$0 ~ pat { found=1 } found { print }' | \ awk 'NR==2' | \ tr ":" "\n" @@ -96,8 +96,8 @@ function sbt_build_client_classpath() { } function sbt_build_server_classpath() { - $SBT "error; clean; export externalDependencyClasspath" | \ - awk '/externalDependencyClasspath/ { found=1 } found { print }' | \ + $SBT "error; clean; export managedClasspath" | \ + awk '/managedClasspath/ { found=1 } found { print }' | \ awk 'NR % 2 == 0' | \ # This will skip the last line sed '$d' | \ diff --git a/dev/deps/dependencies-client-flink-1.14 b/dev/deps/dependencies-client-flink-1.14 index 456250bb961..be6b7d17fb4 100644 --- a/dev/deps/dependencies-client-flink-1.14 +++ b/dev/deps/dependencies-client-flink-1.14 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-flink-1.15 b/dev/deps/dependencies-client-flink-1.15 index 456250bb961..be6b7d17fb4 100644 --- a/dev/deps/dependencies-client-flink-1.15 +++ b/dev/deps/dependencies-client-flink-1.15 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-flink-1.17 b/dev/deps/dependencies-client-flink-1.17 index 456250bb961..be6b7d17fb4 100644 --- a/dev/deps/dependencies-client-flink-1.17 +++ b/dev/deps/dependencies-client-flink-1.17 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-flink-1.18 b/dev/deps/dependencies-client-flink-1.18 index 456250bb961..be6b7d17fb4 100644 --- a/dev/deps/dependencies-client-flink-1.18 +++ b/dev/deps/dependencies-client-flink-1.18 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-mr b/dev/deps/dependencies-client-mr index cc8ef1ade7c..b3764b2eaf7 100644 --- a/dev/deps/dependencies-client-mr +++ b/dev/deps/dependencies-client-mr @@ -122,6 +122,7 @@ kerby-util/1.0.1//kerby-util-1.0.1.jar kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-2.4 b/dev/deps/dependencies-client-spark-2.4 index c439d7e08fe..9beaa1f8ba7 100644 --- a/dev/deps/dependencies-client-spark-2.4 +++ b/dev/deps/dependencies-client-spark-2.4 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.4.0//lz4-java-1.4.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.0 b/dev/deps/dependencies-client-spark-3.0 index 8ad78eccb8a..03618f785a4 100644 --- a/dev/deps/dependencies-client-spark-3.0 +++ b/dev/deps/dependencies-client-spark-3.0 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.7.1//lz4-java-1.7.1.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.1 b/dev/deps/dependencies-client-spark-3.1 index 1cbd01b35fc..da8331b10b2 100644 --- a/dev/deps/dependencies-client-spark-3.1 +++ b/dev/deps/dependencies-client-spark-3.1 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.7.1//lz4-java-1.7.1.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.2 b/dev/deps/dependencies-client-spark-3.2 index e83df4a7192..1feb81a8ed3 100644 --- a/dev/deps/dependencies-client-spark-3.2 +++ b/dev/deps/dependencies-client-spark-3.2 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.7.1//lz4-java-1.7.1.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.3 b/dev/deps/dependencies-client-spark-3.3 index 456250bb961..be6b7d17fb4 100644 --- a/dev/deps/dependencies-client-spark-3.3 +++ b/dev/deps/dependencies-client-spark-3.3 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.4 b/dev/deps/dependencies-client-spark-3.4 index 35698f48756..eabdee5a80e 100644 --- a/dev/deps/dependencies-client-spark-3.4 +++ b/dev/deps/dependencies-client-spark-3.4 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-client-spark-3.5 b/dev/deps/dependencies-client-spark-3.5 index 481caffcc68..4aa62aef53b 100644 --- a/dev/deps/dependencies-client-spark-3.5 +++ b/dev/deps/dependencies-client-spark-3.5 @@ -30,6 +30,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar leveldbjni-all/1.8//leveldbjni-all-1.8.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/dev/deps/dependencies-server b/dev/deps/dependencies-server index 89ac60462c3..1be1e05dd05 100644 --- a/dev/deps/dependencies-server +++ b/dev/deps/dependencies-server @@ -37,6 +37,7 @@ log4j-api/2.17.2//log4j-api-2.17.2.jar log4j-core/2.17.2//log4j-core-2.17.2.jar log4j-slf4j-impl/2.17.2//log4j-slf4j-impl-2.17.2.jar lz4-java/1.8.0//lz4-java-1.8.0.jar +maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar metrics-core/3.2.6//metrics-core-3.2.6.jar metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index a40cc9c5405..aa0e555de2f 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -70,6 +70,14 @@ license: | | celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 | | celeborn.worker.http.host | <localhost> | Worker's http host. | 0.4.0 | | celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 | +| celeborn.worker.jvmQuake.check.interval | 1s | Interval of gc behavior checking for worker jvm quake. | 0.4.0 | +| celeborn.worker.jvmQuake.dump.enabled | true | Whether to heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | +| celeborn.worker.jvmQuake.dump.path | <tmp>/jvm-quake/dump/<pid> | The path of heap dump for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | +| celeborn.worker.jvmQuake.dump.threshold | 30s | The threshold of heap dump for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. Meanwhile, there is no heap dump generated when dump threshold is greater than kill threshold. | 0.4.0 | +| celeborn.worker.jvmQuake.enabled | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.4.0 | +| celeborn.worker.jvmQuake.exitCode | 502 | The exit code of system kill for the maximum GC 'deficit' during worker jvm quake. | 0.4.0 | +| celeborn.worker.jvmQuake.kill.threshold | 60s | The threshold of system kill for the maximum GC 'deficit' which can be accumulated before jvmquake takes action. | 0.4.0 | +| celeborn.worker.jvmQuake.runtimeWeight | 5.0 | The factor by which to multiply running JVM time, when weighing it against GCing time. 'Deficit' is accumulated as `gc_time - runtime * runtime_weight`, and is compared against threshold to determine whether to take action. | 0.4.0 | | celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device monitor to check disk. | 0.3.0 | | celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.3.0 | | celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 | diff --git a/licenses-binary/LISENCE-jdktools.txt b/licenses-binary/LISENCE-jdktools.txt new file mode 100644 index 00000000000..08cc1ab0a30 --- /dev/null +++ b/licenses-binary/LISENCE-jdktools.txt @@ -0,0 +1,21 @@ +The MIT License + +Copyright (c) 2016 Red Hat, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pom.xml b/pom.xml index 34dfaa680a2..e1c96c70437 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ -XX:+IgnoreUnrecognizedVMOptions + --add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED @@ -1156,6 +1157,13 @@ 8 + + + com.github.olivergondza + maven-jdk-tools-wrapper + 0.1 + + diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala index e940702cd3d..b4f56782a96 100644 --- a/project/CelebornBuild.scala +++ b/project/CelebornBuild.scala @@ -50,6 +50,7 @@ object Dependencies { val junitVersion = "4.13.2" val leveldbJniVersion = "1.8" val log4j2Version = "2.17.2" + val jdkToolsVersion = "0.1" val metricsVersion = "3.2.6" val mockitoVersion = "4.11.0" val nettyVersion = "4.1.93.Final" @@ -71,6 +72,7 @@ object Dependencies { val commonsIo = "commons-io" % "commons-io" % commonsIoVersion val commonsLang3 = "org.apache.commons" % "commons-lang3" % commonsLang3Version val commonsLogging = "commons-logging" % "commons-logging" % commonsLoggingVersion + val jdkTools = "com.github.olivergondza" % "maven-jdk-tools-wrapper" % jdkToolsVersion val findbugsJsr305 = "com.google.code.findbugs" % "jsr305" % findbugsVersion val guava = "com.google.guava" % "guava" % guavaVersion excludeAll( ExclusionRule("org.checkerframework", "checker-qual"), @@ -167,6 +169,7 @@ object CelebornCommonSettings { Test / javaOptions ++= Seq( "-Xmx4g", "-XX:+IgnoreUnrecognizedVMOptions", + "--add-exports=jdk.internal.jvmstat/sun.jvmstat.monitor=ALL-UNNAMED", "--add-opens=java.base/java.lang=ALL-UNNAMED", "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED", "--add-opens=java.base/java.lang.reflect=ALL-UNNAMED", @@ -322,6 +325,7 @@ object CelebornCommon { Dependencies.commonsLang3, Dependencies.hadoopClientApi, Dependencies.hadoopClientRuntime, + Dependencies.jdkTools, Dependencies.ratisClient, Dependencies.ratisCommon, Dependencies.leveldbJniAll, diff --git a/project/JDKTools.scala b/project/JDKTools.scala new file mode 100644 index 00000000000..4dd2a5d616d --- /dev/null +++ b/project/JDKTools.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.build + +import java.io.FileNotFoundException + +import sbt._ +import sbt.Keys._ + +import scala.sys.process._ + +import scala.util.Try + +/** + * Represents the main plugin to add tooling settings to projects using libraries from the Scala Debugger project. + */ +object JDKTools extends AutoPlugin { + override def requires = plugins.JvmPlugin + + override def trigger = allRequirements + + override def projectSettings: Seq[Def.Setting[_]] = inConfig(Compile)(settings) + + lazy val settings = baseScalaDebuggerToolsSettings + + lazy val baseScalaDebuggerToolsSettings: Seq[Def.Setting[_]] = + if (System.getProperty("java.specification.version").startsWith("1.")) + Seq( + // JDK Dependency (just for sbt, must exist on classpath for execution, cannot be redistributed) + unmanagedJars += { + Attributed.blank(JavaTools) + } + ) + else + // on Java 9+, we don't need to do anything at all + Seq() + + // + // NOTE: Taken from Ensime Server project (when under BSD 3-clause) + // https://github.com/ensime/ensime-server/blob/master/project/EnsimeBuild.scala + // + // WORKAROUND: https://github.com/typelevel/scala/issues/75 + lazy val JavaTools: File = List( + // manual + sys.env.get("JAVA_HOME"), + sys.env.get("JDK_HOME"), + // osx + Try("/usr/libexec/java_home".!!).toOption, + // fallback + // sys.props.get("java.home") returns jre home for JDK8 + sys.props.get("java.home").map(new File(_).getParent), + sys.props.get("java.home") + ).flatten.map { n => + new File(n.trim + "/lib/tools.jar") + }.find(_.exists()).getOrElse( + throw new FileNotFoundException( + """ + |Could not automatically find the JDK/lib/tools.jar. + |You must explicitly set JDK_HOME or JAVA_HOME. + """.stripMargin) + ) +} diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index d8c7049e586..ecf867b3e0f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -51,6 +51,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_ import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager} import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState +import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager} private[celeborn] class Worker( @@ -275,6 +276,12 @@ private[celeborn] class Worker( private val userResourceConsumptions = JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() + private var jvmQuake: JVMQuake = _ + if (conf.workerJvmQuakeEnabled) { + jvmQuake = JVMQuake.create(conf, workerInfo.toUniqueId().replace(":", "-")) + jvmQuake.start() + } + workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () => workerInfo.getShuffleKeySet.size } @@ -431,6 +438,9 @@ private[celeborn] class Worker( if (!stopped) { logInfo("Stopping Worker.") + if (jvmQuake != null) { + jvmQuake.stop() + } if (sendHeartbeatTask != null) { if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { sendHeartbeatTask.cancel(false) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala new file mode 100644 index 00000000000..a88a39e22fc --- /dev/null +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuake.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.monitor + +import java.io.File +import java.lang.management.ManagementFactory +import java.nio.file.Files +import java.util.UUID +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import com.google.common.annotations.VisibleForTesting +import com.sun.management.HotSpotDiagnosticMXBean +import sun.jvmstat.monitor._ + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.util.{ThreadUtils, Utils} + +/** + * The JVM quake provides granular monitoring of GC behavior, which enables early detection of memory management + * issues and facilitates fast failure. + * + * Note: The principle is in alignment with GC instability detection algorithm for jvmquake project of Netflix: + * https://github.com/Netflix-Skunkworks/jvmquake. + * + * @param conf Celeborn configuration with jvm quake config. + */ +class JVMQuake(conf: CelebornConf, uniqueId: String = UUID.randomUUID().toString) extends Logging { + + import JVMQuake._ + + val dumpFile = s"worker-quake-heapdump-$uniqueId.hprof" + var heapDumped: Boolean = false + + private[this] val enabled = conf.workerJvmQuakeEnabled + private[this] val checkInterval = conf.workerJvmQuakeCheckInterval + private[this] val runtimeWeight = conf.workerJvmQuakeRuntimeWeight + private[this] val dumpThreshold = conf.workerJvmQuakeDumpThreshold.toNanos + private[this] val killThreshold = conf.workerJvmQuakeKillThreshold.toNanos + private[this] val dumpEnabled = conf.workerJvmQuakeDumpEnabled + private[this] val dumpPath = conf.workerJvmQuakeDumpPath + private[this] val exitCode = conf.workerJvmQuakeExitCode + + private[this] var lastExitTime: Long = 0L + private[this] var lastGCTime: Long = 0L + private[this] var bucket: Long = 0L + private[this] var scheduler: ScheduledExecutorService = _ + + def start(): Unit = { + if (enabled) { + lastExitTime = getLastExitTime + lastGCTime = getLastGCTime + scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake") + scheduler.scheduleWithFixedDelay( + new Runnable() { + override def run(): Unit = { + JVMQuake.this.run() + } + }, + 0, + checkInterval, + TimeUnit.MILLISECONDS) + } + } + + def stop(): Unit = { + if (enabled) { + scheduler.shutdown() + } + } + + private def run(): Unit = { + val currentExitTime = getLastExitTime + val currentGCTime = getLastGCTime + val gcTime = currentGCTime - lastGCTime + val runTime = currentExitTime - lastExitTime - gcTime + + bucket = Math.max(0, bucket + gcTime - BigDecimal(runTime * runtimeWeight).toLong) + logDebug(s"Time: (gc time: ${Utils.msDurationToString(gcTime)}, execution time: ${Utils.msDurationToString(runTime)})") + logDebug( + s"Capacity: (bucket: $bucket, dump threshold: $dumpThreshold, kill threshold: $killThreshold)") + + if (bucket > dumpThreshold) { + logError(s"JVM GC has reached the threshold: bucket: $bucket, dumpThreshold: $dumpThreshold.") + if (shouldHeapDump) { + val savePath = getHeapDumpSavePath + val linkPath = getHeapDumpLinkPath + heapDump(savePath, linkPath) + } else if (bucket > killThreshold) { + logError(s"Exit JVM with $exitCode. JVM GC has reached the threshold: bucket: $bucket, killThreshold: $killThreshold.") + System.exit(exitCode) + } + } + lastExitTime = currentExitTime + lastGCTime = currentGCTime + } + + def shouldHeapDump: Boolean = { + dumpEnabled && !heapDumped + } + + @VisibleForTesting + def getHeapDumpSavePath: String = + dumpPath + + @VisibleForTesting + def getHeapDumpLinkPath: String = + s"${new File(dumpPath).getParent}/link/${Utils.getProcessId}" + + private def heapDump(savePath: String, linkPath: String, live: Boolean = false): Unit = { + val saveDir = new File(savePath) + if (!saveDir.exists()) { + saveDir.mkdirs() + } + val heapDump = new File(saveDir, dumpFile) + if (heapDump.exists()) { + // Each worker process only generates one heap dump. Skip when heap dump of worker already exists. + logWarning(s"Skip because heap dump of worker already exists: $heapDump.") + heapDumped = true + return + } + logInfo(s"Starting heap dump: $heapDump.") + ManagementFactory.newPlatformMXBeanProxy( + ManagementFactory.getPlatformMBeanServer, + "com.sun.management:type=HotSpotDiagnostic", + classOf[HotSpotDiagnosticMXBean]).dumpHeap(heapDump.getAbsolutePath, live) + val linkDir = new File(linkPath) + if (linkDir.exists()) { + // Each worker process only generates one heap dump. Skip when symbolic link of heap dump exists. + logWarning(s"Skip because symbolic link of heap dump exists: $linkPath.") + } else if (!linkDir.getParentFile.exists()) { + linkDir.getParentFile.mkdirs() + } + try { + Files.createSymbolicLink(linkDir.toPath, saveDir.toPath) + logInfo(s"Created symbolic link: $linkPath.") + } catch { + case e: Exception => + logError("Create symbolic link failed.", e) + } finally { + heapDumped = true + logInfo(s"Finished heap dump: $dumpFile.") + } + } +} + +object JVMQuake { + + private[this] var quake: JVMQuake = _ + + def create(conf: CelebornConf, uniqueId: String): JVMQuake = { + set(new JVMQuake(conf, uniqueId)) + quake + } + + def get: JVMQuake = { + quake + } + + def set(quake: JVMQuake): Unit = { + this.quake = quake + } + + private[this] lazy val monitoredVm: MonitoredVm = { + val host = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost")) + host.getMonitoredVm(new VmIdentifier("local://%s@localhost".format(Utils.getProcessId))) + } + + private[this] lazy val ygcExitTimeMonitor: Monitor = + monitoredVm.findByName("sun.gc.collector.0.lastExitTime") + private[this] lazy val fgcExitTimeMonitor: Monitor = + monitoredVm.findByName("sun.gc.collector.1.lastExitTime") + private[this] lazy val ygcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.0.time") + private[this] lazy val fgcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.1.time") + + private def getLastExitTime: Long = Math.max( + ygcExitTimeMonitor.getValue.asInstanceOf[Long], + fgcExitTimeMonitor.getValue.asInstanceOf[Long]) + + private def getLastGCTime: Long = + ygcTimeMonitor.getValue.asInstanceOf[Long] + fgcTimeMonitor.getValue.asInstanceOf[Long] +} diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala new file mode 100644 index 00000000000..7ef49f22588 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/monitor/JVMQuakeSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.worker.monitor + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.junit.Assert.assertTrue + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf._ +import org.apache.celeborn.common.util.JavaUtils + +class JVMQuakeSuite extends CelebornFunSuite { + + private val allocation = new ArrayBuffer[Array[Byte]]() + + override def afterEach(): Unit = { + allocation.clear() + System.gc() + } + + test("[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVMQuake") { + val quake = new JVMQuake(new CelebornConf().set(WORKER_JVM_QUAKE_ENABLED.key, "true") + .set(WORKER_JVM_QUAKE_RUNTIME_WEIGHT.key, "1") + .set(WORKER_JVM_QUAKE_DUMP_THRESHOLD.key, "1s") + .set(WORKER_JVM_QUAKE_KILL_THRESHOLD.key, "2s")) + quake.start() + allocateMemory(quake) + quake.stop() + + assertTrue(quake.heapDumped) + val heapDump = new File(s"${quake.getHeapDumpSavePath}/${quake.dumpFile}") + assert(heapDump.exists()) + JavaUtils.deleteRecursively(heapDump) + JavaUtils.deleteRecursively(new File(quake.getHeapDumpLinkPath)) + } + + def allocateMemory(quake: JVMQuake): Unit = { + val capacity = 1024 * 100 + while (allocation.size * capacity < Runtime.getRuntime.maxMemory / 4) { + val bytes = new Array[Byte](capacity) + allocation.append(bytes) + } + while (quake.shouldHeapDump) { + for (index <- allocation.indices) { + val bytes = new Array[Byte](capacity) + allocation(index) = bytes + } + } + } +}