From 3ddd75a85e3600cee7177c6f1d2acc4da660a161 Mon Sep 17 00:00:00 2001 From: Tanqiu Liu Date: Tue, 10 Oct 2023 19:34:26 -0700 Subject: [PATCH 1/6] [Feature] Support Transform as an ISM action (#760) * Initial impl Signed-off-by: Tanqiu Liu * fix style Signed-off-by: Tanqiu Liu * end to end functional Signed-off-by: Tanqiu Liu * ISM transform unit tests & integ tests Signed-off-by: Tanqiu Liu * Fix after core #8157 (#857) Signed-off-by: bowenlan-amzn * Upgrade the backport workflow (#862) Signed-off-by: Ashish Agrawal Signed-off-by: Tanqiu Liu * Added 2.9 release notes. (#851) * Added 2.9 release notes. Signed-off-by: AWSHurneyt * Added 2.9 release notes. Signed-off-by: AWSHurneyt * Added 2.9 release notes. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt Signed-off-by: Tanqiu Liu * Handle NPE in isRollupIndex (#855) * Handle NPE in isRollupIndex `metadata.index()` can return `null`, so handle that case by returning `false`. Signed-off-by: Bryce Lampe * unit test Signed-off-by: Bryce Lampe --------- Signed-off-by: Bryce Lampe Co-authored-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * Fix core XcontentType refactor (#873) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * fix for max & min aggregations when no metric property exist (#870) Signed-off-by: Subhobrata Dey Signed-off-by: Tanqiu Liu * core refactor change (#884) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * update backport branch name (#885) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * core refactor change (#887) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * Fix breaking change by core refactor (#888) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * fix core breaking (#906) Signed-off-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * Support copy alias in rollover (#907) * Support copy alias in rollover Signed-off-by: bowenlan-amzn * 2.10 Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * Set preference to _primary when searching control-center index (#911) * Set preference to _primary when searching control-center index Signed-off-by: gaobinlong * Use _primary_first instead Signed-off-by: gaobinlong --------- Signed-off-by: gaobinlong Signed-off-by: Tanqiu Liu * Add primary first preference to all search requests (#912) Signed-off-by: Tanqiu Liu * fix intelliJ IDEA gradle sync error (#916) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * make control center index as system index (#919) Signed-off-by: Hailong Cui Signed-off-by: Tanqiu Liu * Updates demo certs used in integ tests (#921) Signed-off-by: Darshit Chanpura Signed-off-by: Tanqiu Liu * Added 2.10 release notes (#925) Signed-off-by: Ashish Agrawal Signed-off-by: Tanqiu Liu * Bump bwc version (#930) Signed-off-by: bowenlan-amzn Signed-off-by: Tanqiu Liu * fix integ tests; upgrade mappings versions Signed-off-by: Tanqiu Liu * Fix DCO Signed-off-by: Tanqiu Liu * Addressed pr comments; Add integ test case for re-execute the same transform action Signed-off-by: Tanqiu Liu * Addressed detekt error Signed-off-by: Tanqiu Liu * Added ISMTransform writeable test Signed-off-by: Tanqiu Liu * Addressed comments; Moved updateTransformStartTime to IndexManagementRestTestCase Signed-off-by: Tanqiu Liu --------- Signed-off-by: Tanqiu Liu Signed-off-by: bowenlan-amzn Signed-off-by: Ashish Agrawal Signed-off-by: AWSHurneyt Signed-off-by: Bryce Lampe Signed-off-by: Hailong Cui Signed-off-by: Subhobrata Dey Signed-off-by: gaobinlong Signed-off-by: Darshit Chanpura Co-authored-by: bowenlan-amzn Co-authored-by: Ashish Agrawal Co-authored-by: AWSHurneyt Co-authored-by: Bryce Lampe Co-authored-by: Hailong Cui Co-authored-by: Subhobrata Dey Co-authored-by: gaobinlong Co-authored-by: Darshit Chanpura <35282393+DarshitChanpura@users.noreply.github.com> --- DEVELOPER_GUIDE.md | 2 +- .../model/ActionProperties.kt | 14 +- .../model/TransformActionProperties.kt | 58 +++ .../indexmanagement/IndexManagementPlugin.kt | 2 +- .../indexstatemanagement/ISMActionsParser.kt | 4 +- .../ManagedIndexCoordinator.kt | 8 +- .../action/TransformAction.kt | 63 +++ .../action/TransformActionParser.kt | 42 ++ .../AttemptCreateTransformJobStep.kt | 127 ++++++ .../WaitForTransformCompletionStep.kt | 132 ++++++ .../opensearchapi/OpenSearchExtensions.kt | 4 +- .../transform/model/ISMTransform.kt | 208 ++++++++++ .../mappings/opendistro-ism-config.json | 83 +++- .../mappings/opendistro-ism-history.json | 6 +- .../IndexManagementRestTestCase.kt | 34 +- .../indexmanagement/SecurityRestTestCase.kt | 6 - .../IndexStateManagementRestTestCase.kt | 70 ++++ .../indexstatemanagement/TestHelpers.kt | 6 + .../action/TransformActionIT.kt | 382 ++++++++++++++++++ .../ManagedIndexCoordinatorTests.kt | 4 +- .../AttemptCreateTransformJobStepTests.kt | 65 +++ .../WaitForTransformCompletionStepTests.kt | 171 ++++++++ .../indexmanagement/transform/TestHelpers.kt | 13 + .../transform/TransformRestTestCase.kt | 34 -- .../transform/model/ISMTransformTests.kt | 49 +++ .../transform/model/WriteableTests.kt | 8 + .../cached-opendistro-ism-config.json | 83 +++- .../cached-opendistro-ism-history.json | 6 +- 28 files changed, 1626 insertions(+), 58 deletions(-) create mode 100644 spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 61ba3f9f9..c37625806 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea ### Building from the command line -1. `./gradlew build` builds and tests project. +1. `./gradlew build` builds and tests project. 2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed. 3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed. 4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests. diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt index e957b7724..1b75aec11 100644 --- a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/ActionProperties.kt @@ -23,7 +23,8 @@ data class ActionProperties( val snapshotName: String? = null, val rollupId: String? = null, val hasRollupFailed: Boolean? = null, - val shrinkActionProperties: ShrinkActionProperties? = null + val shrinkActionProperties: ShrinkActionProperties? = null, + val transformActionProperties: TransformActionProperties? = null ) : Writeable, ToXContentFragment { override fun writeTo(out: StreamOutput) { @@ -32,6 +33,7 @@ data class ActionProperties( out.writeOptionalString(rollupId) out.writeOptionalBoolean(hasRollupFailed) out.writeOptionalWriteable(shrinkActionProperties) + out.writeOptionalWriteable(transformActionProperties) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -40,6 +42,7 @@ data class ActionProperties( if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId) if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed) if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params) + if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params) return builder } @@ -52,7 +55,8 @@ data class ActionProperties( val rollupId: String? = si.readOptionalString() val hasRollupFailed: Boolean? = si.readOptionalBoolean() val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) } - return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties) + val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) } + return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties) } fun parse(xcp: XContentParser): ActionProperties { @@ -61,6 +65,7 @@ data class ActionProperties( var rollupId: String? = null var hasRollupFailed: Boolean? = null var shrinkActionProperties: ShrinkActionProperties? = null + var transformActionProperties: TransformActionProperties? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -75,10 +80,13 @@ data class ActionProperties( ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> { shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp) } + TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> { + transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp) + } } } - return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties) + return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties) } } diff --git a/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt new file mode 100644 index 000000000..70b593750 --- /dev/null +++ b/spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/model/TransformActionProperties.kt @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.spi.indexstatemanagement.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentFragment +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser + +data class TransformActionProperties( + val transformId: String? +) : Writeable, ToXContentFragment { + + override fun writeTo(out: StreamOutput) { + out.writeOptionalString(transformId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId) + return builder + } + + companion object { + const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties" + + fun fromStreamInput(sin: StreamInput): TransformActionProperties { + val transformId: String? = sin.readOptionalString() + return TransformActionProperties(transformId) + } + + fun parse(xcp: XContentParser): TransformActionProperties { + var transformId: String? = null + + ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + Properties.TRANSFORM_ID.key -> transformId = xcp.text() + } + } + + return TransformActionProperties(transformId) + } + } + + enum class Properties(val key: String) { + TRANSFORM_ID("transform_id") + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 00e25ce86..23d2655c1 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -453,7 +453,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin val managedIndexCoordinator = ManagedIndexCoordinator( environment.settings(), - client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider + client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry ) val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt index 9db2b0f67..a45cfddae 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser +import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() { RollupActionParser(), RolloverActionParser(), ShrinkActionParser(), - SnapshotActionParser() + SnapshotActionParser(), + TransformActionParser(), ) val customActionExtensionMap = mutableMapOf() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 961a56057..a4689f411 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.commons.authuser.User +import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.index.Index import org.opensearch.index.query.QueryBuilders import org.opensearch.indexmanagement.IndexManagementIndices @@ -110,7 +111,8 @@ class ManagedIndexCoordinator( private val clusterService: ClusterService, private val threadPool: ThreadPool, indexManagementIndices: IndexManagementIndices, - private val indexMetadataProvider: IndexMetadataProvider + private val indexMetadataProvider: IndexMetadataProvider, + private val xContentRegistry: NamedXContentRegistry ) : ClusterStateListener, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), LifecycleListener() { @@ -422,7 +424,7 @@ class ManagedIndexCoordinator( return try { val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } - parseFromSearchResponse(response = response, parse = Policy.Companion::parse) + parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse) } catch (ex: IndexNotFoundException) { emptyList() } catch (ex: ClusterBlockException) { @@ -603,7 +605,7 @@ class ManagedIndexCoordinator( } mRes.forEach { if (it.response.isExists) { - result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType( + result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType( it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse ) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt new file mode 100644 index 000000000..0de34c15b --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformAction.kt @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.transform.model.ISMTransform + +class TransformAction( + val ismTransform: ISMTransform, + index: Int +) : Action(name, index) { + + companion object { + const val name = "transform" + const val ISM_TRANSFORM_FIELD = "ism_transform" + } + + private val attemptCreateTransformJobStep = AttemptCreateTransformJobStep(this) + private val waitForTransformCompletionStep = WaitForTransformCompletionStep() + private val steps = listOf(attemptCreateTransformJobStep, waitForTransformCompletionStep) + + @Suppress("ReturnCount") + override fun getStepToExecute(context: StepContext): Step { + // if stepMetaData is null, return first step + val stepMetaData = context.metadata.stepMetaData ?: return attemptCreateTransformJobStep + + // if the current step has completed, return the next step + if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { + return when (stepMetaData.name) { + AttemptCreateTransformJobStep.name -> waitForTransformCompletionStep + else -> attemptCreateTransformJobStep + } + } + + return when (stepMetaData.name) { + AttemptCreateTransformJobStep.name -> attemptCreateTransformJobStep + else -> waitForTransformCompletionStep + } + } + + override fun getSteps(): List = steps + + override fun populateAction(builder: XContentBuilder, params: ToXContent.Params) { + builder.startObject(type) + builder.field(ISM_TRANSFORM_FIELD, ismTransform) + builder.endObject() + } + + override fun populateAction(out: StreamOutput) { + ismTransform.writeTo(out) + out.writeInt(actionIndex) + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt new file mode 100644 index 000000000..9d7eb1360 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionParser.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.indexmanagement.spi.indexstatemanagement.Action +import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser +import org.opensearch.indexmanagement.transform.model.ISMTransform + +class TransformActionParser : ActionParser() { + override fun fromStreamInput(sin: StreamInput): Action { + val ismTransform = ISMTransform(sin) + val index = sin.readInt() + return TransformAction(ismTransform, index) + } + + override fun fromXContent(xcp: XContentParser, index: Int): Action { + var ismTransform: ISMTransform? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + TransformAction.ISM_TRANSFORM_FIELD -> ismTransform = ISMTransform.parse(xcp) + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in TransformAction.") + } + } + + return TransformAction(ismTransform = requireNotNull(ismTransform) { "TransformAction transform is null." }, index) + } + + override fun getActionType(): String { + return TransformAction.name + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt new file mode 100644 index 000000000..806803e3c --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/AttemptCreateTransformJobStep.kt @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.transform + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.OpenSearchException +import org.opensearch.action.support.WriteRequest +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.indexmanagement.indexstatemanagement.action.TransformAction +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.TransformActionProperties +import org.opensearch.indexmanagement.transform.action.index.IndexTransformAction +import org.opensearch.indexmanagement.transform.action.index.IndexTransformRequest +import org.opensearch.indexmanagement.transform.action.index.IndexTransformResponse +import org.opensearch.indexmanagement.transform.action.start.StartTransformAction +import org.opensearch.indexmanagement.transform.action.start.StartTransformRequest +import org.opensearch.transport.RemoteTransportException + +class AttemptCreateTransformJobStep( + private val action: TransformAction +) : Step(name) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + private var transformId: String? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + val managedIndexMetadata = context.metadata + val startedTransformId = managedIndexMetadata.actionMetaData?.actionProperties?.transformActionProperties?.transformId + + // Creating a transform job + val transform = action.ismTransform.toTransform(indexName, context.user) + transformId = transform.id + logger.info("Attempting to create a transform job $transformId for index $indexName") + + val indexTransformRequest = IndexTransformRequest(transform, WriteRequest.RefreshPolicy.IMMEDIATE) + + try { + val response: IndexTransformResponse = context.client.suspendUntil { execute(IndexTransformAction.INSTANCE, indexTransformRequest, it) } + logger.info("Received status ${response.status.status} on trying to create transform job $transformId") + + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessMessage(transform.id, indexName)) + } catch (e: VersionConflictEngineException) { + val message = getTransformJobAlreadyExistsMessage(transform.id, indexName) + logger.info(message) + if (startedTransformId == null) { + // restart the transform job when this is another execution of the same action in the ISM policy + startTransformJob(transform.id, context) + } else { + // directly mark as complete when this is a retry of this step + stepStatus = StepStatus.COMPLETED + info = mapOf("info" to message) + } + } catch (e: RemoteTransportException) { + processFailure(transform.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: OpenSearchException) { + processFailure(transform.id, indexName, e) + } catch (e: Exception) { + processFailure(transform.id, indexName, e) + } + + return this + } + + fun processFailure(transformId: String, indexName: String, e: Exception) { + val message = getFailedMessage(transformId, indexName) + logger.error(message, e) + this.transformId = null + stepStatus = StepStatus.FAILED + info = mapOf("message" to message, "cause" to "${e.message}") + } + + private suspend fun startTransformJob(transformId: String, context: StepContext) { + val indexName = context.metadata.index + val client = context.client + logger.info("Attempting to re-start the transform job $transformId") + try { + val startTransformRequest = StartTransformRequest(transformId) + client.suspendUntil { execute(StartTransformAction.INSTANCE, startTransformRequest, it) } + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getSuccessRestartMessage(transformId, indexName)) + } catch (e: Exception) { + val message = getFailedToStartMessage(transformId, indexName) + logger.error(message, e) + stepStatus = StepStatus.FAILED + info = mapOf("message" to message) + } + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + val currentActionMetaData = currentMetadata.actionMetaData + val transformActionProperties = TransformActionProperties(transformId) + return currentMetadata.copy( + actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(transformActionProperties = transformActionProperties)), + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) + } + + override fun isIdempotent(): Boolean = true + + companion object { + const val name = "attempt_create_transform" + fun getFailedMessage(transformId: String, index: String) = "Failed to create the transform job [$transformId] [index=$index]" + fun getTransformJobAlreadyExistsMessage(transformId: String, index: String) = + "Transform job [$transformId] already exists, skipping creation [index=$index]" + fun getFailedToStartMessage(transformId: String, index: String) = "Failed to start the transform job [$transformId] [index=$index]" + fun getSuccessMessage(transformId: String, index: String) = "Successfully created the transform job [$transformId] [index=$index]" + fun getSuccessRestartMessage(transformId: String, index: String) = "Successfully restarted the transform job [$transformId] [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt new file mode 100644 index 000000000..78aa41f3d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/transform/WaitForTransformCompletionStep.kt @@ -0,0 +1,132 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step.transform + +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData +import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformAction +import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformRequest +import org.opensearch.indexmanagement.transform.action.explain.ExplainTransformResponse +import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.transport.RemoteTransportException + +@Suppress("ReturnCount") +class WaitForTransformCompletionStep : Step(name) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override suspend fun execute(): Step { + val context = this.context ?: return this + val indexName = context.metadata.index + val managedIndexMetadata = context.metadata + val transformJobId = managedIndexMetadata.actionMetaData?.actionProperties?.transformActionProperties?.transformId + + if (transformJobId == null) { + logger.error("No transform job id passed down.") + stepStatus = StepStatus.FAILED + info = mapOf("message" to getMissingTransformJobMessage(indexName)) + return this + } + + val explainTransformResponse = explainTransformJob(transformJobId, indexName, context) + // if explainTransform call failed, return early + explainTransformResponse ?: return this + + val explainTransform = explainTransformResponse.getIdsToExplain()[transformJobId] + if (explainTransform == null) { + logger.warn("Job $transformJobId is not found, mark step as COMPLETED.") + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getJobNotFoundMessage(transformJobId, indexName)) + return this + } + + if (explainTransform.metadata?.status == null) { + logger.warn("Job $transformJobId has not started yet") + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to getJobProcessingMessage(transformJobId, indexName)) + return this + } + + processTransformMetadataStatus(transformJobId, indexName, explainTransform.metadata) + return this + } + + private suspend fun explainTransformJob(transformJobId: String, indexName: String, context: StepContext): ExplainTransformResponse? { + val explainTransformRequest = ExplainTransformRequest(listOf(transformJobId)) + try { + val response = context.client.suspendUntil { + execute(ExplainTransformAction.INSTANCE, explainTransformRequest, it) + } + logger.info("Received the status for jobs [${response.getIdsToExplain().keys}]") + return response + } catch (e: RemoteTransportException) { + processFailure(transformJobId, indexName, ExceptionsHelper.unwrapCause(e) as Exception) + } catch (e: Exception) { + processFailure(transformJobId, indexName, e) + } + return null + } + + fun processTransformMetadataStatus(transformJobId: String, indexName: String, transformMetadata: TransformMetadata) { + when (transformMetadata.status) { + TransformMetadata.Status.INIT, TransformMetadata.Status.STARTED -> { + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to getJobProcessingMessage(transformJobId, indexName)) + } + TransformMetadata.Status.FAILED -> { + stepStatus = StepStatus.FAILED + info = mapOf("message" to getJobFailedMessage(transformJobId, indexName), "cause" to "${transformMetadata.failureReason}") + } + TransformMetadata.Status.FINISHED -> { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to getJobCompletionMessage(transformJobId, indexName)) + } + TransformMetadata.Status.STOPPED -> { + stepStatus = StepStatus.FAILED + info = mapOf("message" to getJobFailedMessage(transformJobId, indexName), "cause" to JOB_STOPPED_MESSAGE) + } + } + } + + fun processFailure(transformJobId: String, indexName: String, e: Exception) { + stepStatus = StepStatus.FAILED + val message = getFailedMessage(transformJobId, indexName) + logger.error(message, e) + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + + override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetadata.copy( + actionMetaData = currentMetadata.actionMetaData, + stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) + } + + override fun isIdempotent(): Boolean = true + + companion object { + const val name = "wait_for_transform_completion" + const val JOB_STOPPED_MESSAGE = "Transform job was stopped" + fun getFailedMessage(transformJob: String, index: String) = "Failed to get the status of transform job [$transformJob] [index=$index]" + fun getJobProcessingMessage(transformJob: String, index: String) = "Transform job [$transformJob] is still processing [index=$index]" + fun getJobCompletionMessage(transformJob: String, index: String) = "Transform job [$transformJob] completed [index=$index]" + fun getJobFailedMessage(transformJob: String, index: String) = "Transform job [$transformJob] failed [index=$index]" + fun getMissingTransformJobMessage(index: String) = "Transform job was not found [index=$index]" + fun getJobNotFoundMessage(transformJob: String, index: String) = "Transform job [$transformJob] is not found [index=$index]" + } +} diff --git a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt index 09461cfb0..80f460028 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/opensearchapi/OpenSearchExtensions.kt @@ -64,9 +64,9 @@ import kotlin.coroutines.suspendCoroutine const val OPENDISTRO_SECURITY_PROTECTED_INDICES_CONF_REQUEST = "_opendistro_security_protected_indices_conf_request" -fun contentParser(bytesReference: BytesReference): XContentParser { +fun contentParser(bytesReference: BytesReference, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): XContentParser { return XContentHelper.createParser( - NamedXContentRegistry.EMPTY, + xContentRegistry, LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt new file mode 100644 index 000000000..ecc092ac9 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransform.kt @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.model + +import org.apache.commons.codec.digest.DigestUtils +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.authuser.User +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.query.AbstractQueryBuilder +import org.opensearch.index.query.MatchAllQueryBuilder +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Dimension +import org.opensearch.indexmanagement.common.model.dimension.Histogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule +import org.opensearch.search.aggregations.AggregatorFactories +import java.io.IOException +import java.lang.StringBuilder +import java.time.Instant +import java.time.temporal.ChronoUnit +import kotlin.jvm.Throws + +data class ISMTransform( + val description: String, + val targetIndex: String, + val pageSize: Int, + val dataSelectionQuery: QueryBuilder = MatchAllQueryBuilder(), + val groups: List, + val aggregations: AggregatorFactories.Builder = AggregatorFactories.builder() +) : ToXContentObject, Writeable { + + init { + require(pageSize in Transform.MINIMUM_PAGE_SIZE..Transform.MAXIMUM_PAGE_SIZE) { + "Page size must be between ${Transform.MINIMUM_PAGE_SIZE} and ${Transform.MAXIMUM_PAGE_SIZE}" + } + require(description.isNotEmpty()) { "Description cannot be empty" } + require(targetIndex.isNotEmpty()) { "TargetIndex cannot be empty" } + require(groups.isNotEmpty()) { "Groups cannot be empty" } + aggregations.aggregatorFactories.forEach { + require(Transform.supportedAggregations.contains(it.type)) { + "Unsupported aggregation [${it.type}]" + } + } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field(Transform.DESCRIPTION_FIELD, description) + .field(Transform.TARGET_INDEX_FIELD, targetIndex) + .field(Transform.PAGE_SIZE_FIELD, pageSize) + .field(Transform.DATA_SELECTION_QUERY_FIELD, dataSelectionQuery) + .field(Transform.GROUPS_FIELD, groups) + .field(Transform.AGGREGATIONS_FIELD, aggregations) + builder.endObject() + return builder + } + + fun toTransform(sourceIndex: String, user: User? = null): Transform { + val id = sourceIndex + toString() + val currentTime = Instant.now() + return Transform( + id = DigestUtils.sha1Hex(id), + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + schemaVersion = IndexUtils.DEFAULT_SCHEMA_VERSION, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + metadataId = null, + updatedAt = currentTime, + enabled = true, + enabledAt = currentTime, + description = this.description, + sourceIndex = sourceIndex, + dataSelectionQuery = this.dataSelectionQuery, + targetIndex = this.targetIndex, + pageSize = pageSize, + continuous = false, + groups = this.groups, + aggregations = this.aggregations, + user = user + ) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + description = sin.readString(), + targetIndex = sin.readString(), + pageSize = sin.readInt(), + dataSelectionQuery = requireNotNull(sin.readOptionalNamedWriteable(QueryBuilder::class.java)) { "Query cannot be null" }, + groups = sin.let { + val dimensionList = mutableListOf() + val size = it.readVInt() + repeat(size) { _ -> + val type = it.readEnum(Dimension.Type::class.java) + dimensionList.add( + when (requireNotNull(type) { "Dimension type cannot be null" }) { + Dimension.Type.DATE_HISTOGRAM -> DateHistogram(sin) + Dimension.Type.TERMS -> Terms(sin) + Dimension.Type.HISTOGRAM -> Histogram(sin) + } + ) + } + dimensionList.toList() + }, + aggregations = requireNotNull(sin.readOptionalWriteable { AggregatorFactories.Builder(it) }) { "Aggregations cannot be null" } + ) + + override fun toString(): String { + val sbd = StringBuilder() + sbd.append(targetIndex) + sbd.append(pageSize) + sbd.append(dataSelectionQuery) + groups.forEach { + sbd.append(it.type) + sbd.append(it.sourceField) + } + sbd.append(aggregations) + + return sbd.toString() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(description) + out.writeString(targetIndex) + out.writeInt(pageSize) + out.writeOptionalNamedWriteable(dataSelectionQuery) + out.writeVInt(groups.size) + for (group in groups) { + out.writeEnum(group.type) + when (group) { + is DateHistogram -> group.writeTo(out) + is Terms -> group.writeTo(out) + is Histogram -> group.writeTo(out) + } + } + out.writeOptionalWriteable(aggregations) + } + + companion object { + @Suppress("ComplexMethod", "LongMethod") + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ISMTransform { + var description = "" + var targetIndex = "" + var pageSize = 0 + var dataSelectionQuery: QueryBuilder = MatchAllQueryBuilder() + val groups = mutableListOf() + var aggregations: AggregatorFactories.Builder = AggregatorFactories.builder() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + Transform.DESCRIPTION_FIELD -> description = xcp.text() + Transform.TARGET_INDEX_FIELD -> targetIndex = xcp.text() + Transform.PAGE_SIZE_FIELD -> pageSize = xcp.intValue() + Transform.DATA_SELECTION_QUERY_FIELD -> { + val registry = xcp.xContentRegistry + val source = xcp.mapOrdered() + val xContentBuilder = XContentFactory.jsonBuilder().map(source) + val sourceParser = XContentType.JSON.xContent().createParser( + registry, LoggingDeprecationHandler.INSTANCE, + BytesReference + .bytes(xContentBuilder).streamInput() + ) + dataSelectionQuery = AbstractQueryBuilder.parseInnerQueryBuilder(sourceParser) + } + Transform.GROUPS_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + groups.add(Dimension.parse(xcp)) + } + } + Transform.AGGREGATIONS_FIELD -> aggregations = AggregatorFactories.parseAggregators(xcp) + else -> throw IllegalArgumentException("Invalid field [$fieldName] found in ISM Transform.") + } + } + + return ISMTransform( + description = description, + targetIndex = targetIndex, + pageSize = pageSize, + dataSelectionQuery = dataSelectionQuery, + groups = groups, + aggregations = aggregations + ) + } + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 588e886b4..bdcdcea31 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 19 + "schema_version": 20 }, "dynamic": "strict", "properties": { @@ -455,6 +455,83 @@ } } }, + "transform": { + "properties": { + "ism_transform": { + "properties": { + "description": { + "type": "text" + }, + "target_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "page_size": { + "type": "long" + }, + "data_selection_query": { + "type": "object", + "enabled": false + }, + "groups": { + "properties": { + "date_histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "fixed_interval": { + "type": "keyword" + }, + "calendar_interval": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + } + } + }, + "terms": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + } + } + }, + "histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "interval": { + "type": "double" + } + } + } + } + }, + "aggregations": { + "type": "object", + "enabled": false + } + } + } + } + }, "shrink": { "properties": { "num_new_shards": { @@ -789,6 +866,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } diff --git a/src/main/resources/mappings/opendistro-ism-history.json b/src/main/resources/mappings/opendistro-ism-history.json index 0e7db6d40..6ce6e1779 100644 --- a/src/main/resources/mappings/opendistro-ism-history.json +++ b/src/main/resources/mappings/opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 6 + "schema_version": 7 }, "dynamic": "strict", "properties": { @@ -115,6 +115,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index b20fb9f0f..80813c19a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -28,6 +28,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.MediaType import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import java.io.IOException import java.nio.file.Files @@ -41,8 +42,8 @@ import javax.management.remote.JMXServiceURL abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 19 - val historySchemaVersion = 6 + val configSchemaVersion = 20 + val historySchemaVersion = 7 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as // they do not go through the pending task queue. Ideally this should probably be written in a way to wait for the @@ -229,6 +230,35 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) } + protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) { + // Before updating start time of a job always make sure there are no unassigned shards that could cause the config + // index to move to a new node and negate this forced start + if (isMultiNode) { + waitFor { + try { + client().makeRequest("GET", "_cluster/allocation/explain") + fail("Expected 400 Bad Request when there are no unassigned shards to explain") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + } + val intervalSchedule = (update.jobSchedule as IntervalSchedule) + val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() + val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis) + val waitForActiveShards = if (isMultiNode) "all" else "1" + val response = client().makeRequest( + "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", + StringEntity( + "{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" + + "\"$startTimeMillis\"}}}}}", + ContentType.APPLICATION_JSON + ) + ) + + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + } + override fun preserveIndicesUponCompletion(): Boolean = true companion object { val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 diff --git a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt index 87cbb21c0..f77a9be21 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt @@ -109,9 +109,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { private object TransformRestTestCaseExt : TransformRestTestCase() { - fun updateTransformStartTimeExt(update: Transform, desiredStartTimeMillis: Long? = null) = - super.updateTransformStartTime(update, desiredStartTimeMillis) - fun createTransformExt( transform: Transform, transformId: String = randomAlphaOfLength(10), @@ -310,9 +307,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { executeRequest(request, expectedStatus, userClient) } - protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) = - TransformRestTestCaseExt.updateTransformStartTimeExt(update, desiredStartTimeMillis) - protected fun createTransform( transform: Transform, transformId: String = randomAlphaOfLength(10), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index e6be5ce83..8f65fd46b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -19,6 +19,7 @@ import org.opensearch.client.Request import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient +import org.opensearch.cluster.ClusterModule import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -60,12 +61,15 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetry import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ValidationResult +import org.opensearch.indexmanagement.transform.model.Transform +import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestRequest +import org.opensearch.search.SearchModule import org.opensearch.core.rest.RestStatus import org.opensearch.indexmanagement.rollup.randomTermQuery import org.opensearch.test.OpenSearchTestCase @@ -843,6 +847,63 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() return metadata } + protected fun getTransform( + transformId: String, + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ): Transform { + val response = client().makeRequest("GET", "${IndexManagementPlugin.TRANSFORM_BASE_URI}/$transformId", null, header) + assertEquals("Unable to get transform $transformId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + var seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO + lateinit var transform: Transform + + while (parser.nextToken() != Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + _ID -> id = parser.text() + _SEQ_NO -> seqNo = parser.longValue() + _PRIMARY_TERM -> primaryTerm = parser.longValue() + Transform.TRANSFORM_TYPE -> transform = Transform.parse(parser, id, seqNo, primaryTerm) + } + } + return transform + } + + protected fun getTransformMetadata( + metadataId: String, + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ): TransformMetadata { + val response = client().makeRequest("GET", "$INDEX_MANAGEMENT_INDEX/_doc/$metadataId", null, header) + assertEquals("Unable to get transform metadata $metadataId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + ensureExpectedToken(Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + var seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO + lateinit var metadata: TransformMetadata + + while (parser.nextToken() != Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + _ID -> id = parser.text() + _SEQ_NO -> seqNo = parser.longValue() + _PRIMARY_TERM -> primaryTerm = parser.longValue() + TransformMetadata.TRANSFORM_METADATA_TYPE -> metadata = TransformMetadata.parse(parser, id, seqNo, primaryTerm) + } + } + + return metadata + } + protected fun deleteSnapshot(repository: String, snapshotName: String) { val response = client().makeRequest("DELETE", "_snapshot/$repository/$snapshotName") assertEquals("Unable to delete snapshot", RestStatus.OK, response.restStatus()) @@ -1080,4 +1141,13 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() throw OpenSearchParseException("Failed to parse content to list", e) } } + + override fun xContentRegistry(): NamedXContentRegistry { + return NamedXContentRegistry( + listOf( + ClusterModule.getNamedXWriteables(), + SearchModule(Settings.EMPTY, emptyList()).namedXContents + ).flatten() + ) + } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 7bb838f0a..2ad77d312 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -40,6 +40,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.common.model.notification.Channel +import org.opensearch.indexmanagement.indexstatemanagement.action.TransformAction import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Chime import org.opensearch.indexmanagement.indexstatemanagement.model.destination.CustomWebhook import org.opensearch.indexmanagement.indexstatemanagement.model.destination.Destination @@ -49,6 +50,7 @@ import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.rollup.randomISMRollup import org.opensearch.indexmanagement.spi.indexstatemanagement.Action import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.transform.randomISMTransform import org.opensearch.jobscheduler.spi.schedule.CronSchedule import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.jobscheduler.spi.schedule.Schedule @@ -200,6 +202,10 @@ fun randomRollupActionConfig(): RollupAction { return RollupAction(ismRollup = randomISMRollup(), index = 0) } +fun randomTransformActionConfig(): TransformAction { + return TransformAction(ismTransform = randomISMTransform(), index = 0) +} + fun randomCloseActionConfig(): CloseAction { return CloseAction(index = 0) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt new file mode 100644 index 000000000..a054d94bc --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt @@ -0,0 +1,382 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.action + +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.cluster.metadata.DataStream +import org.opensearch.index.query.MatchAllQueryBuilder +import org.opensearch.indexmanagement.common.model.dimension.DateHistogram +import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import org.opensearch.indexmanagement.indexstatemanagement.model.ISMTemplate +import org.opensearch.indexmanagement.indexstatemanagement.model.Policy +import org.opensearch.indexmanagement.indexstatemanagement.model.State +import org.opensearch.indexmanagement.indexstatemanagement.model.Transition +import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transition.AttemptTransitionStep +import org.opensearch.indexmanagement.makeRequest +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry +import org.opensearch.indexmanagement.transform.avgAggregation +import org.opensearch.indexmanagement.transform.maxAggregation +import org.opensearch.indexmanagement.transform.minAggregation +import org.opensearch.indexmanagement.transform.model.ISMTransform +import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.transform.sumAggregation +import org.opensearch.indexmanagement.transform.valueCountAggregation +import org.opensearch.indexmanagement.waitFor +import org.opensearch.search.aggregations.AggregatorFactories +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class TransformActionIT : IndexStateManagementRestTestCase() { + + private val testPrefix = javaClass.simpleName.lowercase(Locale.ROOT) + + companion object { + const val SOURCE_INDEX_MAPPING = """ + "properties": { + "timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "category": { + "type": "keyword" + }, + "value": { + "type": "long" + } + } + """ + } + + fun `test transform action`() { + val indexName = "${testPrefix}_index_basic" + val targetIndex = "${testPrefix}_target" + val policyId = "${testPrefix}_policy_basic" + + val ismTransform = prepareISMTransform(targetIndex) + val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformSucceeded(indexName, policyId, ismTransform) + } + + fun `test data stream transform action`() { + val dataStreamName = "${testPrefix}_data_stream" + val targetIndex = "${testPrefix}_target_data_stream" + val policyId = "${testPrefix}_policy_data_stream" + + val ismTransform = prepareISMTransform(targetIndex) + val policy = preparePolicyContainingTransform(dataStreamName, ismTransform, policyId) + createPolicy(policy, policyId) + createDataStream(dataStreamName) + + // assert transform works on backing indices of a data stream + val indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L) + assertIndexTransformSucceeded(indexName, policyId, ismTransform) + } + + fun `test transform action failure due to wrong source field`() { + val indexName = "${testPrefix}_index_failure" + val targetIndex = "${testPrefix}_target_failure" + val policyId = "${testPrefix}_policy_failure" + + val ismTransform = ISMTransform( + description = "test transform", + targetIndex = targetIndex, + pageSize = 100, + dataSelectionQuery = MatchAllQueryBuilder(), + groups = listOf( + DateHistogram(sourceField = "timestamp", fixedInterval = "1d"), + Terms(sourceField = "wrong_field", targetField = "wrong_field") + ), + aggregations = AggregatorFactories.builder() + .addAggregator(sumAggregation()) + .addAggregator(maxAggregation()) + .addAggregator(minAggregation()) + .addAggregator(avgAggregation()) + .addAggregator(valueCountAggregation()) + ) + val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformFailedInAttemptCreateTransformStep(indexName, policyId, ismTransform) + } + + fun `test transform action failed step got retried`() { + val indexName = "${testPrefix}_index_retry" + val targetIndex = "${testPrefix}_target_retry" + val policyId = "${testPrefix}_policy_retry" + + val ismTransform = ISMTransform( + description = "test transform", + targetIndex = targetIndex, + pageSize = 100, + dataSelectionQuery = MatchAllQueryBuilder(), + groups = listOf( + DateHistogram(sourceField = "timestamp", fixedInterval = "1d"), + Terms(sourceField = "wrong_field", targetField = "wrong_field") + ), + aggregations = AggregatorFactories.builder() + .addAggregator(sumAggregation()) + .addAggregator(maxAggregation()) + .addAggregator(minAggregation()) + .addAggregator(avgAggregation()) + .addAggregator(valueCountAggregation()) + ) + val transform = ismTransform.toTransform(indexName) + val policy = preparePolicyContainingTransform(indexName, ismTransform, policyId, retry = 1) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformFailedInAttemptCreateTransformStep(indexName, policyId, ismTransform) + + // verify the wait for transform completion step will be retried and failed again. + updateManagedIndexConfigStartTime(getExistingManagedIndexConfig(indexName)) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getFailedMessage(transform.id, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } + + fun `test policy succeeded when run the same transform job twice`() { + val indexName = "${testPrefix}_index_two_transforms" + val targetIndex = "${testPrefix}_target_two_transforms" + val policyId = "${testPrefix}_policy_two_transforms" + + val ismTransform = prepareISMTransform(targetIndex) + val policy = preparePolicyContainingTransformTwice(indexName, ismTransform, policyId) + createPolicy(policy, policyId) + createIndex(indexName, policyId, mapping = SOURCE_INDEX_MAPPING) + + assertIndexTransformSucceededTwice(indexName, policyId, ismTransform) + } + + // create an ISMTransform that matches SOURCE_INDEX_MAPPING + private fun prepareISMTransform(targetIndex: String): ISMTransform { + return ISMTransform( + description = "test transform", + targetIndex = targetIndex, + pageSize = 100, + dataSelectionQuery = MatchAllQueryBuilder(), + groups = listOf( + DateHistogram(sourceField = "timestamp", fixedInterval = "1d"), + Terms(sourceField = "category", targetField = "category") + ), + aggregations = AggregatorFactories.builder() + .addAggregator(sumAggregation()) + .addAggregator(maxAggregation()) + .addAggregator(minAggregation()) + .addAggregator(avgAggregation()) + .addAggregator(valueCountAggregation()) + ) + } + + private fun preparePolicyContainingTransform(indexName: String, ismTransform: ISMTransform, policyId: String, retry: Long = 0): Policy { + val actionConfig = TransformAction(ismTransform, 0) + actionConfig.configRetry = ActionRetry(retry) + val states = listOf( + State("transform", listOf(actionConfig), listOf()) + ) + return Policy( + id = policyId, + description = "test description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ismTemplate = listOf( + ISMTemplate( + indexPatterns = listOf(indexName), + priority = 100, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS) + ) + ) + ) + } + + private fun preparePolicyContainingTransformTwice( + indexName: String, + ismTransform: ISMTransform, + policyId: String, + retry: Long = 0 + ): Policy { + val actionConfig = TransformAction(ismTransform, 0) + actionConfig.configRetry = ActionRetry(retry) + val states = listOf( + State("transform1", listOf(actionConfig), listOf(Transition(stateName = "transform2", conditions = null))), + State("transform2", listOf(actionConfig), listOf()) + ) + return Policy( + id = policyId, + description = "test description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ismTemplate = listOf( + ISMTemplate( + indexPatterns = listOf(indexName), + priority = 100, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS) + ) + ) + ) + } + + private fun createDataStream(dataStreamName: String) { + // create an index template for data stream + client().makeRequest( + "PUT", + "/_index_template/${dataStreamName}_template", + StringEntity( + "{ " + + "\"index_patterns\": [ \"$dataStreamName\" ], " + + "\"data_stream\": { \"timestamp_field\": { \"name\": \"timestamp\" } }, " + + "\"template\": { \"mappings\": { $SOURCE_INDEX_MAPPING } } }", + ContentType.APPLICATION_JSON + ) + ) + // create data stream + client().makeRequest("PUT", "/_data_stream/$dataStreamName") + } + + private fun assertIndexTransformSucceeded(indexName: String, policyId: String, ismTransform: ISMTransform) { + val transform = ismTransform.toTransform(indexName) + val transformId = transform.id + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so that the policy will be initialized. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyId, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getSuccessMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + updateTransformStartTime(transform) + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + val transformJob = getTransform(transformId = transformId) + waitFor { + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + } + + private fun assertIndexTransformSucceededTwice(indexName: String, policyId: String, ismTransform: ISMTransform) { + val transform = ismTransform.toTransform(indexName) + val transformId = transform.id + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so that the policy will be initialized. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyId, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getSuccessMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + updateTransformStartTime(transform) + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + var transformJob = getTransform(transformId = transformId) + waitFor { + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + + // Change the start time so that the transition attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptTransitionStep.getSuccessMessage(indexName, "transform2"), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + // Change the start time so that the second transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getSuccessRestartMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + updateTransformStartTime(transform) + + // Change the start time so that the second transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + + transformJob = getTransform(transformId = transformId) + waitFor { + assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) + val transformMetadata = getTransformMetadata(transformJob.metadataId!!) + assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) + } + } + + private fun assertIndexTransformFailedInAttemptCreateTransformStep(indexName: String, policyId: String, ismTransform: ISMTransform) { + val transformId = ismTransform.toTransform(indexName).id + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so that the policy will be initialized. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyId, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so that the transform action will be attempted. + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateTransformJobStep.getFailedMessage(transformId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt index 178e93918..62be3917e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorTests.kt @@ -20,6 +20,7 @@ import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.indexstatemanagement.IndexMetadataProvider import org.opensearch.indexmanagement.indexstatemanagement.ManagedIndexCoordinator import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.opensearch.search.SearchModule import org.opensearch.test.ClusterServiceUtils import org.opensearch.test.OpenSearchTestCase import org.opensearch.threadpool.Scheduler @@ -68,7 +69,8 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() { clusterService = Mockito.spy(originClusterService) indexMetadataProvider = IndexMetadataProvider(settings, client, clusterService, mutableMapOf()) coordinator = ManagedIndexCoordinator( - settings, client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider + settings, client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, + NamedXContentRegistry(SearchModule(Settings.EMPTY, emptyList()).namedXContents) ) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt new file mode 100644 index 000000000..02945979e --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/AttemptCreateTransformJobStepTests.kt @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import org.opensearch.indexmanagement.indexstatemanagement.randomTransformActionConfig +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.TransformActionProperties +import org.opensearch.test.OpenSearchTestCase + +class AttemptCreateTransformJobStepTests : OpenSearchTestCase() { + + private val transformAction = randomTransformActionConfig() + private val indexName = "test" + private val transformId: String = transformAction.ismTransform.toTransform(indexName).id + private val metadata = ManagedIndexMetaData( + indexName, + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + AttemptCreateTransformJobStep.name, 1, 0, false, 0, null, + ActionProperties(transformActionProperties = TransformActionProperties(transformId)) + ), + null, + null, + null + ) + private val step = AttemptCreateTransformJobStep(transformAction) + + fun `test process failure`() { + step.processFailure(transformId, indexName, Exception("dummy-error")) + val updatedManagedIndexMedaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not FAILED", + Step.StepStatus.FAILED, + updatedManagedIndexMedaData.stepMetaData?.stepStatus + ) + assertEquals( + "Error message is not expected", + AttemptCreateTransformJobStep.getFailedMessage(transformId, indexName), + updatedManagedIndexMedaData.info?.get("message") + ) + assertNull( + "TransformId in action properties is not cleaned up", + updatedManagedIndexMedaData.actionMetaData?.actionProperties?.transformActionProperties?.transformId + ) + } + + fun `test isIdempotent`() { + assertTrue(step.isIdempotent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt new file mode 100644 index 000000000..d536c7449 --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/WaitForTransformCompletionStepTests.kt @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.indexstatemanagement.step + +import com.nhaarman.mockitokotlin2.mock +import kotlinx.coroutines.runBlocking +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.AttemptCreateTransformJobStep +import org.opensearch.indexmanagement.indexstatemanagement.step.transform.WaitForTransformCompletionStep +import org.opensearch.indexmanagement.transform.model.TransformMetadata +import org.opensearch.indexmanagement.transform.model.TransformStats +import org.opensearch.indexmanagement.spi.indexstatemanagement.Step +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext +import org.opensearch.indexmanagement.spi.indexstatemanagement.model.TransformActionProperties +import org.opensearch.indexmanagement.util.NO_ID +import org.opensearch.jobscheduler.spi.utils.LockService +import org.opensearch.script.ScriptService +import org.opensearch.test.OpenSearchTestCase +import java.time.Instant + +class WaitForTransformCompletionStepTests : OpenSearchTestCase() { + + private val clusterService: ClusterService = mock() + private val scriptService: ScriptService = mock() + private val settings: Settings = Settings.EMPTY + private val transformId: String = "dummy-id" + private val indexName: String = "test" + private val metadata = ManagedIndexMetaData( + indexName, + "indexUuid", + "policy_id", + null, + null, + null, + null, + null, + null, + null, + ActionMetaData( + AttemptCreateTransformJobStep.name, 1, 0, false, 0, null, + ActionProperties(transformActionProperties = TransformActionProperties(transformId)) + ), + null, + null, + null + ) + private val transformMetadata = TransformMetadata( + id = NO_ID, + transformId = transformId, + lastUpdatedAt = Instant.now(), + status = TransformMetadata.Status.FINISHED, + stats = TransformStats(1, 1, 1, 1, 1) + ) + private val client: Client = mock() + private val step = WaitForTransformCompletionStep() + private val lockService: LockService = LockService(mock(), clusterService) + + fun `test wait for transform when missing transform id`() { + val actionMetadata = metadata.actionMetaData!!.copy(actionProperties = ActionProperties()) + val metadata = metadata.copy(actionMetaData = actionMetadata) + val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) + val step = WaitForTransformCompletionStep() + + runBlocking { + step.preExecute(logger, context).execute() + } + + val updatedManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updatedManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing failure message", + WaitForTransformCompletionStep.getMissingTransformJobMessage(indexName), + updatedManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata FAILED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.FAILED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing failure message", + WaitForTransformCompletionStep.getJobFailedMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata STOPPED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.STOPPED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing failure message", + WaitForTransformCompletionStep.getJobFailedMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + assertEquals("Mismatch in cause", WaitForTransformCompletionStep.JOB_STOPPED_MESSAGE, updateManagedIndexMetaData.info?.get("cause")) + } + + fun `test process transform metadata INIT status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.INIT) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals( + "Step status is not CONDITION_NOT_MET", + Step.StepStatus.CONDITION_NOT_MET, + updateManagedIndexMetaData.stepMetaData?.stepStatus + ) + assertEquals( + "Missing processing message", + WaitForTransformCompletionStep.getJobProcessingMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata STARTED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.STARTED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not CONDITION_NOT_MET", Step.StepStatus.CONDITION_NOT_MET, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing processing message", + WaitForTransformCompletionStep.getJobProcessingMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process transform metadata FINISHED status`() { + val transformMetadata = transformMetadata.copy(status = TransformMetadata.Status.FINISHED) + step.processTransformMetadataStatus(transformId, indexName, transformMetadata) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Step status is not COMPLETED", Step.StepStatus.COMPLETED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + assertEquals( + "Missing processing message", + WaitForTransformCompletionStep.getJobCompletionMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + } + + fun `test process failure`() { + step.processFailure(transformId, indexName, Exception("dummy-exception")) + + val updateManagedIndexMetaData = step.getUpdatedManagedIndexMetadata(metadata) + assertEquals("Mismatch in cause", "dummy-exception", updateManagedIndexMetaData.info?.get("cause")) + assertEquals( + "Mismatch in message", + WaitForTransformCompletionStep.getFailedMessage(transformId, indexName), + updateManagedIndexMetaData.info?.get("message") + ) + assertEquals("Step status is not FAILED", Step.StepStatus.FAILED, updateManagedIndexMetaData.stepMetaData?.stepStatus) + } + + fun `test isIdempotent`() { + assertTrue(step.isIdempotent()) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt index 880f00238..415e44718 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TestHelpers.kt @@ -21,8 +21,10 @@ import org.opensearch.indexmanagement.randomSchedule import org.opensearch.indexmanagement.randomUser import org.opensearch.indexmanagement.rollup.randomAfterKey import org.opensearch.indexmanagement.rollup.randomDimension +import org.opensearch.indexmanagement.rollup.randomTermQuery import org.opensearch.indexmanagement.transform.model.ContinuousTransformStats import org.opensearch.indexmanagement.transform.model.ExplainTransform +import org.opensearch.indexmanagement.transform.model.ISMTransform import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.transform.model.TransformStats @@ -159,6 +161,17 @@ fun randomExplainTransform(): ExplainTransform { return ExplainTransform(metadataID = metadata.id, metadata = metadata) } +fun randomISMTransform(): ISMTransform { + return ISMTransform( + description = OpenSearchRestTestCase.randomAlphaOfLength(10), + targetIndex = OpenSearchRestTestCase.randomAlphaOfLength(10).lowercase(Locale.ROOT), + pageSize = OpenSearchRestTestCase.randomIntBetween(1, 10000), + groups = randomGroups(), + dataSelectionQuery = randomTermQuery(), + aggregations = randomAggregationFactories() + ) +} + fun Transform.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params).string() fun TransformMetadata.toJsonString(params: ToXContent.Params = ToXContent.EMPTY_PARAMS): String = this.toXContent(XContentFactory.jsonBuilder(), params) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt index 4d01e49b2..aaba661c3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRestTestCase.kt @@ -12,7 +12,6 @@ import org.apache.hc.core5.http.io.entity.StringEntity import org.apache.hc.core5.http.message.BasicHeader import org.junit.AfterClass import org.opensearch.client.Response -import org.opensearch.client.ResponseException import org.opensearch.client.RestClient import org.opensearch.common.settings.Settings import org.opensearch.core.xcontent.NamedXContentRegistry @@ -30,12 +29,8 @@ import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO -import org.opensearch.indexmanagement.waitFor -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus import org.opensearch.search.SearchModule -import java.time.Duration -import java.time.Instant abstract class TransformRestTestCase : IndexManagementRestTestCase() { @@ -221,35 +216,6 @@ abstract class TransformRestTestCase : IndexManagementRestTestCase() { return continuousStats["documents_behind"] as Map } - protected fun updateTransformStartTime(update: Transform, desiredStartTimeMillis: Long? = null) { - // Before updating start time of a job always make sure there are no unassigned shards that could cause the config - // index to move to a new node and negate this forced start - if (isMultiNode) { - waitFor { - try { - client().makeRequest("GET", "_cluster/allocation/explain") - fail("Expected 400 Bad Request when there are no unassigned shards to explain") - } catch (e: ResponseException) { - assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) - } - } - } - val intervalSchedule = (update.jobSchedule as IntervalSchedule) - val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() - val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis - val waitForActiveShards = if (isMultiNode) "all" else "1" - val response = client().makeRequest( - "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", - StringEntity( - "{\"doc\":{\"transform\":{\"schedule\":{\"interval\":{\"start_time\":" + - "\"$startTimeMillis\"}}}}}", - ContentType.APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - } - protected fun Transform.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON) override fun xContentRegistry(): NamedXContentRegistry { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt new file mode 100644 index 000000000..6129f453f --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/ISMTransformTests.kt @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.transform.model + +import org.opensearch.indexmanagement.transform.randomISMTransform +import org.opensearch.test.OpenSearchTestCase +import kotlin.test.assertFailsWith + +class ISMTransformTests : OpenSearchTestCase() { + + fun `test ism transform requires non empty description`() { + assertFailsWith(IllegalArgumentException::class, "Requires non empty description") { + randomISMTransform().copy(description = "") + } + } + + fun `test ism transform requires non empty target index`() { + assertFailsWith(IllegalArgumentException::class, "Requires non empty targetIndex") { + randomISMTransform().copy(targetIndex = "") + } + } + + fun `test ism transform requires non empty groups`() { + assertFailsWith(IllegalArgumentException::class, "Requires non empty groups") { + randomISMTransform().copy(groups = listOf()) + } + } + + fun `test ism transform requires page size between 1 and 10K`() { + assertFailsWith(IllegalArgumentException:: class, "Page size cannot be less than 1") { + randomISMTransform().copy(pageSize = -1) + } + + assertFailsWith(IllegalArgumentException:: class, "Page size cannot be less than 1") { + randomISMTransform().copy(pageSize = 0) + } + + assertFailsWith(IllegalArgumentException:: class, "Page size cannot be greater than 10000") { + randomISMTransform().copy(pageSize = 10001) + } + + randomISMTransform().copy(pageSize = 1) + randomISMTransform().copy(pageSize = 500) + randomISMTransform().copy(pageSize = 10000) + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt index 06c3a6b71..af1053641 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/model/WriteableTests.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.transform.model import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.indexmanagement.transform.buildStreamInputForTransforms +import org.opensearch.indexmanagement.transform.randomISMTransform import org.opensearch.indexmanagement.transform.randomTransform import org.opensearch.indexmanagement.transform.randomTransformMetadata import org.opensearch.test.OpenSearchTestCase @@ -36,4 +37,11 @@ class WriteableTests : OpenSearchTestCase() { @Suppress("DEPRECATION") assertTrue("roles field in transform model is deprecated and should be parsed to empty list.", streamedTransform.roles.isEmpty()) } + + fun `test ism transform as stream`() { + val ismTransform = randomISMTransform() + val out = BytesStreamOutput().also { ismTransform.writeTo(it) } + val streamedISMTransform = ISMTransform(buildStreamInputForTransforms(out)) + assertEquals("Round tripping ISMTransform stream doesn't work", ismTransform, streamedISMTransform) + } } diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 588e886b4..bdcdcea31 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 19 + "schema_version": 20 }, "dynamic": "strict", "properties": { @@ -455,6 +455,83 @@ } } }, + "transform": { + "properties": { + "ism_transform": { + "properties": { + "description": { + "type": "text" + }, + "target_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "page_size": { + "type": "long" + }, + "data_selection_query": { + "type": "object", + "enabled": false + }, + "groups": { + "properties": { + "date_histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "fixed_interval": { + "type": "keyword" + }, + "calendar_interval": { + "type": "keyword" + }, + "timezone": { + "type": "keyword" + } + } + }, + "terms": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + } + } + }, + "histogram": { + "properties": { + "source_field": { + "type": "keyword" + }, + "target_field": { + "type": "keyword" + }, + "interval": { + "type": "double" + } + } + } + } + }, + "aggregations": { + "type": "object", + "enabled": false + } + } + } + } + }, "shrink": { "properties": { "num_new_shards": { @@ -789,6 +866,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } diff --git a/src/test/resources/mappings/cached-opendistro-ism-history.json b/src/test/resources/mappings/cached-opendistro-ism-history.json index 0e7db6d40..6ce6e1779 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-history.json +++ b/src/test/resources/mappings/cached-opendistro-ism-history.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 6 + "schema_version": 7 }, "dynamic": "strict", "properties": { @@ -115,6 +115,10 @@ "shrink_action_properties": { "type": "object", "enabled": false + }, + "transform_action_properties": { + "type": "object", + "enabled": false } } } From 2241a292755466f4a29b0a38f7606f3f9be57c3d Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 11 Oct 2023 09:47:25 -0700 Subject: [PATCH 2/6] [Test] increase the wait time after transform job triggered (#999) Signed-off-by: bowenlan-amzn --- .../action/TransformActionIT.kt | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt index a054d94bc..df1efd98b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/TransformActionIT.kt @@ -26,6 +26,7 @@ import org.opensearch.indexmanagement.transform.avgAggregation import org.opensearch.indexmanagement.transform.maxAggregation import org.opensearch.indexmanagement.transform.minAggregation import org.opensearch.indexmanagement.transform.model.ISMTransform +import org.opensearch.indexmanagement.transform.model.Transform import org.opensearch.indexmanagement.transform.model.TransformMetadata import org.opensearch.indexmanagement.transform.sumAggregation import org.opensearch.indexmanagement.transform.valueCountAggregation @@ -274,6 +275,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() { updateTransformStartTime(transform) + assertTransformCompleted(transform) + // Change the start time so that the transform action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { @@ -282,13 +285,6 @@ class TransformActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - - val transformJob = getTransform(transformId = transformId) - waitFor { - assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) - val transformMetadata = getTransformMetadata(transformJob.metadataId!!) - assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - } } private fun assertIndexTransformSucceededTwice(indexName: String, policyId: String, ismTransform: ISMTransform) { @@ -308,7 +304,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - updateTransformStartTime(transform) + + assertTransformCompleted(transform) // Change the start time so that the transform action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) @@ -319,13 +316,6 @@ class TransformActionIT : IndexStateManagementRestTestCase() { ) } - var transformJob = getTransform(transformId = transformId) - waitFor { - assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) - val transformMetadata = getTransformMetadata(transformJob.metadataId!!) - assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) - } - // Change the start time so that the transition attempted. updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { @@ -343,7 +333,8 @@ class TransformActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - updateTransformStartTime(transform) + + assertTransformCompleted(transform) // Change the start time so that the second transform action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) @@ -353,9 +344,12 @@ class TransformActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } + } - transformJob = getTransform(transformId = transformId) - waitFor { + private fun assertTransformCompleted(transform: Transform) { + updateTransformStartTime(transform) + waitFor(timeout = Instant.ofEpochSecond(60)) { + val transformJob = getTransform(transformId = transform.id) assertNotNull("Transform job doesn't have metadata set", transformJob.metadataId) val transformMetadata = getTransformMetadata(transformJob.metadataId!!) assertEquals("Transform is not finished", TransformMetadata.Status.FINISHED, transformMetadata.status) From 64bf2504ef82906114a6017195bc4d6341c101c6 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Wed, 11 Oct 2023 16:18:13 -0700 Subject: [PATCH 3/6] Drafted 2.11 release notes. (#1004) * Drafted 2.11 release notes. Signed-off-by: AWSHurneyt * Drafted 2.11 release notes. Signed-off-by: AWSHurneyt --------- Signed-off-by: AWSHurneyt --- ...index-management.release-notes-2.11.0.0.md | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 release-notes/opensearch-index-management.release-notes-2.11.0.0.md diff --git a/release-notes/opensearch-index-management.release-notes-2.11.0.0.md b/release-notes/opensearch-index-management.release-notes-2.11.0.0.md new file mode 100644 index 000000000..3997113d0 --- /dev/null +++ b/release-notes/opensearch-index-management.release-notes-2.11.0.0.md @@ -0,0 +1,21 @@ +## Version 2.11.0.0 2023-10-11 + +Compatible with OpenSearch 2.10.0 + +### Maintenance +* Increment version to 2.11.0-SNAPSHOT. ([#922](https://github.com/opensearch-project/index-management/pull/922)) + +### Enhancements +* Provide unique id for each rollup job and add debug logs. ([#968](https://github.com/opensearch-project/index-management/pull/968)) + +### Bug fixes +* Fix auto managed index always have -2 seqNo bug. ([#924](https://github.com/opensearch-project/index-management/pull/924)) + +### Infrastructure +* Upload docker test cluster log. ([#964](https://github.com/opensearch-project/index-management/pull/964)) +* Reduce test running time. ([#965](https://github.com/opensearch-project/index-management/pull/965)) +* Parallel test run. ([#966](https://github.com/opensearch-project/index-management/pull/966)) +* Security test filtered. ([#969](https://github.com/opensearch-project/index-management/pull/969)) + +### Documentation +* Added 2.11 release notes. ([#1004](https://github.com/opensearch-project/index-management/pull/1004)) \ No newline at end of file From b7dcd7a98c69dde0e9085b6e8e4ceb8bb31511b5 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 12 Oct 2023 15:17:31 -0700 Subject: [PATCH 4/6] Refactor change policy API and the policy in managed index to be non-null (#967) * Refactor the policy to be non null in managed index config Signed-off-by: bowenlan-amzn * Update Signed-off-by: bowenlan-amzn * fix bug Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn --- .../DefaultIndexMetadataService.kt | 11 +- .../ManagedIndexCoordinator.kt | 2 +- .../ManagedIndexRunner.kt | 218 +++++++----------- .../model/ManagedIndexConfig.kt | 14 +- .../opensearchapi/OpenSearchExtensions.kt | 2 +- .../LegacyOpenDistroManagedIndexSettings.kt | 11 +- .../settings/ManagedIndexSettings.kt | 2 + .../TransportChangePolicyAction.kt | 2 +- .../action/explain/TransportExplainAction.kt | 4 +- .../TransportRetryFailedManagedIndexAction.kt | 2 +- .../util/ManagedIndexUtils.kt | 50 ++-- .../RollupFieldValueExpressionResolver.kt | 1 + .../indexstatemanagement/TestHelpers.kt | 11 +- .../resthandler/RestChangePolicyActionIT.kt | 16 +- .../util/ManagedIndexUtilsTests.kt | 3 +- 15 files changed, 145 insertions(+), 204 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt index fb6fa43c8..fdaa3f1be 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata -class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService { +class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService { /** * Returns the default index metadata needed for ISM @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index response.state.metadata.indices.forEach { // TODO waiting to add document count until it is definitely needed - val uuid = getCustomIndexUUID(it.value) + val uuid = getIndexUUID(it.value) val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1) indexNameToMetadata[it.key] = indexMetadata } @@ -48,11 +48,10 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index } /* - * If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if - * present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off - * cluster and back while using this persistent uuid. + * This method prioritize the custom index setting provided by extension to decide the index UUID + * Custom index UUID is needed when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata */ - fun getCustomIndexUUID(indexMetadata: IndexMetadata): String { + fun getIndexUUID(indexMetadata: IndexMetadata): String { return if (customUUIDSetting != null) { indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID) } else { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index a4689f411..341ccf62b 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -301,7 +301,7 @@ class ManagedIndexCoordinator( // If there is a custom index uuid associated with the index, we do not auto manage it // This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index val indexMetadata = clusterState.metadata.index(indexName) - val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID + val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID val ismIndexMetadata = ismIndicesMetadata[indexName] // We try to find lookup name instead of using index name as datastream indices need the alias to match policy val lookupName = findIndexLookupName(indexName, clusterState) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 3ad6d1be5..3825fa509 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -95,7 +95,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus -import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict +import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentPolicyVersion import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -262,12 +262,11 @@ object ManagedIndexRunner : // Check the cluster state for the index metadata val clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService - val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) } - // If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but - // the cluster state index uuid differs from the one in the managed index config then the config is referring - // to a different index which does not exist in the cluster. We need to check all the extensions to confirm an index exists + val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getIndexUUID(it) } + // If the index metadata is null, the index is not in the cluster state. + // If the index metadata is not null, and the index uuid differs from the one in the managed index config + // These mean this managed index could be a different index type and should use extensions to check if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { - // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeISMIndexMetadata(nonDefaultIndexTypes, listOf(managedIndexConfig.index)) @@ -281,20 +280,18 @@ object ManagedIndexRunner : } } - // If policy or managedIndexMetaData is null then initialize - val policy = managedIndexConfig.policy - if (policy == null || managedIndexMetaData == null) { - initManagedIndex(managedIndexConfig, managedIndexMetaData) + if (managedIndexMetaData == null) { + initManagedIndex(managedIndexConfig) return } - // If the policy was completed or failed then return early and disable job so it stops scheduling work + // If the policy was completed or failed then return early and disable job, so it stops scheduling work if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) { disableManagedIndexConfig(managedIndexConfig) return } - if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) { + if (managedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig)) { val info = mapOf("message" to "There is a version conflict between your previous execution and your managed index") val result = updateManagedIndexMetaData( managedIndexMetaData.copy( @@ -308,6 +305,7 @@ object ManagedIndexRunner : return } + val policy = managedIndexConfig.policy val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( @@ -330,11 +328,12 @@ object ManagedIndexRunner : managedIndexMetaData .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) + if (updated.metadataSaved) + disableManagedIndexConfig(managedIndexConfig) return } - if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { + if (managedIndexConfig.shouldChangePolicy(action)) { initChangePolicy(managedIndexConfig, managedIndexMetaData, action) return } @@ -375,7 +374,7 @@ object ManagedIndexRunner : } // If this action is not allowed and the step to be executed is the first step in the action then we will fail - // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight + // as this action has been removed from the AllowList, but if it's not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( @@ -387,8 +386,8 @@ object ManagedIndexRunner : return } - // If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step) + // If any of State, Action, Step components come back as null, then we are moving to error in ManagedIndexMetaData val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData) @Suppress("ComplexCondition", "MaxLineLength") @@ -402,7 +401,7 @@ object ManagedIndexRunner : actionValidation.validate(action.type, stepContext.metadata.index) } if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) { - logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name") + logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.", action.type, state.name, step.name) publishErrorNotification(policy, managedIndexMetaData) return } @@ -464,32 +463,25 @@ object ManagedIndexRunner : } } - private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) { - var policy: Policy? = managedIndexConfig.policy - val policyID = managedIndexConfig.changePolicy?.policyID ?: managedIndexConfig.policyID - // If policy does not currently exist, we need to save the policy on the ManagedIndexConfig for the first time - // or if a change policy exists then we will also execute the change as we are still in initialization phase - if (policy == null || managedIndexConfig.changePolicy != null) { - // Get the policy by the name unless a ChangePolicy exists then allow the change to happen during initialization - policy = getPolicy(policyID) - // Attempt to save the policy - if (policy != null) { - val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy) - // If we failed to save the policy, don't initialize ManagedIndexMetaData - if (!saved) return + private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { + val policy: Policy = managedIndexConfig.policy + var metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policy.id) + + // User may change policy before first metadata initialization + if (managedIndexConfig.changePolicy != null) { + val policyID = managedIndexConfig.changePolicy.policyID + val newPolicy = getPolicy(policyID) + if (newPolicy != null) { + val saved = savePolicyToManagedIndexConfig(managedIndexConfig, newPolicy) + if (!saved) { + logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})") + return + } } - // If we failed to get the policy then we will update the ManagedIndexMetaData with error info - } - - // at this point we either successfully saved the policy or we failed to get the policy - val updatedManagedIndexMetaData = if (policy == null) { - getFailedInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policyID) - } else { - // Initializing ManagedIndexMetaData for the first time - getInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policy) + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, newPolicy, policyID) } - updateManagedIndexMetaData(updatedManagedIndexMetaData, create = managedIndexMetaData == null) + updateManagedIndexMetaData(metadata, create = true) } @Suppress("ReturnCount", "BlockingMethodInNonBlockingContext") @@ -568,44 +560,35 @@ object ManagedIndexRunner : } } - private suspend fun getFailedInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, - managedIndexConfig: ManagedIndexConfig, - policyID: String - ): ManagedIndexMetaData { - // we either haven't initialized any metadata yet or we have already initialized metadata but still have no policy - return managedIndexMetaData?.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) ?: ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policyID, - policySeqNo = null, - policyPrimaryTerm = null, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = null, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) - } - @Suppress("ComplexMethod") private suspend fun getInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, managedIndexConfig: ManagedIndexConfig, - policy: Policy + policy: Policy?, + policyID: String, ): ManagedIndexMetaData { - val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState - val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) - - return when { - managedIndexMetaData == null -> ManagedIndexMetaData( + val indexCreationDate = getIndexCreationDate(managedIndexConfig) + if (policy == null) { + // We check policy existence in change policy API, but it maybe deleted after that + return ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policyID, + policySeqNo = null, + policyPrimaryTerm = null, + policyCompleted = false, + rolledOver = false, + indexCreationDate = indexCreationDate, + transitionTo = null, + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), + info = mapOf("message" to "Fail to load policy: $policyID") + ) + } else { + val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState + val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) + return ManagedIndexMetaData( index = managedIndexConfig.index, indexUuid = managedIndexConfig.indexUuid, policyID = policy.id, @@ -613,7 +596,7 @@ object ManagedIndexRunner : policyPrimaryTerm = policy.primaryTerm, policyCompleted = false, rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), + indexCreationDate = indexCreationDate, transitionTo = null, stateMetaData = stateMetaData, actionMetaData = null, @@ -621,39 +604,6 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), info = mapOf("message" to "Successfully initialized policy: ${policy.id}") ) - managedIndexMetaData.policySeqNo == null || managedIndexMetaData.policyPrimaryTerm == null -> - // If there is seqNo and PrimaryTerm it is first time populating Policy. - managedIndexMetaData.copy( - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - stateMetaData = stateMetaData, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - // this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm - // in the metadata, in this case we just want to say we successfully initialized the policy again but we will not - // modify the state, action, etc. so it can resume where it left off - managedIndexMetaData.policySeqNo == policy.seqNo && - managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm && - managedIndexMetaData.policyID == policy.id -> - // If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue. - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - else -> - // else this means we either tried to load a policy with a different id, seqno, or primaryterm from what is - // in the metadata and we cannot guarantee it will work with the current state in managedIndexMetaData - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf( - "message" to "Fail to load policy: ${policy.id} with " + - "seqNo ${policy.seqNo} and primaryTerm ${policy.primaryTerm} as it" + - " does not match what's in the metadata [policyID=${managedIndexMetaData.policyID}," + - " policySeqNo=${managedIndexMetaData.policySeqNo}, policyPrimaryTerm=${managedIndexMetaData.policyPrimaryTerm}]" - ) - ) } } @@ -717,17 +667,17 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null val changePolicy = managedIndexConfig.changePolicy + // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null if (changePolicy == null) { - logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig") + logger.debug( + "initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}", + managedIndexConfig + ) return } - // get the policy we'll attempt to change to val policy = getPolicy(changePolicy.policyID) - // update the ManagedIndexMetaData with new information val updatedManagedIndexMetaData = if (policy == null) { managedIndexMetaData.copy( @@ -735,17 +685,18 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0) ) } else { - // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are - // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase - val newTransitionMetaData = ActionMetaData( - TransitionsAction.name, Instant.now().toEpochMilli(), -1, - false, 0, 0, null - ) + // 1. entering transition action in this run + // 2. has been in transition action + // Refresh the transition action metadata, meaning we start the transition for change policy val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { - newTransitionMetaData + ActionMetaData( + TransitionsAction.name, Instant.now().toEpochMilli(), -1, + false, 0, 0, null + ) } else { managedIndexMetaData.actionMetaData } + managedIndexMetaData.copy( info = mapOf("message" to "Attempting to change policy to ${policy.id}"), transitionTo = changePolicy.state, @@ -758,31 +709,32 @@ object ManagedIndexRunner : ) } - // check if the safe flag was set by the Change Policy REST API, if it was then do a second validation - // before allowing a change to happen + /** + * The freshness of isSafe may change between runs, and we use it to decide whether to enter this method + * n [shouldChangePolicy]. So here we check the safeness again + */ if (changePolicy.isSafe) { - // if policy is null then we are only updating error information in metadata so its fine to continue + // if policy is null then we are only updating error information in metadata, so it's fine to continue if (policy != null) { - // current policy being null should never happen as we have a check at the top of runner - // if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution - if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) { + // if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution + if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) return } } } - /* - * Try to update the ManagedIndexMetaData in cluster state, we need to do this first before updating the - * ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next - * execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and - * could fail to update the ManagedIndexMetaData which would put us in a bad state - * */ + /** + * Try to update the ManagedIndexMetaData, we need to do this first before updating the + * ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next + * execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and + * could fail to update the ManagedIndexMetaData which would put us in a bad state + */ val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData) if (!updated.metadataSaved || policy == null) return - // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job + // Change the policy and user stored on the job, this will also set the changePolicy to null on the job savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index 7e8d36268..d92af725e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -35,7 +35,7 @@ data class ManagedIndexConfig( val policyID: String, val policySeqNo: Long?, val policyPrimaryTerm: Long?, - val policy: Policy?, + val policy: Policy, val changePolicy: ChangePolicy?, val jobJitter: Double? ) : ScheduledJobParameter { @@ -177,11 +177,13 @@ data class ManagedIndexConfig( policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" }, policySeqNo = policySeqNo, policyPrimaryTerm = policyPrimaryTerm, - policy = policy?.copy( - id = policyID, - seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, - primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ), + policy = requireNotNull( + policy?.copy( + id = policyID, + seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + ) { "ManagedIndexConfig policy is null" }, changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 60b84bf7b..ceba19187 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -67,7 +67,7 @@ fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: D indexMetadatas.forEach { // it.key is index name if (it.value.state == IndexMetadata.State.CLOSE) { - closeList.add(defaultIndexMetadataService.getCustomIndexUUID(it.value)) + closeList.add(defaultIndexMetadataService.getIndexUUID(it.value)) } } return closeList diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt index 101177465..7858ef93d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt @@ -7,19 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings import org.opensearch.common.settings.Setting import org.opensearch.common.unit.TimeValue -import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_ALL +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST_NONE import java.util.concurrent.TimeUnit import java.util.function.Function @Suppress("UtilityClassWithPublicConstructor") class LegacyOpenDistroManagedIndexSettings { companion object { - const val DEFAULT_ISM_ENABLED = true - const val DEFAULT_JOB_INTERVAL = 5 - private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() - val ALLOW_LIST_NONE = emptyList() - val SNAPSHOT_DENY_LIST_NONE = emptyList() - val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( "opendistro.index_state_management.enabled", DEFAULT_ISM_ENABLED, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index 62f6408ff..125844f4a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings import org.opensearch.common.settings.Setting import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import java.util.function.Function @Suppress("UtilityClassWithPublicConstructor") @@ -19,6 +20,7 @@ class ManagedIndexSettings { const val DEFAULT_JITTER = 0.6 const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX" val ALLOW_LIST_NONE = emptyList() + val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() val SNAPSHOT_DENY_LIST_NONE = emptyList() val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index b697aeea7..1e9d09a0c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -238,7 +238,7 @@ class TransportChangePolicyAction @Inject constructor( val clusterState = response.state val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService clusterState.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } // ISMIndexMetadata from the default index metadata service uses lenient expand, we want to use strict expand, filter diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index ec32bcbd9..f30a0422a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -199,10 +199,10 @@ class TransportExplainAction @Inject constructor( "enabled" to managedIndex.enabled.toString() ) if (showPolicy) { - managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it } + managedIndex.policy.let { appliedPolicies[managedIndex.index] = it } } if (validateAction) { - managedIndex.policy?.let { policiesforValidation[managedIndex.index] = it } + managedIndex.policy.let { policiesforValidation[managedIndex.index] = it } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt index b12b73468..4d85b1c02 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt @@ -167,7 +167,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( override fun onResponse(response: ClusterStateResponse) { val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService response.state.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } processResponse() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index ee2762375..ebb2d7ee5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -61,7 +61,7 @@ fun managedIndexConfigIndexRequest( uuid: String, policyID: String, jobInterval: Int, - policy: Policy? = null, + policy: Policy, jobJitter: Double? ): IndexRequest { val managedIndexConfig = ManagedIndexConfig( @@ -74,8 +74,8 @@ fun managedIndexConfigIndexRequest( jobEnabledTime = Instant.now(), policyID = policyID, policy = policy, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, changePolicy = null, jobJitter = jobJitter ) @@ -380,18 +380,16 @@ val ManagedIndexMetaData.isPolicyCompleted: Boolean get() = this.policyCompleted == true /** - * We will change the policy if a change policy exists and if we are currently in - * a Transitions action (which means we're safely at the end of a state). If a - * transitionTo exists on the [ManagedIndexMetaData] it should still be fine to - * change policy as we have not actually transitioned yet. If the next action is Transition - * or if the rest API determined it was "safe", meaning the new policy has the same structure + * We will change the policy if a change policy exists and if we are currently in a Transitions action + * which means we're safely at the end of a state. + * + * If the next action is Transition or if the rest API determined it was "safe", meaning the new policy has the same structure * of the current state, it should be safe to immediately change (even in the middle of the state). * - * @param managedIndexMetaData current [ManagedIndexMetaData] * @return {@code true} if we should change policy, {@code false} if not */ @Suppress("ReturnCount") -fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action?): Boolean { +fun ManagedIndexConfig.shouldChangePolicy(actionToExecute: Action?): Boolean { if (this.changePolicy == null) { return false } @@ -400,25 +398,14 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta return true } - // we need this in so that we can change policy before the first transition happens so policy doesnt get completed - // before we have a chance to change policy - if (actionToExecute?.type == TransitionsAction.name) { - return true - } - - if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) { - return false - } - - return true + return actionToExecute?.type == TransitionsAction.name } -fun ManagedIndexMetaData.hasVersionConflict(managedIndexConfig: ManagedIndexConfig): Boolean = +fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean = this.policySeqNo != managedIndexConfig.policySeqNo || this.policyPrimaryTerm != managedIndexConfig.policyPrimaryTerm fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { - val schedule = this.schedule - when (schedule) { + when (val schedule = this.schedule) { is IntervalSchedule -> { return schedule.interval != jobInterval } @@ -427,13 +414,13 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { } /** - * A policy is safe to change to a new policy when each policy has the current state - * the [ManagedIndexConfig] is in and that state has the same actions in the same order. + * A policy is safe to change to a new policy when + * both policies have the current state the [ManagedIndexConfig] is in and that state has the same actions in the same order. * This allows simple things like configuration updates to happen which won't break the execution/contract * between [ManagedIndexMetaData] and [ManagedIndexConfig] as the metadata only knows about the current state. - * We never consider a policy safe to immediately change if the ChangePolicy contains a state to transition to - * as this could transition a user into a different state from the middle of the current state which we do not - * want to allow. + * + * If the ChangePolicy contains a state to transition to, we don't consider it's safe to change here + * as this may transition a user into a different state from the middle of the current state. * * @param stateName the name of the state the [ManagedIndexConfig] is currently in * @param newPolicy the new (actual data model) policy we will eventually try to change to @@ -442,20 +429,19 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { */ @Suppress("ReturnCount") fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean { - // if stateName is null it means we either have not initialized the job (no metadata to pull stateName from) + // if stateName is null it means we either have not initialized the job // or we failed to load the initial policy, both cases its safe to change the policy if (stateName == null) return true if (changePolicy.state != null) return false + val currentState = this.states.find { it.name == stateName } val newState = newPolicy.states.find { it.name == stateName } if (currentState == null || newState == null) { return false } - if (currentState.actions.size != newState.actions.size) { return false } - currentState.actions.forEachIndexed { index, action -> val newStateAction = newState.actions[index] if (action.type != newStateAction.type) return@isSafeToChange false diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index f800b54cd..8446be029 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -24,6 +24,7 @@ object RollupFieldValueExpressionResolver { private lateinit var scriptService: ScriptService private lateinit var clusterService: ClusterService lateinit var indexAliasUtils: IndexAliasUtils + fun resolve(rollup: Rollup, fieldValue: String): String { val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 2ad77d312..c5c261c47 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -325,8 +325,7 @@ fun randomManagedIndexConfig( schedule: Schedule = IntervalSchedule(Instant.ofEpochMilli(Instant.now().toEpochMilli()), 5, ChronoUnit.MINUTES), lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - policy: Policy? = randomPolicy(), + policy: Policy = randomPolicy(), changePolicy: ChangePolicy? = randomChangePolicy(), jitter: Double? = 0.0 ): ManagedIndexConfig { @@ -338,10 +337,10 @@ fun randomManagedIndexConfig( jobSchedule = schedule, jobLastUpdatedTime = lastUpdatedTime, jobEnabledTime = enabledTime, - policyID = policy?.id ?: policyID, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, - policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + policy = policy.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index 17f96cda6..eba50e92b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -220,7 +220,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, updatedManagedIndexConfig.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + updatedManagedIndexConfig.policy.description + ) } fun `test changing policy on a valid index and log pattern`() { @@ -301,7 +304,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) // After first execution we should expect the change policy to still be null (since we haven't called it yet) - // and the initial policy should of been cached + // and the initial policy should have been cached val executedManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Executed managed index config is null", config) @@ -346,7 +349,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // speed up to second execution we will have a ChangePolicy but not be in Transitions yet // which means we should still execute the ReadOnlyAction updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Next managed index config is null", config) @@ -386,7 +388,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // speed up to third execution so that we try to move to transitions and trigger a change policy updateManagedIndexConfigStartTime(managedIndexConfig) - val changedManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Changed managed index config is null", config) @@ -512,9 +513,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { RestRequest.Method.POST.toString(), "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity() ) - assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList(), UPDATED_INDICES to 1), response.asMap()) - waitFor { assertNotNull(getExistingManagedIndexConfig(index).changePolicy) } // speed up to first execution where we initialize the policy on the job @@ -529,7 +528,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, config.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + config.policy.description + ) config } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 3dd7b20ec..d6dfed16a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.randomClusterStateManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy import org.opensearch.indexmanagement.indexstatemanagement.randomSweptManagedIndexConfig import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.test.OpenSearchTestCase @@ -34,7 +35,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { val index = randomAlphaOfLength(10) val uuid = randomAlphaOfLength(10) val policyID = randomAlphaOfLength(10) - val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, jobJitter = 0.0) + val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, randomPolicy(), jobJitter = 0.0) assertNotNull("IndexRequest not created", createRequest) assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, createRequest.index()) From 13601ebdd565612c3204d7a70c214e97001035ee Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 12 Oct 2023 18:30:13 -0700 Subject: [PATCH 5/6] Add more error notification at fail points (#1000) * Add more error notification at fail points Signed-off-by: bowenlan-amzn * Handle exception gracefully Signed-off-by: bowenlan-amzn * small fix Signed-off-by: bowenlan-amzn --------- Signed-off-by: bowenlan-amzn --- .../ManagedIndexRunner.kt | 69 +++++++++++++------ 1 file changed, 48 insertions(+), 21 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 3825fa509..e3bf7e4aa 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -291,6 +291,7 @@ object ManagedIndexRunner : return } + val policy = managedIndexConfig.policy if (managedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig)) { val info = mapOf("message" to "There is a version conflict between your previous execution and your managed index") val result = updateManagedIndexMetaData( @@ -301,11 +302,11 @@ object ManagedIndexRunner : ) if (result.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) + publishErrorNotification(policy, managedIndexMetaData) } return } - val policy = managedIndexConfig.policy val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( @@ -328,8 +329,10 @@ object ManagedIndexRunner : managedIndexMetaData .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info) ) - if (updated.metadataSaved) + if (updated.metadataSaved) { disableManagedIndexConfig(managedIndexConfig) + publishErrorNotification(policy, managedIndexMetaData) + } return } @@ -355,7 +358,10 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info ) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) + if (updated.metadataSaved) { + disableManagedIndexConfig(managedIndexConfig) + publishErrorNotification(policy, managedIndexMetaData) + } return } } @@ -369,7 +375,10 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info ) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) + if (updated.metadataSaved) { + disableManagedIndexConfig(managedIndexConfig) + publishErrorNotification(policy, managedIndexMetaData) + } return } @@ -382,7 +391,10 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info ) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) + if (updated.metadataSaved) { + disableManagedIndexConfig(managedIndexConfig) + publishErrorNotification(policy, managedIndexMetaData) + } return } @@ -424,16 +436,7 @@ object ManagedIndexRunner : var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step) if (executedManagedIndexMetaData.isFailed) { - try { - // if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message - publishErrorNotification(policy, executedManagedIndexMetaData) - } catch (e: Exception) { - logger.error("Failed to publish error notification", e) - val errorMessage = e.message ?: "Failed to publish error notification" - val mutableInfo = executedManagedIndexMetaData.info?.toMutableMap() ?: mutableMapOf() - mutableInfo["errorNotificationFailure"] = errorMessage - executedManagedIndexMetaData = executedManagedIndexMetaData.copy(info = mutableInfo.toMap()) - } + executedManagedIndexMetaData = publishErrorNotification(policy, executedManagedIndexMetaData) } if (executedManagedIndexMetaData.isSuccessfulDelete) { @@ -753,13 +756,37 @@ object ManagedIndexRunner : } } - private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) { - policy.errorNotification?.run { - errorNotificationRetryPolicy.retry(logger) { - val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData) - destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client) - channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user) + private suspend fun publishErrorNotification(policy: Policy, metadata: ManagedIndexMetaData): ManagedIndexMetaData { + return try { + val errorNotification = policy.errorNotification + if (errorNotification != null) { + policy.errorNotification.run { + errorNotificationRetryPolicy.retry(logger) { + val compiledMessage = compileTemplate(messageTemplate, metadata) + destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client) + channel?.sendNotification( + client, + ErrorNotification.CHANNEL_TITLE, + metadata, + compiledMessage, + policy.user + ) + } + } + val message = "Successfully published error notification [index = ${metadata.index}]" + logger.info(message) + val mutableInfo = metadata.info?.toMutableMap() ?: mutableMapOf() + mutableInfo["error_notification"] = message + metadata.copy(info = mutableInfo.toMap()) + } else { + return metadata } + } catch (e: Exception) { + logger.error("Failed to publish error notification", e) + val errorMessage = e.message ?: "Failed to publish error notification" + val mutableInfo = metadata.info?.toMutableMap() ?: mutableMapOf() + mutableInfo["error_notification"] = errorMessage + metadata.copy(info = mutableInfo.toMap()) } } From d42cc69f8202ea5c7732f6db6c2543a7376d0f30 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Thu, 12 Oct 2023 18:30:48 -0700 Subject: [PATCH 6/6] fix the race condition in test reset action start time (#1007) Signed-off-by: bowenlan-amzn --- .../RestRetryFailedManagedIndexActionIT.kt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt index 0cf0e97cf..431115c1a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestRetryFailedManagedIndexActionIT.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde import org.opensearch.indexmanagement.waitFor import org.opensearch.rest.RestRequest import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.indexstatemanagement.step.forcemerge.AttemptSetReadOnlyStep import java.time.Instant import java.util.Locale @@ -257,6 +258,7 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { val action = randomForceMergeActionConfig(maxNumSegments = 1) action.configRetry = ActionRetry(0) val policy = randomPolicy(states = listOf(randomState(actions = listOf(action)))) + createPolicy(policy, policyId = policyID) createIndex(indexName, policyID) @@ -264,13 +266,11 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { // init policy on job updateManagedIndexConfigStartTime(managedIndexConfig) - // verify we have policy waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // speed up to execute set read only force merge step updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertPredicatesOnMetaData( listOf( @@ -294,7 +294,6 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { // speed up to execute attempt call force merge step updateManagedIndexConfigStartTime(managedIndexConfig) - // verify failed and save the startTime var firstStartTime: Long = Long.MAX_VALUE waitFor { @@ -331,7 +330,6 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { FAILED_INDICES to emptyList>() ) assertAffectedIndicesResponseIsEqual(expectedErrorMessage, response.asMap()) - // verify actionStartTime was reset to null assertPredicatesOnMetaData( listOf( @@ -348,7 +346,6 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { // should execute and set the startTime again updateManagedIndexConfigStartTime(managedIndexConfig) - // the new startTime should be greater than the first start time waitFor { assertPredicatesOnMetaData( @@ -363,6 +360,10 @@ class RestRetryFailedManagedIndexActionIT : IndexStateManagementRestTestCase() { ), getExplainMap(indexName), false ) + assertEquals( + AttemptSetReadOnlyStep.getSuccessMessage(indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) } } }