Skip to content

Commit

Permalink
Complete rework on serialization and deserialization. (#1498)
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Apr 14, 2023
1 parent b9cb0d0 commit 9a1a17c
Show file tree
Hide file tree
Showing 45 changed files with 607 additions and 685 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown on serialization of a PhysicalPlan tree if paging is finished.
* Processing of such exception should outcome of responding no cursor to the user.
*/
public class NoCursorException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,17 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@EqualsAndHashCode
@RequiredArgsConstructor
public class Cursor {
public static final Cursor None = new Cursor();
public static final Cursor None = new Cursor(null);

@Getter
private final byte[] raw;

private Cursor() {
raw = new byte[] {};
}

public Cursor(byte[] raw) {
this.raw = raw;
}
private final String data;

public String toString() {
return new String(raw);
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.io.InputStream;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.planner.physical.PaginateOperator;
import org.opensearch.sql.exception.NoCursorException;
import org.opensearch.sql.planner.SerializablePlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.TableScanOperator;

/**
* This class is entry point to paged requests. It is responsible to cursor serialization
Expand All @@ -30,132 +31,101 @@
@RequiredArgsConstructor
public class PlanSerializer {
public static final String CURSOR_PREFIX = "n:";
private final StorageEngine storageEngine;

private final StorageEngine engine;

public boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Converts a physical plan tree to a cursor. May cache plan related data somewhere.
* Converts a physical plan tree to a cursor.
*/
public Cursor convertToCursor(PhysicalPlan plan) throws IOException {
if (plan instanceof PaginateOperator) {
var cursor = plan.toCursor();
if (cursor == null) {
return Cursor.None;
}
var raw = CURSOR_PREFIX + compress(cursor);
return new Cursor(raw.getBytes());
public Cursor convertToCursor(PhysicalPlan plan) {
try {
return new Cursor(CURSOR_PREFIX
+ serialize(((SerializablePlan) plan).getPlanForSerialization()));
// ClassCastException thrown when a plan in the tree doesn't implement SerializablePlan
} catch (NotSerializableException | ClassCastException | NoCursorException e) {
return Cursor.None;
}
return Cursor.None;
}

/**
* Compress serialized query plan.
* @param str string representing a query plan
* @return str compressed with gzip.
* Serializes and compresses the object.
* @param object The object.
* @return Encoded binary data.
*/
String compress(String str) throws IOException {
if (str == null || str.length() == 0) {
return "";
protected String serialize(Serializable object) throws NotSerializableException {
try {
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream objectOutput = new ObjectOutputStream(output);
objectOutput.writeObject(object);
objectOutput.flush();

ByteArrayOutputStream out = new ByteArrayOutputStream();
// GZIP provides 35-45%, lzma from apache commons-compress has few % better compression
GZIPOutputStream gzip = new GZIPOutputStream(out) { {
this.def.setLevel(Deflater.BEST_COMPRESSION);
} };
gzip.write(output.toByteArray());
gzip.close();

return HashCode.fromBytes(out.toByteArray()).toString();
} catch (NotSerializableException e) {
throw e;
} catch (IOException e) {
throw new IllegalStateException("Failed to serialize: " + object, e);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(str.getBytes());
gzip.close();
return HashCode.fromBytes(out.toByteArray()).toString();
}

/**
* Decompresses a query plan that was compress with {@link PlanSerializer#compress}.
* @param input compressed query plan
* @return decompressed string
* Decompresses and deserializes the binary data.
* @param code Encoded binary data.
* @return An object.
*/
String decompress(String input) throws IOException {
if (input == null || input.length() == 0) {
return "";
protected Serializable deserialize(String code) {
try {
GZIPInputStream gzip = new GZIPInputStream(
new ByteArrayInputStream(HashCode.fromString(code).asBytes()));
ObjectInputStream objectInput = new CursorDeserializationStream(
new ByteArrayInputStream(gzip.readAllBytes()));
return (Serializable) objectInput.readObject();
} catch (Exception e) {
throw new IllegalStateException("Failed to deserialize object", e);
}
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(
HashCode.fromString(input).asBytes()));
return new String(gzip.readAllBytes());
}

/**
* Parse `NamedExpression`s from cursor.
* @param listToFill List to fill with data.
* @param cursor Cursor to parse.
* @return Remaining part of the cursor.
* Converts a cursor to a physical plan tree.
*/
private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) {
var serializer = new DefaultExpressionSerializer();
if (cursor.startsWith(")")) { //empty list
return cursor.substring(cursor.indexOf(',') + 1);
}
while (!cursor.startsWith("(")) {
listToFill.add((NamedExpression)
serializer.deserialize(cursor.substring(0,
Math.min(cursor.indexOf(','), cursor.indexOf(')')))));
cursor = cursor.substring(cursor.indexOf(',') + 1);
}
return cursor;
}

/**
* Converts a cursor to a physical plan tree.
*/
public PhysicalPlan convertToPlan(String cursor) {
if (!cursor.startsWith(CURSOR_PREFIX)) {
throw new UnsupportedOperationException("Unsupported cursor");
}
try {
cursor = cursor.substring(CURSOR_PREFIX.length());
cursor = decompress(cursor);

// TODO Parse with ANTLR or serialize as JSON/XML
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
// TODO add checks for > 0
cursor = cursor.substring(cursor.indexOf(',') + 1);
final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}

cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> namedParseExpressions = new ArrayList<>();
cursor = parseNamedExpressions(namedParseExpressions, cursor);
return (PhysicalPlan) deserialize(cursor.substring(CURSOR_PREFIX.length()));
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
}
}

List<NamedExpression> projectList = new ArrayList<>();
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);
/**
* This function is used in testing only, to get access to {@link CursorDeserializationStream}.
*/
public CursorDeserializationStream getCursorDeserializationStream(InputStream in)
throws IOException {
return new CursorDeserializationStream(in);
}

if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var scrollId = cursor.substring(0, cursor.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);
public class CursorDeserializationStream extends ObjectInputStream {
public CursorDeserializationStream(InputStream in) throws IOException {
super(in);
}

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
@Override
public Object resolveObject(Object obj) throws IOException {
return obj.equals("engine") ? engine : obj;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.executor.pagination.PlanSerializer;

/**
* All subtypes of PhysicalPlan which needs to be serialized (in cursor, for pagination feature)
* should follow one of the following options.
* <ul>
* <li>Both:
* <ul>
* <li>Override both methods from {@link Externalizable}.</li>
* <li>Define a public no-arg constructor.</li>
* </ul>
* </li>
* <li>
* Overwrite {@link #getPlanForSerialization} to return
* another instance of {@link SerializablePlan}.
* </li>
* </ul>
*/
public interface SerializablePlan extends Externalizable {

/**
* Argument is an instance of {@link PlanSerializer.CursorDeserializationStream}.
*/
@Override
default void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
throw new NotImplementedException(String.format("`readExternal` is not implemented in %s",
getClass().getSimpleName()));
}

/**
* Each plan which has as a child plan should do.
* <pre>{@code
* out.writeObject(input.getPlanForSerialization());
* }</pre>
*/
@Override
default void writeExternal(ObjectOutput out) throws IOException {
throw new NotImplementedException(String.format("`readExternal` is not implemented in %s",
getClass().getSimpleName()));
}

/**
* Override to return child or delegated plan, so parent plan should skip this one
* for serialization, but it should try to serialize grandchild plan.
* Imagine plan structure like this
* <pre>
* A -> this
* `- B -> child
* `- C -> this
* </pre>
* In that case only plans A and C should be attempted to serialize.
* It is needed to skip a `ResourceMonitorPlan` instance only, actually.
* @return Next plan for serialization.
*/
default SerializablePlan getPlanForSerialization() {
return this;
}
}
Loading

0 comments on commit 9a1a17c

Please sign in to comment.