diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 8feeddcafc..b60e18d81a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -55,7 +55,7 @@ public class SparkQueryDispatcher { private static final Logger LOG = LogManager.getLogger(); public static final String INDEX_TAG_KEY = "index"; public static final String DATASOURCE_TAG_KEY = "datasource"; - public static final String CLUSTER_NAME_TAG_KEY = "cluster"; + public static final String CLUSTER_NAME_TAG_KEY = "domain_ident"; public static final String JOB_TYPE_TAG_KEY = "type"; private EMRServerlessClient emrServerlessClient; diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index d0ad9c89cc..3a806cad1f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,8 +5,7 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -28,12 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import lombok.Getter; import org.junit.After; import org.junit.Before; @@ -104,9 +98,18 @@ public List> getSettings() { @Before public void setup() { clusterService = clusterService(); + client = (NodeClient) cluster().client(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder() + .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) + .build()) + .get(); clusterSettings = clusterService.getClusterSettings(); pluginSettings = new OpenSearchSettings(clusterSettings); - client = (NodeClient) cluster().client(); dataSourceService = createDataSourceService(); dataSourceService.createDataSource( new DataSourceMetadata( @@ -143,6 +146,13 @@ public void clean() { .setTransientSettings( Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) .get(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build()) + .get(); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 743274d46c..5139a3003d 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -33,6 +33,7 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; +import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -120,9 +121,9 @@ void setUp() { @Test void testDispatchSelectQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -175,9 +176,9 @@ void testDispatchSelectQuery() { @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -231,9 +232,9 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { @Test void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "select * from my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -366,10 +367,10 @@ void testDispatchSelectQueryFailedCreateSession() { @Test void testDispatchIndexQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.STREAMING.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -425,9 +426,9 @@ void testDispatchIndexQuery() { @Test void testDispatchWithPPLQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "source = my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -480,9 +481,9 @@ void testDispatchWithPPLQuery() { @Test void testDispatchQueryWithoutATableAndDataSourceName() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.BATCH.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); String query = "show tables"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -535,10 +536,10 @@ void testDispatchQueryWithoutATableAndDataSourceName() { @Test void testDispatchIndexQueryWithoutADatasourceName() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.STREAMING.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); String query = "CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH" + " (auto_refresh = true)"; @@ -594,10 +595,10 @@ void testDispatchIndexQueryWithoutADatasourceName() { @Test void testDispatchMaterializedViewQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("index", "flint_mv_1"); - tags.put("cluster", TEST_CLUSTER_NAME); - tags.put("type", JobType.STREAMING.getText()); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(INDEX_TAG_KEY, "flint_mv_1"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); + tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); String query = "CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH" + " (auto_refresh = true)"; @@ -653,8 +654,8 @@ void testDispatchMaterializedViewQuery() { @Test void testDispatchShowMVQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); String query = "SHOW MATERIALIZED VIEW IN mys3.default"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -707,8 +708,8 @@ void testDispatchShowMVQuery() { @Test void testRefreshIndexQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); String query = "REFRESH SKIPPING INDEX ON my_glue.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString( @@ -761,8 +762,8 @@ void testRefreshIndexQuery() { @Test void testDispatchDescribeIndexQuery() { HashMap tags = new HashMap<>(); - tags.put("datasource", "my_glue"); - tags.put("cluster", TEST_CLUSTER_NAME); + tags.put(DATASOURCE_TAG_KEY, "my_glue"); + tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs"; String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(