From ff4e642c5a2bfc7ffb2ac7369013eff2ca7fa933 Mon Sep 17 00:00:00 2001 From: davidl Date: Thu, 26 Sep 2024 16:26:27 +0200 Subject: [PATCH] Fix broken client body streaming --- .../main/scala/zio/http/netty/AsyncBodyReader.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala b/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala index 981b845c0f..709e74c7cb 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala @@ -95,12 +95,12 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle onLastMessage() } - val streaming = + val readMore = state match { case State.Buffering => // `connect` method hasn't been called yet, add all incoming content to the buffer buffer0.addAll(content) - false + true case State.Direct(callback) if isLast && buffer0.knownSize == 0 => // Buffer is empty, we can just use the array directly callback(Chunk.fromArray(content), isLast = true) @@ -109,6 +109,11 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle // We're aggregating the full response, only call the callback on the last message buffer0.addAll(content) if (isLast) callback(result(buffer0), isLast = true) + !isLast + case State.Direct(callback: UnsafeAsync.Streaming) => + // We're streaming, emit chunks as they come + callback(Chunk.fromArray(content), isLast) + // ctx.read will be called when the chunk is consumed false case State.Direct(callback) => // We're streaming, emit chunks as they come @@ -116,7 +121,7 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle true } - if (!isLast && !streaming) ctx.read(): Unit + if (readMore) ctx.read(): Unit } }