Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poc: scroll query request #693

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public interface ExecutionEngine {
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final String cursor;
}

@Data
Expand Down
23 changes: 23 additions & 0 deletions core/src/main/java/org/opensearch/sql/planner/PlanContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.planner;

import lombok.Getter;
import lombok.Setter;

/**
* The context used for planner.
*/
public class PlanContext {
@Getter
@Setter
private int fetchSize;

public PlanContext() {
this.fetchSize = 0;
}
}
10 changes: 8 additions & 2 deletions core/src/main/java/org/opensearch/sql/planner/Planner.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.java.Log;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.logical.LogicalRelation;
Expand Down Expand Up @@ -39,15 +40,20 @@ public class Planner {
* @param plan logical plan
* @return optimal physical plan
*/
public PhysicalPlan plan(LogicalPlan plan) {
public PhysicalPlan plan(LogicalPlan plan, PlanContext planContext) {
String tableName = findTableName(plan);
if (isNullOrEmpty(tableName)) {
return plan.accept(new DefaultImplementor<>(), null);
}

Table table = storageEngine.getTable(tableName);
return table.implement(
table.optimize(optimize(plan)));
table.optimize(optimize(plan)),
planContext);
}

public PhysicalPlan plan(LogicalPlan plan) {
return plan(plan, new PlanContext());
}

private String findTableName(LogicalPlan plan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ public void open() {
getChild().forEach(PhysicalPlan::open);
}

public String getCursor() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now only the OpenSearchIndexScan, which is the leaf of the AST, has the cursor from the response. This is a workaround to get that cursor when reading the results.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It breaks PhysicalPlan interface. Does it means our existing PhysicalPlan abstraction is not fit into cursor.

try {
return getChild().get(0).getCursor();
} catch (IndexOutOfBoundsException e) {
return getCursor();
Comment on lines +38 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably not a good idea to use try-catch as control flow in Java

}
}

public void close() {
getChild().forEach(PhysicalPlan::close);
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Map;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;

Expand All @@ -29,6 +30,8 @@ public interface Table {
*/
PhysicalPlan implement(LogicalPlan plan);
Copy link
Collaborator Author

@seankao-az seankao-az Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to get rid of this eventually.

Copy link
Member

@vamsimanohar vamsimanohar Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason?..is there a better way of doing this? Has anything been proposed on this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I was referring to the line above that doesn't use the PlanContext. All implement calls should use PlanContext.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the current implement only accepts logical plan. Unless we can encode cursor info in the plan, we have to pass in one more argument. If it seems so, I'm wondering why we still need the original method?


PhysicalPlan implement(LogicalPlan plan, PlanContext planContext);

/**
* Optimize the {@link LogicalPlan} by storage engine rule.
* The default optimize solution is no optimization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.opensearch.security.SecurityAccess;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.protocol.response.QueryResult;
import org.opensearch.sql.protocol.response.format.CsvResponseFormatter;
Expand Down Expand Up @@ -101,9 +103,10 @@ public RestChannelConsumer prepareRequest(SQLQueryRequest request, NodeClient no
try {
// For now analyzing and planning stage may throw syntax exception as well
// which hints the fallback to legacy code is necessary here.
plan = sqlService.plan(
sqlService.analyze(
sqlService.parse(request.getQuery())));
LogicalPlan logicalPlan = sqlService.analyze(sqlService.parse(request.getQuery()));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we may want to also carry planContext when generating the logical plan. Example use case: ML command can set the plan context to specify it needs pagination. Then when building the physical plan, the Planner knows to use scroll instead of regular query.

PlanContext planContext = new PlanContext();
planContext.setFetchSize(request.getFetchSize());
plan = sqlService.plan(logicalPlan, planContext);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar should be done for PPL

} catch (SyntaxCheckException e) {
// When explain, print info log for what unsupported syntax is causing fallback to old engine
if (request.isExplainRequest()) {
Expand Down Expand Up @@ -167,7 +170,7 @@ private ResponseListener<QueryResponse> createQueryResponseListener(RestChannel
@Override
public void onResponse(QueryResponse response) {
sendResponse(channel, OK,
formatter.format(new QueryResult(response.getSchema(), response.getResults())));
formatter.format(new QueryResult(response.getSchema(), response.getResults(), response.getCursor())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

// Route request to new query engine if it's supported already
SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(),
sqlRequest.getSql(), request.path(), request.params());
sqlRequest.getSql(), request.path(), request.params(), sqlRequest.fetchSize());
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void execute(PhysicalPlan physicalPlan, ResponseListener<QueryResponse> l
result.add(plan.next());
}

QueryResponse response = new QueryResponse(physicalPlan.schema(), result);
QueryResponse response = new QueryResponse(physicalPlan.schema(), result, plan.getCursor());
Copy link
Collaborator Author

@seankao-az seankao-az Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels wrong, feels out of place.

The original design for PhysicalPlan assumes that the only results meaningful when executing it, is the data rows it fetched (and filtered, sorted, projected, etc.), hence the plan.next() syntax above makes sense.
Now there's other thing (the cursor) that we also need from the execution result.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is strange to have getCursor added to base class and force each physical operator to implement it. We need to figure out better solution..

listener.onResponse(response);
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public List<PhysicalPlan> getChild() {
return delegate.getChild();
}

@Override
public String getCursor() {
return delegate.getCursor();
}

@Override
public boolean hasNext() {
return delegate.hasNext();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.opensearch.request;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* OpenSearch scroll cursor request.
*/
@EqualsAndHashCode
@Getter
@ToString
public class OpenSearchScrollCursorRequest implements OpenSearchRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between this new class and existing OpenSearchScrollRequest? Can we reuse and modify the existing one?

And same question for OpenSearchScrollQueryRequest below. Wondering why we need two new classes for cursor?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing OpenSearchScrollRequest is stateful, and assumes its search will be called multiple times. I used it in another PR.
In this PR, though, the initial scroll query and the subsequent cursor queries happen at different requests. The OpenSearchScrollQueryRequest is for invoking the initial scroll query, which specifies both the fetch_size and query. OpenSearchScrollCursorRequest uses a cursor to fetch the next page.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turned out that in order to paginate some PPL queries, we indeed need to store some states, so using the existing OpenSearchScrollRequest makes more sense than adding these two new classes.


/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(1L);

/** Index name. */
@EqualsAndHashCode.Exclude
@ToString.Exclude
private final OpenSearchExprValueFactory exprValueFactory;

/**
* Scroll id.
*/
private String scrollId;

/** Search request source builder. */
private final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor request doesn't need this, but OpenSearchRequest requires this


public OpenSearchScrollCursorRequest(String scrollId, OpenSearchExprValueFactory exprValueFactory) {
this.scrollId = scrollId;
this.exprValueFactory = exprValueFactory;
}

@Override
public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
SearchResponse openSearchResponse;
openSearchResponse = scrollAction.apply(scrollRequest());

return new OpenSearchResponse(openSearchResponse, exprValueFactory);
}

@Override
public void clean(Consumer<String> cleanAction) {
try {
cleanAction.accept(getScrollId());
}
finally {

}
}

/**
* Generate OpenSearch scroll request by scroll id maintained.
*
* @return scroll request
*/
public SearchScrollRequest scrollRequest() {
Objects.requireNonNull(scrollId, "Scroll id cannot be null");
return new SearchScrollRequest().scroll(DEFAULT_SCROLL_TIMEOUT).scrollId(scrollId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.opensearch.request;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.opensearch.data.value.OpenSearchExprValueFactory;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;

import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* OpenSearch scroll search request. This has to be stateful because it needs to:
*
* <p>1) Accumulate search source builder when visiting logical plan to push down operation 2)
* Maintain scroll ID between calls to client search method
*/
@EqualsAndHashCode
@Getter
@ToString
public class OpenSearchScrollQueryRequest implements OpenSearchRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it cover aggregation pagination also?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't


/** Default scroll context timeout in minutes. */
public static final TimeValue DEFAULT_SCROLL_TIMEOUT = TimeValue.timeValueMinutes(1L);

/**
* {@link IndexName}.
*/
private final IndexName indexName;

/** Index name. */
@EqualsAndHashCode.Exclude
@ToString.Exclude
private final OpenSearchExprValueFactory exprValueFactory;

/** Search request source builder. */
private final SearchSourceBuilder sourceBuilder;

public OpenSearchScrollQueryRequest(IndexName indexName, int size, OpenSearchExprValueFactory exprValueFactory) {
this.indexName = indexName;
this.sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(size);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conflicts with LIMIT
We may want to forbid mix usage of LIMIT and scroll

this.exprValueFactory = exprValueFactory;
}

public OpenSearchScrollQueryRequest(String indexName, int size, OpenSearchExprValueFactory exprValueFactory) {
this(new IndexName(indexName), size, exprValueFactory);
}

@Override
public OpenSearchResponse search(Function<SearchRequest, SearchResponse> searchAction,
Function<SearchScrollRequest, SearchResponse> scrollAction) {
SearchResponse openSearchResponse;
openSearchResponse = searchAction.apply(searchRequest());

return new OpenSearchResponse(openSearchResponse, exprValueFactory);
}

@Override
public void clean(Consumer<String> cleanAction) {
// do nothing
}

/**
* Generate OpenSearch search request.
*
* @return search request
*/
public SearchRequest searchRequest() {
return new SearchRequest()
.indices(indexName.getIndexNames())
.scroll(DEFAULT_SCROLL_TIMEOUT)
Copy link
Collaborator Author

@seankao-az seankao-az Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want this to be configurable. In OpenSearch Setting? In Request body with fetch_size?

.source(sourceBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Iterator;
import java.util.Map;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.search.SearchHits;
Expand All @@ -36,6 +37,12 @@ public class OpenSearchResponse implements Iterable<ExprValue> {
*/
private final Aggregations aggregations;

/**
* Search scrollId result.
*/
@Getter
private final String scrollId;

/**
* ElasticsearchExprValueFactory used to build ExprValue from search result.
*/
Expand All @@ -49,6 +56,7 @@ public OpenSearchResponse(SearchResponse searchResponse,
OpenSearchExprValueFactory exprValueFactory) {
this.hits = searchResponse.getHits();
this.aggregations = searchResponse.getAggregations();
this.scrollId = searchResponse.getScrollId();
this.exprValueFactory = exprValueFactory;
}

Expand All @@ -58,6 +66,7 @@ public OpenSearchResponse(SearchResponse searchResponse,
public OpenSearchResponse(SearchHits hits, OpenSearchExprValueFactory exprValueFactory) {
this.hits = hits;
this.aggregations = null;
this.scrollId = null;
this.exprValueFactory = exprValueFactory;
}

Expand Down
Loading