diff --git a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java index 6de841a890..4d48ec5f29 100644 --- a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java @@ -46,6 +46,7 @@ public interface ExecutionEngine { class QueryResponse { private final Schema schema; private final List results; + private final String cursor; } @Data diff --git a/core/src/main/java/org/opensearch/sql/planner/PlanContext.java b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java new file mode 100644 index 0000000000..9d943e397b --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/PlanContext.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.planner; + +import lombok.Getter; +import lombok.Setter; + +/** + * The context used for planner. + */ +public class PlanContext { + @Getter + @Setter + private int fetchSize; + + public PlanContext() { + this.fetchSize = 0; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/Planner.java b/core/src/main/java/org/opensearch/sql/planner/Planner.java index 803b2d1931..6d6f761b56 100644 --- a/core/src/main/java/org/opensearch/sql/planner/Planner.java +++ b/core/src/main/java/org/opensearch/sql/planner/Planner.java @@ -10,6 +10,7 @@ import java.util.List; import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; import org.opensearch.sql.planner.logical.LogicalRelation; @@ -39,7 +40,7 @@ public class Planner { * @param plan logical plan * @return optimal physical plan */ - public PhysicalPlan plan(LogicalPlan plan) { + public PhysicalPlan plan(LogicalPlan plan, PlanContext planContext) { String tableName = findTableName(plan); if (isNullOrEmpty(tableName)) { return plan.accept(new DefaultImplementor<>(), null); @@ -47,7 +48,12 @@ public PhysicalPlan plan(LogicalPlan plan) { Table table = storageEngine.getTable(tableName); return table.implement( - table.optimize(optimize(plan))); + table.optimize(optimize(plan)), + planContext); + } + + public PhysicalPlan plan(LogicalPlan plan) { + return plan(plan, new PlanContext()); } private String findTableName(LogicalPlan plan) { diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java index a067f5b3f9..eda0fe2a4d 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java @@ -32,6 +32,14 @@ public void open() { getChild().forEach(PhysicalPlan::open); } + public String getCursor() { + try { + return getChild().get(0).getCursor(); + } catch (IndexOutOfBoundsException e) { + return getCursor(); + } + } + public void close() { getChild().forEach(PhysicalPlan::close); } diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index 731cf878c6..33dc4cd5e8 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -8,6 +8,7 @@ import java.util.Map; import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -29,6 +30,8 @@ public interface Table { */ PhysicalPlan implement(LogicalPlan plan); + PhysicalPlan implement(LogicalPlan plan, PlanContext planContext); + /** * Optimize the {@link LogicalPlan} by storage engine rule. * The default optimize solution is no optimization. diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java index 51484feda7..b21077e2fe 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -29,7 +29,9 @@ import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.protocol.response.QueryResult; import org.opensearch.sql.protocol.response.format.CsvResponseFormatter; @@ -101,9 +103,10 @@ public RestChannelConsumer prepareRequest(SQLQueryRequest request, NodeClient no try { // For now analyzing and planning stage may throw syntax exception as well // which hints the fallback to legacy code is necessary here. - plan = sqlService.plan( - sqlService.analyze( - sqlService.parse(request.getQuery()))); + LogicalPlan logicalPlan = sqlService.analyze(sqlService.parse(request.getQuery())); + PlanContext planContext = new PlanContext(); + planContext.setFetchSize(request.getFetchSize()); + plan = sqlService.plan(logicalPlan, planContext); } catch (SyntaxCheckException e) { // When explain, print info log for what unsupported syntax is causing fallback to old engine if (request.isExplainRequest()) { @@ -167,7 +170,7 @@ private ResponseListener createQueryResponseListener(RestChannel @Override public void onResponse(QueryResponse response) { sendResponse(channel, OK, - formatter.format(new QueryResult(response.getSchema(), response.getResults()))); + formatter.format(new QueryResult(response.getSchema(), response.getResults(), response.getCursor()))); } @Override diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 10d9dab0fa..03fa43a9dc 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -149,7 +149,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli // Route request to new query engine if it's supported already SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(), - sqlRequest.getSql(), request.path(), request.params()); + sqlRequest.getSql(), request.path(), request.params(), sqlRequest.fetchSize()); RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client); if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) { LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId()); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index bb00fbb68b..4785d5503f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -40,7 +40,7 @@ public void execute(PhysicalPlan physicalPlan, ResponseListener l result.add(plan.next()); } - QueryResponse response = new QueryResponse(physicalPlan.schema(), result); + QueryResponse response = new QueryResponse(physicalPlan.schema(), result, plan.getCursor()); listener.onResponse(response); } catch (Exception e) { listener.onFailure(e); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java index 9c59e4acaf..e9a42fd28b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/ResourceMonitorPlan.java @@ -69,6 +69,11 @@ public List getChild() { return delegate.getChild(); } + @Override + public String getCursor() { + return delegate.getCursor(); + } + @Override public boolean hasNext() { return delegate.hasNext(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollCursorRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollCursorRequest.java new file mode 100644 index 0000000000..4a38d96177 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollCursorRequest.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.opensearch.request; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; +import org.opensearch.sql.opensearch.response.OpenSearchResponse; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * OpenSearch scroll cursor request. + */ +@EqualsAndHashCode +@Getter +@ToString +public class OpenSearchScrollCursorRequest implements OpenSearchRequest { + + /** Default scroll context timeout in minutes. */ + public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(1L); + + /** Index name. */ + @EqualsAndHashCode.Exclude + @ToString.Exclude + private final OpenSearchExprValueFactory exprValueFactory; + + /** + * Scroll id. + */ + private String scrollId; + + /** Search request source builder. */ + private final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + + public OpenSearchScrollCursorRequest(String scrollId, OpenSearchExprValueFactory exprValueFactory) { + this.scrollId = scrollId; + this.exprValueFactory = exprValueFactory; + } + + @Override + public OpenSearchResponse search(Function searchAction, + Function scrollAction) { + SearchResponse openSearchResponse; + openSearchResponse = scrollAction.apply(scrollRequest()); + + return new OpenSearchResponse(openSearchResponse, exprValueFactory); + } + + @Override + public void clean(Consumer cleanAction) { + try { + cleanAction.accept(getScrollId()); + } + finally { + + } + } + + /** + * Generate OpenSearch scroll request by scroll id maintained. + * + * @return scroll request + */ + public SearchScrollRequest scrollRequest() { + Objects.requireNonNull(scrollId, "Scroll id cannot be null"); + return new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollQueryRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollQueryRequest.java new file mode 100644 index 0000000000..4b79c472e6 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollQueryRequest.java @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.opensearch.request; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; +import org.opensearch.sql.opensearch.response.OpenSearchResponse; + +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * OpenSearch scroll search request. This has to be stateful because it needs to: + * + *

1) Accumulate search source builder when visiting logical plan to push down operation 2) + * Maintain scroll ID between calls to client search method + */ +@EqualsAndHashCode +@Getter +@ToString +public class OpenSearchScrollQueryRequest implements OpenSearchRequest { + + /** Default scroll context timeout in minutes. */ + public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(1L); + + /** + * {@link IndexName}. + */ + private final IndexName indexName; + + /** Index name. */ + @EqualsAndHashCode.Exclude + @ToString.Exclude + private final OpenSearchExprValueFactory exprValueFactory; + + /** Search request source builder. */ + private final SearchSourceBuilder sourceBuilder; + + public OpenSearchScrollQueryRequest(IndexName indexName, int size, OpenSearchExprValueFactory exprValueFactory) { + this.indexName = indexName; + this.sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.from(0); + sourceBuilder.size(size); + this.exprValueFactory = exprValueFactory; + } + + public OpenSearchScrollQueryRequest(String indexName, int size, OpenSearchExprValueFactory exprValueFactory) { + this(new IndexName(indexName), size, exprValueFactory); + } + + @Override + public OpenSearchResponse search(Function searchAction, + Function scrollAction) { + SearchResponse openSearchResponse; + openSearchResponse = searchAction.apply(searchRequest()); + + return new OpenSearchResponse(openSearchResponse, exprValueFactory); + } + + @Override + public void clean(Consumer cleanAction) { + // do nothing + } + + /** + * Generate OpenSearch search request. + * + * @return search request + */ + public SearchRequest searchRequest() { + return new SearchRequest() + .indices(indexName.getIndexNames()) + .scroll(DEFAULT_SCROLL_TIMEOUT) + .source(sourceBuilder); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java index 7dc77d7d29..262d5aa21b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/OpenSearchResponse.java @@ -11,6 +11,7 @@ import java.util.Iterator; import java.util.Map; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.ToString; import org.opensearch.action.search.SearchResponse; import org.opensearch.search.SearchHits; @@ -36,6 +37,12 @@ public class OpenSearchResponse implements Iterable { */ private final Aggregations aggregations; + /** + * Search scrollId result. + */ + @Getter + private final String scrollId; + /** * ElasticsearchExprValueFactory used to build ExprValue from search result. */ @@ -49,6 +56,7 @@ public OpenSearchResponse(SearchResponse searchResponse, OpenSearchExprValueFactory exprValueFactory) { this.hits = searchResponse.getHits(); this.aggregations = searchResponse.getAggregations(); + this.scrollId = searchResponse.getScrollId(); this.exprValueFactory = exprValueFactory; } @@ -58,6 +66,7 @@ public OpenSearchResponse(SearchResponse searchResponse, public OpenSearchResponse(SearchHits hits, OpenSearchExprValueFactory exprValueFactory) { this.hits = hits; this.aggregations = null; + this.scrollId = null; this.exprValueFactory = exprValueFactory; } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index 49301cbf53..6aef5badcf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -32,6 +32,7 @@ import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder; import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer; import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalAD; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; @@ -83,9 +84,9 @@ public Map getFieldTypes() { * TODO: Push down operations to index scan operator as much as possible in future. */ @Override - public PhysicalPlan implement(LogicalPlan plan) { + public PhysicalPlan implement(LogicalPlan plan, PlanContext planContext) { OpenSearchIndexScan indexScan = new OpenSearchIndexScan(client, settings, indexName, - new OpenSearchExprValueFactory(getFieldTypes())); + new OpenSearchExprValueFactory(getFieldTypes()), planContext.getFetchSize()); /* * Visit logical plan with index scan as context so logical operators visited, such as @@ -95,6 +96,12 @@ public PhysicalPlan implement(LogicalPlan plan) { return plan.accept(new OpenSearchDefaultImplementor(indexScan, client), indexScan); } + @Override + public PhysicalPlan implement(LogicalPlan plan) { + return this.implement(plan, new PlanContext()); + } + + @Override public LogicalPlan optimize(LogicalPlan plan) { return OpenSearchLogicalPlanOptimizerFactory.create().optimize(plan); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java index c35a5ba9db..0c1ea25be7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexScan.java @@ -33,6 +33,9 @@ import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory; import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; +import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; +import org.opensearch.sql.opensearch.request.OpenSearchScrollCursorRequest; +import org.opensearch.sql.opensearch.request.OpenSearchScrollQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.response.agg.OpenSearchAggregationResponseParser; @@ -57,13 +60,25 @@ public class OpenSearchIndexScan extends TableScanOperator { /** Search response for current batch. */ private Iterator iterator; + /** Cursor response. */ + private String cursor; + /** * Constructor. */ public OpenSearchIndexScan(OpenSearchClient client, Settings settings, String indexName, OpenSearchExprValueFactory exprValueFactory) { - this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory); + this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory, 0); + } + + /** + * Constructor. + */ + public OpenSearchIndexScan(OpenSearchClient client, + Settings settings, String indexName, + OpenSearchExprValueFactory exprValueFactory, int fetchSize) { + this(client, settings, new OpenSearchRequest.IndexName(indexName), exprValueFactory, fetchSize); } /** @@ -72,23 +87,39 @@ public OpenSearchIndexScan(OpenSearchClient client, public OpenSearchIndexScan(OpenSearchClient client, Settings settings, OpenSearchRequest.IndexName indexName, OpenSearchExprValueFactory exprValueFactory) { + this(client, settings, indexName, exprValueFactory, 0); + } + + /** + * Constructor. + */ + public OpenSearchIndexScan(OpenSearchClient client, + Settings settings, OpenSearchRequest.IndexName indexName, + OpenSearchExprValueFactory exprValueFactory, int fetchSize) { this.client = client; - this.request = new OpenSearchQueryRequest(indexName, - settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory); + if (fetchSize > 0) { + this.request = new OpenSearchScrollQueryRequest(indexName, fetchSize, exprValueFactory); + } else { + this.request = new OpenSearchQueryRequest(indexName, + settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT), exprValueFactory); + } } + //public setScanMode + @Override public void open() { super.open(); // For now pull all results immediately once open - List responses = new ArrayList<>(); OpenSearchResponse response = client.search(request); - while (!response.isEmpty()) { - responses.add(response); - response = client.search(request); - } - iterator = Iterables.concat(responses.toArray(new OpenSearchResponse[0])).iterator(); + cursor = response.getScrollId(); + iterator = response.iterator(); + } + + @Override + public String getCursor() { + return cursor; } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java index edd5593f4d..82102e24a9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/system/OpenSearchSystemIndex.java @@ -11,6 +11,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Map; import lombok.RequiredArgsConstructor; +import lombok.extern.java.Log; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.opensearch.client.OpenSearchClient; @@ -18,6 +19,7 @@ import org.opensearch.sql.opensearch.request.system.OpenSearchDescribeIndexRequest; import org.opensearch.sql.opensearch.request.system.OpenSearchSystemRequest; import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalRelation; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -48,6 +50,11 @@ public PhysicalPlan implement(LogicalPlan plan) { return plan.accept(new OpenSearchSystemIndexDefaultImplementor(), null); } + @Override + public PhysicalPlan implement(LogicalPlan plan, PlanContext planContext) { + return implement(plan); + } + @VisibleForTesting @RequiredArgsConstructor public class OpenSearchSystemIndexDefaultImplementor diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index e4699b6f9f..f50f23df31 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -158,7 +158,7 @@ private ResponseListener createListener( @Override public void onResponse(ExecutionEngine.QueryResponse response) { String responseContent = - formatter.format(new QueryResult(response.getSchema(), response.getResults())); + formatter.format(new QueryResult(response.getSchema(), response.getResults(), response.getCursor())); listener.onResponse(new TransportPPLQueryResponse(responseContent)); } diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java index 915a61f361..c6d995914f 100644 --- a/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/QueryResult.java @@ -32,6 +32,9 @@ public class QueryResult implements Iterable { */ private final Collection exprValues; + @Getter + private final String cursor; + /** * size of results. diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/JdbcResponseFormatter.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/JdbcResponseFormatter.java index 943287cb62..1eea09631b 100644 --- a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/JdbcResponseFormatter.java +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/JdbcResponseFormatter.java @@ -38,6 +38,12 @@ protected Object buildJsonObject(QueryResult response) { response.getSchema().getColumns().forEach(col -> json.column(fetchColumn(col))); json.datarows(fetchDataRows(response)); + // Fetch cursor + String cursor = response.getCursor(); + if (cursor != null) { + json.cursor(cursor); + } + // Populate other fields json.total(response.size()) .size(response.size()) @@ -92,6 +98,7 @@ public static class JdbcResponse { @Singular("column") private final List schema; private final Object[][] datarows; + private final String cursor; private final long total; private final long size; private final int status; diff --git a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java index 991e9df12a..bdf81b8360 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java +++ b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java @@ -17,6 +17,7 @@ import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.function.BuiltinFunctionRepository; +import org.opensearch.sql.planner.PlanContext; import org.opensearch.sql.planner.Planner; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; @@ -107,4 +108,12 @@ public PhysicalPlan plan(LogicalPlan logicalPlan) { .plan(logicalPlan); } + /** + * Generate optimal physical plan from logical plan and context. + */ + public PhysicalPlan plan(LogicalPlan logicalPlan, PlanContext planContext) { + return new Planner(storageEngine, LogicalPlanOptimizer.create(new DSL(repository))) + .plan(logicalPlan, planContext); + } + } diff --git a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java index 508f80cee4..228e6bdfab 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java +++ b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java @@ -60,19 +60,42 @@ public class SQLQueryRequest { */ private Map params = Collections.emptyMap(); + /** + * Request params. + */ + @Getter + private final int fetchSize; + @Getter @Accessors(fluent = true) private boolean sanitize = true; + /** + * Constructor of SQLQueryRequest that passes request params. + */ + public SQLQueryRequest( + JSONObject jsonContent, String query, String path, String params) { + this(jsonContent, query, path, params, 0); + } + /** * Constructor of SQLQueryRequest that passes request params. */ public SQLQueryRequest( JSONObject jsonContent, String query, String path, Map params) { + this(jsonContent, query, path, params, 0); + } + + /** + * Constructor of SQLQueryRequest that passes request params. + */ + public SQLQueryRequest( + JSONObject jsonContent, String query, String path, Map params, int fetchSize) { this.jsonContent = jsonContent; this.query = query; this.path = path; this.params = params; + this.fetchSize = fetchSize; this.format = getFormat(params); this.sanitize = shouldSanitize(params); } @@ -87,7 +110,6 @@ public SQLQueryRequest( */ public boolean isSupported() { return isOnlySupportedFieldInPayload() - && isFetchSizeZeroIfPresent() && isSupportedFormat(); } @@ -116,10 +138,6 @@ private boolean isOnlySupportedFieldInPayload() { return SUPPORTED_FIELDS.containsAll(jsonContent.keySet()); } - private boolean isFetchSizeZeroIfPresent() { - return (jsonContent.optInt("fetch_size") == 0); - } - private boolean isSupportedFormat() { return Strings.isNullOrEmpty(format) || "jdbc".equalsIgnoreCase(format) || "csv".equalsIgnoreCase(format) || "raw".equalsIgnoreCase(format);