Skip to content

Commit

Permalink
[CELEBORN-1122] Metrics supports json format
Browse files Browse the repository at this point in the history
  • Loading branch information
RuiQin7 committed Nov 29, 2023
1 parent 71a360e commit 4ce6f37
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 73 deletions.
5 changes: 5 additions & 0 deletions LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ org.scala-lang:scala-reflect
org.slf4j:jcl-over-slf4j
org.yaml:snakeyaml
org.rocksdb:rocksdbjni
com.fasterxml.jackson.module:jackson-module-scala_2.12
com.fasterxml.jackson.module:jackson-module-scala_2.11
com.fasterxml.jackson.core:jackson-databind
com.fasterxml.jackson.core:jackson-annotations
com.fasterxml.jackson.core:jackson-core


------------------------------------------------------------------------------------
Expand Down
8 changes: 6 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@
<artifactId>RoaringBitmap</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand All @@ -123,6 +123,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,49 @@

package org.apache.celeborn.common.metrics.sink

import java.util
import java.util.Properties

import scala.beans.BeanProperty
import scala.collection.mutable.ArrayBuffer

import com.codahale.metrics.MetricRegistry
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.PropertyAccessor
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule}
import io.netty.channel.ChannelHandler.Sharable

import org.apache.celeborn.common.metrics.{CelebornHistogram, CelebornTimer, ResettableSlidingWindowReservoir}
import org.apache.celeborn.common.metrics.source.{AbstractSource, NamedCounter, NamedGauge, NamedHistogram, NamedTimer, Source}

class MetricData(
@BeanProperty var name: String,
@BeanProperty var value: Any,
@BeanProperty var timestampMs: Long,
@BeanProperty var labelNames: util.List[String],
@BeanProperty var labelValues: util.List[String]) {}
object JsonConverter {
val mapper = new ObjectMapper() with ClassTagExtensions
mapper.registerModule(DefaultScalaModule)

def toJson(value: ArrayBuffer[Any]): String = {
mapper.writeValueAsString(value)
}
}

case class MetricData(
name: String,
value: Any,
timestampMs: Long,
labelNames: ArrayBuffer[String],
labelValues: ArrayBuffer[String])

class JsonServlet(
val property: Properties,
val registry: MetricRegistry,
val sources: Seq[Source],
val servletPath: String) extends AbstractServlet(sources) {

val mapper = new ObjectMapper()
mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY)

override def getMetricsSnapshot: String = {
val metricDatas = new util.ArrayList[MetricData]
val metricDatas = new ArrayBuffer[MetricData]
try {
sources.map(source => metricDatas.addAll(getMetrics(source)))
mapper.writeValueAsString(metricDatas)
sources.map(source => metricDatas ++= getMetrics(source))
JsonConverter.toJson(metricDatas.map(_.asInstanceOf[Any]))
} catch {
case e: Throwable =>
logError("failed to get json data for metrics", e)
mapper.writeValueAsString(new util.ArrayList[MetricData])
JsonConverter.toJson(new ArrayBuffer())
}
}

