Skip to content

Commit

Permalink
always use transform data native for all results
Browse files Browse the repository at this point in the history
  • Loading branch information
KSDaemon committed Dec 17, 2024
1 parent 1fdc9af commit 5f5cc6c
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 160 deletions.
135 changes: 36 additions & 99 deletions packages/cubejs-api-gateway/src/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import {
QueryAlias,
} from '@cubejs-backend/shared';
import {
getFinalCubestoreResult,
getFinalCubestoreResultArray,
getFinalCubestoreResultMulti,
getFinalQueryResult,
getFinalQueryResultArray,
getFinalQueryResultMulti,
transformData as transformDataNative,

Check warning on line 17 in packages/cubejs-api-gateway/src/gateway.ts

View workflow job for this annotation

GitHub Actions / lint

'transformDataNative' is defined but never used. Allowed unused vars must match /^_.*/u
TransformDataResponse

Check warning on line 18 in packages/cubejs-api-gateway/src/gateway.ts

View workflow job for this annotation

GitHub Actions / lint

'TransformDataResponse' is defined but never used. Allowed unused vars must match /^_.*/u
} from '@cubejs-backend/native';
Expand Down Expand Up @@ -123,10 +123,8 @@ function systemAsyncHandler(handler: (req: Request & { context: ExtendedRequestC
function cleanupResult(result) {
return {
...result,
dataCb: undefined,
rawData: undefined,
transformDataParams: undefined,
isNative: undefined,
};
}

Expand Down Expand Up @@ -1625,23 +1623,8 @@ class ApiGateway {
};

// We postpone data transformation until the last minute
// in case when all responses are native - we process them in native part
const dataCb: TransformDataResponseCb = response.data.isNative ?
async () => {
const jsonData = await transformDataNative(
transformDataParams, response.data.getNativeRef()
);
return JSON.parse(jsonData.result) as TransformDataResponse;
}
:
async () => transformData({
...transformDataParams,
data: response.data,
});

return {
query: normalizedQuery,
dataCb,
rawData: response.data,
transformDataParams,
lastRefreshTime: response.lastRefreshTime?.toISOString(),
Expand All @@ -1663,7 +1646,6 @@ class ApiGateway {
external: response.external,
slowQuery: Boolean(response.slowQuery),
total: normalizedQuery.total ? response.total : null,
isNative: response.data.isNative
};
}

Expand Down Expand Up @@ -1822,57 +1804,30 @@ class ApiGateway {
context,
);

const allNative = results.every(r => r.isNative);

if (props.queryType === 'multi') {
// If all query results are from Cubestore (are native)
// we prepare the final json result on native side
if (allNative) {
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[Object[], any[], Object[]]>(
([transformList, rawList, resultList], r) => {
transformList.push(r.transformDataParams);
rawList.push(r.rawData.getNativeRef());
resultList.push(cleanupResult(r));
return [transformList, rawList, resultList];
},
[[], [], []]
);
// We prepare the final json result on native side
const [transformDataJson, rawDataRef, cleanResultList] = results.reduce<[Object[], any[], Object[]]>(
([transformList, rawList, resultList], r) => {
transformList.push(r.transformDataParams);
rawList.push(r.rawData.isNative ? r.rawData.getNativeRef() : r.rawData);
resultList.push(cleanupResult(r));
return [transformList, rawList, resultList];
},
[[], [], []]
);

const responseDataObj = {
queryType,
results: cleanResultList,
slowQuery
};
const responseDataObj = {
queryType,
results: cleanResultList,
slowQuery
};

res(await getFinalCubestoreResultMulti(transformDataJson, rawDataRef, responseDataObj));
} else {
// if we have mixed query results (there are js and native)
// we prepare results separately: on js and native sides
// and serve final response from JS side
res({
queryType,
results: await Promise.all(results.map(async (r) => {
const data = await r.dataCb();
return {
...cleanupResult(r),
data,
};
})),
pivotQuery: getPivotQuery(queryType, normalizedQueries),
slowQuery
});
}
} else if (allNative) {
res(await getFinalQueryResultMulti(transformDataJson, rawDataRef, responseDataObj));
} else {
// We prepare the full final json result on native side
const r = results[0];
const rawDataRef = r.rawData.getNativeRef();
res(await getFinalCubestoreResult(r.transformDataParams, rawDataRef, cleanupResult(r)));
} else {
const data = await results[0].dataCb();
res({
...cleanupResult(results[0]),
data,
});
const rawData = r.rawData.isNative ? r.rawData.getNativeRef() : r.rawData;
res(await getFinalQueryResult(r.transformDataParams, rawData, cleanupResult(r)));
}
} catch (e: any) {
this.handleError({
Expand Down Expand Up @@ -2002,40 +1957,22 @@ class ApiGateway {
})
);

const allNative = results.every(r => r.isNative);

if (!request.streaming) {
// If all query results are from Cubestore (are native)
// we prepare the final json result on native side
if (allNative) {
const [transformDataJson, rawDataRef, resultDataJson] = (results as {
transformDataParams: any;
rawData: { getNativeRef: () => any };
}[]).reduce<[Object[], any[], Object[]]>(
([transformList, rawList, resultList], r) => {
transformList.push(r.transformDataParams);
rawList.push(r.rawData.getNativeRef());
resultList.push(cleanupResult(r));
return [transformList, rawList, resultList];
},
[[], [], []]
);
// We prepare the final json result on native side
const [transformDataJson, rawData, resultDataJson] = (results as {
transformDataParams: any;
rawData: { isNative: boolean, getNativeRef: () => any };
}[]).reduce<[Object[], any[], Object[]]>(
([transformList, rawList, resultList], r) => {
transformList.push(r.transformDataParams);
rawList.push(r.rawData.isNative ? r.rawData.getNativeRef() : r.rawData);
resultList.push(cleanupResult(r));
return [transformList, rawList, resultList];
},
[[], [], []]
);

res(await getFinalCubestoreResultArray(transformDataJson, rawDataRef, resultDataJson));
} else {
// if we have mixed query results (there are js and native)
// we prepare results separately: on js and native sides
// and serve final response from JS side
res({
results: await Promise.all(results.map(async (r) => {
const data = await r.dataCb();
return {
...cleanupResult(r),
data,
};
})),
});
}
res(await getFinalQueryResultArray(transformDataJson, rawData, resultDataJson));
} else {
res(results[0]);
}
Expand Down
12 changes: 6 additions & 6 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,22 +389,22 @@ export const transformData = (transformDataObj: Object, rows: any): Promise<Tran
return native.transformQueryData(transformDataObj, rows);
};

export const getFinalCubestoreResult = (transformDataObj: Object, rows: any, resultData: Object): Promise<ArrayBuffer> => {
export const getFinalQueryResult = (transformDataObj: Object, rows: any, resultData: Object): Promise<ArrayBuffer> => {
const native = loadNative();

return native.getFinalCubestoreResult(transformDataObj, rows, resultData);
return native.getFinalQueryResult(transformDataObj, rows, resultData);
};

export const getFinalCubestoreResultArray = (transformDataArr: Object[], rows: any[], resultDataArr: Object[]): Promise<ArrayBuffer> => {
export const getFinalQueryResultArray = (transformDataArr: Object[], rows: any[], resultDataArr: Object[]): Promise<ArrayBuffer> => {
const native = loadNative();

return native.getFinalCubestoreResultArray(transformDataArr, rows, resultDataArr);
return native.getFinalQueryResultArray(transformDataArr, rows, resultDataArr);
};

export const getFinalCubestoreResultMulti = (transformDataArr: Object[], rows: any[], responseData: Object): Promise<ArrayBuffer> => {
export const getFinalQueryResultMulti = (transformDataArr: Object[], rows: any[], responseData: Object): Promise<ArrayBuffer> => {
const native = loadNative();

return native.getFinalCubestoreResultMulti(transformDataArr, rows, responseData);
return native.getFinalQueryResultMulti(transformDataArr, rows, responseData);
};

export interface PyConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<'de, 'a, 'b> Deserializer<'de> for JsValueDeserializer<'a, 'b> {
}

Err(JsDeserializationError(
"Unsupported type for deserialization".to_string(),
"Unsupported number type for deserialization".to_string(),
))
} else if self.value.is_a::<JsBoolean, _>(self.cx) {
let value = self
Expand Down
88 changes: 54 additions & 34 deletions packages/cubejs-backend-native/src/orchestrator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::node_obj_deserializer::JsValueDeserializer;
use cubeorchestrator::cubestore_message_parser::CubeStoreResult;
use cubeorchestrator::cubestore_result_transform::{
use cubeorchestrator::query_message_parser::QueryResult;
use cubeorchestrator::query_result_transform::{
get_final_cubestore_result_array, RequestResultArray, RequestResultData,
RequestResultDataMulti, TransformedData,
};
use cubeorchestrator::transport::TransformDataRequest;
use cubeorchestrator::transport::{JsRawData, TransformDataRequest};
use neon::context::{Context, FunctionContext, ModuleContext};
use neon::handle::Handle;
use neon::object::Object;
Expand All @@ -22,9 +22,9 @@ pub fn register_module(cx: &mut ModuleContext) -> NeonResult<()> {
)?;
cx.export_function("getCubestoreResult", get_cubestore_result)?;
cx.export_function("transformQueryData", transform_query_data)?;
cx.export_function("getFinalCubestoreResult", final_cubestore_result)?;
cx.export_function("getFinalCubestoreResultMulti", final_cubestore_result_multi)?;
cx.export_function("getFinalCubestoreResultArray", final_cubestore_result_array)?;
cx.export_function("getFinalQueryResult", final_query_result)?;
cx.export_function("getFinalQueryResultMulti", final_query_result_multi)?;
cx.export_function("getFinalQueryResultArray", final_query_result_array)?;

Ok(())
}
Expand All @@ -50,12 +50,32 @@ where
}
}

fn extract_query_result(
cx: &mut FunctionContext<'_>,
data_arg: Handle<JsValue>,
) -> Result<Arc<QueryResult>, anyhow::Error> {
if let Ok(js_box) = data_arg.downcast::<JsBox<Arc<QueryResult>>, _>(cx) {
Ok(Arc::clone(&js_box))
} else if let Ok(js_array) = data_arg.downcast::<JsArray, _>(cx) {
let deserializer = JsValueDeserializer::new(cx, js_array.upcast());
let js_raw_data: JsRawData = Deserialize::deserialize(deserializer)?;

QueryResult::from_js_raw_data(js_raw_data)
.map(Arc::new)
.map_err(anyhow::Error::from)
} else {
Err(anyhow::anyhow!(
"Second argument must be an Array of JsBox<Arc<QueryResult>> or JsArray"
))
}
}

pub fn parse_cubestore_result_message(mut cx: FunctionContext) -> JsResult<JsPromise> {
let msg = cx.argument::<JsBuffer>(0)?;
let msg_data = msg.as_slice(&cx).to_vec();

let promise = cx
.task(move || CubeStoreResult::from_fb(&msg_data))
.task(move || QueryResult::from_cubestore_fb(&msg_data))
.promise(move |mut cx, res| match res {
Ok(result) => Ok(cx.boxed(Arc::new(result))),
Err(err) => cx.throw_error(err.to_string()),
Expand All @@ -65,7 +85,7 @@ pub fn parse_cubestore_result_message(mut cx: FunctionContext) -> JsResult<JsPro
}

pub fn get_cubestore_result(mut cx: FunctionContext) -> JsResult<JsValue> {
let result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(0)?;
let result = cx.argument::<JsBox<Arc<QueryResult>>>(0)?;

let js_array = cx.execute_scoped(|mut cx| {
let js_array = JsArray::new(&mut cx, result.rows.len());
Expand Down Expand Up @@ -99,7 +119,7 @@ pub fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
Err(err) => return cx.throw_error(err.to_string()),
};

let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
let cube_store_result = cx.argument::<JsBox<Arc<QueryResult>>>(1)?;
let cube_store_result = Arc::clone(&cube_store_result);

let promise = cx
Expand All @@ -126,7 +146,7 @@ pub fn transform_query_data(mut cx: FunctionContext) -> JsResult<JsPromise> {
Ok(promise)
}

pub fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
pub fn final_query_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
let transform_data_js_object = cx.argument::<JsValue>(0)?;
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_js_object);
let transform_request_data: TransformDataRequest = match Deserialize::deserialize(deserializer)
Expand All @@ -135,8 +155,12 @@ pub fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
Err(err) => return cx.throw_error(err.to_string()),
};

let cube_store_result = cx.argument::<JsBox<Arc<CubeStoreResult>>>(1)?;
let cube_store_result = Arc::clone(&cube_store_result);
let data_arg = cx.argument::<JsValue>(1)?;
let cube_store_result: Arc<QueryResult> = match extract_query_result(&mut cx, data_arg) {
Ok(query_result) => query_result,
Err(err) => return cx.throw_error(err.to_string()),
};

let result_data_js_object = cx.argument::<JsValue>(2)?;
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
let mut result_data: RequestResultData = match Deserialize::deserialize(deserializer) {
Expand All @@ -158,7 +182,7 @@ pub fn final_cubestore_result(mut cx: FunctionContext) -> JsResult<JsPromise> {
Ok(promise)
}

pub fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
pub fn final_query_result_array(mut cx: FunctionContext) -> JsResult<JsPromise> {
let transform_data_array = cx.argument::<JsValue>(0)?;
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
Expand All @@ -167,16 +191,14 @@ pub fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromi
Err(err) => return cx.throw_error(err.to_string()),
};

let cube_store_array = cx.argument::<JsArray>(1)?;
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
.to_vec(&mut cx)?
.into_iter()
.map(|js_value| js_value.downcast_or_throw::<JsBox<Arc<CubeStoreResult>>, _>(&mut cx))
.collect::<Result<_, _>>()?;
let cube_store_results: Vec<Arc<CubeStoreResult>> = cube_store_results_boxed
.iter()
.map(|handle| (**handle).clone())
.collect();
let data_array = cx.argument::<JsArray>(1)?;
let mut cube_store_results: Vec<Arc<QueryResult>> = vec![];
for data_arg in data_array.to_vec(&mut cx)? {
match extract_query_result(&mut cx, data_arg) {
Ok(query_result) => cube_store_results.push(query_result),
Err(err) => return cx.throw_error(err.to_string()),
};
}

let results_data_array = cx.argument::<JsValue>(2)?;
let deserializer = JsValueDeserializer::new(&mut cx, results_data_array);
Expand Down Expand Up @@ -207,7 +229,7 @@ pub fn final_cubestore_result_array(mut cx: FunctionContext) -> JsResult<JsPromi
Ok(promise)
}

pub fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
pub fn final_query_result_multi(mut cx: FunctionContext) -> JsResult<JsPromise> {
let transform_data_array = cx.argument::<JsValue>(0)?;
let deserializer = JsValueDeserializer::new(&mut cx, transform_data_array);
let transform_requests: Vec<TransformDataRequest> = match Deserialize::deserialize(deserializer)
Expand All @@ -216,16 +238,14 @@ pub fn final_cubestore_result_multi(mut cx: FunctionContext) -> JsResult<JsPromi
Err(err) => return cx.throw_error(err.to_string()),
};

let cube_store_array = cx.argument::<JsArray>(1)?;
let cube_store_results_boxed: Vec<Handle<JsBox<Arc<CubeStoreResult>>>> = cube_store_array
.to_vec(&mut cx)?
.into_iter()
.map(|js_value| js_value.downcast_or_throw::<JsBox<Arc<CubeStoreResult>>, _>(&mut cx))
.collect::<Result<_, _>>()?;
let cube_store_results: Vec<Arc<CubeStoreResult>> = cube_store_results_boxed
.iter()
.map(|handle| (**handle).clone())
.collect();
let data_array = cx.argument::<JsArray>(1)?;
let mut cube_store_results: Vec<Arc<QueryResult>> = vec![];
for data_arg in data_array.to_vec(&mut cx)? {
match extract_query_result(&mut cx, data_arg) {
Ok(query_result) => cube_store_results.push(query_result),
Err(err) => return cx.throw_error(err.to_string()),
};
}

let result_data_js_object = cx.argument::<JsValue>(2)?;
let deserializer = JsValueDeserializer::new(&mut cx, result_data_js_object);
Expand Down
Loading

0 comments on commit 5f5cc6c

Please sign in to comment.