Skip to content

Commit

Permalink
add Compression (resolve #4)
Browse files Browse the repository at this point in the history
  • Loading branch information
okapies committed Jun 25, 2014
1 parent a5ae252 commit 9e256f5
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 32 deletions.
110 changes: 110 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/Compression.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package okapies.finagle.kafka.protocol

import java.io.{DataOutputStream, InputStream, OutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import org.jboss.netty.buffer._
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

sealed trait Compression {
def id: Int

def compress(msgs: Seq[Message]): Message
def decompress(msg: Message): Seq[MessageWithOffset]
}

case object NoCompression extends Compression {
final val id = 0x00

def compress(msgs: Seq[Message]) = Compression.compress(this, msgs)
def decompress(msg: Message) = Compression.decompress(this, msg)
}

case object GZIPCompression extends Compression {
final val id = 0x01

def compress(msgs: Seq[Message]) = Compression.compress(this, msgs)
def decompress(msg: Message) = Compression.decompress(this, msg)
}

case object SnappyCompression extends Compression {
final val id = 0x02

def compress(msgs: Seq[Message]) = Compression.compress(this, msgs)
def decompress(msg: Message) = Compression.decompress(this, msg)
}

object Compression {

import Spec._

private[this] final val WriteBufferSize = 8192

def apply(codec: Int): Compression = codec match {
case NoCompression.id => NoCompression
case GZIPCompression.id => GZIPCompression
case SnappyCompression.id => SnappyCompression
case _ => throw new IllegalArgumentException(s"Unknown codec: $codec")
}

private[protocol] def compress(compression: Compression,
msgs: Seq[Message]): Message = {
val compressed = ChannelBuffers.dynamicBuffer(estimateSize(msgs))
val out = new DataOutputStream(
codecStream(compression, new ChannelBufferOutputStream(compressed)))
try {
msgs.foldLeft(0) { case (offset, msg) =>
val buf = msg.underlying
val size = buf.readableBytes // == msg.size
out.writeLong(offset) // Offset => int64; TODO: Is it compatible behavior?
out.writeInt(size) // MessageSize => int32
if (buf.hasArray) { // Message
out.write(buf.array, buf.arrayOffset, size)
} else {
val array = new Array[Byte](size)
buf.readBytes(array)
out.write(array, 0, array.length)
}

offset + 1
}
} finally {
out.close()
}

Message.create(value = compressed, key = None, compression = compression)
}

private[protocol] def codecStream(compression: Compression,
out: OutputStream): OutputStream = compression match {
case NoCompression => out
case GZIPCompression => new GZIPOutputStream(out)
case SnappyCompression => new SnappyOutputStream(out)
}

private[this] def estimateSize(msgs: Seq[Message]): Int =
msgs.foldLeft(0)(_ + OffsetLength + MessageSizeLength + _.size)

private[protocol] def decompress(compression: Compression,
msg: Message): Seq[MessageWithOffset] = {
val decompressed = ChannelBuffers.dynamicBuffer
val in = codecStream(compression, new ChannelBufferInputStream(msg.value))
try {
while (in.available > 0) {
decompressed.writeBytes(in, WriteBufferSize)
}
} finally {
in.close()
}

decompressed.decodeMessageSetElements()
}

private[protocol] def codecStream(compression: Compression,
in: InputStream): InputStream = compression match {
case NoCompression => in
case GZIPCompression => new GZIPInputStream(in)
case SnappyCompression => new SnappyInputStream(in)
}

}
9 changes: 7 additions & 2 deletions src/main/scala/okapies/finagle/kafka/protocol/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class Message(private[this] val _underlying: ChannelBuffer) {

def attributes: Byte = _underlying.getByte(AttributesOffset)

def compression: Compression = Compression(attributes & CompressionCodeMask)

def key: Option[ChannelBuffer] = _underlying.getInt(KeySizeOffset) match {
case keySize if keySize >= 0 => Some(_underlying.slice(KeyOffset, keySize))
case _ => None
Expand Down Expand Up @@ -85,19 +87,22 @@ object Message {
// Magic value
final val CurrentMagicValue = 0: Int8

// The mask for compression codec
final val CompressionCodeMask: Int = 0x03

def apply(buffer: ChannelBuffer) = new Message(buffer)

def create(
value: ChannelBuffer,
key: Option[ChannelBuffer] = None,
attributes: Byte = 0): Message = {
compression: Compression = NoCompression): Message = {

val buf = ChannelBuffers.dynamicBuffer() // TODO: estimatedLength

// Message => Crc MagicByte Attributes Key Value
buf.writerIndex(MagicOffset) // skip the Crc field
buf.encodeInt8(CurrentMagicValue)
buf.encodeInt8(attributes)
buf.encodeInt8(compression.id.asInstanceOf[Int8]) // only use lowest 2 bits
key match {
case Some(b) => buf.encodeBytes(b)
case None => buf.encodeBytes(null: ChannelBuffer)
Expand Down
64 changes: 38 additions & 26 deletions src/main/scala/okapies/finagle/kafka/protocol/Spec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package okapies.finagle.kafka.protocol
import java.nio.charset.Charset

import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import org.jboss.netty.buffer.ChannelBuffer

Expand Down Expand Up @@ -52,6 +52,8 @@ private[protocol] object Spec {
*/
private[protocol] implicit class KafkaChannelBuffer(val buf: ChannelBuffer) extends AnyVal {

import KafkaChannelBuffer._

/*
* Methods for encoding value and writing into buffer.
*/
Expand Down Expand Up @@ -221,34 +223,44 @@ private[protocol] object Spec {

@inline
def decodeMessageSet(): Seq[MessageWithOffset] = {
@tailrec
def decodeMessages(msgSetBuf: ChannelBuffer,
msgSet: ArrayBuffer[MessageWithOffset]): Seq[MessageWithOffset] = {
if (msgSetBuf.readableBytes < 12) { // 12 = length(Offset+MessageSize)
msgSet
} else {
val offset = msgSetBuf.decodeInt64() // Offset => int64
val size = msgSetBuf.decodeInt32() // MessageSize => int32
if (size < Message.MinHeaderSize) {
throw new InvalidMessageException(s"Message size is corrupted: $size")
}

if (msgSetBuf.readableBytes < size) {
// ignore a partial message at the end of the message set
msgSet
} else {
val bytes = msgSetBuf.readBytes(size) // Message
msgSet.append(MessageWithOffset(offset, Message(bytes)))

decodeMessages(msgSetBuf, msgSet)
}
}
}

val size = buf.decodeInt32() // MessageSetSize => int32
val msgSetBuf = buf.readBytes(size) // MessageSet

decodeMessages(msgSetBuf, ArrayBuffer[MessageWithOffset]())
decodeMessageSetElementsImpl(msgSetBuf, mutable.ArrayBuffer())
}

@inline
def decodeMessageSetElements(): Seq[MessageWithOffset] =
decodeMessageSetElementsImpl(buf, mutable.ArrayBuffer())

}

private[protocol] object KafkaChannelBuffer {

// Can't implement @tailrec methods in value classes in Scala 2.10 (SI-6574)
@tailrec
private[KafkaChannelBuffer] def decodeMessageSetElementsImpl(
buf: ChannelBuffer,
msgs: mutable.Buffer[MessageWithOffset]): Seq[MessageWithOffset] = {
if (buf.readableBytes < 12) { // 12 = length(Offset+MessageSize)
msgs
} else {
val offset = buf.decodeInt64() // Offset => int64
val size = buf.decodeInt32() // MessageSize => int32
if (size < Message.MinHeaderSize) {
throw new InvalidMessageException(s"Message size is corrupted: $size")
}

if (buf.readableBytes < size) {
// ignore a partial message at the end of the message set
msgs
} else {
val bytes = buf.readBytes(size) // Message
msgs += MessageWithOffset(offset, Message(bytes))

decodeMessageSetElementsImpl(buf, msgs)
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ class MessageSetTest extends FlatSpec with Matchers {
Message.create(
ChannelBuffers.wrappedBuffer("value1".getBytes(utf8)),
Some(ChannelBuffers.wrappedBuffer("key1".getBytes(utf8))),
0
NoCompression
),
Message.create(
ChannelBuffers.wrappedBuffer("value2".getBytes(utf8)),
Some(ChannelBuffers.wrappedBuffer("key2".getBytes(utf8))),
0
NoCompression
)
)
val buf1 = ChannelBuffers.dynamicBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class MessageTest extends FlatSpec with Matchers {
val msg1 = Message.create(
ChannelBuffers.wrappedBuffer("value1".getBytes(utf8)), // value
Some(ChannelBuffers.wrappedBuffer("key1".getBytes(utf8))), // key
0 // codec
NoCompression // codec
)
val kafkaMsg1 = new KafkaMessage(msg1.underlying.toByteBuffer)

Expand All @@ -33,7 +33,7 @@ class MessageTest extends FlatSpec with Matchers {
val msg2 = Message.create(
ChannelBuffers.wrappedBuffer("value2".getBytes(utf8)), // value
None, // key
0 // codec
NoCompression // codec
)
val kafkaMsg2 = new KafkaMessage(msg2.underlying.toByteBuffer)

Expand Down

0 comments on commit 9e256f5

Please sign in to comment.