Skip to content

Commit

Permalink
Step Metadata Update on Index Rollover Timeout (opensearch-project#1174)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshita Kaushik <[email protected]>
Signed-off-by: harycash <[email protected]>
  • Loading branch information
harshitakaushik-dev and Harshita Kaushik committed May 30, 2024
1 parent 59a4fe6 commit bd0a742
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
CONDITION_NOT_MET("condition_not_met"),
FAILED("failed"),
COMPLETED("completed"),
TIMED_OUT("timed_out"),
;

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
Expand Down Expand Up @@ -339,13 +340,18 @@ object ManagedIndexRunner :
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type} has timed out")
val updated = updateManagedIndexMetaData(
managedIndexMetaData
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info),

val updatedIndexMetaData = managedIndexMetaData.copy(
actionMetaData = currentActionMetaData?.copy(failed = true),
stepMetaData = step?.let { StepMetaData(it.name, System.currentTimeMillis(), Step.StepStatus.TIMED_OUT) },
info = info,
)

val updated = updateManagedIndexMetaData(updatedIndexMetaData)

if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
publishErrorNotification(policy, updatedIndexMetaData)
}
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.indexmanagement.waitFor
import java.time.Instant
import java.util.Locale
Expand All @@ -20,11 +22,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
fun `test failed action`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy = """
val testPolicy =
"""
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()
""".trimIndent()

createPolicyJson(testPolicy, policyID)

Expand Down Expand Up @@ -52,16 +55,32 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
waitFor {
assertPredicatesOnMetaData(
listOf(
indexName to listOf(
ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean =
assertActionEquals(
ActionMetaData(
name = RolloverAction.name, startTime = Instant.now().toEpochMilli(), index = 0,
failed = true, consumedRetries = 0, lastRetryTime = null, actionProperties = null,
),
actionMetaDataMap,
),
),
indexName to
listOf(
ActionMetaData.ACTION to

fun(actionMetaDataMap: Any?): Boolean =
assertActionEquals(
ActionMetaData(
name = RolloverAction.name,
startTime = Instant.now().toEpochMilli(),
index = 0,
failed = true,
consumedRetries = 0,
lastRetryTime = null,
actionProperties = null,
),
actionMetaDataMap,
),
StepMetaData.STEP to
fun(stepMetaDataMap: Any?): Boolean =
assertStepEquals(
StepMetaData(
"attempt_rollover", Instant.now().toEpochMilli(), Step.StepStatus.TIMED_OUT,
),
stepMetaDataMap,
),
),
),
getExplainMap(indexName),
strict = false,
Expand All @@ -73,11 +92,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
fun `test action timeout doesn't bleed over into next action`() {
val indexName = "${testIndexName}_index_2"
val policyID = "${testIndexName}_testPolicyName_2"
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()
val testPolicy =
"""
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)

Expand All @@ -96,7 +116,14 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
val expectedOpenInfoString = mapOf("message" to AttemptOpenStep.getSuccessMessage(indexName)).toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
listOf(
indexName to
listOf(
ManagedIndexMetaData.INFO to

fun(info: Any?): Boolean = expectedOpenInfoString == info.toString(),
),
),
getExplainMap(indexName),
strict = false,
)
Expand Down

0 comments on commit bd0a742

Please sign in to comment.