Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Refactoring for tags usage in test files. #2384

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class SparkQueryDispatcher {
private static final Logger LOG = LogManager.getLogger();
public static final String INDEX_TAG_KEY = "index";
public static final String DATASOURCE_TAG_KEY = "datasource";
public static final String CLUSTER_NAME_TAG_KEY = "cluster";
public static final String CLUSTER_NAME_TAG_KEY = "domain_ident";
public static final String JOB_TYPE_TAG_KEY = "type";

private EMRServerlessClient emrServerlessClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID;
Expand All @@ -28,12 +27,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import lombok.Getter;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -104,9 +98,18 @@ public List<Setting<?>> getSettings() {
@Before
public void setup() {
clusterService = clusterService();
client = (NodeClient) cluster().client();
client
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder()
.putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList())
.build())
.get();
clusterSettings = clusterService.getClusterSettings();
pluginSettings = new OpenSearchSettings(clusterSettings);
client = (NodeClient) cluster().client();
dataSourceService = createDataSourceService();
dataSourceService.createDataSource(
new DataSourceMetadata(
Expand Down Expand Up @@ -143,6 +146,13 @@ public void clean() {
.setTransientSettings(
Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build())
.get();
client
.admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().putNull(DATASOURCE_URI_HOSTS_DENY_LIST.getKey()).build())
.get();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.*;

import com.amazonaws.services.emrserverless.model.CancelJobRunResult;
import com.amazonaws.services.emrserverless.model.GetJobRunResult;
Expand Down Expand Up @@ -120,9 +121,9 @@ void setUp() {
@Test
void testDispatchSelectQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -175,9 +176,9 @@ void testDispatchSelectQuery() {
@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -231,9 +232,9 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
@Test
void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -366,10 +367,10 @@ void testDispatchSelectQueryFailedCreateSession() {
@Test
void testDispatchIndexQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.STREAMING.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
String query =
"CREATE INDEX elb_and_requestUri ON my_glue.default.http_logs(l_orderkey, l_quantity) WITH"
+ " (auto_refresh = true)";
Expand Down Expand Up @@ -425,9 +426,9 @@ void testDispatchIndexQuery() {
@Test
void testDispatchWithPPLQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "source = my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -480,9 +481,9 @@ void testDispatchWithPPLQuery() {
@Test
void testDispatchQueryWithoutATableAndDataSourceName() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.BATCH.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "show tables";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -535,10 +536,10 @@ void testDispatchQueryWithoutATableAndDataSourceName() {
@Test
void testDispatchIndexQueryWithoutADatasourceName() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("index", "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.STREAMING.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(INDEX_TAG_KEY, "flint_my_glue_default_http_logs_elb_and_requesturi_index");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
String query =
"CREATE INDEX elb_and_requestUri ON default.http_logs(l_orderkey, l_quantity) WITH"
+ " (auto_refresh = true)";
Expand Down Expand Up @@ -594,10 +595,10 @@ void testDispatchIndexQueryWithoutADatasourceName() {
@Test
void testDispatchMaterializedViewQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("index", "flint_mv_1");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put("type", JobType.STREAMING.getText());
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(INDEX_TAG_KEY, "flint_mv_1");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText());
String query =
"CREATE MATERIALIZED VIEW mv_1 AS query=select * from my_glue.default.logs WITH"
+ " (auto_refresh = true)";
Expand Down Expand Up @@ -653,8 +654,8 @@ void testDispatchMaterializedViewQuery() {
@Test
void testDispatchShowMVQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
String query = "SHOW MATERIALIZED VIEW IN mys3.default";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -707,8 +708,8 @@ void testDispatchShowMVQuery() {
@Test
void testRefreshIndexQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
String query = "REFRESH SKIPPING INDEX ON my_glue.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down Expand Up @@ -761,8 +762,8 @@ void testRefreshIndexQuery() {
@Test
void testDispatchDescribeIndexQuery() {
HashMap<String, String> tags = new HashMap<>();
tags.put("datasource", "my_glue");
tags.put("cluster", TEST_CLUSTER_NAME);
tags.put(DATASOURCE_TAG_KEY, "my_glue");
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
String query = "DESCRIBE SKIPPING INDEX ON mys3.default.http_logs";
String sparkSubmitParameters =
constructExpectedSparkSubmitParameterString(
Expand Down
Loading