From 86b1ae88421efe8da72dc6bb5b17432245ee451e Mon Sep 17 00:00:00 2001 From: dpiercey Date: Tue, 10 Dec 2024 09:03:10 -0700 Subject: [PATCH] fix: improve repsonse body normalization --- .../micro-frame-component/node.marko | 7 ++--- .../stream-source-component/node.marko | 6 ++--- src/util/consume-body.ts | 26 +++++++++++++++++++ 3 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 src/util/consume-body.ts diff --git a/src/node_modules/@internal/micro-frame-component/node.marko b/src/node_modules/@internal/micro-frame-component/node.marko index c94a0eb..75c215c 100644 --- a/src/node_modules/@internal/micro-frame-component/node.marko +++ b/src/node_modules/@internal/micro-frame-component/node.marko @@ -1,6 +1,7 @@ import path from "path"; import https from "https"; import fetch from "make-fetch-happen"; +import consumeResponseBody from "../../../util/consume-body"; static const { ca } = https.globalAgent.options; static const cachePath = path.resolve("node_modules/.cache/fetch"); static const strictSSL = process.env.NODE_TLS_REJECT_UNAUTHORIZED !== "0"; @@ -81,11 +82,7 @@ static async function fetchBody(input, out, buffer) { if (buffer) return res.text(); - if (!res.body || !res.body[Symbol.asyncIterator]) { - throw new Error("Response body must be a stream."); - } - - return res.body[Symbol.asyncIterator](); + return consumeResponseBody(res); }
diff --git a/src/node_modules/@internal/stream-source-component/node.marko b/src/node_modules/@internal/stream-source-component/node.marko index 0bc8fe2..071a57e 100644 --- a/src/node_modules/@internal/stream-source-component/node.marko +++ b/src/node_modules/@internal/stream-source-component/node.marko @@ -2,6 +2,7 @@ import fetch from "make-fetch-happen"; import https from "https"; import path from "path"; import { getSource } from "../../../util/stream"; +import consumeResponseBody from "../../../util/consume-body"; static const { ca } = https.globalAgent.options; static const cachePath = path.resolve("node_modules/.cache/fetch"); static const strictSSL = process.env.NODE_TLS_REJECT_UNAUTHORIZED !== "0"; @@ -85,9 +86,8 @@ $ const streamSource = getSource(input.name, out);
$ out.bf("@_", component, true); - <@then|{ body }|> - $ const iter = input.parser(body[Symbol.asyncIterator]()); - + <@then|res|> + <@catch|err|> $ streamSource.close(err); diff --git a/src/util/consume-body.ts b/src/util/consume-body.ts new file mode 100644 index 0000000..1d5a2a1 --- /dev/null +++ b/src/util/consume-body.ts @@ -0,0 +1,26 @@ +const decoder = new TextDecoder(); +export default function consumeResponseBody( + res: Response +): AsyncIterator | undefined { + if (res.body) { + if ((res.body as any).getReader) { + return consumeBodyReader(res.body.getReader()); + } + + if ((res.body as any)[Symbol.asyncIterator]) { + return (res.body as any)[Symbol.asyncIterator](); + } + } + + throw new Error("Response body must be a stream."); +} + +async function* consumeBodyReader( + reader: ReadableStreamDefaultReader +) { + do { + const next = await reader.read(); + if (next.done) break; + yield decoder.decode(next.value); + } while (true); +}