Skip to content

Commit

Permalink
[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVM…
Browse files Browse the repository at this point in the history
…Quake
  • Loading branch information
SteNicholas committed Nov 1, 2023
1 parent b45b63f commit 6a7d1a5
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 0 deletions.
2 changes: 2 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ javax.servlet:javax.servlet-api

MIT License
------------
See license/LISENCE-jdktools.txt for details.
com.github.olivergondza:maven-jdk-tools-wrapper
See license/LICENSE-slf4j.txt for details.
org.slf4j:jul-to-slf4j
org.slf4j:slf4j-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,21 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)
def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED)
def workerJvmQuakeDumpPath: Option[String] = get(WORKER_JVM_QUAKE_DUMP_PATH)

def workerJvmQuakeDumpThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_DUMP_THRESHOLD.key,
WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeKillThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_KILL_THRESHOLD.key,
WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)

// //////////////////////////////////////////////////////
// Metrics System //
Expand Down Expand Up @@ -2835,6 +2850,75 @@ object CelebornConf extends Logging {
.longConf
.createOptional

val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvm.quake.enabled")
.categories("worker")
.version("0.3.2")
.doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " +
"which enables early detection of memory management issues and facilitates fast failure.")
.booleanConf
.createWithDefault(false)

val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.jvm.quake.check.interval")
.categories("worker")
.version("0.3.2")
.doc("Interval of gc behavior checking for worker jvm quake.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.worker.jvm.quake.runtime.weight")
.categories("worker")
.version("0.3.2")
.doc(
"The factor by which to multiply running JVM time, when weighing it against GCing time. " +
"\"Deficit\" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold " +
"to determine whether to take action.")
.doubleConf
.createWithDefault(5)

val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvm.quake.dump.threshold")
.categories("worker")
.version("0.3.2")
.doc("The threshold of heap dump for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvm.quake.kill.threshold")
.categories("worker")
.version("0.3.2")
.doc("The threshold of system kill for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvm.quake.dump.enabled")
.categories("worker")
.version("0.3.2")
.doc("Whether to heap dump for the maximum GC \"deficit\" during worker jvm quake.")
.booleanConf
.createWithDefault(true)

val WORKER_JVM_QUAKE_DUMP_PATH: OptionalConfigEntry[String] =
buildConf("celeborn.worker.jvm.quake.dump.path")
.categories("worker")
.version("0.3.2")
.doc("The path of heap dump for the maximum GC \"deficit\" during worker jvm quake, " +
"which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid].")
.stringConf
.createOptional

val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] =
buildConf("celeborn.worker.jvm.quake.exit.code")
.categories("worker")
.version("0.3.2")
.doc("The exit code of system kill for the maximum GC \"deficit\" during worker jvm quake.")
.intConf
.createWithDefault(502)

val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-flink-1.14
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-flink-1.15
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-flink-1.17
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-mr
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ kerby-util/1.0.1//kerby-util-1.0.1.jar
kerby-xdr/1.0.1//kerby-xdr-1.0.1.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.4.0//lz4-java-1.4.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-3.0
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-3.2
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.7.1//lz4-java-1.7.1.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-3.3
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-3.4
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-client-spark-3.5
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jsr305/1.3.9//jsr305-1.3.9.jar
jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
leveldbjni-all/1.8//leveldbjni-all-1.8.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
1 change: 1 addition & 0 deletions dev/deps/dependencies-server
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ log4j-api/2.17.2//log4j-api-2.17.2.jar
log4j-core/2.17.2//log4j-core-2.17.2.jar
log4j-slf4j-impl/2.17.2//log4j-slf4j-impl-2.17.2.jar
lz4-java/1.8.0//lz4-java-1.8.0.jar
maven-jdk-tools-wrapper/0.1//maven-jdk-tools-wrapper-0.1.jar
metrics-core/3.2.6//metrics-core-3.2.6.jar
metrics-graphite/3.2.6//metrics-graphite-3.2.6.jar
metrics-jvm/3.2.6//metrics-jvm-3.2.6.jar
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ license: |
| celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 |
| celeborn.worker.http.host | <localhost> | Worker's http host. | 0.4.0 |
| celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 |
| celeborn.worker.jvm.quake.check.interval | 1s | Interval of gc behavior checking for worker jvm quake. | 0.3.2 |
| celeborn.worker.jvm.quake.dump.enabled | true | Whether to heap dump for the maximum GC "deficit" during worker jvm quake. | 0.3.2 |
| celeborn.worker.jvm.quake.dump.path | <undefined> | The path of heap dump for the maximum GC "deficit" during worker jvm quake, which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid]. | 0.3.2 |
| celeborn.worker.jvm.quake.dump.threshold | 30s | The threshold of heap dump for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 |
| celeborn.worker.jvm.quake.enabled | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.3.2 |
| celeborn.worker.jvm.quake.exit.code | 502 | The exit code of system kill for the maximum GC "deficit" during worker jvm quake. | 0.3.2 |
| celeborn.worker.jvm.quake.kill.threshold | 60s | The threshold of system kill for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 |
| celeborn.worker.jvm.quake.runtime.weight | 5.0 | The factor by which to multiply running JVM time, when weighing it against GCing time. "Deficit" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold to determine whether to take action. | 0.3.2 |
| celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device monitor to check disk. | 0.3.0 |
| celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.3.0 |
| celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 |
Expand Down
19 changes: 19 additions & 0 deletions licenses-binary/LISENCE-jdktools.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2016 Red Hat, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@

