Skip to content

Commit

Permalink
Merge pull request #8 from mkhq/master
Browse files Browse the repository at this point in the history
Support for ConsumerMetadataRequest/Response and add new error codes for 0.8.2
  • Loading branch information
okapies committed Mar 24, 2015
2 parents e972431 + f4b09f0 commit bb59073
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 15 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ net.virtualvoid.sbt.graph.Plugin.graphSettings

libraryDependencies ++= List(
"com.twitter" % "finagle-core_2.10" % "6.24.0",
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
"org.apache.kafka" % "kafka_2.10" % "0.8.2.1"
exclude("com.101tec", "zkclient")
exclude("com.yammer.metrics", "metrics-core")
exclude("net.sf.jopt-simple", "jopt-simple")
Expand All @@ -26,7 +26,7 @@ libraryDependencies ++= List(
"org.apache.curator" % "curator-test" % "2.7.1" % "test",
"com.101tec" % "zkclient" % "0.4" % "test",
"com.yammer.metrics" % "metrics-core" % "2.2.0" % "test",
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1" % "test" classifier "test"
"org.apache.kafka" % "kafka_2.10" % "0.8.2.1" % "test" classifier "test"
)

publishTo := {
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/okapies/finagle/kafka/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ class Client(
case MetadataResponse(_, _, topics) => Future.value(topics)
}

def consumerMetadata(consumerGroup: String): Future[ConsumerMetadataResult] =
doRequest(ConsumerMetadataRequest(
nextCorrelationId(),
clientId,
consumerGroup)) {
case ConsumerMetadataResponse(_, result) => Future.value(result)
}

def close(deadline: Time): Future[Unit] = service.close(deadline)

private[this] def doRequest[T](req: Request)(
Expand Down
12 changes: 10 additions & 2 deletions src/main/scala/okapies/finagle/kafka/protocol/KafkaError.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ case class KafkaError(code: Short /* int16 */) {
case ErrorMapping.ReplicaNotAvailableCode => "ReplicaNotAvailable"
case ErrorMapping.MessageSizeTooLargeCode => "MessageSizeTooLarge"
case ErrorMapping.StaleControllerEpochCode => "StaleControllerEpoch"
//case ErrorMapping.OffsetMetadataTooLargeCode => "OffsetMetadataTooLarge"
case ErrorMapping.OffsetMetadataTooLargeCode => "OffsetMetadataTooLarge"
case ErrorMapping.OffsetsLoadInProgressCode => "OffsetsLoadInProgress"
case ErrorMapping.ConsumerCoordinatorNotAvailableCode => "ConsumerCoordinatorNotAvailable"
case ErrorMapping.NotCoordinatorForConsumerCode => "NotCoordinatorForConsumer"
case _ => super.toString
}

Expand Down Expand Up @@ -54,8 +57,13 @@ object KafkaError {

final val StaleControllerEpoch = KafkaError(ErrorMapping.StaleControllerEpochCode)

//val OffsetMetadataTooLarge = KafkaError(ErrorMapping.OffsetMetadataTooLargeCode)
final val OffsetMetadataTooLarge = KafkaError(ErrorMapping.OffsetMetadataTooLargeCode)

final val OffsetsLoadInProgress = KafkaError(ErrorMapping.OffsetsLoadInProgressCode)

final val ConsumerCoordinatorNotAvailable = KafkaError(ErrorMapping.ConsumerCoordinatorNotAvailableCode)

final val NotCoordinatorForConsumer = KafkaError(ErrorMapping.NotCoordinatorForConsumerCode)
}

class KafkaCodecException(message: String = null, cause: Throwable = null)
Expand Down
10 changes: 9 additions & 1 deletion src/main/scala/okapies/finagle/kafka/protocol/Request.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ object Request {
classOf[OffsetRequest] -> ApiKeyOffset,
classOf[MetadataRequest] -> ApiKeyMetadata,
classOf[OffsetCommitRequest] -> ApiKeyOffsetCommit,
classOf[OffsetFetchRequest] -> ApiKeyOffsetFetch
classOf[OffsetFetchRequest] -> ApiKeyOffsetFetch,
classOf[ConsumerMetadataRequest] -> ApiKeyConsumerMetadata
)

}
Expand Down Expand Up @@ -89,3 +90,10 @@ case class OffsetFetchRequest(
consumerGroup: String, // string
partitions: Map[String, Seq[Int]]
) extends Request

// ConsumerMetadataRequest
case class ConsumerMetadataRequest(
correlationId: Int, // int32
clientId: String, // string
consumerGroup: String // string
) extends Request
18 changes: 18 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/RequestCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class RequestEncoder(logger: RequestLogger) extends SimpleChannelDownstreamHandl
case req: MetadataRequest => encodeMetadataRequest(req)
case req: OffsetCommitRequest => encodeOffsetCommitRequest(req)
case req: OffsetFetchRequest => encodeOffsetFetchRequest(req)
case req: ConsumerMetadataRequest => encodeConsumerMetadataRequest(req)
}

logger.add(req)
Expand Down Expand Up @@ -198,4 +199,21 @@ class RequestEncoder(logger: RequestLogger) extends SimpleChannelDownstreamHandl
buf
}

