Skip to content

Commit

Permalink
Fix broken client body streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlar committed Sep 30, 2024
1 parent dc29b2e commit ff4e642
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions zio-http/jvm/src/main/scala/zio/http/netty/AsyncBodyReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle
onLastMessage()
}

val streaming =
val readMore =
state match {
case State.Buffering =>
// `connect` method hasn't been called yet, add all incoming content to the buffer
buffer0.addAll(content)
false
true
case State.Direct(callback) if isLast && buffer0.knownSize == 0 =>
// Buffer is empty, we can just use the array directly
callback(Chunk.fromArray(content), isLast = true)
Expand All @@ -109,14 +109,19 @@ private[netty] abstract class AsyncBodyReader extends SimpleChannelInboundHandle
// We're aggregating the full response, only call the callback on the last message
buffer0.addAll(content)
if (isLast) callback(result(buffer0), isLast = true)
!isLast
case State.Direct(callback: UnsafeAsync.Streaming) =>
// We're streaming, emit chunks as they come
callback(Chunk.fromArray(content), isLast)
// ctx.read will be called when the chunk is consumed
false
case State.Direct(callback) =>
// We're streaming, emit chunks as they come
callback(Chunk.fromArray(content), isLast)
true
}

if (!isLast && !streaming) ctx.read(): Unit
if (readMore) ctx.read(): Unit
}
}

Expand Down

0 comments on commit ff4e642

Please sign in to comment.