Skip to content

Commit

Permalink
[CELEBORN-1092] Introduce JVM monitoring in Celeborn Worker using JVM…
Browse files Browse the repository at this point in the history
…Quake
  • Loading branch information
SteNicholas committed Nov 1, 2023
1 parent b45b63f commit ea365fe
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,21 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)
def workerJvmQuakeEnabled: Boolean = get(WORKER_JVM_QUAKE_ENABLED)
def workerJvmQuakeCheckInterval: Long = get(WORKER_JVM_QUAKE_CHECK_INTERVAL)
def workerJvmQuakeRuntimeWeight: Double = get(WORKER_JVM_QUAKE_RUNTIME_WEIGHT)
def workerJvmQuakeDumpEnabled: Boolean = get(WORKER_JVM_QUAKE_DUMP_ENABLED)
def workerJvmQuakeDumpPath: Option[String] = get(WORKER_JVM_QUAKE_DUMP_PATH)

def workerJvmQuakeDumpThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_DUMP_THRESHOLD.key,
WORKER_JVM_QUAKE_DUMP_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeKillThreshold: Duration =
getTimeAsMs(
WORKER_JVM_QUAKE_KILL_THRESHOLD.key,
WORKER_JVM_QUAKE_KILL_THRESHOLD.defaultValueString).microsecond
def workerJvmQuakeExitCode: Int = get(WORKER_JVM_QUAKE_EXIT_CODE)

