Skip to content

Commit

Permalink
Support switch aliases in shrink action.
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Kravchuk <[email protected]>
  • Loading branch information
ikibo authored and ikibo committed Oct 25, 2023
1 parent 6aad0be commit 5b63304
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ShrinkAction(
val percentageOfSourceShards: Double?,
val targetIndexTemplate: Script?,
val aliases: List<Alias>?,
val switchAliases: Boolean = false,
val forceUnsafe: Boolean?,
index: Int
) : Action(name, index) {
Expand Down Expand Up @@ -104,6 +105,7 @@ class ShrinkAction(
if (percentageOfSourceShards != null) builder.field(PERCENTAGE_OF_SOURCE_SHARDS_FIELD, percentageOfSourceShards)
if (targetIndexTemplate != null) builder.field(TARGET_INDEX_TEMPLATE_FIELD, targetIndexTemplate)
if (aliases != null) { builder.aliasesField(aliases) }
builder.field(SWITCH_ALIASES, switchAliases)
if (forceUnsafe != null) builder.field(FORCE_UNSAFE_FIELD, forceUnsafe)
builder.endObject()
}
Expand All @@ -120,6 +122,7 @@ class ShrinkAction(
} else {
out.writeBoolean(false)
}
out.writeBoolean(switchAliases)
out.writeOptionalBoolean(forceUnsafe)
out.writeInt(actionIndex)
}
Expand All @@ -131,6 +134,7 @@ class ShrinkAction(
const val MAX_SHARD_SIZE_FIELD = "max_shard_size"
const val TARGET_INDEX_TEMPLATE_FIELD = "target_index_name_template"
const val ALIASES_FIELD = "aliases"
const val SWITCH_ALIASES = "switch_aliases"
const val FORCE_UNSAFE_FIELD = "force_unsafe"
const val LOCK_SOURCE_JOB_ID = "shrink-node_name"
fun getSecurityFailureMessage(failure: String) = "Shrink action failed because of missing permissions: $failure"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.C
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.MAX_SHARD_SIZE_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.NUM_NEW_SHARDS_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.PERCENTAGE_OF_SOURCE_SHARDS_FIELD
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.SWITCH_ALIASES
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction.Companion.TARGET_INDEX_TEMPLATE_FIELD
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
Expand All @@ -27,10 +28,11 @@ class ShrinkActionParser : ActionParser() {
val percentageOfSourceShards = sin.readOptionalDouble()
val targetIndexTemplate = if (sin.readBoolean()) Script(sin) else null
val aliases = if (sin.readBoolean()) sin.readList(::Alias) else null
val switchAliases = sin.readBoolean()
val forceUnsafe = sin.readOptionalBoolean()
val index = sin.readInt()

return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, index)
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, index)
}

@Suppress("NestedBlockDepth")
Expand All @@ -40,6 +42,7 @@ class ShrinkActionParser : ActionParser() {
var percentageOfSourceShards: Double? = null
var targetIndexTemplate: Script? = null
var aliases: List<Alias>? = null
var switchAliases = false
var forceUnsafe: Boolean? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
Expand All @@ -63,12 +66,13 @@ class ShrinkActionParser : ActionParser() {
}
}
}
SWITCH_ALIASES -> switchAliases = xcp.booleanValue()
FORCE_UNSAFE_FIELD -> forceUnsafe = xcp.booleanValue()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ShrinkAction.")
}
}

return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, index)
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, index)
}

override fun getActionType(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@

package org.opensearch.indexmanagement.indexstatemanagement.step.shrink

import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkAction
import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting
import org.opensearch.indexmanagement.indexstatemanagement.util.deleteShrinkLock
import org.opensearch.indexmanagement.indexstatemanagement.util.getActionStartTime
import org.opensearch.indexmanagement.indexstatemanagement.util.issueUpdateSettingsRequest
import org.opensearch.indexmanagement.indexstatemanagement.util.resetReadOnlyAndRouting
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ShrinkActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import java.time.Duration
Expand All @@ -45,8 +48,15 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
if (!deleteShrinkLock(localShrinkActionProperties, context.lockService, logger)) {
logger.error("Failed to delete Shrink action lock on node [${localShrinkActionProperties.nodeName}]")
}
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to SUCCESS_MESSAGE)

if (switchAliases(context, localShrinkActionProperties)) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to SUCCESS_MESSAGE)
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Shrink failed due to aliases switch failure.")