Expand All @@ -65,8 +69,8 @@ class JsonServlet(

override def stop(): Unit = {}

def getMetrics(source: Source): util.List[MetricData] = {
val metricDatas = new util.ArrayList[MetricData]
def getMetrics(source: Source): ArrayBuffer[MetricData] = {
val metricDatas = new ArrayBuffer[MetricData]
val absSource = source.asInstanceOf[AbstractSource]
absSource.counters().foreach(c => recordCounter(absSource, c, metricDatas))
absSource.gauges().foreach(g => recordGauge(absSource, g, metricDatas))
Expand All @@ -86,49 +90,54 @@ class JsonServlet(
def recordCounter(
absSource: AbstractSource,
nc: NamedCounter,
metricDatas: util.List[MetricData]): Unit = {
metricDatas: ArrayBuffer[MetricData]): Unit = {
val timestamp = System.currentTimeMillis
val labelNames = new util.ArrayList[String]
val labelValues = new util.ArrayList[String]
val labelNames = new ArrayBuffer[String]
val labelValues = new ArrayBuffer[String]
nc.labels.map { case (k, v) =>
labelNames.add(k)
labelValues.add(v)
labelNames += k
labelValues += v
}
val metricData =
new MetricData(nc.name, nc.counter.getCount, timestamp, labelNames, labelValues)
MetricData(
nc.name,
nc.counter.getCount,
timestamp,
labelNames,
labelValues)
updateInnerMetrics(absSource, metricData, metricDatas)
}

def recordGauge(
absSource: AbstractSource,
ng: NamedGauge[_],
metricDatas: util.List[MetricData]): Unit = {
metricDatas: ArrayBuffer[MetricData]): Unit = {
val timestamp = System.currentTimeMillis
val labelNames = new util.ArrayList[String]
val labelValues = new util.ArrayList[String]
val labelNames = new ArrayBuffer[String]
val labelValues = new ArrayBuffer[String]
ng.labels.map { case (k, v) =>
labelNames.add(k)
labelValues.add(v)
labelNames += k
labelValues += v
}
val metricData = new MetricData(ng.name, ng.gauge.getValue, timestamp, labelNames, labelValues)
val metricData = MetricData(ng.name, ng.gauge.getValue, timestamp, labelNames, labelValues)
updateInnerMetrics(absSource, metricData, metricDatas)
}

def recordHistogram(
absSource: AbstractSource,
nh: NamedHistogram,
metricDatas: util.List[MetricData]): Unit = {
metricDatas: ArrayBuffer[MetricData]): Unit = {
val timestamp = System.currentTimeMillis
val labelNames = new util.ArrayList[String]
val labelValues = new util.ArrayList[String]
val labelNames = new ArrayBuffer[String]
val labelValues = new ArrayBuffer[String]
nh.labels.map { case (k, v) =>
labelNames.add(k)
labelValues.add(v)
labelNames += k
labelValues += v
}
val snapshot = nh.histogram.getSnapshot
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_Count",
nh.histogram.getCount,
timestamp,
Expand All @@ -137,7 +146,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_Max",
absSource.reportNanosAsMills(snapshot.getMax),
timestamp,
Expand All @@ -146,7 +155,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_Mean",
absSource.reportNanosAsMills(snapshot.getMean),
timestamp,
Expand All @@ -155,7 +164,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_Min",
absSource.reportNanosAsMills(snapshot.getMin),
timestamp,
Expand All @@ -164,7 +173,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_50thPercentile",
absSource.reportNanosAsMills(snapshot.getMedian),
timestamp,
Expand All @@ -173,7 +182,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_75thPercentile",
absSource.reportNanosAsMills(snapshot.get75thPercentile),
timestamp,
Expand All @@ -182,7 +191,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_95thPercentile",
absSource.reportNanosAsMills(snapshot.get95thPercentile),
timestamp,
Expand All @@ -191,7 +200,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_98thPercentile",
absSource.reportNanosAsMills(snapshot.get98thPercentile),
timestamp,
Expand All @@ -200,7 +209,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_99thPercentile",
absSource.reportNanosAsMills(snapshot.get99thPercentile),
timestamp,
Expand All @@ -209,7 +218,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nh.name}_999thPercentile",
absSource.reportNanosAsMills(snapshot.get999thPercentile),
timestamp,
Expand All @@ -221,22 +230,22 @@ class JsonServlet(
def recordTimer(
absSource: AbstractSource,
nt: NamedTimer,
metricDatas: util.List[MetricData]): Unit = {
metricDatas: ArrayBuffer[MetricData]): Unit = {
val timestamp = System.currentTimeMillis
val labelNames = new util.ArrayList[String]
val labelValues = new util.ArrayList[String]
val labelNames = new ArrayBuffer[String]
val labelValues = new ArrayBuffer[String]
nt.labels.map { case (k, v) =>
labelNames.add(k)
labelValues.add(v)
labelNames += k
labelValues += v
}
val snapshot = nt.timer.getSnapshot
updateInnerMetrics(
absSource,
new MetricData(s"${nt.name}_Count", nt.timer.getCount, timestamp, labelNames, labelValues),
MetricData(s"${nt.name}_Count", nt.timer.getCount, timestamp, labelNames, labelValues),
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_Max",
absSource.reportNanosAsMills(snapshot.getMax),
timestamp,
Expand All @@ -245,7 +254,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_Mean",
absSource.reportNanosAsMills(snapshot.getMean),
timestamp,
Expand All @@ -254,7 +263,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_Min",
absSource.reportNanosAsMills(snapshot.getMin),
timestamp,
Expand All @@ -263,7 +272,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_50thPercentile",
absSource.reportNanosAsMills(snapshot.getMedian),
timestamp,
Expand All @@ -272,7 +281,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_75thPercentile",
absSource.reportNanosAsMills(snapshot.get75thPercentile),
timestamp,
Expand All @@ -281,7 +290,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_95thPercentile",
absSource.reportNanosAsMills(snapshot.get95thPercentile),
timestamp,
Expand All @@ -290,7 +299,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_98thPercentile",
absSource.reportNanosAsMills(snapshot.get98thPercentile),
timestamp,
Expand All @@ -299,7 +308,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_99thPercentile",
absSource.reportNanosAsMills(snapshot.get99thPercentile),
timestamp,
Expand All @@ -308,7 +317,7 @@ class JsonServlet(
metricDatas)
updateInnerMetrics(
absSource,
new MetricData(
MetricData(
s"${nt.name}_999thPercentile",
absSource.reportNanosAsMills(snapshot.get999thPercentile),
timestamp,
Expand All @@ -320,9 +329,9 @@ class JsonServlet(
private def updateInnerMetrics(
absSource: AbstractSource,
metricData: MetricData,
metricDatas: util.List[MetricData]): Unit = {
if (metricDatas.size() < absSource.metricsCapacity) {
metricDatas.add(metricData)
metricDatas: ArrayBuffer[MetricData]): Unit = {
if (metricDatas.size < absSource.metricsCapacity) {
metricDatas += metricData
}
}
}
Expand Down
1 change: 1 addition & 0 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
#

*.sink.prometheusServlet.class=org.apache.celeborn.common.metrics.sink.PrometheusServlet
*.sink.jsonServlet.class=org.apache.celeborn.common.metrics.sink.JsonServlet
Loading

0 comments on commit 4ce6f37

Please sign in to comment.