diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 1d0308c3e06..878bf92e448 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -497,7 +497,7 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricsSnapshot.foreach(metric => sb.append(metric)) if (leftMetricsNum <= 0) { logWarning( - s"The number of metrics exceed the output metrics strings capacity! Full metrics num: ${getAllMetricsNum}") + s"The number of metrics exceed the output metrics strings capacity! Full metrics num: $getAllMetricsNum") } sb.toString() } @@ -514,35 +514,42 @@ abstract class AbstractSource(conf: CelebornConf, role: String) val appCount0Metrics = ArrayBuffer[String]() for (m <- metricList) { if (addNum >= leftNum) breakOut + var strMetrics = "" + var isApp = false m match { case c: NamedCounter => - val counterMetric = getCounterMetrics(c) + strMetrics = getCounterMetrics(c) if (c.isApp) { + isApp = true if (c.counter.getCount > 0) { - appMetricsSnapshot += counterMetric + appMetricsSnapshot += strMetrics } else { - appCount0Metrics += counterMetric + appCount0Metrics += strMetrics } - } else metricsSnapshot += counterMetric + } case g: NamedGauge[_] => - val gaugeMetric = getGaugeMetrics(g) + strMetrics = getGaugeMetrics(g) if (g.isApp) { - appMetricsSnapshot += gaugeMetric - } else metricsSnapshot += gaugeMetric + appMetricsSnapshot += strMetrics + isApp = true + } case m: NamedMeter => - metricsSnapshot += getMeterMetrics(m) + strMetrics = getMeterMetrics(m) case h: NamedHistogram => - metricsSnapshot += getHistogramMetrics(h) + strMetrics = getHistogramMetrics(h) h.asInstanceOf[CelebornHistogram].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() case t: NamedTimer => - metricsSnapshot += getTimerMetrics(t) + strMetrics = getTimerMetrics(t) t.timer.asInstanceOf[CelebornTimer].reservoir .asInstanceOf[ResettableSlidingWindowReservoir].reset() case s => - metricsSnapshot += s.toString + strMetrics = s.toString + } + if (!isApp) { + metricsSnapshot += strMetrics + addNum = addNum + 1 } - addNum = addNum + 1 } appMetricsSnapshot ++= appCount0Metrics leftNum - addNum diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index d6eeb23581d..1afab5a1c1f 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -22,16 +22,10 @@ import org.apache.celeborn.common.CelebornConf class CelebornSourceSuite extends CelebornFunSuite { - test("test getMetrics with customized label") { - val conf = new CelebornConf() - createAbstractSourceAndCheck(conf, "", Role.MASTER) - createAbstractSourceAndCheck(conf, "", Role.WORKER) - } - - def createAbstractSourceAndCheck( + def createAbstractSource( conf: CelebornConf, extraLabels: String, - role: String = "mock"): Unit = { + role: String = "mock"): (String, List[String]) = { val mockSource = new AbstractSource(conf, role) { override def sourceName: String = "mockSource" } @@ -39,12 +33,13 @@ class CelebornSourceSuite extends CelebornFunSuite { val user2 = Map("user" -> "user2") val user3 = Map("user" -> "user3") mockSource.addGauge("Gauge1") { () => 1000 } - mockSource.addGauge("Gauge2", user1) { () => 2000 } - mockSource.addCounter("Counter1") - mockSource.addCounter("Counter2", user2) + mockSource.addGauge("Gauge2", user1, true) { () => 2000 } + mockSource.addCounter("Counter1", Map.empty[String, String], true) + mockSource.addCounter("Counter2", user2, true) // test operation with and without label mockSource.incCounter("Counter1", 3000) mockSource.incCounter("Counter2", 4000, user2) + mockSource.incCounter("Counter2", -4000, user2) mockSource.addTimer("Timer1") mockSource.addTimer("Timer2", user3) // ditto @@ -66,37 +61,83 @@ class CelebornSourceSuite extends CelebornFunSuite { s"""metrics_Gauge2_Value{${extraLabelsStr}${instanceLabelStr}role="$role",user="user1"} 2000""" val exp3 = s"""metrics_Counter1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 3000""" val exp4 = - s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 4000""" + s"""metrics_Counter2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user2"} 0""" val exp5 = s"""metrics_Timer1_Count{${extraLabelsStr}${instanceLabelStr}role="$role"} 1""" val exp6 = s"""metrics_Timer2_Count{${extraLabelsStr}${instanceLabelStr}role="$role",user="user3"} 1""" - assert(res.contains(exp1)) - assert(res.contains(exp2)) - assert(res.contains(exp3)) - assert(res.contains(exp4)) - assert(res.contains(exp5)) - assert(res.contains(exp6)) + val expList = List[String](exp1, exp2, exp3, exp4, exp5, exp6) + (res, expList) + } + + def checkMetricsRes(res: String, labelList: List[String]): Unit = { + labelList.foreach { exp => + assert(res.contains(exp)) + } } test("test getMetrics with customized label by conf") { val conf = new CelebornConf() + val (resM, expsM) = createAbstractSource(conf, "", Role.MASTER) + checkMetricsRes(resM, expsM) + val (resW, expsW) = createAbstractSource(conf, "", Role.WORKER) + checkMetricsRes(resW, expsW) + // label's is normal conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=v2,l3=v3") val extraLabels = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels) + val (res, exps) = createAbstractSource(conf, extraLabels) + checkMetricsRes(res, exps) // labels' kv not correct assertThrows[IllegalArgumentException] { conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, "l1=v1,l2=") val extraLabels2 = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels2) + val (res2, exps2) = createAbstractSource(conf, extraLabels2) + checkMetricsRes(res2, exps2) } // there are spaces in labels conf.set(CelebornConf.METRICS_EXTRA_LABELS.key, " l1 = v1, l2 =v2 ,l3 =v3 ") val extraLabels3 = """l1="v1",l2="v2",l3="v3"""" - createAbstractSourceAndCheck(conf, extraLabels3) + val (res3, exps3) = createAbstractSource(conf, extraLabels3) + checkMetricsRes(res3, exps3) + } + + test("test getMetrics with full capacity and isAppEnable false") { + val conf = new CelebornConf() + // metrics won't contain appMetrics + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "false") + conf.set(CelebornConf.METRICS_CAPACITY.key, "6") + val (res1, exps1) = createAbstractSource(conf, "") + List[Int](0, 4, 5).foreach { i => + assert(res1.contains(exps1(i))) + } + List[Int](1, 2, 3).foreach { i => + assert(!res1.contains(exps1(i))) + } + + // app metrics will fall behind when it reaches capacity + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") + conf.set(CelebornConf.METRICS_CAPACITY.key, "3") + val (res2, exps2) = createAbstractSource(conf, "") + List[Int](0, 4, 5).foreach { i => + assert(res2.contains(exps2(i))) + } + List[Int](1, 2, 3).foreach { i => + assert(!res2.contains(exps2(i))) + } + + // app metrics count0 will fall behind + conf.set(CelebornConf.METRICS_APP_ENABLED.key, "true") + conf.set(CelebornConf.METRICS_CAPACITY.key, "5") + val (res3, exps3) = createAbstractSource(conf, "") + List[Int](0, 4, 5, 1, 2).foreach { i => + assert(res3.contains(exps3(i))) + } + List[Int](3).foreach { i => + assert(!res3.contains(exps3(i))) + } } }