Check warning on line 57 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt#L56-L57

Added lines #L56 - L57 were not covered by tests
}

return this
}

Expand Down Expand Up @@ -91,6 +101,64 @@ class WaitForShrinkStep(private val action: ShrinkAction) : ShrinkStep(name, tru
}
}

private suspend fun switchAliases(context: StepContext, shrinkActionProperties: ShrinkActionProperties): Boolean {

val sourceIndexName = context.metadata.index
val targetIndexName = shrinkActionProperties.targetIndexName

if (!action.switchAliases) {
logger.info("Switch aliases disabled from [$sourceIndexName] to [$targetIndexName].")
return true
}

logger.info("Switching aliases from [$sourceIndexName] to [$targetIndexName].")

val targetIndexAliasesNames = context
.clusterService
.state()
.metadata()
.index(targetIndexName)
.aliases
.keys
val sourceIndexAliases = context
.clusterService
.state()
.metadata()
.index(sourceIndexName)
.aliases
.values

val req = IndicesAliasesRequest()
sourceIndexAliases.map { it.alias }.forEach { req.addAliasAction(AliasActions(AliasActions.Type.REMOVE).index(sourceIndexName).alias(it)) }

sourceIndexAliases
.filterNot { targetIndexAliasesNames.contains(it.alias) }
.map {
AliasActions(AliasActions.Type.ADD)
.index(targetIndexName)
.alias(it.alias)
.filter(it.filter?.string())
.indexRouting(it.indexRouting)
.searchRouting(it.searchRouting)
.isHidden(it.isHidden)
.writeIndex(it.writeIndex())
}
.forEach { req.addAliasAction(it) }

return try {
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil { aliases(req, it) }
if (response.isAcknowledged) {
logger.info("Aliases switched successfully from [$sourceIndexName] to [$targetIndexName].")
} else {
logger.error("Switching aliases from [$sourceIndexName] to [$targetIndexName] failed.")

Check warning on line 153 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt#L153

Added line #L153 was not covered by tests
}
response.isAcknowledged
} catch (e: Exception) {
logger.error("Switching aliases from [$sourceIndexName] to [$targetIndexName] failed due to exception.", e)

Check warning on line 157 in src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt

View check run for this annotation

Codecov / codecov/patch

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/shrink/WaitForShrinkStep.kt#L156-L157

Added lines #L156 - L157 were not covered by tests
false
}
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
actionMetaData = currentMetadata.actionMetaData?.copy(
Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 20
"schema_version": 21
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -551,6 +551,9 @@
"type": "object",
"enabled": false
},
"switch_aliases": {
"type": "boolean"
},
"force_unsafe": {
"type": "boolean"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 20
val configSchemaVersion = 21
val historySchemaVersion = 7

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,17 @@ fun randomShrinkAction(
percentageOfSourceShards: Double? = null,
targetIndexTemplate: Script? = if (randomBoolean()) randomTemplateScript(randomAlphaOfLength(10)) else null,
aliases: List<Alias>? = if (randomBoolean()) randomList(10) { randomAlias() } else null,
switchAliases: Boolean = randomBoolean(),
forceUnsafe: Boolean? = if (randomBoolean()) randomBoolean() else null
): ShrinkAction {
if (numNewShards == null && maxShardSize == null && percentageOfSourceShards == null) {
when (randomInt(2)) {
0 -> return ShrinkAction(abs(randomInt()) + 1, null, null, targetIndexTemplate, aliases, forceUnsafe, 0)
1 -> return ShrinkAction(null, randomByteSizeValue(), null, targetIndexTemplate, aliases, forceUnsafe, 0)
2 -> return ShrinkAction(null, null, randomDoubleBetween(0.0, 1.0, true), targetIndexTemplate, aliases, forceUnsafe, 0)
0 -> return ShrinkAction(abs(randomInt()) + 1, null, null, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
1 -> return ShrinkAction(null, randomByteSizeValue(), null, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
2 -> return ShrinkAction(null, null, randomDoubleBetween(0.0, 1.0, true), targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
}
}
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, forceUnsafe, 0)
return ShrinkAction(numNewShards, maxShardSize, percentageOfSourceShards, targetIndexTemplate, aliases, switchAliases, forceUnsafe, 0)
}

fun randomReadOnlyActionConfig(): ReadOnlyAction {
Expand Down
Loading

0 comments on commit 5b63304

Please sign in to comment.