Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-502598: Add async query execution #672

Merged
merged 20 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f0abbf9
SNOW-502598: Add async query execution
sfc-gh-ext-simba-lf Oct 20, 2023
903a8d8
SNOW-502598: Add tests
sfc-gh-ext-simba-lf Oct 20, 2023
922acf6
SNOW-502598: Add tests
sfc-gh-ext-simba-lf Oct 20, 2023
a2e92cb
SNOW-502598: Remove redundant requestAsync() function
sfc-gh-ext-simba-lf Oct 23, 2023
1ee54ed
SNOW-502598: Replace for loop with while loop
sfc-gh-ext-simba-lf Oct 23, 2023
b79a595
SNOW-502598: Edit comment for query status retry
sfc-gh-ext-simba-lf Oct 23, 2023
0a7b7c8
SNOW-502598: Remove test comment
sfc-gh-ext-simba-lf Oct 23, 2023
8d8bd87
SNOW-502598: Parse data back into JSON
sfc-gh-ext-simba-lf Oct 23, 2023
a479942
SNOW-502598: Revert normalizing response
sfc-gh-ext-simba-lf Oct 23, 2023
a5b0fd3
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 24, 2023
038f781
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 26, 2023
72751cb
SNOW-502598: Use async for before/after tests
sfc-gh-ext-simba-lf Oct 27, 2023
5ecda9b
SNOW-502598: Extract running and error statuses
sfc-gh-ext-simba-lf Oct 27, 2023
e2240a0
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 27, 2023
3624ae7
SNOW-502598: Refactor error into the correct type
sfc-gh-ext-simba-lf Oct 27, 2023
924052c
SNOW-502598: Keep asyncExec option for execute only
sfc-gh-ext-simba-lf Oct 27, 2023
6c00000
SNOW-502598: Format unused params in tests
sfc-gh-ext-simba-lf Oct 28, 2023
357359a
SNOW-502598: Refactor getQueryStatusThrowIfError
sfc-gh-ext-simba-lf Oct 31, 2023
46748bb
Merge branch 'master' of https://github.com/snowflakedb/snowflake-con…
sfc-gh-ext-simba-lf Oct 31, 2023
6944e99
Merge branch 'master' into SNOW-502598-async-query-exec
sfc-gh-ext-simba-lf Nov 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 187 additions & 0 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const Url = require('url');
const QueryString = require('querystring');
const GSErrors = require('../constants/gs_errors')
const QueryStatus = require('../constants/query_status').code;

var Util = require('../util');
var Errors = require('../errors');
Expand Down Expand Up @@ -37,6 +38,15 @@
// generate an id for the connection
var id = uuidv4();

// async max retry and retry pattern from python connector
const asyncNoDataMaxRetry = 24;
const asyncRetryPattern = [1, 1, 2, 3, 4, 8, 10];
const asyncRetryInMilliseconds = 500;

