From af10e33fddd21ac4f1033b889f449ca2a14c6b32 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Tue, 31 Oct 2023 18:28:24 -0400 Subject: [PATCH 1/4] tester code: pipeline aggr. transform job Signed-off-by: n-dohrmann --- .../transform/TransformRunnerIT.kt | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index e4c7daad1..10a866fe7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -27,6 +27,7 @@ import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilders import org.opensearch.search.aggregations.AggregatorFactories import org.opensearch.search.aggregations.metrics.ScriptedMetricAggregationBuilder +import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder import java.lang.Integer.min import java.time.Instant import java.time.temporal.ChronoUnit @@ -688,6 +689,51 @@ class TransformRunnerIT : TransformRestTestCase() { assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) } + fun `test transform with invalid pipeline aggregation triggering search failure`() { + validateSourceIndex("transform-source-index") + + val aggregatorFactories = AggregatorFactories.builder() + aggregatorFactories.addPipelineAggregator( + BucketScriptPipelineAggregationBuilder( + "test_pipeline_aggregation", + Script("1") + ) + ) + + val transform = Transform( + id = "id_17", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "transform-source-index", + targetIndex = "transform-target-index", + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = "store_and_fwd_flag", targetField = "flag"), + Histogram(sourceField = "passenger_count", targetField = "count", interval = 2.0), + DateHistogram(sourceField = "tpep_pickup_datetime", targetField = "date", fixedInterval = "1d") + ), + aggregations = aggregatorFactories + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + val metadata = waitFor { + val job = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", job.metadataId) + val transformMetadata = getTransformMetadata(job.metadataId!!) + assertEquals("Transform has not failed", TransformMetadata.Status.FAILED, transformMetadata.status) + transformMetadata + } + + assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) + } + fun `test transform with data stream`() { // Create a data stream. val dataStreamName = "transform-data-stream" From 937f89cf7215e288f05bd290018cb3661cec9893 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Tue, 31 Oct 2023 19:19:48 -0400 Subject: [PATCH 2/4] made test case for pipeline aggregator in transform job Signed-off-by: n-dohrmann --- .../transform/TransformRunnerIT.kt | 79 ++++++++++--------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 10a866fe7..7912d2d22 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -31,6 +31,7 @@ import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregati import java.lang.Integer.min import java.time.Instant import java.time.temporal.ChronoUnit +import kotlin.test.assertFailsWith class TransformRunnerIT : TransformRestTestCase() { @@ -690,48 +691,50 @@ class TransformRunnerIT : TransformRestTestCase() { } fun `test transform with invalid pipeline aggregation triggering search failure`() { - validateSourceIndex("transform-source-index") - - val aggregatorFactories = AggregatorFactories.builder() - aggregatorFactories.addPipelineAggregator( - BucketScriptPipelineAggregationBuilder( - "test_pipeline_aggregation", - Script("1") + assertFailsWith(IllegalArgumentException::class, "Bucket-script aggregation must fail!") { + validateSourceIndex("transform-source-index") + + val aggregatorFactories = AggregatorFactories.builder() + aggregatorFactories.addPipelineAggregator( + BucketScriptPipelineAggregationBuilder( + "test_pipeline_aggregation", + Script("1") + ) ) - ) - val transform = Transform( - id = "id_17", - schemaVersion = 1L, - enabled = true, - enabledAt = Instant.now(), - updatedAt = Instant.now(), - jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), - description = "test transform", - metadataId = null, - sourceIndex = "transform-source-index", - targetIndex = "transform-target-index", - roles = emptyList(), - pageSize = 1, - groups = listOf( - Terms(sourceField = "store_and_fwd_flag", targetField = "flag"), - Histogram(sourceField = "passenger_count", targetField = "count", interval = 2.0), - DateHistogram(sourceField = "tpep_pickup_datetime", targetField = "date", fixedInterval = "1d") - ), - aggregations = aggregatorFactories - ).let { createTransform(it, it.id) } - - updateTransformStartTime(transform) + val transform = Transform( + id = "id_17", + schemaVersion = 1L, + enabled = true, + enabledAt = Instant.now(), + updatedAt = Instant.now(), + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + description = "test transform", + metadataId = null, + sourceIndex = "transform-source-index", + targetIndex = "transform-target-index", + roles = emptyList(), + pageSize = 1, + groups = listOf( + Terms(sourceField = "store_and_fwd_flag", targetField = "flag"), + Histogram(sourceField = "passenger_count", targetField = "count", interval = 2.0), + DateHistogram(sourceField = "tpep_pickup_datetime", targetField = "date", fixedInterval = "1d") + ), + aggregations = aggregatorFactories + ).let { createTransform(it, it.id) } + + updateTransformStartTime(transform) + + val metadata = waitFor { + val job = getTransform(transformId = transform.id) + assertNotNull("Transform job doesn't have metadata set", job.metadataId) + val transformMetadata = getTransformMetadata(job.metadataId!!) + assertEquals("Transform has not failed", TransformMetadata.Status.FAILED, transformMetadata.status) + transformMetadata + } - val metadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform has not failed", TransformMetadata.Status.FAILED, transformMetadata.status) - transformMetadata + assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) } - - assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) } fun `test transform with data stream`() { From c75ca60dbd2d699864bae82bc85f1dc7319358a4 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Tue, 31 Oct 2023 20:08:51 -0400 Subject: [PATCH 3/4] removed unnec. test lines Signed-off-by: n-dohrmann --- .../indexmanagement/transform/TransformRunnerIT.kt | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 7912d2d22..466dd09b2 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -722,18 +722,6 @@ class TransformRunnerIT : TransformRestTestCase() { ), aggregations = aggregatorFactories ).let { createTransform(it, it.id) } - - updateTransformStartTime(transform) - - val metadata = waitFor { - val job = getTransform(transformId = transform.id) - assertNotNull("Transform job doesn't have metadata set", job.metadataId) - val transformMetadata = getTransformMetadata(job.metadataId!!) - assertEquals("Transform has not failed", TransformMetadata.Status.FAILED, transformMetadata.status) - transformMetadata - } - - assertTrue("Expected failure message to be present", !metadata.failureReason.isNullOrBlank()) } } From 3445ec9fc857842424b6cd380fc62f75963cf6f1 Mon Sep 17 00:00:00 2001 From: n-dohrmann Date: Tue, 31 Oct 2023 20:27:14 -0400 Subject: [PATCH 4/4] re-added method call on Transform obj Signed-off-by: n-dohrmann --- .../opensearch/indexmanagement/transform/TransformRunnerIT.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt index 466dd09b2..a948899ff 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/transform/TransformRunnerIT.kt @@ -722,6 +722,7 @@ class TransformRunnerIT : TransformRestTestCase() { ), aggregations = aggregatorFactories ).let { createTransform(it, it.id) } + updateTransformStartTime(transform) } }