From c25f9cccb715a12a8116fb37dd29294fc5a9d134 Mon Sep 17 00:00:00 2001 From: Brian Ingles Date: Thu, 9 Jan 2025 16:37:29 -0600 Subject: [PATCH 1/5] Implemented NodeHttp2gRPCTransport (DH-18086-2) --- package-lock.json | 4 + packages/jsapi-nodejs/package.json | 2 + .../src/NodeHttp2gRPCTransport.ts | 188 ++++++++++++++++++ packages/jsapi-nodejs/src/index.ts | 1 + packages/jsapi-nodejs/tsconfig.json | 3 +- 5 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts diff --git a/package-lock.json b/package-lock.json index 6dff24c6ed..fcdc64ee1f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -30212,6 +30212,8 @@ "version": "0.102.0", "license": "Apache-2.0", "dependencies": { + "@deephaven/log": "file:../log", + "@deephaven/utils": "file:../utils", "ws": "^8.18.0" }, "devDependencies": { @@ -32508,6 +32510,8 @@ "@deephaven/jsapi-nodejs": { "version": "file:packages/jsapi-nodejs", "requires": { + "@deephaven/log": "file:../log", + "@deephaven/utils": "file:../utils", "@types/node": "^22.7.5", "@types/ws": "^8.5.12", "ws": "^8.18.0" diff --git a/packages/jsapi-nodejs/package.json b/packages/jsapi-nodejs/package.json index bf4149de26..042a633e87 100644 --- a/packages/jsapi-nodejs/package.json +++ b/packages/jsapi-nodejs/package.json @@ -21,6 +21,8 @@ "build:babel": "babel ./src --out-dir ./dist --extensions \".ts,.tsx,.js,.jsx\" --source-maps --root-mode upward" }, "dependencies": { + "@deephaven/log": "file:../log", + "@deephaven/utils": "file:../utils", "ws": "^8.18.0" }, "devDependencies": { diff --git a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts new file mode 100644 index 0000000000..e61e619c2a --- /dev/null +++ b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts @@ -0,0 +1,188 @@ +import http2 from 'node:http2'; +import type { dh as DhcType } from '@deephaven/jsapi-types'; +import Log from '@deephaven/log'; +import { assertNotNull } from '@deephaven/utils'; + +const logger = Log.module('@deephaven/jsapi-nodejs.NodeHttp2gRPCTransport'); + +type GrpcTransport = DhcType.grpc.GrpcTransport; +type GrpcTransportFactory = DhcType.grpc.GrpcTransportFactory; +type GrpcTransportOptions = DhcType.grpc.GrpcTransportOptions; + +/** + * A gRPC transport implementation using Node.js's built-in HTTP/2 client. This + * can be passed to the CoreClient constructor to adapt the underlying transport + * to use http2. This addresses a limitation of nodejs `fetch` implementation + * which currently uses http1. + * + * e.g. + * const client = new dhc.CoreClient(dhServerUrl, { + * transportFactory: NodeHttp2gRPCTransport.factory, + * }) + */ +export class NodeHttp2gRPCTransport implements GrpcTransport { + private static sessionMap: Map = new Map(); + + /** + * Factory for creating new NodeHttp2gRPCTransport instances. + */ + static readonly factory: GrpcTransportFactory = { + /** + * Create a new transport instance. + * @param options - options for creating the transport + * @return a transport instance to use for gRPC communication + */ + create: (options: GrpcTransportOptions): GrpcTransport => { + const { origin } = new URL(options.url); + + if (!NodeHttp2gRPCTransport.sessionMap.has(origin)) { + const session = http2.connect(origin); + session.on('error', err => { + logger.error('Session error', err); + }); + NodeHttp2gRPCTransport.sessionMap.set(origin, session); + } + + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const session = NodeHttp2gRPCTransport.sessionMap.get(origin)!; + + return new NodeHttp2gRPCTransport(options, session); + }, + + /** + * Return true to signal that created transports may have {@link GrpcTransport.sendMessage} + * called on it more than once before {@link GrpcTransport.finishSend} should + * be called. + * @return true to signal that the implementation can stream multiple messages, + * false otherwise indicating that Open/Next gRPC calls should be used + */ + get supportsClientStreaming(): boolean { + return false; + }, + }; + + /** + * Private constructor to limit instantiation to the static factory method. + * @param options Transport options. + * @param session node:http2 session. + */ + private constructor( + options: GrpcTransportOptions, + session: http2.ClientHttp2Session + ) { + this.options = options; + this.session = session; + } + + private readonly options: GrpcTransportOptions; + + private readonly session: http2.ClientHttp2Session; + + private request: http2.ClientHttp2Stream | null = null; + + createRequest = ( + headers: Record | null + ): http2.ClientHttp2Stream => { + const url = new URL(this.options.url); + + logger.debug('createRequest', url.pathname); + + const req = this.session.request({ + ...headers, + ':method': 'POST', + ':path': url.pathname, + }); + + req.on('response', (responseHeaders, _flags) => { + const headersRecord: Record = {}; + + // strip any undefined headers or keys that start with `:` + Object.keys(responseHeaders).forEach(name => { + if (responseHeaders[name] != null && !name.startsWith(':')) { + headersRecord[name] = responseHeaders[name]; + } + }); + + this.options.onHeaders(headersRecord, Number(responseHeaders[':status'])); + }); + + // Note that `chunk` is technically a `Buffer`, but the `Buffer` type defined + // in @types/pouchdb-core is outdated and incompatible with latest `Uint8Array` + // types. Since `Buffer` inherits from `Uint8Array`, we can get around this + // by just declaring it as a `Uint8Array`. + req.on('data', (chunk: Uint8Array) => { + this.options.onChunk(chunk); + }); + req.on('end', () => { + this.options.onEnd(); + }); + req.on('error', err => { + this.options.onEnd(err); + }); + + return req; + }; + + /** + * Starts the stream, sending metadata to the server. + * @param metadata - the headers to send the server when opening the connection + */ + start(metadata: { [key: string]: string | Array }): void { + logger.debug('start', metadata.headersMap); + + if (this.request != null) { + throw new Error('start called more than once'); + } + + const headers: Record = {}; + Object.entries(metadata).forEach(([key, value]) => { + headers[key] = typeof value === 'string' ? value : value.join(', '); + }); + + this.request = this.createRequest(headers); + } + + /** + * Sends a message to the server. + * @param msgBytes - bytes to send to the server + */ + sendMessage(msgBytes: Uint8Array): void { + logger.debug('sendMessage', msgBytes); + assertNotNull(this.request, 'request is required'); + + this.request.write(msgBytes); + } + + /** + * "Half close" the stream, signaling to the server that no more messages will + * be sent, but that the client is still open to receiving messages. + */ + finishSend(): void { + logger.debug('finishSend'); + assertNotNull(this.request, '_request'); + this.request.end(); + } + + /** + * End the stream, both notifying the server that no more messages will be + * sent nor received, and preventing the client from receiving any more events. + */ + cancel(): void { + logger.debug('cancel'); + assertNotNull(this.request, '_request'); + this.request.close(); + } + + /** + * Cleanup. + */ + static dispose(): void { + // eslint-disable-next-line no-restricted-syntax + for (const session of NodeHttp2gRPCTransport.sessionMap.values()) { + session.close(); + } + NodeHttp2gRPCTransport.sessionMap.clear(); + } +} + +export default NodeHttp2gRPCTransport; diff --git a/packages/jsapi-nodejs/src/index.ts b/packages/jsapi-nodejs/src/index.ts index a3ec855315..75a93b3a32 100644 --- a/packages/jsapi-nodejs/src/index.ts +++ b/packages/jsapi-nodejs/src/index.ts @@ -2,4 +2,5 @@ export * from './errorUtils.js'; export * from './fsUtils.js'; export * from './loaderUtils.js'; export * from './polyfillWs.js'; +export * from './NodeHttp2gRPCTransport.js'; export * from './serverUtils.js'; diff --git a/packages/jsapi-nodejs/tsconfig.json b/packages/jsapi-nodejs/tsconfig.json index e064ffa6b2..ee5de70bcb 100644 --- a/packages/jsapi-nodejs/tsconfig.json +++ b/packages/jsapi-nodejs/tsconfig.json @@ -12,5 +12,6 @@ "outDir": "dist/" }, "include": ["src/**/*.ts", "src/**/*.tsx", "src/**/*.js", "src/**/*.jsx"], - "exclude": ["node_modules", "src/**/*.test.*", "src/**/__mocks__/*"] + "exclude": ["node_modules", "src/**/*.test.*", "src/**/__mocks__/*"], + "references": [{ "path": "../log" }, { "path": "../utils" }] } From 74d9057758f64928478bfe7e1a6a3aae6fbbf9de Mon Sep 17 00:00:00 2001 From: Brian Ingles Date: Tue, 14 Jan 2025 10:43:23 -0600 Subject: [PATCH 2/5] Comment (DH-18086-2) --- packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts index e61e619c2a..a13c540a43 100644 --- a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts +++ b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts @@ -64,7 +64,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { /** * Private constructor to limit instantiation to the static factory method. * @param options Transport options. - * @param session node:http2 session. + * @param session node:http2 session to use for data transport. */ private constructor( options: GrpcTransportOptions, From 7c0ad0db39dec3fc155cbedc7d5ba7fd251424b8 Mon Sep 17 00:00:00 2001 From: Brian Ingles Date: Tue, 14 Jan 2025 11:20:17 -0600 Subject: [PATCH 3/5] Comments (DH-18086-2) --- packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts index a13c540a43..d322c66087 100644 --- a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts +++ b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts @@ -80,6 +80,12 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { private request: http2.ClientHttp2Stream | null = null; + /** + * Create an http2 client stream that can send requests to the server and pass + * responses to callbacks defined on the transport options. + * @param headers Request headers + * @returns The created http2 client stream + */ createRequest = ( headers: Record | null ): http2.ClientHttp2Stream => { @@ -159,7 +165,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { */ finishSend(): void { logger.debug('finishSend'); - assertNotNull(this.request, '_request'); + assertNotNull(this.request, 'request is required'); this.request.end(); } @@ -169,7 +175,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { */ cancel(): void { logger.debug('cancel'); - assertNotNull(this.request, '_request'); + assertNotNull(this.request, 'request is required'); this.request.close(); } From 057f592834949e0859f33dd92ca1165f33c5a0a2 Mon Sep 17 00:00:00 2001 From: Brian Ingles Date: Thu, 16 Jan 2025 10:50:51 -0600 Subject: [PATCH 4/5] Remove reference to request instance after cancel (DH-18086-2) --- packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts index d322c66087..3b80dc02e3 100644 --- a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts +++ b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts @@ -57,7 +57,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { * false otherwise indicating that Open/Next gRPC calls should be used */ get supportsClientStreaming(): boolean { - return false; + return true; }, }; @@ -177,6 +177,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { logger.debug('cancel'); assertNotNull(this.request, 'request is required'); this.request.close(); + this.request = null; } /** From 2cd6ce220cb56f3a0297e997b63da438c6b5cbfa Mon Sep 17 00:00:00 2001 From: Brian Ingles Date: Thu, 16 Jan 2025 11:35:35 -0600 Subject: [PATCH 5/5] Reverted change (DH-18086-2) --- packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts index 3b80dc02e3..6c8d8e5a6f 100644 --- a/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts +++ b/packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts @@ -177,7 +177,6 @@ export class NodeHttp2gRPCTransport implements GrpcTransport { logger.debug('cancel'); assertNotNull(this.request, 'request is required'); this.request.close(); - this.request = null; } /**