Skip to content

Commit

Permalink
Additional actions metrics with requested changes from previous commit
Browse files Browse the repository at this point in the history
Signed-off-by: harycash <[email protected]>
  • Loading branch information
harycash committed Jun 20, 2024
1 parent 9eaf83a commit 0c55243
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics
import org.opensearch.telemetry.metrics.MetricsRegistry

abstract class ActionMetrics {
Expand All @@ -11,26 +16,42 @@ class IndexManagementActionsMetrics private constructor() {
private lateinit var metricsRegistry: MetricsRegistry
private lateinit var actionMetricsMap: Map<String, ActionMetrics>

companion object {
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }

const val ROLLOVER = "rollover"
const val NOTIFICATION = "notification"
const val FORCE_MERGE = "force_merge"
const val DELETE = "delete"
const val REPLICA_COUNT = "replica_count"
const val TRANSITION = "transition"

private object HOLDER {
val instance = IndexManagementActionsMetrics()
}
}

fun initialize(metricsRegistry: MetricsRegistry) {
this.metricsRegistry = metricsRegistry

RolloverActionMetrics.instance.initializeCounters(metricsRegistry, "rollover")
RolloverActionMetrics.instance.initializeCounters(metricsRegistry)
NotificationActionMetrics.instance.initializeCounters(metricsRegistry)
ForceMergeActionMetrics.instance.initializeCounters(metricsRegistry)
DeleteActionMetrics.instance.initializeCounters(metricsRegistry)
ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry)
TransitionActionMetrics.instance.initializeCounters(metricsRegistry)

actionMetricsMap = mapOf(
"rollover" to RolloverActionMetrics.instance,
// Add other action metrics here
ROLLOVER to RolloverActionMetrics.instance,
NOTIFICATION to NotificationActionMetrics.instance,
FORCE_MERGE to ForceMergeActionMetrics.instance,
DELETE to DeleteActionMetrics.instance,
REPLICA_COUNT to ReplicaCountActionMetrics.instance,
TRANSITION to TransitionActionMetrics.instance,
)
}

fun getActionMetrics(actionName: String): ActionMetrics? {
return actionMetricsMap[actionName]
}

companion object {
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = IndexManagementActionsMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class DeleteActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.DELETE
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Delete Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Delete Action Failures", "count")
}

companion object {
val instance: DeleteActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = DeleteActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class ForceMergeActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.FORCE_MERGE
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes =
metricsRegistry.createCounter("${actionName}_successes", "Force Merge Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Force Merge Action Failures", "count")
}

companion object {
val instance: ForceMergeActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = ForceMergeActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class NotificationActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.NOTIFICATION
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Notification Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Notification Action Failures", "count")
}

companion object {
val instance: NotificationActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = NotificationActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class ReplicaCountActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.REPLICA_COUNT
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Replica Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Replica Action Failures", "count")
}

companion object {
val instance: ReplicaCountActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = ReplicaCountActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class RolloverActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = "rollover_action"
override val actionName: String = IndexManagementActionsMetrics.ROLLOVER
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry, actionName: String) {
fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Rollover Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Rollover Action Failures", "count")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class TransitionActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = IndexManagementActionsMetrics.TRANSITION
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Transition Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Transition Action Failures", "count")
}

companion object {
val instance: TransitionActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = TransitionActionMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.step.delete

import org.apache.logging.log4j.LogManager
Expand All @@ -12,19 +7,26 @@ import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInProgressException
import org.opensearch.telemetry.metrics.tags.Tags
import org.opensearch.transport.RemoteTransportException

class AttemptDeleteStep : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private lateinit var indexManagementActionsMetrics: IndexManagementActionsMetrics
private lateinit var actionMetrics: DeleteActionMetrics

override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
this.indexManagementActionsMetrics = indexManagementActionMetrics
this.actionMetrics = indexManagementActionMetrics.getActionMetrics(IndexManagementActionsMetrics.DELETE) as DeleteActionMetrics

try {
val response: AcknowledgedResponse =
context.client.admin().indices()
Expand All @@ -33,11 +35,21 @@ class AttemptDeleteStep : Step(name) {
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
actionMetrics.successes.add(
1.0,
Tags.create().addTag("index_name", context.metadata.index)
.addTag("policy_id", context.metadata.policyID).addTag("node_id", context.clusterService.nodeName),
)
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
actionMetrics.failures.add(
1.0,
Tags.create().addTag("index_name", context.metadata.index)
.addTag("policy_id", context.metadata.policyID).addTag("node_id", context.clusterService.nodeName),
)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
Expand Down Expand Up @@ -66,6 +78,11 @@ class AttemptDeleteStep : Step(name) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
actionMetrics.failures.add(
1.0,
Tags.create().addTag("index_name", context?.metadata?.index)
.addTag("policy_id", context?.metadata?.policyID).addTag("node_id", context?.clusterService?.nodeName),
)
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,27 @@ import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.tags.Tags
import org.opensearch.transport.RemoteTransportException
import java.time.Instant

class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(name) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private lateinit var indexManagementActionsMetrics: IndexManagementActionsMetrics
private lateinit var actionMetrics: ForceMergeActionMetrics

@Suppress("TooGenericExceptionCaught", "ComplexMethod")
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): AttemptCallForceMergeStep {
val context = this.context ?: return this
val indexName = context.metadata.index
this.indexManagementActionsMetrics = indexManagementActionMetrics
this.actionMetrics = indexManagementActionMetrics.getActionMetrics(IndexManagementActionsMetrics.FORCE_MERGE) as ForceMergeActionMetrics
try {
val startTime = Instant.now().toEpochMilli()
val request = ForceMergeRequest(indexName).maxNumSegments(action.maxNumSegments)
Expand Down Expand Up @@ -62,6 +68,11 @@ class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(nam
if (shadowedResponse?.let { it.status == RestStatus.OK } != false) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to if (shadowedResponse == null) getSuccessfulCallMessage(indexName) else getSuccessMessage(indexName))
actionMetrics.successes.add(
1.0,
Tags.create().addTag("index_name", context.metadata.index)
.addTag("policy_id", context.metadata.policyID).addTag("node_id", context.clusterService.nodeName),
)
} else {
// Otherwise the request to force merge encountered some problem
stepStatus = StepStatus.FAILED
Expand All @@ -71,6 +82,11 @@ class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(nam
"status" to shadowedResponse.status,
"shard_failures" to shadowedResponse.shardFailures.map { it.getUsefulCauseString() },
)
actionMetrics.failures.add( // Changed from actionMetrics to indexManagementActionMetrics
1.0,
Tags.create().addTag("index_name", context.metadata.index)
.addTag("policy_id", context.metadata.policyID).addTag("node_id", context.clusterService.nodeName),
)
}
} catch (e: RemoteTransportException) {
handleException(indexName, ExceptionsHelper.unwrapCause(e) as Exception)
Expand All @@ -85,6 +101,11 @@ class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(nam
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
actionMetrics.failures.add(
1.0,
Tags.create().addTag("index_name", context?.metadata?.index)
.addTag("policy_id", context?.metadata?.policyID).addTag("node_id", context?.clusterService?.nodeName),
)
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
Expand Down
Loading

0 comments on commit 0c55243

Please sign in to comment.