Skip to content

Commit

Permalink
Add endpoint for retrieving checkpoint previews
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 13, 2025
1 parent 34f51d2 commit c81ee71
Show file tree
Hide file tree
Showing 14 changed files with 250 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -167,6 +168,8 @@ public interface Workflow {

void reset( StorageManager sm );

void resetFailedExecutionInit( StorageManager sm );

ActivityWrapper addActivity( String activityType, RenderModel renderModel );

ActivityWrapper cloneActivity( UUID activityId, double posX, double posY );
Expand Down Expand Up @@ -194,6 +197,10 @@ public interface Workflow {

void validateStructure( StorageManager sm, AttributedDirectedGraph<UUID, ExecutionEdge> subDag ) throws IllegalStateException;

default ActivityWrapper getActivityOrThrow( UUID activityId ) {
return Objects.requireNonNull( getActivity( activityId ), "Activity does not exist: " + activityId );
}


/**
* Returns a WorkflowModel of this workflow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
Expand Down Expand Up @@ -118,11 +117,6 @@ public ActivityWrapper getActivity( UUID activityId ) {
}


private ActivityWrapper getActivityOrThrow( UUID activityId ) {
return Objects.requireNonNull( activities.get( activityId ), "Activity does not exist: " + activityId );
}


@Override
public List<Edge> getEdges() {
return edges.values()
Expand Down Expand Up @@ -339,6 +333,16 @@ public void reset( StorageManager sm ) {
}


@Override
public void resetFailedExecutionInit( StorageManager sm ) {
resetAll( getActivities().stream()
.filter( a -> a.getState() == ActivityState.QUEUED )
.map( ActivityWrapper::getId ).toList(), sm
);
setState( WorkflowState.IDLE );
}


@Override
public ActivityWrapper addActivity( String activityType, RenderModel renderModel ) {
ActivityWrapper wrapper = ActivityWrapper.fromModel( new ActivityModel( activityType, renderModel ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.config.RuntimeConfig;
import org.polypheny.db.workflow.dag.Workflow;
import org.polypheny.db.workflow.dag.Workflow.WorkflowState;
import org.polypheny.db.workflow.engine.execution.Executor.ExecutorException;
import org.polypheny.db.workflow.engine.monitoring.ExecutionInfo;
import org.polypheny.db.workflow.engine.monitoring.ExecutionInfo.ExecutionState;
Expand Down Expand Up @@ -100,7 +99,7 @@ public synchronized ExecutionMonitor startExecution( Workflow workflow, StorageM
scheduler = new WorkflowScheduler( workflow, sm, monitor, globalWorkerCount, targetActivity );
submissions = scheduler.startExecution();
} catch ( Exception e ) {
workflow.setState( WorkflowState.IDLE );
workflow.resetFailedExecutionInit( sm );
monitor.stop();
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,27 @@

package org.polypheny.db.workflow.engine.storage;

import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.ResultIterator;
import org.polypheny.db.adapter.AdapterManager;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.AlgRoot;
import org.polypheny.db.algebra.constant.Kind;
import org.polypheny.db.algebra.core.common.Modify;
import org.polypheny.db.algebra.core.common.Scan;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.algebra.type.AlgDataTypeField;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.allocation.AllocationPlacement;
import org.polypheny.db.catalog.entity.logical.LogicalCollection;
import org.polypheny.db.catalog.entity.logical.LogicalColumn;
import org.polypheny.db.catalog.entity.logical.LogicalEntity;
import org.polypheny.db.catalog.entity.logical.LogicalGraph;
import org.polypheny.db.catalog.entity.logical.LogicalIndex;
Expand All @@ -57,6 +63,14 @@
import org.polypheny.db.type.entity.graph.PolyPath;
import org.polypheny.db.type.entity.relational.PolyMap;
import org.polypheny.db.util.Pair;
import org.polypheny.db.webui.crud.LanguageCrud;
import org.polypheny.db.webui.models.SortState;
import org.polypheny.db.webui.models.catalog.UiColumnDefinition;
import org.polypheny.db.webui.models.catalog.UiColumnDefinition.UiColumnDefinitionBuilder;
import org.polypheny.db.webui.models.requests.UIRequest;
import org.polypheny.db.webui.models.results.QueryType;
import org.polypheny.db.webui.models.results.RelationalResult;
import org.polypheny.db.webui.models.results.Result;

public class QueryUtils {

Expand Down Expand Up @@ -135,7 +149,7 @@ public static String quotedIdentifier( LogicalEntity entity ) {
} else if ( entity instanceof LogicalCollection collection ) {
return "\"" + collection.getNamespaceName() + "\".\"" + collection.getName() + "\"";
} else if ( entity instanceof LogicalGraph graph ) {
return "\"" + graph.getNamespaceName() + "\"";
return "\"" + Catalog.snapshot().getNamespace( graph.getNamespaceId() ).orElseThrow().getName() + "\"";
}
throw new IllegalArgumentException( "Encountered unknown entity type" );
}
Expand Down Expand Up @@ -252,6 +266,66 @@ public static long estimateByteSize( PolyValue[] tuple ) {
}


public static Result<?, ?> getRelResult( ExecutedContext context, UIRequest request, Statement statement ) {
// TODO decide whether to use this method or LanguageCrud directly
if ( context.getException().isPresent() ) {
return LanguageCrud.buildErrorResult( statement.getTransaction(), context, context.getException().get() ).build();
}

Catalog catalog = Catalog.getInstance();
ResultIterator iterator = context.getIterator();
List<List<PolyValue>> rows;
try {
rows = iterator.getAllRowsAndClose();
} catch ( Exception e ) {
return LanguageCrud.buildErrorResult( statement.getTransaction(), context, e ).build();
}

LogicalTable table = null;
if ( request.entityId != null ) {
table = Catalog.snapshot().rel().getTable( request.entityId ).orElseThrow();
}

List<UiColumnDefinition> header = new ArrayList<>();
for ( AlgDataTypeField field : context.getIterator().getImplementation().tupleType.getFields() ) {
String columnName = field.getName();

UiColumnDefinitionBuilder<?, ?> dbCol = UiColumnDefinition.builder()
.name( field.getName() )
.dataType( field.getType().getFullTypeString() )
.nullable( field.getType().isNullable() == (ResultSetMetaData.columnNullable == 1) )
.precision( field.getType().getPrecision() )
.sort( new SortState() )
.filter( "" );

// Get column default values
if ( table != null ) {
Optional<LogicalColumn> optional = catalog.getSnapshot().rel().getColumn( table.id, columnName );
if ( optional.isPresent() ) {
if ( optional.get().defaultValue != null ) {
dbCol.defaultValue( optional.get().defaultValue.value.toJson() );
}
}
}
header.add( dbCol.build() );
}

List<String[]> data = LanguageCrud.computeResultData( rows, header, statement.getTransaction() );

return RelationalResult
.builder()
.header( header.toArray( new UiColumnDefinition[0] ) )
.data( data.toArray( new String[0][] ) )
.dataModel( context.getIterator().getImplementation().getDataModel() )
.namespace( request.namespace )
.language( context.getQuery().getLanguage() )
.affectedTuples( data.size() )
.queryType( QueryType.from( context.getImplementation().getKind() ) )
.xid( statement.getTransaction().getXid().toString() )
.query( context.getQuery().getQuery() ).build();
}


private static long estimateByteSize( Collection<? extends PolyValue> values ) {
return estimateByteSize( values.toArray( new PolyValue[0] ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public synchronized LpgWriter createLpgCheckpoint( UUID activityId, int outputId
LogicalGraph graph = Catalog.snapshot().graph().getGraph( graphId ).orElseThrow();
register( activityId, outputIdx, graph );
registeredNamespaces.put( graphId, graphName );
return new LpgWriter( graph, QueryUtils.startTransaction( docNamespace, "DocWrite" ) );
return new LpgWriter( graph, QueryUtils.startTransaction( graphId, "LpgWrite" ) );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.Pair;
import org.polypheny.db.util.Triple;
import org.polypheny.db.webui.models.results.Result;
import org.polypheny.db.workflow.engine.storage.QueryUtils;

public abstract class CheckpointReader implements AutoCloseable {
Expand Down Expand Up @@ -111,6 +113,14 @@ public Pair<AlgDataType, Iterator<List<PolyValue>>> getIteratorFromQuery( Checkp
}


/**
* Reads a subset of this checkpoint and returns it as a Result that can be sent to the frontend.
*
* @return a preview of this checkpoint, followed by the limit on the number of tuples and the total count for this checkpoint (for graphs: node-count)
*/
public abstract Triple<Result<?, ?>, Integer, Long> getPreview();


/**
* Convenience method that wraps the iterator from {@code getIteratorFromQuery(query)} into an Iterable
* to enable enhanced for loops.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,17 @@
import org.polypheny.db.type.entity.PolyString;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.document.PolyDocument;
import org.polypheny.db.util.Triple;
import org.polypheny.db.webui.crud.LanguageCrud;
import org.polypheny.db.webui.models.requests.UIRequest;
import org.polypheny.db.webui.models.results.Result;
import org.polypheny.db.workflow.engine.storage.QueryUtils;

public class DocReader extends CheckpointReader {

public static final int PREVIEW_DOCS_LIMIT = 100;


public DocReader( LogicalCollection collection, Transaction transaction ) {
super( collection, transaction );
}
Expand Down Expand Up @@ -103,6 +110,23 @@ public long getTupleCount() {
}


@Override
public Triple<Result<?, ?>, Integer, Long> getPreview() {
LogicalCollection collection = getCollection();
String query = "db.\"" + collection.getName() + "\".find({}).limit(" + PREVIEW_DOCS_LIMIT + ")";
UIRequest request = UIRequest.builder()
.entityId( collection.id )
.namespace( collection.getNamespaceName() )
.build();
ExecutedContext executedContext = QueryUtils.parseAndExecuteQuery(
query, "MQL", collection.getNamespaceId(), transaction );

return Triple.of( LanguageCrud.getDocResult( executedContext, request, executedContext.getStatement() ).build(),
PREVIEW_DOCS_LIMIT,
getDocCount() );
}


private LogicalCollection getCollection() {
return (LogicalCollection) entity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.NoSuchElementException;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.lpg.LogicalLpgScan;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.logical.LogicalGraph;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.plan.AlgTraitSet;
Expand All @@ -29,10 +30,17 @@
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.graph.PolyEdge;
import org.polypheny.db.type.entity.graph.PolyNode;
import org.polypheny.db.util.Triple;
import org.polypheny.db.webui.crud.LanguageCrud;
import org.polypheny.db.webui.models.requests.UIRequest;
import org.polypheny.db.webui.models.results.Result;
import org.polypheny.db.workflow.engine.storage.QueryUtils;

public class LpgReader extends CheckpointReader {

public static final int PREVIEW_NODES_LIMIT = 100;


public LpgReader( LogicalGraph graph, Transaction transaction ) {
super( graph, transaction );
}
Expand Down Expand Up @@ -136,6 +144,22 @@ public long getTupleCount() {
}


@Override
public Triple<Result<?, ?>, Integer, Long> getPreview() {
LogicalGraph graph = getGraph();
String query = "MATCH (n) RETURN n LIMIT " + PREVIEW_NODES_LIMIT;
UIRequest request = UIRequest.builder()
.namespace( Catalog.snapshot().getNamespace( graph.getNamespaceId() ).orElseThrow().getName() )
.build();
ExecutedContext executedContext = QueryUtils.parseAndExecuteQuery(
query, "cypher", graph.getNamespaceId(), transaction );

return Triple.of( LanguageCrud.getGraphResult( executedContext, request, executedContext.getStatement() ).build(),
PREVIEW_NODES_LIMIT,
getNodeCount() );
}


private Iterator<PolyValue[]> executeCypherQuery( String query ) {
ExecutedContext executedContext = QueryUtils.parseAndExecuteQuery(
query, "cypher", getGraph().getNamespaceId(), transaction );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@
import org.polypheny.db.schema.trait.ModelTrait;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.Triple;
import org.polypheny.db.webui.models.requests.UIRequest;
import org.polypheny.db.webui.models.results.Result;
import org.polypheny.db.workflow.engine.storage.QueryUtils;

public class RelReader extends CheckpointReader {

public static final int PREVIEW_ROW_LIMIT = 100;


public RelReader( LogicalTable table, Transaction transaction ) {
super( table, transaction );
Expand Down Expand Up @@ -75,6 +80,25 @@ public long getTupleCount() {
}


@Override
public Triple<Result<?, ?>, Integer, Long> getPreview() {
LogicalTable table = getTable();
String query = "SELECT " + QueryUtils.quoteAndJoin( table.getColumnNames() ) + " FROM " + QueryUtils.quotedIdentifier( table )
+ " LIMIT " + PREVIEW_ROW_LIMIT;
UIRequest request = UIRequest.builder()
.entityId( table.id )
.namespace( table.getNamespaceName() )
.build();
ExecutedContext executedContext = QueryUtils.parseAndExecuteQuery(
query, "SQL", table.getNamespaceId(), transaction );

return Triple.of(
QueryUtils.getRelResult( executedContext, request, executedContext.getStatement() ),
PREVIEW_ROW_LIMIT,
getRowCount() );
}


private LogicalTable getTable() {
return (LogicalTable) entity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public enum RequestType {
RESET,
UPDATE_CONFIG, // workflow config
UPDATE_VARIABLES,
GET_CHECKPOINT,
KEEPALIVE
}

Expand Down Expand Up @@ -132,4 +133,12 @@ public static class UpdateVariablesRequest extends WsRequest {

}


public static class GetCheckpointRequest extends WsRequest {

public UUID activityId;
public int outputIndex;

}

}
Loading

0 comments on commit c81ee71

Please sign in to comment.