Skip to content

Commit

Permalink
add support for partial message (fix #3)
Browse files Browse the repository at this point in the history
  • Loading branch information
okapies committed Jun 16, 2014
1 parent c3ca18b commit 4ef1210
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 10 deletions.
3 changes: 3 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ object Message {
final val KeyOffset = KeySizeOffset + KeySizeLength
final val ValueSizeLength = 4

final val MinHeaderSize =
CrcLength + MagicLength + AttributesLength + KeySizeLength + ValueSizeLength

// Magic value
final val CurrentMagicValue = 0: Int8

Expand Down
37 changes: 28 additions & 9 deletions src/main/scala/okapies/finagle/kafka/protocol/Spec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package okapies.finagle.kafka.protocol

import java.nio.charset.Charset

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer

import org.jboss.netty.buffer.ChannelBuffer

import _root_.kafka.common.KafkaException
import _root_.kafka.message.InvalidMessageException

private[protocol] object Spec {

Expand Down Expand Up @@ -219,17 +221,34 @@ private[protocol] object Spec {

@inline
def decodeMessageSet(): Seq[MessageWithOffset] = {
val size = buf.decodeInt32()
val messageSetBuf = buf.readBytes(size)

val messages = ArrayBuffer[MessageWithOffset]()
while(messageSetBuf.readableBytes > 0) {
val offset = messageSetBuf.decodeInt64()
val bytes = messageSetBuf.decodeBytes()
messages.append(MessageWithOffset(offset, Message(bytes)))
@tailrec
def decodeMessages(msgSetBuf: ChannelBuffer,
msgSet: ArrayBuffer[MessageWithOffset]): Seq[MessageWithOffset] = {
if (msgSetBuf.readableBytes < 12) { // 12 = length(Offset+MessageSize)
msgSet
} else {
val offset = msgSetBuf.decodeInt64() // Offset => int64
val size = msgSetBuf.decodeInt32() // MessageSize => int32
if (size < Message.MinHeaderSize) {
throw new InvalidMessageException(s"Message size is corrupted: $size")
}

if (msgSetBuf.readableBytes < size) {
// ignore a partial message at the end of the message set
msgSet
} else {
val bytes = msgSetBuf.readBytes(size) // Message
msgSet.append(MessageWithOffset(offset, Message(bytes)))

decodeMessages(msgSetBuf, msgSet)
}
}
}

messages
val size = buf.decodeInt32() // MessageSetSize => int32
val msgSetBuf = buf.readBytes(size) // MessageSet

decodeMessages(msgSetBuf, ArrayBuffer[MessageWithOffset]())
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class MessageSetTest extends FlatSpec with Matchers {
buf1.put(kafkaMsgs1.buffer)
buf1.rewind()

val msgs1 = ChannelBuffers.wrappedBuffer(buf1).decodeMessageSet()
val chBuf1 = ChannelBuffers.wrappedBuffer(buf1)
val msgs1 = chBuf1.decodeMessageSet()
assert(chBuf1.readableBytes === 0)

val msg11 = msgs1(0)
assert(msg11.offset === 1)
Expand All @@ -88,4 +90,30 @@ class MessageSetTest extends FlatSpec with Matchers {
assert(msg12.message.value.toString(utf8) === "value2")
}

it should "decode bytes including a partial message into a MessageSet" in {
val kafkaMsgs1 = new ByteBufferMessageSet(
NoCompressionCodec,
new AtomicLong(1),
new KafkaMessage("value1".getBytes(utf8), "key1".getBytes(utf8)),
new KafkaMessage("value2".getBytes(utf8), "key2".getBytes(utf8))
)
val size1 = kafkaMsgs1.sizeInBytes - 5 // make 2nd message partial
val buf1 = ByteBuffer.allocateDirect(4 /* Size */ + size1)
buf1.putInt(size1)
buf1.put(kafkaMsgs1.buffer.array, 0, size1)
buf1.rewind()

val chBuf1 = ChannelBuffers.wrappedBuffer(buf1)
val msgs1 = chBuf1.decodeMessageSet()
assert(chBuf1.readableBytes === 0)

val msg11 = msgs1(0)
assert(msg11.offset === 1)
assert(msg11.message.key.get.toString(utf8) === "key1")
assert(msg11.message.value.toString(utf8) === "value1")

// 2nd message must be ignored
assert(msgs1.length === 1)
}

}

0 comments on commit 4ef1210

Please sign in to comment.