<!-- for JDK-17 test-->
<extraJavaTestArgs>-XX:+IgnoreUnrecognizedVMOptions
--add-exports=java.base/sun.jvmstat.monitor=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
Expand Down Expand Up @@ -1134,6 +1135,13 @@
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.olivergondza</groupId>
<artifactId>maven-jdk-tools-wrapper</artifactId>
<version>0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Based on https://github.com/google/error-prone/blob/f8e33bc460be82ab22256a7ef8b979d7a2cacaba/docs/installation.md#jdk-8 -->
Expand Down
6 changes: 6 additions & 0 deletions project/CelebornBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object Dependencies {
val junitVersion = "4.13.2"
val leveldbJniVersion = "1.8"
val log4j2Version = "2.17.2"
val jdkToolsVersion = "0.1"
val metricsVersion = "3.2.6"
val mockitoVersion = "4.11.0"
val nettyVersion = "4.1.93.Final"
Expand All @@ -71,6 +72,7 @@ object Dependencies {
val commonsIo = "commons-io" % "commons-io" % commonsIoVersion
val commonsLang3 = "org.apache.commons" % "commons-lang3" % commonsLang3Version
val commonsLogging = "commons-logging" % "commons-logging" % commonsLoggingVersion
val jdkTools = "com.github.olivergondza" % "maven-jdk-tools-wrapper" % jdkToolsVersion
val findbugsJsr305 = "com.google.code.findbugs" % "jsr305" % findbugsVersion
val guava = "com.google.guava" % "guava" % guavaVersion
val hadoopClientApi = "org.apache.hadoop" % "hadoop-client-api" % hadoopVersion
Expand Down Expand Up @@ -156,12 +158,14 @@ object CelebornCommonSettings {
Dependencies.commonsCompress,
Dependencies.commonsLogging,
Dependencies.findbugsJsr305,
Dependencies.jdkTools,
Dependencies.slf4jApi),

// Make sure any tests in any project that uses Spark is configured for running well locally
Test / javaOptions ++= Seq(
"-Xmx4g",
"-XX:+IgnoreUnrecognizedVMOptions",
"--add-exports=java.base/sun.jvmstat.monitor=ALL-UNNAMED",
"--add-opens=java.base/java.lang=ALL-UNNAMED",
"--add-opens=java.base/java.lang.invoke=ALL-UNNAMED",
"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED",
Expand Down Expand Up @@ -316,6 +320,7 @@ object CelebornCommon {
Dependencies.commonsLang3,
Dependencies.hadoopClientApi,
Dependencies.hadoopClientRuntime,
Dependencies.jdkTools,
Dependencies.ratisClient,
Dependencies.ratisCommon,
Dependencies.leveldbJniAll,
Expand Down Expand Up @@ -429,6 +434,7 @@ object CelebornWorker {
Dependencies.guava,
Dependencies.commonsIo,
Dependencies.ioNetty,
Dependencies.jdkTools,
Dependencies.log4j12Api,
Dependencies.log4jSlf4jImpl,
Dependencies.leveldbJniAll,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}

private[celeborn] class Worker(
Expand Down Expand Up @@ -271,6 +272,9 @@ private[celeborn] class Worker(
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()

private val jvmQuake = JVMQuake.create(conf)
jvmQuake.start()

workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
Expand Down Expand Up @@ -430,6 +434,8 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")

jvmQuake.stop()

if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
sendHeartbeatTask.cancel(false)
Expand Down
Loading

0 comments on commit 6a7d1a5

Please sign in to comment.