diff --git a/build.sbt b/build.sbt index bf1f5d0..6c565e4 100644 --- a/build.sbt +++ b/build.sbt @@ -8,14 +8,12 @@ organizationHomepage := Some(url("https://github.com/okapies")) version := "0.2.2-SNAPSHOT" -scalaVersion := "2.11.7" +scalaVersion := "2.11.8" -crossScalaVersions := Seq("2.10.5", "2.11.7") - -net.virtualvoid.sbt.graph.Plugin.graphSettings +crossScalaVersions := Seq("2.10.5", "2.11.8") libraryDependencies ++= List( - "com.twitter" %% "finagle-core" % "6.30.0", + "com.twitter" %% "finagle-core" % "6.35.0", "org.apache.kafka" %% "kafka" % "0.8.2.1" exclude("com.101tec", "zkclient") exclude("com.yammer.metrics", "metrics-core") @@ -25,8 +23,8 @@ libraryDependencies ++= List( "org.scalatest" %% "scalatest" % "2.2.4" % "test", // dependencies for kafka-test "junit" % "junit" % "4.11" % "test", - "org.apache.curator" % "curator-test" % "2.8.0" % "test", - "com.101tec" % "zkclient" % "0.4" % "test", + "org.apache.curator" % "curator-test" % "2.11.0" % "test", + "com.101tec" % "zkclient" % "0.8" % "test", "com.yammer.metrics" % "metrics-core" % "2.2.0" % "test", "org.apache.kafka" %% "kafka" % "0.8.2.1" % "test" classifier "test" ) diff --git a/project/plugins.sbt b/project/plugins.sbt index 25c27bf..92326c1 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,3 @@ -addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.5") +addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2") addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1") diff --git a/src/main/scala/okapies/finagle/Kafka.scala b/src/main/scala/okapies/finagle/Kafka.scala index 77668c9..506ea63 100644 --- a/src/main/scala/okapies/finagle/Kafka.scala +++ b/src/main/scala/okapies/finagle/Kafka.scala @@ -1,9 +1,9 @@ package okapies.finagle -import com.twitter.util.{Future, Promise} -import com.twitter.finagle.{Client, Name, Server, Service, ServiceFactory} -import com.twitter.finagle.client.{Bridge, DefaultClient} -import com.twitter.finagle.server.DefaultServer +import com.twitter.util.{Closable, Future, Promise} +import com.twitter.finagle._ +import com.twitter.finagle.client.{StackClient, Transporter, StdStackClient} +import com.twitter.finagle.server.{StackServer, StdStackServer, Listener} import com.twitter.finagle.dispatch.{PipeliningDispatcher, GenSerialServerDispatcher} import com.twitter.finagle.netty3.{Netty3Transporter, Netty3Listener} import com.twitter.finagle.pool.SingletonPool @@ -21,61 +21,87 @@ trait KafkaRichClient { self: Client[Request, Response] => } -object KafkaTransporter extends Netty3Transporter[Request, Response]( - name = "kafka", - pipelineFactory = KafkaBatchClientPipelineFactory -) - -object KafkaClient extends DefaultClient[Request, Response]( - name = "kafka", - endpointer = - Bridge[Request, Response, Request, Response](KafkaTransporter, new PipeliningDispatcher(_)), - pool = (sr: StatsReceiver) => new SingletonPool(_, sr) -) with KafkaRichClient - -class KafkaServerDispatcher( - trans: Transport[Response, Request], - service: Service[Request, Response]) -extends GenSerialServerDispatcher[Request, Response, Response, Request](trans) { - - trans.onClose ensure { - service.close() - } +object Kafka +extends Client[Request, Response] +with KafkaRichClient +with Server[Request, Response] { + + case class Client( + stack: Stack[ServiceFactory[Request, Response]] = StackClient.newStack, + params: Stack.Params = StackClient.defaultParams + ) extends StdStackClient[Request, Response, Client] { + protected type In = Request + protected type Out = Response - protected def dispatch(msg: Request, eos: Promise[Unit]) = msg match { - case req: Request => - service(req) ensure eos.setDone() - case failure => - eos.setDone - Future.exception(new IllegalArgumentException(s"Invalid message $failure")) + protected def copy1( + stack: Stack[ServiceFactory[Request, Response]], + params: Stack.Params): Client = + copy(stack, params) + + protected def newTransporter(): Transporter[Request, Response] = + Netty3Transporter(KafkaBatchClientPipelineFactory, params) + + protected def newDispatcher( + transport: Transport[Request, Response]): Service[Request, Response] = + new PipeliningDispatcher(transport) } - protected def handle(resp: Response) = resp match { - case r: NilResponse => Future.Unit // write no response to the transport - case anyResp => trans.write(resp) + + val client = Client() + + def newClient(dest: Name, label: String): ServiceFactory[Request, Response] = + client.newClient(dest, label) + + def newService(dest: Name, label: String): Service[Request, Response] = + client.newService(dest, label) + + + private[finagle] class KafkaServerDispatcher( + trans: Transport[Response, Request], + service: Service[Request, Response]) + extends GenSerialServerDispatcher[Request, Response, Response, Request](trans) { + + trans.onClose ensure { + service.close() + } + + protected def dispatch(msg: Request, eos: Promise[Unit]): Future[Response] = msg match { + case req: Request => + service(req) ensure eos.setDone() + case failure => + eos.setDone + Future.exception(new IllegalArgumentException(s"Invalid message $failure")) + } + + protected def handle(resp: Response): Future[Unit] = resp match { + case r: NilResponse => Future.Unit // write no response to the transport + case anyResp => trans.write(resp) + } } -} -object Kafka -extends Client[Request, Response] -with KafkaRichClient -with Server[Request, Response] { + case class Server( + stack: Stack[ServiceFactory[Request, Response]] = StackServer.newStack, + params: Stack.Params = StackServer.defaultParams + ) extends StdStackServer[Request, Response, Server] { + protected type In = Response + protected type Out = Request - def newClient(dest: Name, label: String) = KafkaClient.newClient(dest, label) + protected def copy1( + stack: Stack[ServiceFactory[Request, Response]], + params: Stack.Params): Server = + copy(stack, params) - def newService(dest: Name, label: String) = KafkaClient.newService(dest, label) + protected def newListener(): Listener[In, Out] = + Netty3Listener(new KafkaServerPipelineFactory, params) - class Server() extends DefaultServer[Request, Response, Response, Request]( - "kafka-server", - new Netty3Listener( - "kafka-server-listener", - new KafkaServerPipelineFactory - ), - new KafkaServerDispatcher(_, _) - ) + protected def newDispatcher( + transport: Transport[In, Out], + service: Service[Request, Response]): Closable = + new KafkaServerDispatcher(transport, service) + } - private val server = new Server() + val server = Server() def serve(addr: SocketAddress, service: ServiceFactory[Request, Response]) = server.serve(addr, service) diff --git a/src/main/scala/okapies/finagle/kafka/protocol/KafkaCodec.scala b/src/main/scala/okapies/finagle/kafka/protocol/KafkaCodec.scala index a04961c..e2b84d0 100644 --- a/src/main/scala/okapies/finagle/kafka/protocol/KafkaCodec.scala +++ b/src/main/scala/okapies/finagle/kafka/protocol/KafkaCodec.scala @@ -142,7 +142,10 @@ class KafkaCodecFactory(stats: StatsReceiver) extends CodecFactory[Request, Resp new Codec[Request, Response] { def pipelineFactory = KafkaStreamClientPipelineFactory - override def prepareConnFactory(underlying: ServiceFactory[Request, Response]) = { + override def prepareConnFactory( + underlying: ServiceFactory[Request, Response], + params: Stack.Params + ): ServiceFactory[Request, Response] = { new KafkaTracingFilter() andThen new KafkaLoggingFilter(stats) andThen underlying } }