Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18086: gRPC transport implementation for nodejs backed by http2 #2339

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/jsapi-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"build:babel": "babel ./src --out-dir ./dist --extensions \".ts,.tsx,.js,.jsx\" --source-maps --root-mode upward"
},
"dependencies": {
"@deephaven/log": "file:../log",
"@deephaven/utils": "file:../utils",
"ws": "^8.18.0"
},
"devDependencies": {
Expand Down
194 changes: 194 additions & 0 deletions packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import http2 from 'node:http2';
import type { dh as DhcType } from '@deephaven/jsapi-types';
import Log from '@deephaven/log';
import { assertNotNull } from '@deephaven/utils';

const logger = Log.module('@deephaven/jsapi-nodejs.NodeHttp2gRPCTransport');

type GrpcTransport = DhcType.grpc.GrpcTransport;
type GrpcTransportFactory = DhcType.grpc.GrpcTransportFactory;
type GrpcTransportOptions = DhcType.grpc.GrpcTransportOptions;

/**
* A gRPC transport implementation using Node.js's built-in HTTP/2 client. This
* can be passed to the CoreClient constructor to adapt the underlying transport
* to use http2. This addresses a limitation of nodejs `fetch` implementation
* which currently uses http1.
*
* e.g.
* const client = new dhc.CoreClient(dhServerUrl, {
* transportFactory: NodeHttp2gRPCTransport.factory,
* })
*/
export class NodeHttp2gRPCTransport implements GrpcTransport {
private static sessionMap: Map<string, http2.ClientHttp2Session> = new Map();

/**
* Factory for creating new NodeHttp2gRPCTransport instances.
*/
static readonly factory: GrpcTransportFactory = {
/**
* Create a new transport instance.
* @param options - options for creating the transport
* @return a transport instance to use for gRPC communication
*/
create: (options: GrpcTransportOptions): GrpcTransport => {
const { origin } = new URL(options.url);

if (!NodeHttp2gRPCTransport.sessionMap.has(origin)) {
const session = http2.connect(origin);
session.on('error', err => {
logger.error('Session error', err);
});
NodeHttp2gRPCTransport.sessionMap.set(origin, session);
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const session = NodeHttp2gRPCTransport.sessionMap.get(origin)!;

return new NodeHttp2gRPCTransport(options, session);
},

/**
* Return true to signal that created transports may have {@link GrpcTransport.sendMessage}
* called on it more than once before {@link GrpcTransport.finishSend} should
* be called.
* @return true to signal that the implementation can stream multiple messages,
* false otherwise indicating that Open/Next gRPC calls should be used
*/
get supportsClientStreaming(): boolean {
return true;
},
};

/**
* Private constructor to limit instantiation to the static factory method.
* @param options Transport options.
* @param session node:http2 session to use for data transport.
*/
private constructor(
options: GrpcTransportOptions,
session: http2.ClientHttp2Session
) {
this.options = options;
this.session = session;
}

private readonly options: GrpcTransportOptions;

private readonly session: http2.ClientHttp2Session;

private request: http2.ClientHttp2Stream | null = null;

/**
* Create an http2 client stream that can send requests to the server and pass
* responses to callbacks defined on the transport options.
* @param headers Request headers
* @returns The created http2 client stream
*/
createRequest = (
headers: Record<string, string> | null
): http2.ClientHttp2Stream => {
const url = new URL(this.options.url);

logger.debug('createRequest', url.pathname);

const req = this.session.request({
...headers,
':method': 'POST',
':path': url.pathname,
});

req.on('response', (responseHeaders, _flags) => {
const headersRecord: Record<string, string | string[]> = {};

// strip any undefined headers or keys that start with `:`
Object.keys(responseHeaders).forEach(name => {
if (responseHeaders[name] != null && !name.startsWith(':')) {
headersRecord[name] = responseHeaders[name];
}
});

this.options.onHeaders(headersRecord, Number(responseHeaders[':status']));
});

// Note that `chunk` is technically a `Buffer`, but the `Buffer` type defined
// in @types/pouchdb-core is outdated and incompatible with latest `Uint8Array`
// types. Since `Buffer` inherits from `Uint8Array`, we can get around this
// by just declaring it as a `Uint8Array`.
req.on('data', (chunk: Uint8Array) => {
this.options.onChunk(chunk);
});
req.on('end', () => {
this.options.onEnd();
});
req.on('error', err => {
this.options.onEnd(err);
});

return req;
};

/**
* Starts the stream, sending metadata to the server.
* @param metadata - the headers to send the server when opening the connection
*/
start(metadata: { [key: string]: string | Array<string> }): void {
logger.debug('start', metadata.headersMap);

if (this.request != null) {
throw new Error('start called more than once');
}

const headers: Record<string, string> = {};
Object.entries(metadata).forEach(([key, value]) => {
headers[key] = typeof value === 'string' ? value : value.join(', ');
});

this.request = this.createRequest(headers);
}

/**
* Sends a message to the server.
* @param msgBytes - bytes to send to the server
*/
sendMessage(msgBytes: Uint8Array): void {
logger.debug('sendMessage', msgBytes);
assertNotNull(this.request, 'request is required');

this.request.write(msgBytes);
}

/**
* "Half close" the stream, signaling to the server that no more messages will
* be sent, but that the client is still open to receiving messages.
*/
finishSend(): void {
logger.debug('finishSend');
assertNotNull(this.request, 'request is required');
this.request.end();
}

/**
* End the stream, both notifying the server that no more messages will be
* sent nor received, and preventing the client from receiving any more events.
*/
cancel(): void {
logger.debug('cancel');
assertNotNull(this.request, 'request is required');
this.request.close();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reset request to null here? Probably not needed, if request.write handles closed requests fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. @niloc132 do you have any thoughts on this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks safe to me to not null out - but nulling it out does come with the benefit of clearly showing why an error happened if it is reused somehow. I'm certainly not against that style of defensive coding, but I think we'd want to update a few other places too to reflect this.

GC-wise, I don't see any issue with the current implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opting to leave as-is for now to avoid adding complexity. Setting to null will require changes to start which assumes this stays set to know if it has already been called.

}

/**
* Cleanup.
*/
static dispose(): void {
// eslint-disable-next-line no-restricted-syntax
for (const session of NodeHttp2gRPCTransport.sessionMap.values()) {
session.close();
}
NodeHttp2gRPCTransport.sessionMap.clear();
}
}

export default NodeHttp2gRPCTransport;
1 change: 1 addition & 0 deletions packages/jsapi-nodejs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export * from './errorUtils.js';
export * from './fsUtils.js';
export * from './loaderUtils.js';
export * from './polyfillWs.js';
export * from './NodeHttp2gRPCTransport.js';
export * from './serverUtils.js';
3 changes: 2 additions & 1 deletion packages/jsapi-nodejs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
"outDir": "dist/"
},
"include": ["src/**/*.ts", "src/**/*.tsx", "src/**/*.js", "src/**/*.jsx"],
"exclude": ["node_modules", "src/**/*.test.*", "src/**/__mocks__/*"]
"exclude": ["node_modules", "src/**/*.test.*", "src/**/__mocks__/*"],
"references": [{ "path": "../log" }, { "path": "../utils" }]
}
Loading