Skip to content

Commit

Permalink
Merge branch 'main' into transform_API_error_handling
Browse files Browse the repository at this point in the history
  • Loading branch information
n-dohrmann authored Oct 18, 2023
2 parents 8b3e2c6 + d42cc69 commit 6a37e2b
Show file tree
Hide file tree
Showing 43 changed files with 1,838 additions and 286 deletions.
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ However, to build the `index management` plugin project, we also use the OpenSea

### Building from the command line

1. `./gradlew build` builds and tests project.
1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
## Version 2.11.0.0 2023-10-11

Compatible with OpenSearch 2.10.0

### Maintenance
* Increment version to 2.11.0-SNAPSHOT. ([#922](https://github.com/opensearch-project/index-management/pull/922))

### Enhancements
* Provide unique id for each rollup job and add debug logs. ([#968](https://github.com/opensearch-project/index-management/pull/968))

### Bug fixes
* Fix auto managed index always have -2 seqNo bug. ([#924](https://github.com/opensearch-project/index-management/pull/924))

### Infrastructure
* Upload docker test cluster log. ([#964](https://github.com/opensearch-project/index-management/pull/964))
* Reduce test running time. ([#965](https://github.com/opensearch-project/index-management/pull/965))
* Parallel test run. ([#966](https://github.com/opensearch-project/index-management/pull/966))
* Security test filtered. ([#969](https://github.com/opensearch-project/index-management/pull/969))

### Documentation
* Added 2.11 release notes. ([#1004](https://github.com/opensearch-project/index-management/pull/1004))
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ data class ActionProperties(
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null,
val shrinkActionProperties: ShrinkActionProperties? = null
val shrinkActionProperties: ShrinkActionProperties? = null,
val transformActionProperties: TransformActionProperties? = null
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
Expand All @@ -32,6 +33,7 @@ data class ActionProperties(
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
out.writeOptionalWriteable(shrinkActionProperties)
out.writeOptionalWriteable(transformActionProperties)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -40,6 +42,7 @@ data class ActionProperties(
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
if (shrinkActionProperties != null) builder.addObject(ShrinkActionProperties.SHRINK_ACTION_PROPERTIES, shrinkActionProperties, params)
if (transformActionProperties != null) builder.addObject(TransformActionProperties.TRANSFORM_ACTION_PROPERTIES, transformActionProperties, params)
return builder
}

Expand All @@ -52,7 +55,8 @@ data class ActionProperties(
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()
val shrinkActionProperties: ShrinkActionProperties? = si.readOptionalWriteable { ShrinkActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
val transformActionProperties: TransformActionProperties? = si.readOptionalWriteable { TransformActionProperties.fromStreamInput(it) }
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}

fun parse(xcp: XContentParser): ActionProperties {
Expand All @@ -61,6 +65,7 @@ data class ActionProperties(
var rollupId: String? = null
var hasRollupFailed: Boolean? = null
var shrinkActionProperties: ShrinkActionProperties? = null
var transformActionProperties: TransformActionProperties? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -75,10 +80,13 @@ data class ActionProperties(
ShrinkActionProperties.SHRINK_ACTION_PROPERTIES -> {
shrinkActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else ShrinkActionProperties.parse(xcp)
}
TransformActionProperties.TRANSFORM_ACTION_PROPERTIES -> {
transformActionProperties = if (xcp.currentToken() == Token.VALUE_NULL) null else TransformActionProperties.parse(xcp)
}
}
}

return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed, shrinkActionProperties, transformActionProperties)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.spi.indexstatemanagement.model

import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser

data class TransformActionProperties(
val transformId: String?
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalString(transformId)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
if (transformId != null) builder.field(Properties.TRANSFORM_ID.key, transformId)
return builder
}

companion object {
const val TRANSFORM_ACTION_PROPERTIES = "transform_action_properties"

fun fromStreamInput(sin: StreamInput): TransformActionProperties {
val transformId: String? = sin.readOptionalString()
return TransformActionProperties(transformId)
}

fun parse(xcp: XContentParser): TransformActionProperties {
var transformId: String? = null

ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Properties.TRANSFORM_ID.key -> transformId = xcp.text()
}
}

return TransformActionProperties(transformId)
}
}

enum class Properties(val key: String) {
TRANSFORM_ID("transform_id")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin

val managedIndexCoordinator = ManagedIndexCoordinator(
environment.settings(),
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider
client, clusterService, threadPool, indexManagementIndices, indexMetadataProvider, xContentRegistry
)

val smRunner = SMRunner.init(client, threadPool, settings, indexManagementIndices, clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.IndexMetadataService
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ISMIndexMetadata

class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : IndexMetadataService {
class DefaultIndexMetadataService(private val customUUIDSetting: String? = null) : IndexMetadataService {

/**
* Returns the default index metadata needed for ISM
Expand All @@ -39,7 +39,7 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index

response.state.metadata.indices.forEach {
// TODO waiting to add document count until it is definitely needed
val uuid = getCustomIndexUUID(it.value)
val uuid = getIndexUUID(it.value)
val indexMetadata = ISMIndexMetadata(uuid, it.value.creationDate, -1)
indexNameToMetadata[it.key] = indexMetadata
}
Expand All @@ -48,11 +48,10 @@ class DefaultIndexMetadataService(val customUUIDSetting: String? = null) : Index
}

/*
* If an extension wants Index Management to determine cluster state indices UUID based on a custom index setting if
* present of cluster state, the extension will override this customUUID setting. This allows an index to migrate off
* cluster and back while using this persistent uuid.
* This method prioritize the custom index setting provided by extension to decide the index UUID
* Custom index UUID is needed when index moved out of cluster and re-attach back, it will get a new UUID in cluster metadata
*/
fun getCustomIndexUUID(indexMetadata: IndexMetadata): String {
fun getIndexUUID(indexMetadata: IndexMetadata): String {
return if (customUUIDSetting != null) {
indexMetadata.settings.get(customUUIDSetting, indexMetadata.indexUUID)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry
Expand All @@ -49,7 +50,8 @@ class ISMActionsParser private constructor() {
RollupActionParser(),
RolloverActionParser(),
ShrinkActionParser(),
SnapshotActionParser()
SnapshotActionParser(),
TransformActionParser(),
)

val customActionExtensionMap = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.common.regex.Regex
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.authuser.User
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.index.Index
import org.opensearch.index.query.QueryBuilders
import org.opensearch.indexmanagement.IndexManagementIndices
Expand Down Expand Up @@ -110,7 +111,8 @@ class ManagedIndexCoordinator(
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
indexManagementIndices: IndexManagementIndices,
private val indexMetadataProvider: IndexMetadataProvider
private val indexMetadataProvider: IndexMetadataProvider,
private val xContentRegistry: NamedXContentRegistry
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {
Expand Down Expand Up @@ -299,7 +301,7 @@ class ManagedIndexCoordinator(
// If there is a custom index uuid associated with the index, we do not auto manage it
// This is because cold index uses custom uuid, and we do not auto manage cold-to-warm index
val indexMetadata = clusterState.metadata.index(indexName)
val wasOffCluster = defaultIndexMetadataService.getCustomIndexUUID(indexMetadata) != indexMetadata.indexUUID
val wasOffCluster = defaultIndexMetadataService.getIndexUUID(indexMetadata) != indexMetadata.indexUUID
val ismIndexMetadata = ismIndicesMetadata[indexName]
// We try to find lookup name instead of using index name as datastream indices need the alias to match policy
val lookupName = findIndexLookupName(indexName, clusterState)
Expand Down Expand Up @@ -422,7 +424,7 @@ class ManagedIndexCoordinator(

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
parseFromSearchResponse(response = response, parse = Policy.Companion::parse)
parseFromSearchResponse(response, xContentRegistry, Policy.Companion::parse)
} catch (ex: IndexNotFoundException) {
emptyList()
} catch (ex: ClusterBlockException) {
Expand Down Expand Up @@ -603,7 +605,7 @@ class ManagedIndexCoordinator(
}
mRes.forEach {
if (it.response.isExists) {
result[it.id] = contentParser(it.response.sourceAsBytesRef).parseWithType(
result[it.id] = contentParser(it.response.sourceAsBytesRef, xContentRegistry).parseWithType(
it.response.id, it.response.seqNo, it.response.primaryTerm, ManagedIndexConfig.Companion::parse
)
}
Expand Down
Loading

0 comments on commit 6a37e2b

Please sign in to comment.