Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design for LIMIT in pagination. #1752

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* Currently, V2 engine does not support queries with:
* - aggregation (GROUP BY clause or aggregation functions like min/max)
* - in memory aggregation (window function)
* - LIMIT/OFFSET clause(s)
* - OFFSET clause
* - without FROM clause
* - JOIN
* - a subquery
Expand Down Expand Up @@ -103,10 +103,13 @@ public Boolean visitValues(Values node, Object context) {
return Boolean.TRUE;
}

// Queries with LIMIT/OFFSET clauses are unsupported
// Limit can't be pushed down in pagination, so has to be implemented by `LimitOperator`.
// OpenSearch rejects scroll query with `from` parameter, so offset can't be pushed down.
// Non-zero offset produces incomplete or even empty pages, so making it not supported.
@Override
public Boolean visitLimit(Limit node, Object context) {
return Boolean.FALSE;
// TODO open a GH ticket for offset
return node.getOffset() == 0 && canPaginate(node, context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import static com.facebook.presto.matching.DefaultMatcher.DEFAULT_MATCHER;

import com.facebook.presto.matching.Match;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
Expand All @@ -28,20 +32,20 @@
*/
public class LogicalPlanOptimizer {

private final List<Rule<?>> rules;
private final List<Rule<LogicalPlan>> rules;

/**
* Create {@link LogicalPlanOptimizer} with customized rules.
*/
public LogicalPlanOptimizer(List<Rule<?>> rules) {
public LogicalPlanOptimizer(List<Rule<LogicalPlan>> rules) {
this.rules = rules;
}

/**
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
return new LogicalPlanOptimizer(Stream.of(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
Expand All @@ -51,47 +55,48 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
new PushDownPageSize(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
new PushDownPageSize(),
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_NESTED,
TableScanPushDown.PUSH_DOWN_PROJECT,
new CreateTableWriteBuilder()));
new CreateTableWriteBuilder())
.map(r -> (Rule<LogicalPlan>)r).collect(Collectors.toList()));
}

/**
* Optimize {@link LogicalPlan}.
*/
public LogicalPlan optimize(LogicalPlan plan) {
LogicalPlan optimized = internalOptimize(plan);
var node = plan;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just rename the argument?

for (Rule<LogicalPlan> rule : rules) {
node = traverseAndOptimize(node, rule);
}
return node;
}

private LogicalPlan traverseAndOptimize(LogicalPlan plan, Rule<LogicalPlan> rule) {
LogicalPlan optimized = internalOptimize(plan, rule);
optimized.replaceChildPlans(
optimized.getChild().stream().map(this::optimize).collect(
Collectors.toList()));
return internalOptimize(optimized);
optimized.getChild().stream().map(p -> traverseAndOptimize(p, rule))
.collect(Collectors.toList()));
return internalOptimize(optimized, rule);
}

private LogicalPlan internalOptimize(LogicalPlan plan) {
private LogicalPlan internalOptimize(LogicalPlan plan, Rule<LogicalPlan> rule) {
LogicalPlan node = plan;
boolean done = false;
while (!done) {
done = true;
for (Rule rule : rules) {
Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
if (node != match.value()) {
done = false;
}
}
}
Match<LogicalPlan> match = DEFAULT_MATCHER.match(rule.pattern(), node);
if (match.isPresent()) {
node = rule.apply(match.value(), match.captures());

// For new TableScanPushDown impl, pattern match doesn't necessarily cause
// push down to happen. So reiterate all rules against the node only if the node
// is actually replaced by any rule.
// TODO: may need to introduce fixed point or maximum iteration limit in future
}
return node;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class TableScanPushDown<T extends LogicalPlan> implements Rule<T> {

@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern<T> pattern,
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
BiFunction<T, TableScanBuilder, Boolean> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<TableScanBuilder>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
package org.opensearch.sql.planner.physical;

import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.exception.NoCursorException;
import org.opensearch.sql.planner.SerializablePlan;

/**
* The limit operator sets a window, to and block the rows out of the window
Expand All @@ -25,16 +30,22 @@
* it occurs when the original result set has a size smaller than {index + limit},
* or even not greater than the offset. The latter results in an empty output.</p>
*/
@RequiredArgsConstructor
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
public class LimitOperator extends PhysicalPlan {
private final PhysicalPlan input;
private final Integer limit;
private final Integer offset;
@AllArgsConstructor
public class LimitOperator extends PhysicalPlan implements SerializablePlan {
private PhysicalPlan input;
private Integer limit;
private Integer offset;
private Integer count = 0;

public LimitOperator(PhysicalPlan input, Integer limit, Integer offset) {
this.input = input;
this.limit = limit;
this.offset = offset;
}

@Override
public void open() {
super.open();
Expand Down Expand Up @@ -67,4 +78,29 @@ public List<PhysicalPlan> getChild() {
return ImmutableList.of(input);
}

/** Don't use, it is for deserialization needs only. */
@Deprecated
public LimitOperator() {
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
limit = in.readInt();
count = in.readInt();
// note: offset aren't serialized and deserialized, because not supported in pagination
// TODO open a GH ticket and post here link
offset = 0;
input = (PhysicalPlan) in.readObject();
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
if (count == limit) {
// paging is finished
throw new NoCursorException();
}
out.writeInt(limit);
out.writeInt(count);
out.writeObject(((SerializablePlan) input).getPlanForSerialization());
}
}
62 changes: 62 additions & 0 deletions docs/dev/limit-in-pagination.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
## Background

Scrolled search is performed by request to `http://localhost:9200/<index>/_search?scroll=<timeout>` endpoint and it has a mandatory field `size`, which defines page size:
```json
{
"size" : 5
}
```
Regular (non-scrolled/non-paged) search is performed by request to `http://localhost:9200/<index</_search` endpoint and it has not mandatory fields. Parameter `size` though defines maximum number of docs to be returned by search.

## Problem statement

`LIMIT` clause is being converted to `size` by SQL plugin during push down operation.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to properly define what we want to accomplish, but discuss what is currently the problem with the code. I would change this to talk about LIMIT and size conflict, but also include the use cases:

  1. When LIMIT > size
  2. When size < LIMIT
  3. When size == LIMIT (this is kind of naive case, but maybe just mention it anyways)
    Then discuss exactly how we expect the system to behave (which does it return a cursor, when does it NOT return a cursor, where does it break (max_window_size), and does this override the default fetch size.
    If there is anything to add in terms of how the JDBC driver will behave, then include it here.

Hereby comes the conflict in using `LIMIT` with pagination.

## Solution

Don't do push down for `LIMIT`. `LimitOperator` Physical Plan Tree node will cut off yielding search results with minimal overhead.
Copy link
Collaborator

@forestmvey forestmvey Jun 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we utilize the limit operator how will this work with any other operator that needs to do post-processing. This also asks the question of how we will continue to add operators and how will they work with each other. We may need some way to chain operators if we don't want to have the limitation of one operator for post-processing per query.

Rather than have an operator to limit the output with post processing why not include the functionality as part of the base class. We can have an optional limit if push down isn't available that can be performed in the PhysicalPlan base class prior to the inheriting class's next() call. Another option would be to have the limit post-processing performed in the ResourceMonitorPlan prior to calling the delegate next().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the solution we talked about in the walk-through... I think you just need to reverse the business logic to execute all the rules on nodes before proceeding to the next node...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this section is similar to Fix below

It seems simple, but here comes another problem.

## Impediments

`Optimizer` which calls push down operations sets page size after limit, which overwrites this settings. `Optimizer` does not apply rules in the order they are given.

## Fix

Rework [`Optimizer`](https://github.com/opensearch-project/sql/blob/57ce303740f64fe0279fc34aab0116f33f11fbe6/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java).
Current behavior:
```py
def Optimize:
for node in PlanTree: # Traverse the Logical Plan Tree
for rule in rules: # Enumerate rules
tryApplyRule()
```

Expected behavior:
```py
def Optimize:
for rule in rules: # Enumerate rules
for node in PlanTree: # Traverse the Logical Plan Tree
tryApplyRule()
```
Rules list:
```
...
CreateTableScanBuilder
PushDownPageSize
...
PUSH_DOWN_LIMIT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this ordering important?

...
```

This gives us warranty that `pushDownLimit` operation would be rejected if `pushPageSize` called before. Then, not optimized Logical Plan Tree node `LogicalLimit` will be converted to `LimitOperator` Physical Plan tree node.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This gives us warranty that `pushDownLimit` operation would be rejected if `pushPageSize` called before. Then, not optimized Logical Plan Tree node `LogicalLimit` will be converted to `LimitOperator` Physical Plan tree node.
This gives us guarantee that `pushDownLimit` operation will be rejected if `pushPageSize` already called. In that case, the not-optimized Logical Plan Tree node `LogicalLimit` will be converted to `LimitOperator` Physical Plan tree node.


## Other changes

1. Make `Optimizer` rules applied only once.
2. Reorder `Optimizer` rules.
3. Make `LimitOperator` properly serialized and deserialized.
4. Make `OpenSearchIndexScanBuilder::pushDownLimit` return `false` if `pushDownPageSize` was called before.
5. (Optional) Groom `Optimizer` to reduce amount of unchecked casts and uses raw classes.
6. (Optional) Rework `Optimizer` to make it a tree visitor.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope work. Please raise a follow-up ticket to do these later. You can label this as maintenance ticket.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ public PhysicalPlan visitLimit(LimitOperator node, Object context) {
return new LimitOperator(
visitInput(node.getInput(), context),
node.getLimit(),
node.getOffset());
node.getOffset(),
node.getCount());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ public OpenSearchRequestBuilder(int requestedTotalSize,
*/
public OpenSearchRequest build(OpenSearchRequest.IndexName indexName,
int maxResultWindow, TimeValue scrollTimeout) {
int size = requestedTotalSize;
if (pageSize == null) {
if (startFrom + size > maxResultWindow) {
if (startFrom + requestedTotalSize > maxResultWindow) {
sourceBuilder.size(maxResultWindow - startFrom);
return new OpenSearchScrollRequest(
indexName, scrollTimeout, sourceBuilder, exprValueFactory);
Expand Down Expand Up @@ -182,7 +181,6 @@ public void pushDownSort(List<SortBuilder<?>> sortBuilders) {
public void pushDownLimit(Integer limit, Integer offset) {
requestedTotalSize = limit;
startFrom = offset;
sourceBuilder.from(offset).size(limit);
}

public void pushDownTrackedScore(boolean trackScores) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class OpenSearchIndexScanBuilder extends TableScanBuilder {
/** Is limit operator pushed down. */
private boolean isLimitPushedDown = false;

/** Is page size set. */
private boolean isPageSizePushedDown = false;

/**
* Constructor used during query execution.
*/
Expand Down Expand Up @@ -80,6 +83,7 @@ public boolean pushDownAggregation(LogicalAggregation aggregation) {

@Override
public boolean pushDownPageSize(LogicalPaginate paginate) {
isPageSizePushedDown = true;
return delegate.pushDownPageSize(paginate);
}

Expand All @@ -93,8 +97,12 @@ public boolean pushDownSort(LogicalSort sort) {

@Override
public boolean pushDownLimit(LogicalLimit limit) {
if (isPageSizePushedDown) {
return false;
}
// Assume limit push down happening on OpenSearchIndexScanQueryBuilder
isLimitPushedDown = true;
// TODO move this logic to OpenSearchRequestBuilder?
return delegate.pushDownLimit(limit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@


import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndIndexScan;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndRelation;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeFilterAndRelation;
Expand All @@ -23,10 +28,10 @@ public class PrometheusLogicalPlanOptimizerFactory {
* Create Prometheus storage specified logical plan optimizer.
*/
public static LogicalPlanOptimizer create() {
return new LogicalPlanOptimizer(Arrays.asList(
return new LogicalPlanOptimizer(Stream.of(
new MergeFilterAndRelation(),
new MergeAggAndIndexScan(),
new MergeAggAndRelation()
));
).map(r -> (Rule<LogicalPlan>)r).collect(Collectors.toList()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is messy, and we can update the Rules classes to extend LogicalPlan instead.

}
}
Loading