Skip to content

Commit

Permalink
Implemented protocol support for ConsumerMetadataRequest and Consumer…
Browse files Browse the repository at this point in the history
…MetadataResponse

- New client API function consumerMetadata
  • Loading branch information
Mikael Högqvist committed Mar 23, 2015
1 parent 88e4e58 commit f4b09f0
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 11 deletions.
8 changes: 8 additions & 0 deletions src/main/scala/okapies/finagle/kafka/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ class Client(
case MetadataResponse(_, _, topics) => Future.value(topics)
}

def consumerMetadata(consumerGroup: String): Future[ConsumerMetadataResult] =
doRequest(ConsumerMetadataRequest(
nextCorrelationId(),
clientId,
consumerGroup)) {
case ConsumerMetadataResponse(_, result) => Future.value(result)
}

def close(deadline: Time): Future[Unit] = service.close(deadline)

private[this] def doRequest[T](req: Request)(
Expand Down
10 changes: 9 additions & 1 deletion src/main/scala/okapies/finagle/kafka/protocol/Request.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ object Request {
classOf[OffsetRequest] -> ApiKeyOffset,
classOf[MetadataRequest] -> ApiKeyMetadata,
classOf[OffsetCommitRequest] -> ApiKeyOffsetCommit,
classOf[OffsetFetchRequest] -> ApiKeyOffsetFetch
classOf[OffsetFetchRequest] -> ApiKeyOffsetFetch,
classOf[ConsumerMetadataRequest] -> ApiKeyConsumerMetadata
)

}
Expand Down Expand Up @@ -89,3 +90,10 @@ case class OffsetFetchRequest(
consumerGroup: String, // string
partitions: Map[String, Seq[Int]]
) extends Request

// ConsumerMetadataRequest
case class ConsumerMetadataRequest(
correlationId: Int, // int32
clientId: String, // string
consumerGroup: String // string
) extends Request
18 changes: 18 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/RequestCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class RequestEncoder(logger: RequestLogger) extends SimpleChannelDownstreamHandl
case req: MetadataRequest => encodeMetadataRequest(req)
case req: OffsetCommitRequest => encodeOffsetCommitRequest(req)
case req: OffsetFetchRequest => encodeOffsetFetchRequest(req)
case req: ConsumerMetadataRequest => encodeConsumerMetadataRequest(req)
}

logger.add(req)
Expand Down Expand Up @@ -198,4 +199,21 @@ class RequestEncoder(logger: RequestLogger) extends SimpleChannelDownstreamHandl
buf
}

/**
* Implemented in Kafka 0.8.2.0
*
* {{{
* ConsumerMetadataRequest => ConsumerGroup
* }}}
*/
private def encodeConsumerMetadataRequest(req: ConsumerMetadataRequest) = {
val buf = ChannelBuffers.dynamicBuffer() // TODO: estimatedLength
encodeRequestHeader(buf, ApiKeyConsumerMetadata, req)

buf.encodeString(req.consumerGroup)

buf
}


}
13 changes: 13 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ case class OffsetFetchResult(
error: KafkaError // int16
)

// ConsumerMetadataResponse
case class ConsumerMetadataResponse(
correlationId: Int, // int32
result: ConsumerMetadataResult
) extends Response

case class ConsumerMetadataResult(
error: KafkaError, // int16
id: Int, // int32
host: String, // string
port: Int // int32
)

/**
* A message frame for responses.
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/okapies/finagle/kafka/protocol/ResponseCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object ResponseDecoder {
case ApiKeyStopReplica => null
case ApiKeyOffsetCommit => decodeOffsetCommitResponse(correlationId, frame)
case ApiKeyOffsetFetch => decodeOffsetFetchResponse(correlationId, frame)
case ApiKeyConsumerMetadata => decodeConsumerMetadataResponse(correlationId, frame)
}

/**
Expand Down Expand Up @@ -250,6 +251,22 @@ object ResponseDecoder {
OffsetFetchResponse(correlationId, results)
}

/**
* Implemented in Kafka 0.8.2.0
*
* {{{
* ConsumerMetadataResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
* }}}
*/
private[this] def decodeConsumerMetadataResponse(correlationId: Int, buf: ChannelBuffer) = {
val error: KafkaError = buf.decodeInt16()
val id = buf.decodeInt32()
val host = buf.decodeString()
val port = buf.decodeInt32()

val result = ConsumerMetadataResult(error, id, host, port)
ConsumerMetadataResponse(correlationId, result)
}
}

