diff --git a/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala b/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala index 981b845c0f..709e74c7cb 100644 --- a/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala +++ b/zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala @@ -95,12 +95,12 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle onLastMessage() } - val streaming = + val readMore = state match { case State.Buffering => // `connect` method hasn't been called yet, add all incoming content to the buffer buffer0.addAll(content) - false + true case State.Direct(callback) if isLast && buffer0.knownSize == 0 => // Buffer is empty, we can just use the array directly callback(Chunk.fromArray(content), isLast = true) @@ -109,6 +109,11 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle // We're aggregating the full response, only call the callback on the last message buffer0.addAll(content) if (isLast) callback(result(buffer0), isLast = true) + !isLast + case State.Direct(callback: UnsafeAsync.Streaming) => + // We're streaming, emit chunks as they come + callback(Chunk.fromArray(content), isLast) + // ctx.read will be called when the chunk is consumed false case State.Direct(callback) => // We're streaming, emit chunks as they come @@ -116,7 +121,7 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle true } - if (!isLast && !streaming) ctx.read(): Unit + if (readMore) ctx.read(): Unit } }