From db3b06cf17c72d4fc53240de31d432f5849411c3 Mon Sep 17 00:00:00 2001 From: Yuta Okamoto Date: Thu, 26 Jun 2014 02:59:11 +0900 Subject: [PATCH] add Compression (resolve #4) --- .../finagle/kafka/protocol/Compression.scala | 110 ++++++++++++++++++ .../finagle/kafka/protocol/Message.scala | 9 +- .../okapies/finagle/kafka/protocol/Spec.scala | 64 +++++----- .../kafka/protocol/MessageSetTest.scala | 4 +- .../finagle/kafka/protocol/MessageTest.scala | 4 +- 5 files changed, 159 insertions(+), 32 deletions(-) create mode 100644 src/main/scala/okapies/finagle/kafka/protocol/Compression.scala diff --git a/src/main/scala/okapies/finagle/kafka/protocol/Compression.scala b/src/main/scala/okapies/finagle/kafka/protocol/Compression.scala new file mode 100644 index 0000000..886dce6 --- /dev/null +++ b/src/main/scala/okapies/finagle/kafka/protocol/Compression.scala @@ -0,0 +1,110 @@ +package okapies.finagle.kafka.protocol + +import java.io.{DataOutputStream, InputStream, OutputStream} +import java.util.zip.{GZIPInputStream, GZIPOutputStream} + +import org.jboss.netty.buffer._ +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + +sealed trait Compression { + def id: Int + + def compress(msgs: Seq[Message]): Message + def decompress(msg: Message): Seq[MessageWithOffset] +} + +case object NoCompression extends Compression { + final val id = 0x00 + + def compress(msgs: Seq[Message]) = Compression.compress(this, msgs) + def decompress(msg: Message) = Compression.decompress(this, msg) +} + +case object GZIPCompression extends Compression { + final val id = 0x01 + + def compress(msgs: Seq[Message]) = Compression.compress(this, msgs) + def decompress(msg: Message) = Compression.decompress(this, msg) +} + +case object SnappyCompression extends Compression { + final val id = 0x02 + + def compress(msgs: Seq[Message]) = Compression.compress(this, msgs) + def decompress(msg: Message) = Compression.decompress(this, msg) +} + +object Compression { + + import Spec._ + + private[this] final val WriteBuffer = 8192 + + def apply(codec: Int): Compression = codec match { + case NoCompression.id => NoCompression + case GZIPCompression.id => GZIPCompression + case SnappyCompression.id => SnappyCompression + case _ => throw new IllegalArgumentException(s"Unknown codec: $codec") + } + + private[protocol] def compress(compression: Compression, + msgs: Seq[Message]): Message = { + val compressed = ChannelBuffers.dynamicBuffer(estimateSize(msgs)) + val out = new DataOutputStream( + codecStream(compression, new ChannelBufferOutputStream(compressed))) + try { + msgs.foldLeft(0) { case (offset, msg) => + val buf = msg.underlying + val size = buf.readableBytes // == msg.size + out.writeLong(offset) // Offset => int64; TODO: Is it compatible behavior? + out.writeInt(size) // MessageSize => int32 + if (buf.hasArray) { // Message + out.write(buf.array, buf.arrayOffset, size) + } else { + val array = new Array[Byte](size) + buf.readBytes(array) + out.write(array, 0, array.length) + } + + offset + 1 + } + } finally { + out.close() + } + + Message.create(value = compressed, key = None, compression = compression) + } + + private[protocol] def codecStream(compression: Compression, + out: OutputStream): OutputStream = compression match { + case NoCompression => out + case GZIPCompression => new GZIPOutputStream(out) + case SnappyCompression => new SnappyOutputStream(out) + } + + private[this] def estimateSize(msgs: Seq[Message]): Int = + msgs.foldLeft(0)(_ + OffsetLength + MessageSizeLength + _.size) + + private[protocol] def decompress(compression: Compression, + msg: Message): Seq[MessageWithOffset] = { + val decompressed = ChannelBuffers.dynamicBuffer + val in = codecStream(compression, new ChannelBufferInputStream(msg.value)) + try { + while (in.available > 0) { + decompressed.writeBytes(in, WriteBuffer) + } + } finally { + in.close() + } + + decompressed.decodeMessageSetElements() + } + + private[protocol] def codecStream(compression: Compression, + in: InputStream): InputStream = compression match { + case NoCompression => in + case GZIPCompression => new GZIPInputStream(in) + case SnappyCompression => new SnappyInputStream(in) + } + +} diff --git a/src/main/scala/okapies/finagle/kafka/protocol/Message.scala b/src/main/scala/okapies/finagle/kafka/protocol/Message.scala index b130416..5012a1d 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/Message.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/Message.scala @@ -32,6 +32,8 @@ class Message(private[this] val _underlying: ChannelBuffer) { def attributes: Byte = _underlying.getByte(AttributesOffset) + def compression: Compression = Compression(attributes & CompressionCodeMask) + def key: Option[ChannelBuffer] = _underlying.getInt(KeySizeOffset) match { case keySize if keySize >= 0 => Some(_underlying.slice(KeyOffset, keySize)) case _ => None @@ -85,19 +87,22 @@ object Message { // Magic value final val CurrentMagicValue = 0: Int8 + // The mask for compression codec + final val CompressionCodeMask: Int = 0x03 + def apply(buffer: ChannelBuffer) = new Message(buffer) def create( value: ChannelBuffer, key: Option[ChannelBuffer] = None, - attributes: Byte = 0): Message = { + compression: Compression = NoCompression): Message = { val buf = ChannelBuffers.dynamicBuffer() // TODO: estimatedLength // Message => Crc MagicByte Attributes Key Value buf.writerIndex(MagicOffset) // skip the Crc field buf.encodeInt8(CurrentMagicValue) - buf.encodeInt8(attributes) + buf.encodeInt8(compression.id.asInstanceOf[Int8]) // only use lowest 2 bits key match { case Some(b) => buf.encodeBytes(b) case None => buf.encodeBytes(null: ChannelBuffer) diff --git a/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala b/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala index aba2220..c64fbe8 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala @@ -3,7 +3,7 @@ package okapies.finagle.kafka.protocol import java.nio.charset.Charset import scala.annotation.tailrec -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import org.jboss.netty.buffer.ChannelBuffer @@ -52,6 +52,8 @@ private[protocol] object Spec { */ private[protocol] implicit class KafkaChannelBuffer(val buf: ChannelBuffer) extends AnyVal { + import KafkaChannelBuffer._ + /* * Methods for encoding value and writing into buffer. */ @@ -221,34 +223,44 @@ private[protocol] object Spec { @inline def decodeMessageSet(): Seq[MessageWithOffset] = { - @tailrec - def decodeMessages(msgSetBuf: ChannelBuffer, - msgSet: ArrayBuffer[MessageWithOffset]): Seq[MessageWithOffset] = { - if (msgSetBuf.readableBytes < 12) { // 12 = length(Offset+MessageSize) - msgSet - } else { - val offset = msgSetBuf.decodeInt64() // Offset => int64 - val size = msgSetBuf.decodeInt32() // MessageSize => int32 - if (size < Message.MinHeaderSize) { - throw new InvalidMessageException(s"Message size is corrupted: $size") - } - - if (msgSetBuf.readableBytes < size) { - // ignore a partial message at the end of the message set - msgSet - } else { - val bytes = msgSetBuf.readBytes(size) // Message - msgSet.append(MessageWithOffset(offset, Message(bytes))) - - decodeMessages(msgSetBuf, msgSet) - } - } - } - val size = buf.decodeInt32() // MessageSetSize => int32 val msgSetBuf = buf.readBytes(size) // MessageSet - decodeMessages(msgSetBuf, ArrayBuffer[MessageWithOffset]()) + decodeMessageSetElementsImpl(msgSetBuf, mutable.ArrayBuffer()) + } + + @inline + def decodeMessageSetElements(): Seq[MessageWithOffset] = + decodeMessageSetElementsImpl(buf, mutable.ArrayBuffer()) + + } + + private[protocol] object KafkaChannelBuffer { + + // Can't implement @tailrec methods in value classes in Scala 2.10 (SI-6574) + @tailrec + private[KafkaChannelBuffer] def decodeMessageSetElementsImpl( + buf: ChannelBuffer, + msgs: mutable.Buffer[MessageWithOffset]): Seq[MessageWithOffset] = { + if (buf.readableBytes < 12) { // 12 = length(Offset+MessageSize) + msgs + } else { + val offset = buf.decodeInt64() // Offset => int64 + val size = buf.decodeInt32() // MessageSize => int32 + if (size < Message.MinHeaderSize) { + throw new InvalidMessageException(s"Message size is corrupted: $size") + } + + if (buf.readableBytes < size) { + // ignore a partial message at the end of the message set + msgs + } else { + val bytes = buf.readBytes(size) // Message + msgs += MessageWithOffset(offset, Message(bytes)) + + decodeMessageSetElementsImpl(buf, msgs) + } + } } } diff --git a/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala b/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala index ae060d2..6484d8a 100644 --- a/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala +++ b/src/test/scala/okapies/finagle/kafka/protocol/MessageSetTest.scala @@ -26,12 +26,12 @@ class MessageSetTest extends FlatSpec with Matchers { Message.create( ChannelBuffers.wrappedBuffer("value1".getBytes(utf8)), Some(ChannelBuffers.wrappedBuffer("key1".getBytes(utf8))), - 0 + NoCompression ), Message.create( ChannelBuffers.wrappedBuffer("value2".getBytes(utf8)), Some(ChannelBuffers.wrappedBuffer("key2".getBytes(utf8))), - 0 + NoCompression ) ) val buf1 = ChannelBuffers.dynamicBuffer() diff --git a/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala b/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala index 729ed72..0175af6 100644 --- a/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala +++ b/src/test/scala/okapies/finagle/kafka/protocol/MessageTest.scala @@ -20,7 +20,7 @@ class MessageTest extends FlatSpec with Matchers { val msg1 = Message.create( ChannelBuffers.wrappedBuffer("value1".getBytes(utf8)), // value Some(ChannelBuffers.wrappedBuffer("key1".getBytes(utf8))), // key - 0 // codec + NoCompression // codec ) val kafkaMsg1 = new KafkaMessage(msg1.underlying.toByteBuffer) @@ -33,7 +33,7 @@ class MessageTest extends FlatSpec with Matchers { val msg2 = Message.create( ChannelBuffers.wrappedBuffer("value2".getBytes(utf8)), // value None, // key - 0 // codec + NoCompression // codec ) val kafkaMsg2 = new KafkaMessage(msg2.underlying.toByteBuffer)