diff --git a/src/main/scala/okapies/finagle/kafka/Client.scala b/src/main/scala/okapies/finagle/kafka/Client.scala index 9b28a7f..e923f9c 100644 --- a/src/main/scala/okapies/finagle/kafka/Client.scala +++ b/src/main/scala/okapies/finagle/kafka/Client.scala @@ -137,6 +137,14 @@ class Client( case MetadataResponse(_, _, topics) => Future.value(topics) } + def consumerMetadata(consumerGroup: String): Future[ConsumerMetadataResult] = + doRequest(ConsumerMetadataRequest( + nextCorrelationId(), + clientId, + consumerGroup)) { + case ConsumerMetadataResponse(_, result) => Future.value(result) + } + def close(deadline: Time): Future[Unit] = service.close(deadline) private[this] def doRequest[T](req: Request)( diff --git a/src/main/scala/okapies/finagle/kafka/protocol/Request.scala b/src/main/scala/okapies/finagle/kafka/protocol/Request.scala index 6cd5c22..3199cdd 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/Request.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/Request.scala @@ -17,7 +17,8 @@ object Request { classOf[OffsetRequest] -> ApiKeyOffset, classOf[MetadataRequest] -> ApiKeyMetadata, classOf[OffsetCommitRequest] -> ApiKeyOffsetCommit, - classOf[OffsetFetchRequest] -> ApiKeyOffsetFetch + classOf[OffsetFetchRequest] -> ApiKeyOffsetFetch, + classOf[ConsumerMetadataRequest] -> ApiKeyConsumerMetadata ) } @@ -89,3 +90,10 @@ case class OffsetFetchRequest( consumerGroup: String, // string partitions: Map[String, Seq[Int]] ) extends Request + +// ConsumerMetadataRequest +case class ConsumerMetadataRequest( + correlationId: Int, // int32 + clientId: String, // string + consumerGroup: String // string +) extends Request diff --git a/src/main/scala/okapies/finagle/kafka/protocol/RequestCodec.scala b/src/main/scala/okapies/finagle/kafka/protocol/RequestCodec.scala index f85f529..f4ebcc8 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/RequestCodec.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/RequestCodec.scala @@ -32,6 +32,7 @@ class RequestEncoder(logger: RequestLogger) extends SimpleChannelDownstreamHandl case req: MetadataRequest => encodeMetadataRequest(req) case req: OffsetCommitRequest => encodeOffsetCommitRequest(req) case req: OffsetFetchRequest => encodeOffsetFetchRequest(req) + case req: ConsumerMetadataRequest => encodeConsumerMetadataRequest(req) } logger.add(req) @@ -198,4 +199,21 @@ class RequestEncoder(logger: RequestLogger) extends SimpleChannelDownstreamHandl buf } + /** + * Implemented in Kafka 0.8.2.0 + * + * {{{ + * ConsumerMetadataRequest => ConsumerGroup + * }}} + */ + private def encodeConsumerMetadataRequest(req: ConsumerMetadataRequest) = { + val buf = ChannelBuffers.dynamicBuffer() // TODO: estimatedLength + encodeRequestHeader(buf, ApiKeyConsumerMetadata, req) + + buf.encodeString(req.consumerGroup) + + buf + } + + } diff --git a/src/main/scala/okapies/finagle/kafka/protocol/Response.scala b/src/main/scala/okapies/finagle/kafka/protocol/Response.scala index b7489c1..e086947 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/Response.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/Response.scala @@ -102,6 +102,19 @@ case class OffsetFetchResult( error: KafkaError // int16 ) +// ConsumerMetadataResponse +case class ConsumerMetadataResponse( + correlationId: Int, // int32 + result: ConsumerMetadataResult +) extends Response + +case class ConsumerMetadataResult( + error: KafkaError, // int16 + id: Int, // int32 + host: String, // string + port: Int // int32 +) + /** * A message frame for responses. */ diff --git a/src/main/scala/okapies/finagle/kafka/protocol/ResponseCodec.scala b/src/main/scala/okapies/finagle/kafka/protocol/ResponseCodec.scala index 3152f8f..1c4298e 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/ResponseCodec.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/ResponseCodec.scala @@ -79,6 +79,7 @@ object ResponseDecoder { case ApiKeyStopReplica => null case ApiKeyOffsetCommit => decodeOffsetCommitResponse(correlationId, frame) case ApiKeyOffsetFetch => decodeOffsetFetchResponse(correlationId, frame) + case ApiKeyConsumerMetadata => decodeConsumerMetadataResponse(correlationId, frame) } /** @@ -250,6 +251,22 @@ object ResponseDecoder { OffsetFetchResponse(correlationId, results) } + /** + * Implemented in Kafka 0.8.2.0 + * + * {{{ + * ConsumerMetadataResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort + * }}} + */ + private[this] def decodeConsumerMetadataResponse(correlationId: Int, buf: ChannelBuffer) = { + val error: KafkaError = buf.decodeInt16() + val id = buf.decodeInt32() + val host = buf.decodeString() + val port = buf.decodeInt32() + + val result = ConsumerMetadataResult(error, id, host, port) + ConsumerMetadataResponse(correlationId, result) + } } /** diff --git a/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala b/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala index aba2220..2d8a66b 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/Spec.scala @@ -22,14 +22,15 @@ private[protocol] object Spec { final val ApiVersion0 = 0: Int16 // Api Keys - final val ApiKeyProduce = 0: Int16 - final val ApiKeyFetch = 1: Int16 - final val ApiKeyOffset = 2: Int16 - final val ApiKeyMetadata = 3: Int16 - final val ApiKeyLeaderAndIsr = 4: Int16 - final val ApiKeyStopReplica = 5: Int16 - final val ApiKeyOffsetCommit = 8: Int16 - final val ApiKeyOffsetFetch = 9: Int16 + final val ApiKeyProduce = 0: Int16 + final val ApiKeyFetch = 1: Int16 + final val ApiKeyOffset = 2: Int16 + final val ApiKeyMetadata = 3: Int16 + final val ApiKeyLeaderAndIsr = 4: Int16 + final val ApiKeyStopReplica = 5: Int16 + final val ApiKeyOffsetCommit = 8: Int16 + final val ApiKeyOffsetFetch = 9: Int16 + final val ApiKeyConsumerMetadata = 10: Int16 // length of fields final val CorrelationIdLength = 4 diff --git a/src/test/scala/okapies/finagle/kafka/ClientTest.scala b/src/test/scala/okapies/finagle/kafka/ClientTest.scala index 6ecb0c3..2c56f75 100644 --- a/src/test/scala/okapies/finagle/kafka/ClientTest.scala +++ b/src/test/scala/okapies/finagle/kafka/ClientTest.scala @@ -2,6 +2,8 @@ package okapies.finagle.kafka import org.scalatest._ import org.scalatest.matchers._ +import org.scalatest.concurrent.Eventually +import org.scalatest.time.{Span, Seconds, Millis} import java.util.Properties import java.nio.charset.Charset @@ -41,6 +43,10 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite => kafkaConfig = TestUtils.createBrokerConfig(1) kafkaConfig.put("zookeeper.connect", zkConn) kafkaConfig.put("host.name", "127.0.0.1") + + // https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/ + // kafka/server/OffsetCommitTest.scala#L49 + kafkaConfig.put("offsets.topic.replication.factor", "1") kafkaConn = s"""${kafkaConfig.get("host.name")}:${kafkaConfig.get("port")}""" kafkaServer = TestUtils.createServer(new KafkaConfig(kafkaConfig)) @@ -57,7 +63,11 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite => } -class ClientTest extends FlatSpec with Matchers with KafkaTest { +class ClientTest +extends FlatSpec +with Matchers +with Eventually +with KafkaTest { import ClientTestUtils._ import Await.result import KafkaError._ @@ -67,6 +77,10 @@ class ClientTest extends FlatSpec with Matchers with KafkaTest { val group = "test-group" val msg = Message.create("hello".cb) + override implicit val patienceConfig = + PatienceConfig(timeout = scaled(Span(5, Seconds)), interval = scaled(Span(500, Millis))) + + override def beforeAll { // start zookeeper and kafka super.beforeAll @@ -170,9 +184,20 @@ class ClientTest extends FlatSpec with Matchers with KafkaTest { commit.error should equal (NoError) val fetch = result(client.offsetFetch(group, topic, 0)) - fetch.error should equal(NoError) + fetch.error should equal (NoError) fetch.metadata should equal(metadata) } */ + + it should "return consumer group metadata" in { + eventually { + val resp = result(client.consumerMetadata(group)) + + resp.error should equal (NoError) + resp.id should equal (1) + resp.host should equal (kafkaConfig.get("host.name")) + resp.port should equal (kafkaConfig.get("port").asInstanceOf[String].toInt) + } + } }