Skip to content

Commit

Permalink
@connectrpc/connect-web unlock client streaming and bidi streaming fo…
Browse files Browse the repository at this point in the history
…r browsers which support it, such as Chromium-based browsers. All other browsers simply get an error thrown, which was the existing behavior before this commit.

Signed-off-by: David Fiala <[email protected]>
  • Loading branch information
davidfiala committed Dec 30, 2024
1 parent ebc1880 commit d2359d1
Showing 1 changed file with 56 additions and 8 deletions.
64 changes: 56 additions & 8 deletions packages/connect-web/src/connect-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,24 @@ const fetchOptions: RequestInit = {
redirect: "error",
};

const supportsRequestStreams = (() => {
let duplexAccessed = false;

// explicit casting. RequestInit does not standardly support `duplex` yet
const hasContentType = new Request("https://example.com/", {
body: new ReadableStream(),
method: "POST",
get duplex() {
duplexAccessed = true;
return "half";
},
} as RequestInit).headers.has("Content-Type");

// eslint is unaware that `get duplex()` may be called
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
return duplexAccessed && !hasContentType;
})();

/**
* Create a Transport for the Connect protocol, which makes unary and
* server-streaming methods available to web browsers. It uses the fetch
Expand Down Expand Up @@ -305,17 +323,42 @@ export function createConnectTransport(
}
}

function createStreamingRequestBody(
input: AsyncIterable<MessageShape<I>>,
serialize: (msg: MessageShape<I>) => Uint8Array,
): ReadableStream<Uint8Array> {
return new ReadableStream({
async start(controller) {
try {
for await (const message of input) {
const encoded = encodeEnvelope(0, serialize(message));
controller.enqueue(encoded);
}
controller.close();
} catch (e) {
controller.error(e);
}
},
});
}

async function createRequestBody(
input: AsyncIterable<MessageShape<I>>,
): Promise<Uint8Array> {
if (method.methodKind != "server_streaming") {
throw "The fetch API does not support streaming request bodies";
method: DescMethodStreaming<I, O>,
): Promise<BodyInit> {
if (method.methodKind === "server_streaming") {
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw "missing request message";
}
return encodeEnvelope(0, serialize(r.value));
}
const r = await input[Symbol.asyncIterator]().next();
if (r.done == true) {
throw "missing request message";

if (!supportsRequestStreams) {
throw "The fetch API does not support streaming request bodies in this browser";
}
return encodeEnvelope(0, serialize(r.value));

return createStreamingRequestBody(input, serialize);
}

timeoutMs =
Expand Down Expand Up @@ -346,12 +389,17 @@ export function createConnectTransport(
},
next: async (req) => {
const fetch = options.fetch ?? globalThis.fetch;
const clientDuplex =
method.methodKind !== "server_streaming"
? ({ duplex: "half" } as RequestInit)
: undefined;
const fRes = await fetch(req.url, {
...fetchOptions,
...clientDuplex,
method: req.requestMethod,
headers: req.header,
signal: req.signal,
body: await createRequestBody(req.message),
body: await createRequestBody(req.message, method),
});
validateResponse(
method.methodKind,
Expand Down

0 comments on commit d2359d1

Please sign in to comment.