Skip to content

Commit

Permalink
batch query interface is added
Browse files Browse the repository at this point in the history
  • Loading branch information
ikishore committed Jul 11, 2022
1 parent 01e63fe commit a41bb74
Show file tree
Hide file tree
Showing 22 changed files with 404 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public boolean run()
boolean isFromFile = !isNullOrEmpty(cliOptions.sqlFile);

String query = cliOptions.execute;

if (hasQuery) {
query += ";";
}
Expand Down
3 changes: 3 additions & 0 deletions presto-cli/src/main/java/io/prestosql/cli/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public class ClientOptions
@Option(name = "--max-batch-process-size", title = "Maximum Batch Process Size (Rows)", description = "Maximum Batch Process Size as the number of Rows which can be processed")
public String maxBatchProcessSize = "50000000";

@Option(name = {"-b", "--batch"}, title = "batch query", description = "Execute batch query")
public String batchQuery;

public enum OutputFormat
{
ALIGNED,
Expand Down
15 changes: 14 additions & 1 deletion presto-cli/src/main/java/io/prestosql/cli/Console.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public boolean run()
boolean hasQuery = !isNullOrEmpty(clientOptions.execute);
boolean isFromFile = !isNullOrEmpty(clientOptions.file);
initializeLogging(clientOptions.logLevelsFile);
boolean hasBatchQuery = !isNullOrEmpty(clientOptions.batchQuery);

String query = clientOptions.execute;
if (hasQuery) {
Expand All @@ -103,6 +104,9 @@ public boolean run()
if (hasQuery) {
throw new RuntimeException("both --execute and --file specified");
}
if (hasBatchQuery) {
throw new RuntimeException("both --batch and --file specified");
}
try {
query = asCharSource(new File(clientOptions.file), UTF_8).read();
hasQuery = true;
Expand All @@ -112,6 +116,14 @@ public boolean run()
}
}

if (hasBatchQuery) {
if (hasQuery) {
throw new RuntimeException("both --execute and --batch specified");
}
query = clientOptions.batchQuery;
hasQuery = true;
}

// abort any running query if the CLI is terminated
AtomicBoolean exiting = new AtomicBoolean();
ThreadInterruptor interruptor = new ThreadInterruptor();
Expand Down Expand Up @@ -140,7 +152,8 @@ public boolean run()
Optional.ofNullable(clientOptions.krb5ConfigPath),
Optional.ofNullable(clientOptions.krb5KeytabPath),
Optional.ofNullable(clientOptions.krb5CredentialCachePath),
!clientOptions.krb5DisableRemoteServiceHostnameCanonicalization)) {
!clientOptions.krb5DisableRemoteServiceHostnameCanonicalization,
hasBatchQuery)) {
if (hasQuery) {
return executeCommand(
queryRunner,
Expand Down
7 changes: 5 additions & 2 deletions presto-cli/src/main/java/io/prestosql/cli/QueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class QueryRunner
{
private final AtomicReference<ClientSession> session;
private final boolean debug;
private final boolean isBatchQuery;
private final OkHttpClient httpClient;
private final Consumer<OkHttpClient.Builder> sslSetup;
private CubeConsole cubeConsole;
Expand All @@ -67,10 +68,12 @@ public QueryRunner(
Optional<String> kerberosConfigPath,
Optional<String> kerberosKeytabPath,
Optional<String> kerberosCredentialCachePath,
boolean kerberosUseCanonicalHostname)
boolean kerberosUseCanonicalHostname,
boolean isBatchQuery)
{
this.session = new AtomicReference<>(requireNonNull(session, "session is null"));
this.debug = debug;
this.isBatchQuery = isBatchQuery;

this.sslSetup = builder -> setupSsl(builder, keystorePath, keystorePassword, truststorePath, truststorePassword);

Expand Down Expand Up @@ -153,7 +156,7 @@ private StatementClient startInternalQuery(ClientSession session, String query)
sslSetup.accept(builder);
OkHttpClient client = builder.build();

return newStatementClient(client, session, query);
return newStatementClient(client, session, query, isBatchQuery);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ static QueryRunner createQueryRunner(ClientSession clientSession)
Optional.empty(),
Optional.empty(),
Optional.empty(),
false,
false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class PrestoHeaders
public static final String PRESTO_PAGE_TOKEN = "X-Presto-Page-Sequence-Id";
public static final String PRESTO_PAGE_NEXT_TOKEN = "X-Presto-Page-End-Sequence-Id";
public static final String PRESTO_BUFFER_COMPLETE = "X-Presto-Buffer-Complete";
public static final String PRESTO_BATCH_QUERY = "X-Presto-Is_Batch_Query";

private PrestoHeaders() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ private StatementClientFactory() {}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query)
{
return new StatementClientV1(httpClient, session, query);
return new StatementClientV1(httpClient, session, query, false);
}

public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query, boolean isBatchQuery)
{
return new StatementClientV1(httpClient, session, query, isBatchQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XPCDP_VALUE;
import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XXP;
import static io.prestosql.client.HttpSecurityHeadersConstants.HTTP_SECURITY_XXP_VALUE;
import static io.prestosql.client.PrestoHeaders.PRESTO_BATCH_QUERY;
import static java.lang.String.format;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
Expand Down Expand Up @@ -97,10 +98,10 @@ class StatementClientV1
private final String user;
private final String clientCapabilities;
private final boolean timeInMilliseconds;

private boolean isBatchQuery;
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);

public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query, boolean isBatchQuery)
{
requireNonNull(httpClient, "httpClient is null");
requireNonNull(session, "session is null");
Expand All @@ -113,6 +114,7 @@ public StatementClientV1(OkHttpClient httpClient, ClientSession session, String
this.user = session.getUser();
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.timeInMilliseconds = session.isTimeInMilliseconds();
this.isBatchQuery = isBatchQuery;

Request request = buildQueryRequest(session, query);

Expand Down Expand Up @@ -233,6 +235,9 @@ private Request buildQueryRequest(ClientSession session, String query)
else {
builder.addHeader(HTTP_SECURITY_XXP, HTTP_SECURITY_XXP_VALUE);
}
if (isBatchQuery) {
builder.addHeader(PRESTO_BATCH_QUERY, "1");
}

return builder.build();
}
Expand Down
113 changes: 111 additions & 2 deletions presto-main/src/main/java/io/prestosql/dispatcher/DispatchManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.prestosql.spi.resourcegroups.SelectionContext;
import io.prestosql.spi.resourcegroups.SelectionCriteria;
import io.prestosql.spi.service.PropertyService;
import io.prestosql.sql.tree.Statement;
import io.prestosql.statestore.SharedQueryState;
import io.prestosql.statestore.StateCacheStore;
import io.prestosql.statestore.StateFetcher;
Expand All @@ -57,10 +58,12 @@
import javax.inject.Inject;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.StringTokenizer;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -170,7 +173,7 @@ public QueryId createQueryId()
return queryIdGenerator.createNextQueryId();
}

public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query)
public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionContext sessionContext, String query, boolean isBatchQuery)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(sessionContext, "sessionFactory is null");
Expand All @@ -181,7 +184,12 @@ public ListenableFuture<?> createQuery(QueryId queryId, String slug, SessionCont
DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
queryExecutor.execute(() -> {
try {
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
if (!isBatchQuery) {
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
else {
createBatchQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
}
finally {
queryCreationFuture.set(null);
Expand Down Expand Up @@ -279,6 +287,107 @@ private <C> void createQueryInternal(QueryId queryId, String slug, SessionContex
}
}

private <C> void createBatchQueryInternal(QueryId queryId, String slug, SessionContext sessionContext, String inputQuery, ResourceGroupManager<C> resourceGroupManager)
{
String query = inputQuery;
Session session = null;
DispatchQuery dispatchQuery = null;
List<String> queryList = new ArrayList<>();
List<PreparedQuery> preparedQueryList = new ArrayList<>();
boolean isTransactionControlStatement = false;

try {
if (query.length() > maxQueryLength) {
int queryLength = query.length();
query = query.substring(0, maxQueryLength);
throw new PrestoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
}

// decode session
session = sessionSupplier.createSession(queryId, sessionContext);
StringTokenizer tokenizer = new StringTokenizer(query, ";");
while (tokenizer.hasMoreTokens()) {
String curQuery = tokenizer.nextToken();
queryList.add(curQuery);
// prepare query
preparedQueryList.add(queryPreparer.prepareQuery(session, curQuery));
}

// select resource group
SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
sessionContext.getIdentity().getPrincipal().isPresent(),
sessionContext.getIdentity().getUser(),
Optional.ofNullable(sessionContext.getSource()),
sessionContext.getClientTags(),
sessionContext.getResourceEstimates(),
Optional.empty()));

// apply system default session properties (does not override user set properties)
session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, Optional.empty(), selectionContext.getResourceGroupId());

// Check if any query is transaction control statement
for (PreparedQuery preparedQuery : preparedQueryList) {
Statement statement = preparedQuery.getStatement();
isTransactionControlStatement = isTransactionControlStatement(statement);
if (isTransactionControlStatement) {
break;
}
}
// mark existing transaction as active
transactionManager.activateTransaction(session, isTransactionControlStatement, accessControl);

dispatchQuery = dispatchQueryFactory.createDispatchQuery(
session,
queryList,
preparedQueryList,
slug,
selectionContext.getResourceGroupId(),
resourceGroupManager,
isTransactionControlStatement);

boolean queryAdded = queryCreated(dispatchQuery);
if (queryAdded && !dispatchQuery.isDone()) {
try {
resourceGroupManager.submit(dispatchQuery, selectionContext, queryExecutor);

if (PropertyService.getBooleanProperty(HetuConstant.MULTI_COORDINATOR_ENABLED) && stateUpdater != null) {
stateUpdater.registerQuery(StateStoreConstants.QUERY_STATE_COLLECTION_NAME, dispatchQuery);
}

if (LOG.isDebugEnabled()) {
long now = System.currentTimeMillis();
LOG.debug("query:%s submission started at %s, ended at %s, total time use: %sms",
dispatchQuery.getQueryId(),
new SimpleDateFormat("HH:mm:ss:SSS").format(dispatchQuery.getCreateTime().toDate()),
new SimpleDateFormat("HH:mm:ss:SSS").format(new Date(now)),
now - dispatchQuery.getCreateTime().getMillis());
}
}
catch (Throwable e) {
// dispatch query has already been registered, so just fail it directly
dispatchQuery.fail(e);
}
}
}
catch (Throwable throwable) {
// creation must never fail, so register a failed query in this case
if (dispatchQuery == null) {
if (session == null) {
session = Session.builder(new SessionPropertyManager())
.setQueryId(queryId)
.setIdentity(sessionContext.getIdentity())
.setSource(sessionContext.getSource())
.build();
}
DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, Optional.empty(), throwable);
queryCreated(failedDispatchQuery);
}
else {
dispatchQuery.fail(throwable);
}
}
}

private boolean queryCreated(DispatchQuery dispatchQuery)
{
boolean queryAdded = queryTracker.addQuery(dispatchQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.prestosql.execution.resourcegroups.ResourceGroupManager;
import io.prestosql.spi.resourcegroups.ResourceGroupId;

import java.util.List;

public interface DispatchQueryFactory
{
DispatchQuery createDispatchQuery(
Expand All @@ -27,4 +29,13 @@ DispatchQuery createDispatchQuery(
String slug,
ResourceGroupId resourceGroup,
ResourceGroupManager resourceGroupManager);

DispatchQuery createDispatchQuery(
Session session,
List<String> queryList,
List<PreparedQuery> preparedQueryList,
String slug,
ResourceGroupId resourceGroup,
ResourceGroupManager resourceGroupManager,
boolean isTransactionControlStatement);
}
Loading

0 comments on commit a41bb74

Please sign in to comment.