Skip to content

Commit

Permalink
Requested Changes and Addition of Metrics to all the remaining Actions
Browse files Browse the repository at this point in the history
Signed-off-by: harycash <[email protected]>
  • Loading branch information
harycash committed Jul 1, 2024
1 parent 55ff6dd commit aaa80d5
Show file tree
Hide file tree
Showing 49 changed files with 567 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,106 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
return this
}

abstract suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step
abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
fun postExecute(
logger: Logger,
indexManagementActionMetrics: IndexManagementActionsMetrics,
step: Step,
startingManagedIndexMetaData: ManagedIndexMetaData,
): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
val updatedStepMetaData = step.getUpdatedManagedIndexMetadata(startingManagedIndexMetaData)
when (context?.metadata?.actionMetaData?.name) {
IndexManagementActionsMetrics.ROLLOVER -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ROLLOVER,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.FORCE_MERGE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.FORCE_MERGE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.DELETE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.DELETE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.REPLICA_COUNT -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.REPLICA_COUNT,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.TRANSITION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.TRANSITION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
IndexManagementActionsMetrics.NOTIFICATION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.NOTIFICATION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.CLOSE -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.CLOSE,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SET_INDEX_PRIORITY -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SET_INDEX_PRIORITY,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.OPEN -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.OPEN,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.CREATE_ROLLUP -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.CREATE_ROLLUP,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
IndexManagementActionsMetrics.CREATE_TRANSFORM -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.CREATE_TRANSFORM,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.MOVE_SHARD -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.MOVE_SHARD,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SET_READ_ONLY -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SET_READ_ONLY,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SHRINK -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SHRINK,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.SNAPSHOT -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.SNAPSHOT,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.ALIAS_ACTION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ALIAS_ACTION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)

IndexManagementActionsMetrics.ALLOCATION -> indexManagementActionMetrics.getActionMetrics(
IndexManagementActionsMetrics.ALLOCATION,
)
?.emitMetrics(context!!, indexManagementActionMetrics, updatedStepMetaData.stepMetaData)
else -> {
logger.info(
"Action Metrics is not supported for this action [%s]",
context?.metadata?.actionMetaData?.name,
)
}
}
this.context = null
return this
}
Expand All @@ -50,7 +146,8 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
}
}

final fun getStartingStepMetaData(metadata: ManagedIndexMetaData): StepMetaData = StepMetaData(name, getStepStartTime(metadata).toEpochMilli(), StepStatus.STARTING)
final fun getStartingStepMetaData(metadata: ManagedIndexMetaData): StepMetaData =
StepMetaData(name, getStepStartTime(metadata).toEpochMilli(), StepStatus.STARTING)

enum class StepStatus(val status: String) : Writeable {
STARTING("starting"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AliasActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.AllocationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CloseActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CreateRollupActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.CreateTransformActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.DeleteActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ForceMergeActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.MoveShardActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.NotificationActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.OpenActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ReplicaCountActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetIndexPriorityActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SetReadOnlyActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.ShrinkActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.SnapshotActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.TransitionActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.MetricsRegistry
import org.opensearch.telemetry.metrics.tags.Tags

Expand All @@ -23,8 +35,15 @@ abstract class ActionMetrics {
.addTag("index_name", context.metadata.index)
.addTag("policy_id", context.metadata.policyID)
.addTag("node_id", context.clusterService.nodeName ?: "")
.addTag("Index_uuid", context.metadata.indexUuid)
return tags
}

abstract fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
)
}