// //////////////////////////////////////////////////////
// Metrics System //
Expand Down Expand Up @@ -2835,6 +2850,75 @@ object CelebornConf extends Logging {
.longConf
.createOptional

val WORKER_JVM_QUAKE_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvm.quake.enabled")
.categories("worker")
.version("0.3.2")
.doc("When true, Celeborn worker will start the jvm quake to monitor of gc behavior, " +
"which enables early detection of memory management issues and facilitates fast failure.")
.booleanConf
.createWithDefault(false)

val WORKER_JVM_QUAKE_CHECK_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.jvm.quake.check.interval")
.categories("worker")
.version("0.3.2")
.doc("Interval of gc behavior checking for worker jvm quake.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

val WORKER_JVM_QUAKE_RUNTIME_WEIGHT: ConfigEntry[Double] =
buildConf("celeborn.worker.jvm.quake.runtime.weight")
.categories("worker")
.version("0.3.2")
.doc(
"The factor by which to multiply running JVM time, when weighing it against GCing time. " +
"\"Deficit\" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold " +
"to determine whether to take action.")
.doubleConf
.createWithDefault(5)

val WORKER_JVM_QUAKE_DUMP_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvm.quake.dump.threshold")
.categories("worker")
.version("0.3.2")
.doc("The threshold of heap dump for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val WORKER_JVM_QUAKE_KILL_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.worker.jvm.quake.kill.threshold")
.categories("worker")
.version("0.3.2")
.doc("The threshold of system kill for the maximum GC \"deficit\" which can be accumulated before jvmquake takes action.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

val WORKER_JVM_QUAKE_DUMP_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.jvm.quake.dump.enabled")
.categories("worker")
.version("0.3.2")
.doc("Whether to heap dump for the maximum GC \"deficit\" during worker jvm quake.")
.booleanConf
.createWithDefault(true)

val WORKER_JVM_QUAKE_DUMP_PATH: OptionalConfigEntry[String] =
buildConf("celeborn.worker.jvm.quake.dump.path")
.categories("worker")
.version("0.3.2")
.doc("The path of heap dump for the maximum GC \"deficit\" during worker jvm quake, " +
"which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid].")
.stringConf
.createOptional

val WORKER_JVM_QUAKE_EXIT_CODE: ConfigEntry[Int] =
buildConf("celeborn.worker.jvm.quake.exit.code")
.categories("worker")
.version("0.3.2")
.doc("The exit code of system kill for the maximum GC \"deficit\" during worker jvm quake.")
.intConf
.createWithDefault(502)

val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ license: |
| celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful shutdown timeout time. | 0.2.0 |
| celeborn.worker.http.host | <localhost> | Worker's http host. | 0.4.0 |
| celeborn.worker.http.port | 9096 | Worker's http port. | 0.4.0 |
| celeborn.worker.jvm.quake.check.interval | 1s | Interval of gc behavior checking for worker jvm quake. | 0.3.2 |
| celeborn.worker.jvm.quake.dump.enabled | true | Whether to heap dump for the maximum GC "deficit" during worker jvm quake. | 0.3.2 |
| celeborn.worker.jvm.quake.dump.path | <undefined> | The path of heap dump for the maximum GC "deficit" during worker jvm quake, which default value is [java.io.tmpdir]/celeborn/jvm-quake/dump/[pid]. | 0.3.2 |
| celeborn.worker.jvm.quake.dump.threshold | 30s | The threshold of heap dump for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 |
| celeborn.worker.jvm.quake.enabled | false | When true, Celeborn worker will start the jvm quake to monitor of gc behavior, which enables early detection of memory management issues and facilitates fast failure. | 0.3.2 |
| celeborn.worker.jvm.quake.exit.code | 502 | The exit code of system kill for the maximum GC "deficit" during worker jvm quake. | 0.3.2 |
| celeborn.worker.jvm.quake.kill.threshold | 60s | The threshold of system kill for the maximum GC "deficit" which can be accumulated before jvmquake takes action. | 0.3.2 |
| celeborn.worker.jvm.quake.runtime.weight | 5.0 | The factor by which to multiply running JVM time, when weighing it against GCing time. "Deficit" is accumulated as gc_time - runtime * runtime_weight, and is compared against threshold to determine whether to take action. | 0.3.2 |
| celeborn.worker.monitor.disk.check.interval | 30s | Intervals between device monitor to check disk. | 0.3.0 |
| celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker check device status. | 0.3.0 |
| celeborn.worker.monitor.disk.checklist | readwrite,diskusage | Monitor type for disk, available items are: iohang, readwrite and diskusage. | 0.2.0 |
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
--add-opens=java.base/sun.jvmstat.monitor=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED
--add-opens=java.base/sun.security.action=ALL-UNNAMED
Expand Down Expand Up @@ -1134,6 +1135,13 @@
<properties>
<java.version>8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.github.olivergondza</groupId>
<artifactId>maven-jdk-tools-wrapper</artifactId>
<version>0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Based on https://github.com/google/error-prone/blob/f8e33bc460be82ab22256a7ef8b979d7a2cacaba/docs/installation.md#jdk-8 -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import org.apache.celeborn.service.deploy.worker.monitor.JVMQuake
import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}

private[celeborn] class Worker(
Expand Down Expand Up @@ -271,6 +272,9 @@ private[celeborn] class Worker(
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()

private val jvmQuake = JVMQuake.create(conf)
jvmQuake.start()

workerSource.addGauge(WorkerSource.REGISTERED_SHUFFLE_COUNT) { () =>
workerInfo.getShuffleKeySet.size
}
Expand Down Expand Up @@ -430,6 +434,8 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")

jvmQuake.stop()

if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
sendHeartbeatTask.cancel(false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.service.deploy.worker.monitor

import java.io.File
import java.lang.management.ManagementFactory
import java.nio.file.Files
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import com.google.common.annotations.VisibleForTesting
import com.sun.management.HotSpotDiagnosticMXBean
import org.apache.commons.io.FileUtils
import sun.jvmstat.monitor._

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.util.{ThreadUtils, Utils}

/**
* The JVM quake provides granular monitoring of GC behavior, which enables early detection of memory management
* issues and facilitates fast failure.
*
* Note: The principle is in alignment with GC instability detection algorithm for jvmquake project of Netflix:
* https://github.com/Netflix-Skunkworks/jvmquake.
*
* @param conf Celeborn configuration with jvm quake config.
*/
class JVMQuake(conf: CelebornConf) extends Logging {

import JVMQuake._

val dumpFile = "worker-quake-heapdump.hprof"
var heapDumped: Boolean = false

private[this] val enabled = conf.workerJvmQuakeEnabled
private[this] val checkInterval = conf.workerJvmQuakeCheckInterval
private[this] val runtimeWeight = conf.workerJvmQuakeRuntimeWeight
private[this] val dumpThreshold = conf.workerJvmQuakeDumpThreshold.toNanos
private[this] val killThreshold = conf.workerJvmQuakeKillThreshold.toNanos
private[this] val dumpEnabled = conf.workerJvmQuakeDumpEnabled
private[this] val dumpPath = conf.workerJvmQuakeDumpPath
private[this] val exitCode = conf.workerJvmQuakeExitCode
private[this] val dumpDir =
s"${Utils.createDirectory(FileUtils.getTempDirectoryPath).getPath}/jvm-quake/dump"

private[this] var lastExitTime: Long = 0L
private[this] var lastGCTime: Long = 0L
private[this] var bucket: Long = 0L
private[this] var scheduler: ScheduledExecutorService = _

def start(): Unit = {
if (enabled) {
lastExitTime = getLastExitTime
lastGCTime = getLastGCTime
scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("jvm-quake")
scheduler.scheduleWithFixedDelay(
new Runnable() {
override def run(): Unit = {
JVMQuake.this.run()
}
},
0,
checkInterval,
TimeUnit.MILLISECONDS)
}
}

def stop(): Unit = {
if (enabled) {
scheduler.shutdown()
}
}

private def run(): Unit = {
val currentExitTime = getLastExitTime
val currentGCTime = getLastGCTime
val gcTime = currentGCTime - lastGCTime
val runTime = currentExitTime - lastExitTime - gcTime

bucket = Math.max(0, bucket + gcTime - BigDecimal(runTime * runtimeWeight).toLong)
logDebug(s"Time: (gc time: $gcTime, execution time: $runTime)")
logDebug(
s"Capacity: (bucket: $bucket, dump threshold: $dumpThreshold, kill threshold: $killThreshold)")

if (bucket > dumpThreshold) {
logError(s"JVM GC has reached the threshold: bucket: $bucket, dumpThreshold: $dumpThreshold.")
if (shouldHeapDump) {
val savePath = getHeapDumpSavePath
val linkPath = getHeapDumpLinkPath
heapDump(savePath, linkPath)
} else if (bucket > killThreshold) {
System.exit(exitCode)
}
}
lastExitTime = currentExitTime
lastGCTime = currentGCTime
}

def shouldHeapDump: Boolean = {
dumpEnabled && !heapDumped
}

@VisibleForTesting
def getHeapDumpSavePath: String =
dumpPath.getOrElse(s"$dumpDir/$getProcessId/hprof")

@VisibleForTesting
def getHeapDumpLinkPath: String =
s"$dumpDir/$getProcessId/link"

private def heapDump(savePath: String, linkPath: String, live: Boolean = false): Unit = {
val saveDir = new File(savePath)
if (!saveDir.exists()) {
saveDir.mkdirs()
}
val heapDump = new File(saveDir, dumpFile)
if (heapDump.exists()) {
logInfo(s"Heap dump of worker exists: $heapDump.")
heapDumped = true
return
}
logInfo(s"Starting heap dump: $heapDump.")
ManagementFactory.newPlatformMXBeanProxy(
ManagementFactory.getPlatformMBeanServer,
"com.sun.management:type=HotSpotDiagnostic",
classOf[HotSpotDiagnosticMXBean]).dumpHeap(heapDump.getAbsolutePath, live)
val linkDir = new File(linkPath)
if (linkDir.exists()) {
logInfo(s"Symbolic link of heap dump exists: $linkPath.")
} else if (!linkDir.getParentFile.exists()) {
linkDir.getParentFile.mkdirs()
}
try {
Files.createSymbolicLink(linkDir.toPath, saveDir.toPath)
logInfo(s"Created symbolic link: $linkPath.")
} catch {
case e: Exception =>
logError("Create symbolic link failed.", e)
} finally {
heapDumped = true
logInfo(s"Finished heap dump: $dumpFile.")
}
}
}

object JVMQuake {

private[this] var quake: JVMQuake = _

def create(conf: CelebornConf): JVMQuake = {
set(new JVMQuake(conf))
quake
}

def get: JVMQuake = {
quake
}

def set(quake: JVMQuake): Unit = {
this.quake = quake
}

private[this] lazy val monitoredVm: MonitoredVm = {
val host = MonitoredHost.getMonitoredHost(new HostIdentifier("localhost"))
host.getMonitoredVm(new VmIdentifier(
"local://%s@localhost".format(getProcessId)))
}

private[this] lazy val ygcExitTimeMonitor: Monitor =
monitoredVm.findByName("sun.gc.collector.0.lastExitTime")
private[this] lazy val fgcExitTimeMonitor: Monitor =
monitoredVm.findByName("sun.gc.collector.1.lastExitTime")
private[this] lazy val ygcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.0.time")
private[this] lazy val fgcTimeMonitor: Monitor = monitoredVm.findByName("sun.gc.collector.1.time")

private def getLastExitTime: Long = Math.max(
ygcExitTimeMonitor.getValue.asInstanceOf[Long],
fgcExitTimeMonitor.getValue.asInstanceOf[Long])

private def getLastGCTime: Long =
ygcTimeMonitor.getValue.asInstanceOf[Long] + fgcTimeMonitor.getValue.asInstanceOf[Long]

private def getProcessId: String = ManagementFactory.getRuntimeMXBean.getName.split("@")(0)
}
Loading

0 comments on commit ea365fe

Please sign in to comment.