From e4a86e4825d1ff1879e92fbed0b8b4e91dd4e838 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Sun, 1 Oct 2023 12:07:38 -0700 Subject: [PATCH 1/3] Refactor the policy to be non null in managed index config Signed-off-by: bowenlan-amzn --- .../DefaultIndexMetadataService.kt | 11 +- .../ManagedIndexCoordinator.kt | 2 +- .../ManagedIndexRunner.kt | 201 +++++++----------- .../model/ManagedIndexConfig.kt | 14 +- .../opensearchapi/OpenSearchExtensions.kt | 2 +- .../LegacyOpenDistroManagedIndexSettings.kt | 11 +- .../settings/ManagedIndexSettings.kt | 2 + .../TransportChangePolicyAction.kt | 2 +- .../action/explain/TransportExplainAction.kt | 4 +- .../TransportRetryFailedManagedIndexAction.kt | 2 +- .../util/ManagedIndexUtils.kt | 23 +- .../RollupFieldValueExpressionResolver.kt | 1 + .../indexstatemanagement/TestHelpers.kt | 11 +- .../resthandler/RestChangePolicyActionIT.kt | 16 +- .../util/ManagedIndexUtilsTests.kt | 3 +- 15 files changed, 130 insertions(+), 175 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt index fb6fa43c8..fdaa3f1be 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/DefaultIndexMetadataService.kt @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata -class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService { +class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService { /** * Returns the default index metadata needed for ISM @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index response.state.metadata.indices.forEach { // TODO waiting to add document count until it is definitely needed - val uuid = getCustomIndexUUID(it.value) + val uuid = getIndexUUID(it.value) val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1) indexNameToMetadata[it.key] = indexMetadata } @@ -48,11 +48,10 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index } /* - * If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if - * present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off - * cluster and back while using this persistent uuid. + * This method prioritize the custom index setting provided by extension to decide the index UUID + * Custom index UUID is needed when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata */ - fun getCustomIndexUUID(indexMetadata: IndexMetadata): String { + fun getIndexUUID(indexMetadata: IndexMetadata): String { return if (customUUIDSetting != null) { indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID) } else { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 961a56057..9f6a070c5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -299,7 +299,7 @@ class ManagedIndexCoordinator( // If there is a custom index uuid associated with the index, we do not auto manage it // This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index val indexMetadata = clusterState.metadata.index(indexName) - val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID + val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID val ismIndexMetadata = ismIndicesMetadata[indexName] // We try to find lookup name instead of using index name as datastream indices need the alias to match policy val lookupName = findIndexLookupName(indexName, clusterState) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 3ad6d1be5..d30e5bdfc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -95,7 +95,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter import org.opensearch.jobscheduler.spi.ScheduledJobRunner import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus -import org.opensearch.indexmanagement.indexstatemanagement.util.hasVersionConflict +import org.opensearch.indexmanagement.indexstatemanagement.util.hasDifferentPolicyVersion import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -262,12 +262,11 @@ object ManagedIndexRunner : // Check the cluster state for the index metadata val clusterStateIndexMetadata = getIndexMetadata(managedIndexConfig.index) val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService - val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getCustomIndexUUID(it) } - // If the index metadata is null, the index is not in the cluster state. If the index metadata is not null, but - // the cluster state index uuid differs from the one in the managed index config then the config is referring - // to a different index which does not exist in the cluster. We need to check all the extensions to confirm an index exists + val clusterStateIndexUUID = clusterStateIndexMetadata?.let { defaultIndexMetadataService.getIndexUUID(it) } + // If the index metadata is null, the index is not in the cluster state. + // If the index metadata is not null, and the index uuid differs from the one in the managed index config + // These mean this managed index could be a different index type and should use extensions to check if (clusterStateIndexMetadata == null || clusterStateIndexUUID != managedIndexConfig.indexUuid) { - // If the cluster state/default index type didn't have an index with a matching name and uuid combination, try all other index types val nonDefaultIndexTypes = indexMetadataProvider.services.keys.filter { it != DEFAULT_INDEX_TYPE } val multiTypeIndexNameToMetaData = indexMetadataProvider.getMultiTypeISMIndexMetadata(nonDefaultIndexTypes, listOf(managedIndexConfig.index)) @@ -281,20 +280,18 @@ object ManagedIndexRunner : } } - // If policy or managedIndexMetaData is null then initialize - val policy = managedIndexConfig.policy - if (policy == null || managedIndexMetaData == null) { - initManagedIndex(managedIndexConfig, managedIndexMetaData) + if (managedIndexMetaData == null) { + initManagedIndex(managedIndexConfig) return } - // If the policy was completed or failed then return early and disable job so it stops scheduling work + // If the policy was completed or failed then return early and disable job, so it stops scheduling work if (managedIndexMetaData.policyCompleted == true || managedIndexMetaData.isFailed) { disableManagedIndexConfig(managedIndexConfig) return } - if (managedIndexMetaData.hasVersionConflict(managedIndexConfig)) { + if (managedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig)) { val info = mapOf("message" to "There is a version conflict between your previous execution and your managed index") val result = updateManagedIndexMetaData( managedIndexMetaData.copy( @@ -308,6 +305,7 @@ object ManagedIndexRunner : return } + val policy = managedIndexConfig.policy val state = policy.getStateToExecute(managedIndexMetaData) val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider) val stepContext = StepContext( @@ -318,6 +316,7 @@ object ManagedIndexRunner : // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early + // TODO are there any step not safe to disable on? if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) return @@ -334,7 +333,11 @@ object ManagedIndexRunner : return } + logger.info("change policy $managedIndexConfig") + logger.info("change policy $managedIndexMetaData") + logger.info("change policy $action") if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { + logger.info("Change policy for index ${managedIndexConfig.index}") initChangePolicy(managedIndexConfig, managedIndexMetaData, action) return } @@ -375,7 +378,7 @@ object ManagedIndexRunner : } // If this action is not allowed and the step to be executed is the first step in the action then we will fail - // as this action has been removed from the AllowList, but if its not the first step we will let it finish as it's already inflight + // as this action has been removed from the AllowList, but if it's not the first step we will let it finish as it's already inflight if (action?.isAllowed(allowList) == false && step != null && action.isFirstStep(step.name) && action.type != TransitionsAction.name) { val info = mapOf("message" to "Attempted to execute action=${action.type} which is not allowed.") val updated = updateManagedIndexMetaData( @@ -387,8 +390,8 @@ object ManagedIndexRunner : return } - // If any of State, Action, Step components come back as null then we are moving to error in ManagedIndexMetaData val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step) + // If any of State, Action, Step components come back as null, then we are moving to error in ManagedIndexMetaData val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData) @Suppress("ComplexCondition", "MaxLineLength") @@ -402,7 +405,7 @@ object ManagedIndexRunner : actionValidation.validate(action.type, stepContext.metadata.index) } if (validationResult.validationStatus == Validate.ValidationStatus.RE_VALIDATING) { - logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.\", action.type, state.name, step.name") + logger.warn("Validation Status is: RE_VALIDATING. The action is {}, state is {}, step is {}.", action.type, state.name, step.name) publishErrorNotification(policy, managedIndexMetaData) return } @@ -464,32 +467,44 @@ object ManagedIndexRunner : } } - private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig, managedIndexMetaData: ManagedIndexMetaData?) { - var policy: Policy? = managedIndexConfig.policy - val policyID = managedIndexConfig.changePolicy?.policyID ?: managedIndexConfig.policyID - // If policy does not currently exist, we need to save the policy on the ManagedIndexConfig for the first time - // or if a change policy exists then we will also execute the change as we are still in initialization phase - if (policy == null || managedIndexConfig.changePolicy != null) { - // Get the policy by the name unless a ChangePolicy exists then allow the change to happen during initialization - policy = getPolicy(policyID) - // Attempt to save the policy - if (policy != null) { + private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { + var policy: Policy = managedIndexConfig.policy + lateinit var metadata: ManagedIndexMetaData + + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) + if (managedIndexConfig.changePolicy != null) { + val policyID = managedIndexConfig.changePolicy.policyID + val newPolicy = getPolicy(policyID) + if (newPolicy != null) { + policy = newPolicy val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy) - // If we failed to save the policy, don't initialize ManagedIndexMetaData - if (!saved) return + if (!saved) { + logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})") + return + } + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) + } else { + // no policy found for change policy TODO can we check this in change policy API + metadata = ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policyID, + policySeqNo = null, + policyPrimaryTerm = null, + policyCompleted = false, + rolledOver = false, + indexCreationDate = getIndexCreationDate(managedIndexConfig), + transitionTo = null, + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), + info = mapOf("message" to "Fail to load policy: $policyID") + ) } - // If we failed to get the policy then we will update the ManagedIndexMetaData with error info - } - - // at this point we either successfully saved the policy or we failed to get the policy - val updatedManagedIndexMetaData = if (policy == null) { - getFailedInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policyID) - } else { - // Initializing ManagedIndexMetaData for the first time - getInitializedManagedIndexMetaData(managedIndexMetaData, managedIndexConfig, policy) } - updateManagedIndexMetaData(updatedManagedIndexMetaData, create = managedIndexMetaData == null) + updateManagedIndexMetaData(metadata, create = true) } @Suppress("ReturnCount", "BlockingMethodInNonBlockingContext") @@ -568,95 +583,32 @@ object ManagedIndexRunner : } } - private suspend fun getFailedInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, + @Suppress("ComplexMethod") + private suspend fun getInitializedManagedIndexMetaData( managedIndexConfig: ManagedIndexConfig, - policyID: String + policy: Policy ): ManagedIndexMetaData { - // we either haven't initialized any metadata yet or we have already initialized metadata but still have no policy - return managedIndexMetaData?.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) ?: ManagedIndexMetaData( + val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState + val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) + + return ManagedIndexMetaData( index = managedIndexConfig.index, indexUuid = managedIndexConfig.indexUuid, - policyID = policyID, - policySeqNo = null, - policyPrimaryTerm = null, + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, policyCompleted = false, rolledOver = false, indexCreationDate = getIndexCreationDate(managedIndexConfig), transitionTo = null, - stateMetaData = null, + stateMetaData = stateMetaData, actionMetaData = null, stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") + policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), + info = mapOf("message" to "Successfully initialized policy: ${policy.id}") ) } - @Suppress("ComplexMethod") - private suspend fun getInitializedManagedIndexMetaData( - managedIndexMetaData: ManagedIndexMetaData?, - managedIndexConfig: ManagedIndexConfig, - policy: Policy - ): ManagedIndexMetaData { - val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState - val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) - - return when { - managedIndexMetaData == null -> ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = stateMetaData, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - managedIndexMetaData.policySeqNo == null || managedIndexMetaData.policyPrimaryTerm == null -> - // If there is seqNo and PrimaryTerm it is first time populating Policy. - managedIndexMetaData.copy( - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - stateMetaData = stateMetaData, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - // this is an edge case where a user deletes the job config or index and we already have a policySeqNo/primaryTerm - // in the metadata, in this case we just want to say we successfully initialized the policy again but we will not - // modify the state, action, etc. so it can resume where it left off - managedIndexMetaData.policySeqNo == policy.seqNo && - managedIndexMetaData.policyPrimaryTerm == policy.primaryTerm && - managedIndexMetaData.policyID == policy.id -> - // If existing PolicySeqNo and PolicyPrimaryTerm is equal to cached Policy then no issue. - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) - else -> - // else this means we either tried to load a policy with a different id, seqno, or primaryterm from what is - // in the metadata and we cannot guarantee it will work with the current state in managedIndexMetaData - managedIndexMetaData.copy( - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf( - "message" to "Fail to load policy: ${policy.id} with " + - "seqNo ${policy.seqNo} and primaryTerm ${policy.primaryTerm} as it" + - " does not match what's in the metadata [policyID=${managedIndexMetaData.policyID}," + - " policySeqNo=${managedIndexMetaData.policySeqNo}, policyPrimaryTerm=${managedIndexMetaData.policyPrimaryTerm}]" - ) - ) - } - } - /** * update metadata in config index, and save metadata in history after update * this can be called 2 times in one job run, so need to save seqNo & primeTerm @@ -717,11 +669,13 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non null + // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null val changePolicy = managedIndexConfig.changePolicy if (changePolicy == null) { - logger.debug("initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: $managedIndexConfig") + logger.debug( + "initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}", + managedIndexConfig + ) return } @@ -737,12 +691,11 @@ object ManagedIndexRunner : } else { // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase - val newTransitionMetaData = ActionMetaData( - TransitionsAction.name, Instant.now().toEpochMilli(), -1, - false, 0, 0, null - ) val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { - newTransitionMetaData + ActionMetaData( + TransitionsAction.name, Instant.now().toEpochMilli(), -1, + false, 0, 0, null + ) } else { managedIndexMetaData.actionMetaData } @@ -761,11 +714,11 @@ object ManagedIndexRunner : // check if the safe flag was set by the Change Policy REST API, if it was then do a second validation // before allowing a change to happen if (changePolicy.isSafe) { - // if policy is null then we are only updating error information in metadata so its fine to continue + // if policy is null then we are only updating error information in metadata, so it's fine to continue if (policy != null) { // current policy being null should never happen as we have a check at the top of runner - // if it is unsafe to change then we set safe back to false so we don't keep doing this check every execution - if (managedIndexConfig.policy?.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy) != true) { + // if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution + if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) return } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt index 7e8d36268..d92af725e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/model/ManagedIndexConfig.kt @@ -35,7 +35,7 @@ data class ManagedIndexConfig( val policyID: String, val policySeqNo: Long?, val policyPrimaryTerm: Long?, - val policy: Policy?, + val policy: Policy, val changePolicy: ChangePolicy?, val jobJitter: Double? ) : ScheduledJobParameter { @@ -177,11 +177,13 @@ data class ManagedIndexConfig( policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" }, policySeqNo = policySeqNo, policyPrimaryTerm = policyPrimaryTerm, - policy = policy?.copy( - id = policyID, - seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, - primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM - ), + policy = requireNotNull( + policy?.copy( + id = policyID, + seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + ) { "ManagedIndexConfig policy is null" }, changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt index 60b84bf7b..ceba19187 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/opensearchapi/OpenSearchExtensions.kt @@ -67,7 +67,7 @@ fun getUuidsForClosedIndices(state: ClusterState, defaultIndexMetadataService: D indexMetadatas.forEach { // it.key is index name if (it.value.state == IndexMetadata.State.CLOSE) { - closeList.add(defaultIndexMetadataService.getCustomIndexUUID(it.value)) + closeList.add(defaultIndexMetadataService.getIndexUUID(it.value)) } } return closeList diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt index 101177465..7858ef93d 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/LegacyOpenDistroManagedIndexSettings.kt @@ -7,19 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings import org.opensearch.common.settings.Setting import org.opensearch.common.unit.TimeValue -import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_ISM_ENABLED +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.DEFAULT_JOB_INTERVAL +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST_ALL +import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SNAPSHOT_DENY_LIST_NONE import java.util.concurrent.TimeUnit import java.util.function.Function @Suppress("UtilityClassWithPublicConstructor") class LegacyOpenDistroManagedIndexSettings { companion object { - const val DEFAULT_ISM_ENABLED = true - const val DEFAULT_JOB_INTERVAL = 5 - private val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() - val ALLOW_LIST_NONE = emptyList() - val SNAPSHOT_DENY_LIST_NONE = emptyList() - val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( "opendistro.index_state_management.enabled", DEFAULT_ISM_ENABLED, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt index 62f6408ff..125844f4a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/settings/ManagedIndexSettings.kt @@ -8,6 +8,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.settings import org.opensearch.common.settings.Setting import org.opensearch.common.unit.TimeValue import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.indexstatemanagement.ISMActionsParser import java.util.function.Function @Suppress("UtilityClassWithPublicConstructor") @@ -19,6 +20,7 @@ class ManagedIndexSettings { const val DEFAULT_JITTER = 0.6 const val DEFAULT_RESTRICTED_PATTERN = "\\.opendistro_security|\\.kibana.*|\\$INDEX_MANAGEMENT_INDEX" val ALLOW_LIST_NONE = emptyList() + val ALLOW_LIST_ALL = ISMActionsParser.instance.parsers.map { it.getActionType() }.toList() val SNAPSHOT_DENY_LIST_NONE = emptyList() val INDEX_STATE_MANAGEMENT_ENABLED: Setting = Setting.boolSetting( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt index b697aeea7..1e9d09a0c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/changepolicy/TransportChangePolicyAction.kt @@ -238,7 +238,7 @@ class TransportChangePolicyAction @Inject constructor( val clusterState = response.state val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService clusterState.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } // ISMIndexMetadata from the default index metadata service uses lenient expand, we want to use strict expand, filter diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt index ec32bcbd9..f30a0422a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/explain/TransportExplainAction.kt @@ -199,10 +199,10 @@ class TransportExplainAction @Inject constructor( "enabled" to managedIndex.enabled.toString() ) if (showPolicy) { - managedIndex.policy?.let { appliedPolicies[managedIndex.index] = it } + managedIndex.policy.let { appliedPolicies[managedIndex.index] = it } } if (validateAction) { - managedIndex.policy?.let { policiesforValidation[managedIndex.index] = it } + managedIndex.policy.let { policiesforValidation[managedIndex.index] = it } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt index b12b73468..4d85b1c02 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/transport/action/retryfailedmanagedindex/TransportRetryFailedManagedIndexAction.kt @@ -167,7 +167,7 @@ class TransportRetryFailedManagedIndexAction @Inject constructor( override fun onResponse(response: ClusterStateResponse) { val defaultIndexMetadataService = indexMetadataProvider.services[DEFAULT_INDEX_TYPE] as DefaultIndexMetadataService response.state.metadata.indices.forEach { - val indexUUID = defaultIndexMetadataService.getCustomIndexUUID(it.value) + val indexUUID = defaultIndexMetadataService.getIndexUUID(it.value) indexUuidToIndexMetadata[indexUUID] = it.value } processResponse() diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index ee2762375..cda41cf40 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -61,7 +61,7 @@ fun managedIndexConfigIndexRequest( uuid: String, policyID: String, jobInterval: Int, - policy: Policy? = null, + policy: Policy, jobJitter: Double? ): IndexRequest { val managedIndexConfig = ManagedIndexConfig( @@ -74,8 +74,8 @@ fun managedIndexConfigIndexRequest( jobEnabledTime = Instant.now(), policyID = policyID, policy = policy, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, changePolicy = null, jobJitter = jobJitter ) @@ -400,25 +400,24 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta return true } - // we need this in so that we can change policy before the first transition happens so policy doesnt get completed + // we need this in so that we can change policy before the first transition happens so policy doesn't get completed // before we have a chance to change policy if (actionToExecute?.type == TransitionsAction.name) { return true } - if (managedIndexMetaData.actionMetaData?.name != TransitionsAction.name) { - return false - } - - return true + // TODO actionToExecute is correlate to the actionMetadata? + // actionToExecute is found out by checking the metadata, it can be current unfinished one or the next + // actionMetadata has already been updated, it can be current unfinished one or the next + // In change policy context, we only accept unfinished transition or the new transition + return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name } -fun ManagedIndexMetaData.hasVersionConflict(managedIndexConfig: ManagedIndexConfig): Boolean = +fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean = this.policySeqNo != managedIndexConfig.policySeqNo || this.policyPrimaryTerm != managedIndexConfig.policyPrimaryTerm fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { - val schedule = this.schedule - when (schedule) { + when (val schedule = this.schedule) { is IntervalSchedule -> { return schedule.interval != jobInterval } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt index f800b54cd..8446be029 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -24,6 +24,7 @@ object RollupFieldValueExpressionResolver { private lateinit var scriptService: ScriptService private lateinit var clusterService: ClusterService lateinit var indexAliasUtils: IndexAliasUtils + fun resolve(rollup: Rollup, fieldValue: String): String { val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt index 7bb838f0a..cbf3b0976 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -319,8 +319,7 @@ fun randomManagedIndexConfig( schedule: Schedule = IntervalSchedule(Instant.ofEpochMilli(Instant.now().toEpochMilli()), 5, ChronoUnit.MINUTES), lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, - policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - policy: Policy? = randomPolicy(), + policy: Policy = randomPolicy(), changePolicy: ChangePolicy? = randomChangePolicy(), jitter: Double? = 0.0 ): ManagedIndexConfig { @@ -332,10 +331,10 @@ fun randomManagedIndexConfig( jobSchedule = schedule, jobLastUpdatedTime = lastUpdatedTime, jobEnabledTime = enabledTime, - policyID = policy?.id ?: policyID, - policySeqNo = policy?.seqNo, - policyPrimaryTerm = policy?.primaryTerm, - policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + policy = policy.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM), changePolicy = changePolicy, jobJitter = jitter ) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index 17f96cda6..eba50e92b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -220,7 +220,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, updatedManagedIndexConfig.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + updatedManagedIndexConfig.policy.description + ) } fun `test changing policy on a valid index and log pattern`() { @@ -301,7 +304,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) // After first execution we should expect the change policy to still be null (since we haven't called it yet) - // and the initial policy should of been cached + // and the initial policy should have been cached val executedManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Executed managed index config is null", config) @@ -346,7 +349,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // speed up to second execution we will have a ChangePolicy but not be in Transitions yet // which means we should still execute the ReadOnlyAction updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Next managed index config is null", config) @@ -386,7 +388,6 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // speed up to third execution so that we try to move to transitions and trigger a change policy updateManagedIndexConfigStartTime(managedIndexConfig) - val changedManagedIndexConfig: ManagedIndexConfig = waitFor { val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Changed managed index config is null", config) @@ -512,9 +513,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { RestRequest.Method.POST.toString(), "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity() ) - assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList(), UPDATED_INDICES to 1), response.asMap()) - waitFor { assertNotNull(getExistingManagedIndexConfig(index).changePolicy) } // speed up to first execution where we initialize the policy on the job @@ -529,7 +528,10 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // Will use the unique generated description to ensure they are the same policies, the cached policy does not have // id, seqNo, primaryTerm on the policy itself so cannot directly compare // TODO: figure out why the newPolicy.lastUpdatedTime and cached policy lastUpdatedTime is off by a few milliseconds - assertEquals("Initialized policy is not the change policy", newPolicy.description, config.policy?.description) + assertEquals( + "Initialized policy is not the change policy", newPolicy.description, + config.policy.description + ) config } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt index 3dd7b20ec..d6dfed16a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtilsTests.kt @@ -22,6 +22,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Transition import org.opensearch.indexmanagement.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import org.opensearch.indexmanagement.indexstatemanagement.randomChangePolicy import org.opensearch.indexmanagement.indexstatemanagement.randomClusterStateManagedIndexConfig +import org.opensearch.indexmanagement.indexstatemanagement.randomPolicy import org.opensearch.indexmanagement.indexstatemanagement.randomSweptManagedIndexConfig import org.opensearch.indexmanagement.opensearchapi.parseWithType import org.opensearch.test.OpenSearchTestCase @@ -34,7 +35,7 @@ class ManagedIndexUtilsTests : OpenSearchTestCase() { val index = randomAlphaOfLength(10) val uuid = randomAlphaOfLength(10) val policyID = randomAlphaOfLength(10) - val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, jobJitter = 0.0) + val createRequest = managedIndexConfigIndexRequest(index, uuid, policyID, 5, randomPolicy(), jobJitter = 0.0) assertNotNull("IndexRequest not created", createRequest) assertEquals("Incorrect ism index used in request", INDEX_MANAGEMENT_INDEX, createRequest.index()) From 691afb2f634d4edbc70c718dbaec86a510b106ee Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 10 Oct 2023 21:16:41 -0700 Subject: [PATCH 2/3] Update Signed-off-by: bowenlan-amzn --- .../ManagedIndexRunner.kt | 126 +++++++++--------- .../util/ManagedIndexUtils.kt | 39 ++---- 2 files changed, 76 insertions(+), 89 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index d30e5bdfc..f80959661 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -316,7 +316,6 @@ object ManagedIndexRunner : // If Index State Management is disabled and the current step is not null and safe to disable on // then disable the job and return early - // TODO are there any step not safe to disable on? if (!indexStateManagementEnabled && step != null && step.isSafeToDisableOn) { disableManagedIndexConfig(managedIndexConfig) return @@ -329,15 +328,12 @@ object ManagedIndexRunner : managedIndexMetaData .copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info) ) - if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig) + if (updated.metadataSaved) + disableManagedIndexConfig(managedIndexConfig) return } - logger.info("change policy $managedIndexConfig") - logger.info("change policy $managedIndexMetaData") - logger.info("change policy $action") - if (managedIndexConfig.shouldChangePolicy(managedIndexMetaData, action)) { - logger.info("Change policy for index ${managedIndexConfig.index}") + if (managedIndexConfig.shouldChangePolicy(action)) { initChangePolicy(managedIndexConfig, managedIndexMetaData, action) return } @@ -469,9 +465,9 @@ object ManagedIndexRunner : private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { var policy: Policy = managedIndexConfig.policy - lateinit var metadata: ManagedIndexMetaData + var metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policy.id) - metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) + // User may change policy before first metadata initialization if (managedIndexConfig.changePolicy != null) { val policyID = managedIndexConfig.changePolicy.policyID val newPolicy = getPolicy(policyID) @@ -482,26 +478,8 @@ object ManagedIndexRunner : logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})") return } - metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy) - } else { - // no policy found for change policy TODO can we check this in change policy API - metadata = ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policyID, - policySeqNo = null, - policyPrimaryTerm = null, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = null, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), - info = mapOf("message" to "Fail to load policy: $policyID") - ) } + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policyID) } updateManagedIndexMetaData(metadata, create = true) @@ -586,27 +564,48 @@ object ManagedIndexRunner : @Suppress("ComplexMethod") private suspend fun getInitializedManagedIndexMetaData( managedIndexConfig: ManagedIndexConfig, - policy: Policy + policy: Policy?, + policyID: String, ): ManagedIndexMetaData { - val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState - val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) - - return ManagedIndexMetaData( - index = managedIndexConfig.index, - indexUuid = managedIndexConfig.indexUuid, - policyID = policy.id, - policySeqNo = policy.seqNo, - policyPrimaryTerm = policy.primaryTerm, - policyCompleted = false, - rolledOver = false, - indexCreationDate = getIndexCreationDate(managedIndexConfig), - transitionTo = null, - stateMetaData = stateMetaData, - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), - info = mapOf("message" to "Successfully initialized policy: ${policy.id}") - ) + val indexCreationDate = getIndexCreationDate(managedIndexConfig) + if (policy == null) { + // We check policy existence in change policy API, but it maybe deleted after that + return ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policyID, + policySeqNo = null, + policyPrimaryTerm = null, + policyCompleted = false, + rolledOver = false, + indexCreationDate = indexCreationDate, + transitionTo = null, + stateMetaData = null, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0), + info = mapOf("message" to "Fail to load policy: $policyID") + ) + } else { + val state = managedIndexConfig.changePolicy?.state ?: policy.defaultState + val stateMetaData = StateMetaData(state, Instant.now().toEpochMilli()) + return ManagedIndexMetaData( + index = managedIndexConfig.index, + indexUuid = managedIndexConfig.indexUuid, + policyID = policy.id, + policySeqNo = policy.seqNo, + policyPrimaryTerm = policy.primaryTerm, + policyCompleted = false, + rolledOver = false, + indexCreationDate = indexCreationDate, + transitionTo = null, + stateMetaData = stateMetaData, + actionMetaData = null, + stepMetaData = null, + policyRetryInfo = PolicyRetryInfoMetaData(failed = false, consumedRetries = 0), + info = mapOf("message" to "Successfully initialized policy: ${policy.id}") + ) + } } /** @@ -669,8 +668,8 @@ object ManagedIndexRunner : managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action? ) { - // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null val changePolicy = managedIndexConfig.changePolicy + // should never happen since we only call this if there is a changePolicy, but we'll do it to make changePolicy non-null if (changePolicy == null) { logger.debug( "initChangePolicy was called with a null ChangePolicy, ManagedIndexConfig: {}", @@ -679,9 +678,7 @@ object ManagedIndexRunner : return } - // get the policy we'll attempt to change to val policy = getPolicy(changePolicy.policyID) - // update the ManagedIndexMetaData with new information val updatedManagedIndexMetaData = if (policy == null) { managedIndexMetaData.copy( @@ -689,8 +686,9 @@ object ManagedIndexRunner : policyRetryInfo = PolicyRetryInfoMetaData(failed = true, consumedRetries = 0) ) } else { - // if the action to execute is transition then set the actionMetaData to a new transition metadata to reflect we are - // in transition (in case we triggered change policy from entering transition) or to reflect this is a new policy transition phase + // 1. entering transition action in this run + // 2. has been in transition action + // Refresh the transition action metadata, meaning we start the transition for change policy val actionMetaData = if (actionToExecute?.type == TransitionsAction.name) { ActionMetaData( TransitionsAction.name, Instant.now().toEpochMilli(), -1, @@ -699,6 +697,7 @@ object ManagedIndexRunner : } else { managedIndexMetaData.actionMetaData } + managedIndexMetaData.copy( info = mapOf("message" to "Attempting to change policy to ${policy.id}"), transitionTo = changePolicy.state, @@ -711,12 +710,13 @@ object ManagedIndexRunner : ) } - // check if the safe flag was set by the Change Policy REST API, if it was then do a second validation - // before allowing a change to happen + /** + * The freshness of isSafe may change between runs, and we use it to decide whether to enter this method + * n [shouldChangePolicy]. So here we check the safeness again + */ if (changePolicy.isSafe) { // if policy is null then we are only updating error information in metadata, so it's fine to continue if (policy != null) { - // current policy being null should never happen as we have a check at the top of runner // if it is unsafe to change then we set safe back to false, so we don't keep doing this check every execution if (!managedIndexConfig.policy.isSafeToChange(managedIndexMetaData.stateMetaData?.name, policy, changePolicy)) { updateManagedIndexConfig(managedIndexConfig.copy(changePolicy = managedIndexConfig.changePolicy.copy(isSafe = false))) @@ -725,17 +725,17 @@ object ManagedIndexRunner : } } - /* - * Try to update the ManagedIndexMetaData in cluster state, we need to do this first before updating the - * ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next - * execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and - * could fail to update the ManagedIndexMetaData which would put us in a bad state - * */ + /** + * Try to update the ManagedIndexMetaData, we need to do this first before updating the + * ManagedIndexConfig because if this fails we can fail early and still retry this whole process on the next + * execution whereas if we do the update to ManagedIndexConfig first we lose the ChangePolicy on the job and + * could fail to update the ManagedIndexMetaData which would put us in a bad state + */ val updated = updateManagedIndexMetaData(updatedManagedIndexMetaData) if (!updated.metadataSaved || policy == null) return - // Change the policy and user stored on the job from changePolicy, this will also set the changePolicy to null on the job + // Change the policy and user stored on the job, this will also set the changePolicy to null on the job savePolicyToManagedIndexConfig(managedIndexConfig, policy.copy(user = changePolicy.user)) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt index cda41cf40..ebb2d7ee5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/ManagedIndexUtils.kt @@ -380,18 +380,16 @@ val ManagedIndexMetaData.isPolicyCompleted: Boolean get() = this.policyCompleted == true /** - * We will change the policy if a change policy exists and if we are currently in - * a Transitions action (which means we're safely at the end of a state). If a - * transitionTo exists on the [ManagedIndexMetaData] it should still be fine to - * change policy as we have not actually transitioned yet. If the next action is Transition - * or if the rest API determined it was "safe", meaning the new policy has the same structure + * We will change the policy if a change policy exists and if we are currently in a Transitions action + * which means we're safely at the end of a state. + * + * If the next action is Transition or if the rest API determined it was "safe", meaning the new policy has the same structure * of the current state, it should be safe to immediately change (even in the middle of the state). * - * @param managedIndexMetaData current [ManagedIndexMetaData] * @return {@code true} if we should change policy, {@code false} if not */ @Suppress("ReturnCount") -fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMetaData, actionToExecute: Action?): Boolean { +fun ManagedIndexConfig.shouldChangePolicy(actionToExecute: Action?): Boolean { if (this.changePolicy == null) { return false } @@ -400,17 +398,7 @@ fun ManagedIndexConfig.shouldChangePolicy(managedIndexMetaData: ManagedIndexMeta return true } - // we need this in so that we can change policy before the first transition happens so policy doesn't get completed - // before we have a chance to change policy - if (actionToExecute?.type == TransitionsAction.name) { - return true - } - - // TODO actionToExecute is correlate to the actionMetadata? - // actionToExecute is found out by checking the metadata, it can be current unfinished one or the next - // actionMetadata has already been updated, it can be current unfinished one or the next - // In change policy context, we only accept unfinished transition or the new transition - return managedIndexMetaData.actionMetaData?.name == TransitionsAction.name + return actionToExecute?.type == TransitionsAction.name } fun ManagedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig: ManagedIndexConfig): Boolean = @@ -426,13 +414,13 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { } /** - * A policy is safe to change to a new policy when each policy has the current state - * the [ManagedIndexConfig] is in and that state has the same actions in the same order. + * A policy is safe to change to a new policy when + * both policies have the current state the [ManagedIndexConfig] is in and that state has the same actions in the same order. * This allows simple things like configuration updates to happen which won't break the execution/contract * between [ManagedIndexMetaData] and [ManagedIndexConfig] as the metadata only knows about the current state. - * We never consider a policy safe to immediately change if the ChangePolicy contains a state to transition to - * as this could transition a user into a different state from the middle of the current state which we do not - * want to allow. + * + * If the ChangePolicy contains a state to transition to, we don't consider it's safe to change here + * as this may transition a user into a different state from the middle of the current state. * * @param stateName the name of the state the [ManagedIndexConfig] is currently in * @param newPolicy the new (actual data model) policy we will eventually try to change to @@ -441,20 +429,19 @@ fun ManagedIndexConfig.hasDifferentJobInterval(jobInterval: Int): Boolean { */ @Suppress("ReturnCount") fun Policy.isSafeToChange(stateName: String?, newPolicy: Policy, changePolicy: ChangePolicy): Boolean { - // if stateName is null it means we either have not initialized the job (no metadata to pull stateName from) + // if stateName is null it means we either have not initialized the job // or we failed to load the initial policy, both cases its safe to change the policy if (stateName == null) return true if (changePolicy.state != null) return false + val currentState = this.states.find { it.name == stateName } val newState = newPolicy.states.find { it.name == stateName } if (currentState == null || newState == null) { return false } - if (currentState.actions.size != newState.actions.size) { return false } - currentState.actions.forEachIndexed { index, action -> val newStateAction = newState.actions[index] if (action.type != newStateAction.type) return@isSafeToChange false From a0d88f4dee9ddafaa8fa8dd769c0d15fc43e21e8 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Wed, 11 Oct 2023 10:28:55 -0700 Subject: [PATCH 3/3] fix bug Signed-off-by: bowenlan-amzn --- .../indexstatemanagement/ManagedIndexRunner.kt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index f80959661..3825fa509 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -464,7 +464,7 @@ object ManagedIndexRunner : } private suspend fun initManagedIndex(managedIndexConfig: ManagedIndexConfig) { - var policy: Policy = managedIndexConfig.policy + val policy: Policy = managedIndexConfig.policy var metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policy.id) // User may change policy before first metadata initialization @@ -472,14 +472,13 @@ object ManagedIndexRunner : val policyID = managedIndexConfig.changePolicy.policyID val newPolicy = getPolicy(policyID) if (newPolicy != null) { - policy = newPolicy - val saved = savePolicyToManagedIndexConfig(managedIndexConfig, policy) + val saved = savePolicyToManagedIndexConfig(managedIndexConfig, newPolicy) if (!saved) { logger.error("Failed to save policy to ManagedIndexConfig(${managedIndexConfig.index})") return } } - metadata = getInitializedManagedIndexMetaData(managedIndexConfig, policy, policyID) + metadata = getInitializedManagedIndexMetaData(managedIndexConfig, newPolicy, policyID) } updateManagedIndexMetaData(metadata, create = true)