class IndexManagementActionsMetrics private constructor() {
Expand All @@ -40,6 +59,17 @@ class IndexManagementActionsMetrics private constructor() {
const val DELETE = "delete"
const val REPLICA_COUNT = "replica_count"
const val TRANSITION = "transition"
const val CLOSE = "close"
const val SET_INDEX_PRIORITY = "set_index_priority"
const val OPEN = "open"
const val CREATE_ROLLUP = "create_rollup"
const val CREATE_TRANSFORM = "create_transform"
const val MOVE_SHARD = "move_shard"
const val SET_READ_ONLY = "set_read_only"
const val SHRINK = "shrink"
const val SNAPSHOT = "snapshot"
const val ALIAS_ACTION = "alias_action"
const val ALLOCATION = "allocation"

private object HOLDER {
val instance = IndexManagementActionsMetrics()
Expand All @@ -55,6 +85,17 @@ class IndexManagementActionsMetrics private constructor() {
DeleteActionMetrics.instance.initializeCounters(metricsRegistry)
ReplicaCountActionMetrics.instance.initializeCounters(metricsRegistry)
TransitionActionMetrics.instance.initializeCounters(metricsRegistry)
CloseActionMetrics.instance.initializeCounters(metricsRegistry)
SetIndexPriorityActionMetrics.instance.initializeCounters(metricsRegistry)
OpenActionMetrics.instance.initializeCounters(metricsRegistry)
CreateRollupActionMetrics.instance.initializeCounters(metricsRegistry)
CreateTransformActionMetrics.instance.initializeCounters(metricsRegistry)
MoveShardActionMetrics.instance.initializeCounters(metricsRegistry)
SetReadOnlyActionMetrics.instance.initializeCounters(metricsRegistry)
ShrinkActionMetrics.instance.initializeCounters(metricsRegistry)
SnapshotActionMetrics.instance.initializeCounters(metricsRegistry)
AliasActionMetrics.instance.initializeCounters(metricsRegistry)
AllocationActionMetrics.instance.initializeCounters(metricsRegistry)

actionMetricsMap = mapOf(
ROLLOVER to RolloverActionMetrics.instance,
Expand All @@ -63,6 +104,17 @@ class IndexManagementActionsMetrics private constructor() {
DELETE to DeleteActionMetrics.instance,
REPLICA_COUNT to ReplicaCountActionMetrics.instance,
TRANSITION to TransitionActionMetrics.instance,
CLOSE to CloseActionMetrics.instance,
SET_INDEX_PRIORITY to SetIndexPriorityActionMetrics.instance,
OPEN to OpenActionMetrics.instance,
CREATE_ROLLUP to CreateRollupActionMetrics.instance,
CREATE_TRANSFORM to CreateTransformActionMetrics.instance,
MOVE_SHARD to MoveShardActionMetrics.instance,
SET_READ_ONLY to SetReadOnlyActionMetrics.instance,
SHRINK to ShrinkActionMetrics.instance,
SNAPSHOT to SnapshotActionMetrics.instance,
ALIAS_ACTION to AliasActionMetrics.instance,
ALLOCATION to AllocationActionMetrics.instance,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

Expand All @@ -29,4 +32,22 @@ class AliasActionMetrics private constructor() : ActionMetrics() {
private object HOLDER {
val instance = AliasActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val aliasActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALIAS_ACTION) as AliasActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
aliasActionMetrics.successes.add(1.0, context.let { aliasActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
aliasActionMetrics.failures.add(1.0, context.let { aliasActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
aliasActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { aliasActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

Expand All @@ -29,4 +32,22 @@ class AllocationActionMetrics private constructor() : ActionMetrics() {
private object HOLDER {
val instance = AllocationActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val allocationActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.ALLOCATION) as AllocationActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
allocationActionMetrics.successes.add(1.0, context.let { allocationActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
allocationActionMetrics.failures.add(1.0, context.let { allocationActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
allocationActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { allocationActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

Expand All @@ -29,4 +32,22 @@ class CloseActionMetrics private constructor() : ActionMetrics() {
private object HOLDER {
val instance = CloseActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val closeActionMetrics = indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CLOSE) as CloseActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
closeActionMetrics.successes.add(1.0, context.let { closeActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
closeActionMetrics.failures.add(1.0, context.let { closeActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
closeActionMetrics.cumulativeLatency.add(latency.toDouble(), context.let { closeActionMetrics.createTags(it) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.Step.StepStatus
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

Expand All @@ -29,4 +32,26 @@ class CreateRollupActionMetrics private constructor() : ActionMetrics() {
private object HOLDER {
val instance = CreateRollupActionMetrics()
}

override fun emitMetrics(
context: StepContext,
indexManagementActionsMetrics: IndexManagementActionsMetrics,
stepMetaData: StepMetaData?,
) {
val createRollupActionMetrics =
indexManagementActionsMetrics.getActionMetrics(IndexManagementActionsMetrics.CREATE_ROLLUP) as CreateRollupActionMetrics
val stepStatus = stepMetaData?.stepStatus
if (stepStatus == StepStatus.COMPLETED) {
createRollupActionMetrics.successes.add(1.0, context.let { createRollupActionMetrics.createTags(it) })
}
if (stepStatus == StepStatus.FAILED) {
createRollupActionMetrics.failures.add(1.0, context.let { createRollupActionMetrics.createTags(it) })
}
val endTime = System.currentTimeMillis()
val latency = endTime - (context.metadata.stepMetaData?.startTime ?: endTime)
createRollupActionMetrics.cumulativeLatency.add(
latency.toDouble(),
context.let { createRollupActionMetrics.createTags(it) },
)
}
}
Loading

0 comments on commit aaa80d5

Please sign in to comment.