// Custom regex based on uuid validate
// Unable to directly use uuid validate because the queryId returned from the server doesn't match the regex
const queryIdRegex = new RegExp(/^(?:[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000)$/i);

//Make session tokens available for testing
this.getTokens = function ()
{
Expand Down Expand Up @@ -411,6 +421,183 @@
return this;
};

/**
* Gets the response containing the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {Object} the query response
*/
async function getQueryResponse(queryId) {
// Check if queryId exists and is valid uuid
Errors.checkArgumentExists(Util.exists(queryId),
ErrorCodes.ERR_CONN_FETCH_RESULT_MISSING_QUERY_ID);
Errors.checkArgumentValid(queryIdRegex.test(queryId),
ErrorCodes.ERR_GET_RESPONSE_QUERY_INVALID_UUID, queryId);

// Form the request options
const options =
{
method: 'GET',
url: Url.format(
{
pathname: `/monitoring/queries/${queryId}`
}),
};

// Get the response containing the query status
const response = await services.sf.requestAsync(options);

return response['data'];
}

/**
* Extracts the status of the query from the query response.
*
* @param {Object} queryResponse
*
* @returns {String} the query status.
*/
function extractQueryStatus(queryResponse) {
const queries = queryResponse['data']['queries'];
let status = QueryStatus.NO_DATA; // default status
if (queries.length > 0) {
status = queries[0]['status'];
}

return status;
}

/**
* Gets the status of the query based on queryId.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatus = async function (queryId) {
return extractQueryStatus(await getQueryResponse(queryId));
};

/**
* Gets the status of the query based on queryId and throws if there's an error.
*
* @param {String} queryId
*
* @returns {String} the query status.
*/
this.getQueryStatusThrowIfError = async function (queryId) {
sfc-gh-pmotacki marked this conversation as resolved.
Show resolved Hide resolved
const response = await getQueryResponse(queryId);
const queries = response['data']['queries'];
const status = extractQueryStatus(response);
sfc-gh-pmotacki marked this conversation as resolved.
Show resolved Hide resolved
let message, code, sqlState = null;

if (this.isAnError(status)) {
message = response['message'] || '';
code = response['code'] || -1;

if (response['data']) {
message += queries.length > 0 ? queries[0]['errorMessage'] : '';
sqlState = response['data']['sqlState'];
}

throw Errors.createOperationFailedError(
code, response, message, sqlState);
}

return status;
};

/**
* Gets the results from a previously ran query based on queryId
*
* @param {Object} options
*
* @returns {Object}
*/
this.getResultsFromQueryId = async function (options) {
const queryId = options.queryId;
let status, noDataCounter = 0, retryPatternPos = 0;

// Wait until query has finished executing
let queryStillExecuting = true;
while (queryStillExecuting) {
// Check if query is still running and trigger exception if it failed
status = await this.getQueryStatusThrowIfError(queryId);
queryStillExecuting = this.isStillRunning(status);
if (!queryStillExecuting) {
break;
}

// Timeout based on query status retry rules
await new Promise((resolve) => {
setTimeout(() => resolve(), asyncRetryInMilliseconds * asyncRetryPattern[retryPatternPos]);
});

// If no data, increment the no data counter
if (QueryStatus[status] == QueryStatus.NO_DATA) {
noDataCounter++;

Check warning on line 539 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L539

Added line #L539 was not covered by tests
// Check if retry for no data is exceeded
if (noDataCounter > asyncNoDataMaxRetry) {
throw Errors.createClientError(

Check warning on line 542 in lib/connection/connection.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/connection.js#L541-L542

Added lines #L541 - L542 were not covered by tests
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NO_DATA, true, queryId);
}
}

if (retryPatternPos < asyncRetryPattern.length - 1) {
retryPatternPos++;
}
}

if (QueryStatus[status] != QueryStatus.SUCCESS) {
throw Errors.createClientError(
ErrorCodes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS, true, queryId, status);
}

return this.fetchResult(options);
};

/**
* Checks whether the given status is currently running.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isStillRunning = function (status) {
const stillRunning =
sfc-gh-pbulawa marked this conversation as resolved.
Show resolved Hide resolved
[
QueryStatus.RUNNING,
QueryStatus.RESUMING_WAREHOUSE,
QueryStatus.QUEUED,
QueryStatus.QUEUED_REPARING_WAREHOUSE,
QueryStatus.NO_DATA,
].includes(QueryStatus[status]);

return stillRunning;
};

/**
* Checks whether the given status means that there has been an error.
*
* @param {String} status
*
* @returns {Boolean}
*/
this.isAnError = function (status) {
const isAnError =
[
QueryStatus.ABORTING,
QueryStatus.FAILED_WITH_ERROR,
QueryStatus.ABORTED,
QueryStatus.FAILED_WITH_INCIDENT,
QueryStatus.DISCONNECTED,
QueryStatus.BLOCKED,
].includes(QueryStatus[status]);

return isAnError;
};

/**
* Returns a serialized version of this connection.
*
Expand Down
47 changes: 43 additions & 4 deletions lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
FILE_POST_EXEC: 'FILE_POST_EXEC'
};

const queryCodes = {
QUERY_IN_PROGRESS: '333333', // GS code: the query is in progress
QUERY_IN_PROGRESS_ASYNC: '333334' // GS code: the query is detached
};

exports.createContext = function (
options, services, connectionConfig)
{
Expand Down Expand Up @@ -187,6 +192,12 @@
JSON.stringify(fetchAsString[invalidValueIndex]));
}

// check for invalid asyncExec
if (Util.exists(statementOptions.asyncExec)) {
Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec),

Check warning on line 197 in lib/connection/statement.js

View check run for this annotation

Codecov / codecov/patch

lib/connection/statement.js#L197

Added line #L197 was not covered by tests
ErrorCodes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC);
}

const rowMode = statementOptions.rowMode;
if (Util.exists(rowMode)) {
RowMode.checkRowModeValid(rowMode);
Expand All @@ -206,6 +217,7 @@
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;
statementContext.asyncExec = statementOptions.asyncExec;

// set the statement type
statementContext.type = (statementContext.type == statementTypes.ROW_PRE_EXEC) ? statementTypes.ROW_POST_EXEC : statementTypes.FILE_POST_EXEC;
Expand Down Expand Up @@ -364,6 +376,12 @@
RowMode.checkRowModeValid(rowMode);
}

// if an asyncExec flag is specified, make sure it's boolean
if (Util.exists(statementOptions.asyncExec)) {
Errors.checkArgumentValid(Util.isBoolean(statementOptions.asyncExec),
ErrorCodes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC);
}

// create a statement context
var statementContext = createStatementContext();

Expand All @@ -374,6 +392,7 @@
statementContext.multiResultIds = statementOptions.multiResultIds;
statementContext.multiCurId = statementOptions.multiCurId;
statementContext.rowMode = statementOptions.rowMode;
statementContext.asyncExec = statementOptions.asyncExec;

// if a binds array is specified, add it to the statement context
if (Util.exists(statementOptions.binds))
Expand Down Expand Up @@ -682,10 +701,13 @@
streamResult = context.connectionConfig.getStreamResult();
}

// if the result will be streamed later,
// if the result will be streamed later or in asyncExec mode,
// invoke the complete callback right away
if (streamResult) {
context.complete(Errors.externalize(context.resultError), statement);
} else if (context.asyncExec) {
// return the result object with the query ID inside.
context.complete(null, statement, context.result);
} else {
process.nextTick(function () {
// aggregate all the rows into an array and pass this
Expand Down Expand Up @@ -779,6 +801,12 @@
// if we don't already have a result
if (!context.result)
{
if (body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC) {
context.result = {
queryId: body.data.queryId
};
return;
}
if (body.data.resultIds != undefined && body.data.resultIds.length > 0)
{
//multi statements
Expand Down Expand Up @@ -1330,6 +1358,11 @@
json.queryContextDTO = statementContext.services.sf.getQueryContextDTO();
}

// include the asyncExec flag if a value was specified
if (Util.exists(statementContext.asyncExec)) {
json.asyncExec = statementContext.asyncExec;
}

// use the snowflake service to issue the request
sendSfRequest(statementContext,
{
Expand Down Expand Up @@ -1645,9 +1678,15 @@
statementContext.queryId = body.data.queryId;

// if the result is not ready yet, extract the result url from the response
// and issue a GET request to try to fetch the result again
if (body && (body.code === '333333' || body.code === '333334'))
{
// and issue a GET request to try to fetch the result again unless asyncExec is enabled.
if (body && (body.code === queryCodes.QUERY_IN_PROGRESS
|| body.code === queryCodes.QUERY_IN_PROGRESS_ASYNC)) {

if (statementContext.asyncExec) {
await onResultAvailable.call(null, err, body);
return;
}

// extract the result url from the response and try to get the result
// again
sendSfRequest(statementContext,
Expand Down
10 changes: 8 additions & 2 deletions lib/constants/error_messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ exports[404040] = 'Invalid browser timeout value. The specified value must be a
exports[404041] = 'Invalid disablQueryContextCache. The specified value must be a boolean.';
exports[404042] = 'Invalid includeRetryReason. The specified value must be a boolean.'
exports[404043] = 'Invalid clientConfigFile value. The specified value must be a string.';
exports[404044] = 'Invalid asyncExec. The specified value must be a boolean.'

// 405001
exports[405001] = 'Invalid callback. The specified value must be a function.';
Expand Down Expand Up @@ -113,8 +114,8 @@ exports[409013] = 'Invalid requestId. The specified value must be a string.';
// 410001
exports[410001] = 'Fetch-result options must be specified.';
exports[410002] = 'Invalid options. The specified value must be an object.';
exports[410003] = 'A statement id must be specified.';
exports[410004] = 'Invalid statement id. The specified value must be a string.';
exports[410003] = 'A query id/statement id must be specified.';
exports[410004] = 'Invalid query id/statement id. The specified value must be a string.';
exports[410005] = 'Invalid complete callback. The specified value must be a function.';
exports[410006] = 'Invalid streamResult flag. The specified value must be a boolean.';
exports[410007] = 'Invalid fetchAsString value. The specified value must be an Array.';
Expand Down Expand Up @@ -151,3 +152,8 @@ exports[450004] = 'Invalid each() callback. The specified value must be a functi
exports[450005] = 'An end() callback must be specified.';
exports[450006] = 'Invalid end() callback. The specified value must be a function.';
exports[450007] = 'Operation failed because the statement is still in progress.';

// 460001
exports[460001] = 'Invalid queryId: %s';
exports[460002] = 'Cannot retrieve data. No information returned from server for query %s';
exports[460003] = 'Status of query %s is %s, results are unavailable';
22 changes: 22 additions & 0 deletions lib/constants/query_status.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Snowflake Computing Inc. All rights reserved.
*/

const code = {};

code.RUNNING = 'RUNNING';
code.ABORTING = 'ABORTING';
code.SUCCESS = 'SUCCESS';
code.FAILED_WITH_ERROR = 'FAILED_WITH_ERROR';
code.ABORTED = 'ABORTED';
code.QUEUED = 'QUEUED';
code.FAILED_WITH_INCIDENT = 'FAILED_WITH_INCIDENT';
code.DISCONNECTED = 'DISCONNECTED';
code.RESUMING_WAREHOUSE = 'RESUMING_WAREHOUSE';
// purposeful typo.Is present in QueryDTO.java
code.QUEUED_REPARING_WAREHOUSE = 'QUEUED_REPARING_WAREHOUSE';
code.RESTARTED = 'RESTARTED';
code.BLOCKED = 'BLOCKED';
code.NO_DATA = 'NO_DATA';

exports.code = code;
6 changes: 6 additions & 0 deletions lib/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ codes.ERR_CONN_CREATE_INVALID_BROWSER_TIMEOUT = 404040;
codes.ERR_CONN_CREATE_INVALID_DISABLED_QUERY_CONTEXT_CACHE = 404041
codes.ERR_CONN_CREATE_INVALID_INCLUDE_RETRY_REASON =404042
codes.ERR_CONN_CREATE_INVALID_CLIENT_CONFIG_FILE = 404043;
codes.ERR_CONN_CREATE_INVALID_ASYNC_EXEC = 404044

// 405001
codes.ERR_CONN_CONNECT_INVALID_CALLBACK = 405001;
Expand Down Expand Up @@ -158,6 +159,11 @@ codes.ERR_STMT_FETCH_ROWS_MISSING_END = 450005;
codes.ERR_STMT_FETCH_ROWS_INVALID_END = 450006;
codes.ERR_STMT_FETCH_ROWS_FETCHING_RESULT = 450007;

// 460001
codes.ERR_GET_RESPONSE_QUERY_INVALID_UUID = 460001;
codes.ERR_GET_RESULTS_QUERY_ID_NO_DATA = 460002;
codes.ERR_GET_RESULTS_QUERY_ID_NOT_SUCCESS_STATUS = 460003;

exports.codes = codes;

/**
Expand Down
Loading
Loading