/**
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/okapies/finagle/kafka/protocol/Spec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ private[protocol] object Spec {
final val ApiVersion0 = 0: Int16

// Api Keys
final val ApiKeyProduce = 0: Int16
final val ApiKeyFetch = 1: Int16
final val ApiKeyOffset = 2: Int16
final val ApiKeyMetadata = 3: Int16
final val ApiKeyLeaderAndIsr = 4: Int16
final val ApiKeyStopReplica = 5: Int16
final val ApiKeyOffsetCommit = 8: Int16
final val ApiKeyOffsetFetch = 9: Int16
final val ApiKeyProduce = 0: Int16
final val ApiKeyFetch = 1: Int16
final val ApiKeyOffset = 2: Int16
final val ApiKeyMetadata = 3: Int16
final val ApiKeyLeaderAndIsr = 4: Int16
final val ApiKeyStopReplica = 5: Int16
final val ApiKeyOffsetCommit = 8: Int16
final val ApiKeyOffsetFetch = 9: Int16
final val ApiKeyConsumerMetadata = 10: Int16

// length of fields
final val CorrelationIdLength = 4
Expand Down
29 changes: 27 additions & 2 deletions src/test/scala/okapies/finagle/kafka/ClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package okapies.finagle.kafka

import org.scalatest._
import org.scalatest.matchers._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{Span, Seconds, Millis}

import java.util.Properties
import java.nio.charset.Charset
Expand Down Expand Up @@ -41,6 +43,10 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>
kafkaConfig = TestUtils.createBrokerConfig(1)
kafkaConfig.put("zookeeper.connect", zkConn)
kafkaConfig.put("host.name", "127.0.0.1")

// https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/
// kafka/server/OffsetCommitTest.scala#L49
kafkaConfig.put("offsets.topic.replication.factor", "1")
kafkaConn = s"""${kafkaConfig.get("host.name")}:${kafkaConfig.get("port")}"""
kafkaServer = TestUtils.createServer(new KafkaConfig(kafkaConfig))

Expand All @@ -57,7 +63,11 @@ trait KafkaTest extends BeforeAndAfterAll { suite: Suite =>

}

class ClientTest extends FlatSpec with Matchers with KafkaTest {
class ClientTest
extends FlatSpec
with Matchers
with Eventually
with KafkaTest {
import ClientTestUtils._
import Await.result
import KafkaError._
Expand All @@ -67,6 +77,10 @@ class ClientTest extends FlatSpec with Matchers with KafkaTest {
val group = "test-group"
val msg = Message.create("hello".cb)

override implicit val patienceConfig =
PatienceConfig(timeout = scaled(Span(5, Seconds)), interval = scaled(Span(500, Millis)))


override def beforeAll {
// start zookeeper and kafka
super.beforeAll
Expand Down Expand Up @@ -170,9 +184,20 @@ class ClientTest extends FlatSpec with Matchers with KafkaTest {
commit.error should equal (NoError)
val fetch = result(client.offsetFetch(group, topic, 0))
fetch.error should equal(NoError)
fetch.error should equal (NoError)
fetch.metadata should equal(metadata)
}
*/

it should "return consumer group metadata" in {
eventually {
val resp = result(client.consumerMetadata(group))

resp.error should equal (NoError)
resp.id should equal (1)
resp.host should equal (kafkaConfig.get("host.name"))
resp.port should equal (kafkaConfig.get("port").asInstanceOf[String].toInt)
}
}
}

0 comments on commit f4b09f0

Please sign in to comment.