Skip to content

Commit

Permalink
[CELEBORN-1122] Metrics supports json format
Browse files Browse the repository at this point in the history
  • Loading branch information
RuiQin7 committed Dec 5, 2023
1 parent 1c7cd1b commit af01170
Show file tree
Hide file tree
Showing 29 changed files with 601 additions and 51 deletions.
5 changes: 5 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ org.scala-lang:scala-reflect
org.slf4j:jcl-over-slf4j
org.yaml:snakeyaml
org.rocksdb:rocksdbjni
com.fasterxml.jackson.module:jackson-module-scala
com.fasterxml.jackson.core:jackson-databind
com.fasterxml.jackson.core:jackson-annotations
com.fasterxml.jackson.core:jackson-core
com.thoughtworks.paranamer:paranamer


------------------------------------------------------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def metricsAppTopDiskUsageInterval: Long = get(METRICS_APP_TOP_DISK_USAGE_INTERVAL)
def metricsWorkerForceAppendPauseSpentTimeThreshold: Int =
get(METRICS_WORKER_PAUSE_SPENT_TIME_FORCE_APPEND_THRESHOLD)
def metricsJsonPrettyEnabled: Boolean = get(METRICS_JSON_PRETTY_ENABLED)

// //////////////////////////////////////////////////////
// Quota //
Expand Down Expand Up @@ -3900,6 +3901,23 @@ object CelebornConf extends Logging {
.checkValue(path => path.startsWith("/"), "Context path must start with '/'")
.createWithDefault("/metrics/prometheus")

val METRICS_JSON_PATH: ConfigEntry[String] =
buildConf("celeborn.metrics.json.path")
.categories("metrics")
.doc("URI context path of json metrics HTTP server.")
.version("0.4.0")
.stringConf
.checkValue(path => path.startsWith("/"), "Context path must start with '/'")
.createWithDefault("/metrics/json")

val METRICS_JSON_PRETTY_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.metrics.json.pretty.enabled")
.categories("metrics")
.doc("When true, view metrics in json pretty format")
.version("0.4.0")
.booleanConf
.createWithDefault(true)

val QUOTA_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.quota.enabled")
.categories("quota")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,34 @@ import scala.util.matching.Regex
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{METRICS_JSON_PATH, METRICS_PROMETHEUS_PATH}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.sink.{PrometheusHttpRequestHandler, PrometheusServlet, Sink}
import org.apache.celeborn.common.metrics.sink.{JsonServlet, PrometheusServlet, ServletHttpRequestHandler, Sink}
import org.apache.celeborn.common.metrics.source.Source
import org.apache.celeborn.common.util.Utils

class MetricsSystem(
val instance: String,
conf: CelebornConf,
val servletPath: String) extends Logging {
conf: CelebornConf) extends Logging {
private[this] val metricsConfig = new MetricsConfig(conf)

private val sinks = new ArrayBuffer[Sink]
private val sources = new CopyOnWriteArrayList[Source]
private val registry = new MetricRegistry()
private val prometheusServletPath = conf.get(METRICS_PROMETHEUS_PATH)
private val jsonServletPath = conf.get(METRICS_JSON_PATH)

private var prometheusServlet: Option[PrometheusServlet] = None
private var jsonServlet: Option[JsonServlet] = None

var running: Boolean = false

metricsConfig.initialize()

def getPrometheusHandler: PrometheusHttpRequestHandler = {
def getServletHandlers: Array[ServletHttpRequestHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
prometheusServlet.map(_.getHandler(conf)).orNull
prometheusServlet.map(_.getHandlers(conf)).getOrElse(Array()) ++
jsonServlet.map(_.getHandlers(conf)).getOrElse(Array())
}

def start(registerStaticSources: Boolean = true) {
Expand Down Expand Up @@ -132,8 +136,25 @@ class MetricsSystem(
classOf[MetricRegistry],
classOf[Seq[Source]],
classOf[String])
.newInstance(kv._2, registry, sources.asScala, servletPath)
prometheusServlet = Some(servlet.asInstanceOf[PrometheusServlet])
prometheusServlet = Some(servlet.newInstance(
kv._2,
registry,
sources.asScala,
prometheusServletPath).asInstanceOf[PrometheusServlet])
} else if (kv._1 == "jsonServlet") {
val servlet = Utils.classForName(classPath)
.getConstructor(
classOf[Properties],
classOf[MetricRegistry],
classOf[Seq[Source]],
classOf[String],
classOf[Boolean])
jsonServlet = Some(servlet.newInstance(
kv._2,
registry,
sources.asScala,
jsonServletPath,
conf.metricsJsonPrettyEnabled.asInstanceOf[Object]).asInstanceOf[JsonServlet])
} else {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
Expand Down Expand Up @@ -171,8 +192,7 @@ object MetricsSystem {

def createMetricsSystem(
instance: String,
conf: CelebornConf,
servletPath: String): MetricsSystem = {
new MetricsSystem(instance, conf, servletPath)
conf: CelebornConf): MetricsSystem = {
new MetricsSystem(instance, conf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.celeborn.common.metrics.sink

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.source.Source

abstract class AbstractServlet(sources: Seq[Source]) extends Sink with Logging {
def getHandlers(conf: CelebornConf): Array[ServletHttpRequestHandler] = {
Array[ServletHttpRequestHandler](
createHttpRequestHandler())
}

def createHttpRequestHandler(): ServletHttpRequestHandler

def getMetricsSnapshot: String = {
sources.map(_.getMetrics).mkString
}

override def start(): Unit = {}

override def stop(): Unit = {}

override def report(): Unit = {}
}

abstract class ServletHttpRequestHandler(path: String) extends Logging {

def handleRequest(uri: String): String

def getServletPath(): String = path

}
Loading

0 comments on commit af01170

Please sign in to comment.