Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial integration of TelemetryAwarePlugin with Rollover Step Integration #1186

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
Expand All @@ -25,7 +26,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
return this
}

abstract suspend fun execute(): Step
abstract suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${context?.metadata?.index}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics.RolloverActionMetrics
import org.opensearch.telemetry.metrics.MetricsRegistry

abstract class ActionMetrics {
abstract val actionName: String
}

class IndexManagementActionsMetrics private constructor() {
harshitakaushik-dev marked this conversation as resolved.
Show resolved Hide resolved
private lateinit var metricsRegistry: MetricsRegistry
private lateinit var actionMetricsMap: Map<String, ActionMetrics>

fun initialize(metricsRegistry: MetricsRegistry) {
this.metricsRegistry = metricsRegistry

RolloverActionMetrics.instance.initializeCounters(metricsRegistry, "rollover")
harshitakaushik-dev marked this conversation as resolved.
Show resolved Hide resolved

actionMetricsMap = mapOf(
"rollover" to RolloverActionMetrics.instance,
harshitakaushik-dev marked this conversation as resolved.
Show resolved Hide resolved
// Add other action metrics here
)
}

fun getActionMetrics(actionName: String): ActionMetrics? {
return actionMetricsMap[actionName]
}

companion object {
val instance: IndexManagementActionsMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = IndexManagementActionsMetrics()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.actionmetrics

import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.ActionMetrics
import org.opensearch.telemetry.metrics.Counter
import org.opensearch.telemetry.metrics.MetricsRegistry

class RolloverActionMetrics private constructor() : ActionMetrics() {
override val actionName: String = "rollover_action"
lateinit var successes: Counter
lateinit var failures: Counter

fun initializeCounters(metricsRegistry: MetricsRegistry, actionName: String) {
successes = metricsRegistry.createCounter("${actionName}_successes", "Rollover Action Successes", "count")
failures = metricsRegistry.createCounter("${actionName}_failures", "Rollover Action Failures", "count")
}

companion object {
val instance: RolloverActionMetrics by lazy { HOLDER.instance }
}

private object HOLDER {
val instance = RolloverActionMetrics()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManage
import org.opensearch.indexmanagement.spi.IndexManagementExtension
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.StatusChecker
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.transform.TargetIndexMappingService
import org.opensearch.indexmanagement.transform.TransformRunner
Expand Down Expand Up @@ -184,10 +185,13 @@ import org.opensearch.plugins.ExtensiblePlugin
import org.opensearch.plugins.NetworkPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.SystemIndexPlugin
import org.opensearch.plugins.TelemetryAwarePlugin
import org.opensearch.repositories.RepositoriesService
import org.opensearch.rest.RestController
import org.opensearch.rest.RestHandler
import org.opensearch.script.ScriptService
import org.opensearch.telemetry.metrics.MetricsRegistry
import org.opensearch.telemetry.tracing.Tracer
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.RemoteClusterService
import org.opensearch.transport.TransportInterceptor
Expand All @@ -196,7 +200,8 @@ import org.opensearch.watcher.ResourceWatcherService
import java.util.function.Supplier

@Suppress("TooManyFunctions")
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, Plugin() {
class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin, ExtensiblePlugin, SystemIndexPlugin,
TelemetryAwarePlugin, Plugin() {
private val logger = LogManager.getLogger(javaClass)
lateinit var indexManagementIndices: IndexManagementIndices
lateinit var actionValidation: ActionValidation
Expand All @@ -210,6 +215,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
private val extensions = mutableSetOf<String>()
private val extensionCheckerMap = mutableMapOf<String, StatusChecker>()
lateinit var indexOperationActionFilter: IndexOperationActionFilter
private lateinit var metricsRegistry: MetricsRegistry

companion object {
const val PLUGINS_BASE_URI = "/_plugins"
Expand Down Expand Up @@ -374,8 +380,11 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
namedWriteableRegistry: NamedWriteableRegistry,
indexNameExpressionResolver: IndexNameExpressionResolver,
repositoriesServiceSupplier: Supplier<RepositoriesService>,
tracer: Tracer,
metricsRegistry: MetricsRegistry,
): Collection<Any> {
val settings = environment.settings()
this.metricsRegistry = metricsRegistry
this.clusterService = clusterService
QueryShardContextFactory.init(
client,
Expand All @@ -385,6 +394,8 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
namedWriteableRegistry,
environment,
)

IndexManagementActionsMetrics.instance.initialize(metricsRegistry)
rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver)
val jvmService = JvmService(environment.settings())
val transformRunner =
Expand Down Expand Up @@ -453,6 +464,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
.registerThreadPool(threadPool)
.registerExtensionChecker(extensionChecker)
.registerIndexMetadataProvider(indexMetadataProvider)
.registerIndexManagementActionMetrics(IndexManagementActionsMetrics.instance)

val managedIndexCoordinator =
ManagedIndexCoordinator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ import org.opensearch.indexmanagement.opensearchapi.withClosableContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
Expand Down Expand Up @@ -121,6 +122,7 @@ object ManagedIndexRunner :
private lateinit var skipExecFlag: SkipExecution
private lateinit var threadPool: ThreadPool
private lateinit var extensionStatusChecker: ExtensionStatusChecker
lateinit var indexManagementActionMetrics: IndexManagementActionsMetrics
private lateinit var indexMetadataProvider: IndexMetadataProvider
private var indexStateManagementEnabled: Boolean = DEFAULT_ISM_ENABLED
private var validationServiceEnabled: Boolean = DEFAULT_ACTION_VALIDATION_ENABLED
Expand Down Expand Up @@ -221,6 +223,11 @@ object ManagedIndexRunner :
return this
}

fun registerIndexManagementActionMetrics(indexManagementActionsMetrics: IndexManagementActionsMetrics): Any {
this.indexManagementActionMetrics = indexManagementActionsMetrics
return this
}

override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) {
if (job !is ManagedIndexConfig) {
throw IllegalArgumentException("Invalid job type, found ${job.javaClass.simpleName} with id: ${context.jobId}")
Expand Down Expand Up @@ -446,7 +453,7 @@ object ManagedIndexRunner :
managedIndexConfig.id, settings, threadPool.threadContext, managedIndexConfig.policy.user,
),
) {
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute().postExecute(logger)
step.preExecute(logger, stepContext.getUpdatedContext(startingManagedIndexMetaData)).execute(indexManagementActionMetrics).postExecute(logger)
}
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.AliasAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData

Expand All @@ -19,7 +20,7 @@ class AttemptAliasActionsStep(private val action: AliasAction) : Step(name) {
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.AllocationAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData

Expand All @@ -20,7 +21,7 @@ class AttemptAllocationStep(private val action: AllocationAction) : Step(name) {
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.action.admin.indices.close.CloseIndexRequest
import org.opensearch.action.admin.indices.close.CloseIndexResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInProgressException
Expand All @@ -21,7 +22,7 @@ class AttemptCloseStep : Step(name) {
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInProgressException
Expand All @@ -21,7 +22,7 @@ class AttemptDeleteStep : Step(name) {
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActi
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
Expand All @@ -31,7 +32,7 @@ class AttemptCallForceMergeStep(private val action: ForceMergeAction) : Step(nam
private var info: Map<String, Any>? = null

@Suppress("TooGenericExceptionCaught", "ComplexMethod")
override suspend fun execute(): AttemptCallForceMergeStep {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): AttemptCallForceMergeStep {
val context = this.context ?: return this
val indexName = context.metadata.index
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
Expand All @@ -25,7 +26,7 @@ class AttemptSetReadOnlyStep(private val action: ForceMergeAction) : Step(name)
private var info: Map<String, Any>? = null

@Suppress("ReturnCount")
override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
val indexSetToReadOnly = setIndexToReadOnly(indexName, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeActi
import org.opensearch.indexmanagement.opensearchapi.getUsefulCauseString
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
Expand All @@ -26,7 +27,7 @@ class WaitForForceMergeStep(private val action: ForceMergeAction) : Step(name, f
private var info: Map<String, Any>? = null

@Suppress("TooGenericExceptionCaught", "ReturnCount")
override suspend fun execute(): WaitForForceMergeStep {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): WaitForForceMergeStep {
val context = this.context ?: return this
val indexName = context.metadata.index
// Retrieve maxNumSegments value from ActionProperties. If ActionProperties is null, update failed info and return early.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.transport.RemoteTransportException
Expand All @@ -24,7 +25,7 @@ class AttemptSetIndexPriorityStep(private val action: IndexPriorityAction) : Ste
private var info: Map<String, Any>? = null

@Suppress("TooGenericExceptionCaught")
override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
val managedIndexMetaData = context.metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.publishLegacyNot
import org.opensearch.indexmanagement.indexstatemanagement.util.sendNotification
import org.opensearch.indexmanagement.opensearchapi.convertToMap
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.metrics.IndexManagementActionsMetrics
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.script.Script
Expand All @@ -22,7 +23,7 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override suspend fun execute(): Step {
override suspend fun execute(indexManagementActionMetrics: IndexManagementActionsMetrics): Step {
val context = this.context ?: return this
val indexName = context.metadata.index
val scriptService = context.scriptService
Expand Down
Loading
Loading