/**
* Implemented in Kafka 0.8.2.0
*
* {{{
* ConsumerMetadataRequest => ConsumerGroup
* }}}
*/
private def encodeConsumerMetadataRequest(req: ConsumerMetadataRequest) = {
val buf = ChannelBuffers.dynamicBuffer() // TODO: estimatedLength
encodeRequestHeader(buf, ApiKeyConsumerMetadata, req)

buf.encodeString(req.consumerGroup)

buf
}


}
13 changes: 13 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ case class OffsetFetchResult(
error: KafkaError // int16
)

// ConsumerMetadataResponse
case class ConsumerMetadataResponse(
correlationId: Int, // int32
result: ConsumerMetadataResult
) extends Response

case class ConsumerMetadataResult(
error: KafkaError, // int16
id: Int, // int32
host: String, // string
port: Int // int32
)

/**
* A message frame for responses.
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/ResponseCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object ResponseDecoder {
case ApiKeyStopReplica => null
case ApiKeyOffsetCommit => decodeOffsetCommitResponse(correlationId, frame)
case ApiKeyOffsetFetch => decodeOffsetFetchResponse(correlationId, frame)
case ApiKeyConsumerMetadata => decodeConsumerMetadataResponse(correlationId, frame)
}

/**
Expand Down Expand Up @@ -250,6 +251,22 @@ object ResponseDecoder {
OffsetFetchResponse(correlationId, results)
}

/**
* Implemented in Kafka 0.8.2.0
*
* {{{
* ConsumerMetadataResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
* }}}
*/
private[this] def decodeConsumerMetadataResponse(correlationId: Int, buf: ChannelBuffer) = {
val error: KafkaError = buf.decodeInt16()
val id = buf.decodeInt32()
val host = buf.decodeString()
val port = buf.decodeInt32()

val result = ConsumerMetadataResult(error, id, host, port)
ConsumerMetadataResponse(correlationId, result)
}
}

/**
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/okapies/finagle/kafka/protocol/Spec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ private[protocol] object Spec {
final val ApiVersion0 = 0: Int16

// Api Keys
final val ApiKeyProduce = 0: Int16
final val ApiKeyFetch = 1: Int16
final val ApiKeyOffset = 2: Int16
final val ApiKeyMetadata = 3: Int16
final val ApiKeyLeaderAndIsr = 4: Int16
final val ApiKeyStopReplica = 5: Int16
final val ApiKeyOffsetCommit = 8: Int16
final val ApiKeyOffsetFetch = 9: Int16
final val ApiKeyProduce = 0: Int16
final val ApiKeyFetch = 1: Int16
final val ApiKeyOffset = 2: Int16
final val ApiKeyMetadata = 3: Int16
final val ApiKeyLeaderAndIsr = 4: Int16
final val ApiKeyStopReplica = 5: Int16
final val ApiKeyOffsetCommit = 8: Int16
final val ApiKeyOffsetFetch = 9: Int16
final val ApiKeyConsumerMetadata = 10: Int16

// length of fields
final val CorrelationIdLength = 4
Expand Down
29 changes: 27 additions & 2 deletions src/test/scala/okapies/finagle/kafka/ClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package okapies.finagle.kafka

import org.scalatest._
import org.scalatest.matchers._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Span, Seconds, Millis}

import java.util.Properties
import java.nio.charset.Charset
Expand Down Expand Up @@ -41,6 +43,10 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>
kafkaConfig = TestUtils.createBrokerConfig(1)
kafkaConfig.put("zookeeper.connect", zkConn)
kafkaConfig.put("host.name", "127.0.0.1")

// https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/
// kafka/server/OffsetCommitTest.scala#L49
kafkaConfig.put("offsets.topic.replication.factor", "1")
kafkaConn = s"""${kafkaConfig.get("host.name")}:${kafkaConfig.get("port")}"""
kafkaServer = TestUtils.createServer(new KafkaConfig(kafkaConfig))

Expand All @@ -57,7 +63,11 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>

}

class ClientTest extends FlatSpec with Matchers with KafkaTest {
class ClientTest
extends FlatSpec
with Matchers
with Eventually
with KafkaTest {
import ClientTestUtils._
import Await.result
import KafkaError._
Expand All @@ -67,6 +77,10 @@ class ClientTest extends FlatSpec with Matchers with KafkaTest {
val group = "test-group"
val msg = Message.create("hello".cb)

override implicit val patienceConfig =
PatienceConfig(timeout = scaled(Span(5, Seconds)), interval = scaled(Span(500, Millis)))


override def beforeAll {
// start zookeeper and kafka
super.beforeAll
Expand Down Expand Up @@ -170,9 +184,20 @@ class ClientTest extends FlatSpec with Matchers with KafkaTest {
commit.error should equal (NoError)
val fetch = result(client.offsetFetch(group, topic, 0))
fetch.error should equal(NoError)
fetch.error should equal (NoError)
fetch.metadata should equal(metadata)
}
*/

it should "return consumer group metadata" in {
eventually {
val resp = result(client.consumerMetadata(group))

resp.error should equal (NoError)
resp.id should equal (1)
resp.host should equal (kafkaConfig.get("host.name"))
resp.port should equal (kafkaConfig.get("port").asInstanceOf[String].toInt)
}
}
}

0 comments on commit bb59073

Please sign in to comment.