Skip to content

Commit

Permalink
Merge pull request #14 from mkhq/finagle-6-35
Browse files Browse the repository at this point in the history
Updated to finagle 6.35.0
  • Loading branch information
okapies authored Jul 4, 2016
2 parents bb17775 + 8413e3f commit ce9bc3a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 58 deletions.
12 changes: 5 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,12 @@ organizationHomepage := Some(url("https://github.com/okapies"))

version := "0.2.2-SNAPSHOT"

scalaVersion := "2.11.7"
scalaVersion := "2.11.8"

crossScalaVersions := Seq("2.10.5", "2.11.7")

net.virtualvoid.sbt.graph.Plugin.graphSettings
crossScalaVersions := Seq("2.10.5", "2.11.8")

libraryDependencies ++= List(
"com.twitter" %% "finagle-core" % "6.30.0",
"com.twitter" %% "finagle-core" % "6.35.0",
"org.apache.kafka" %% "kafka" % "0.8.2.1"
exclude("com.101tec", "zkclient")
exclude("com.yammer.metrics", "metrics-core")
Expand All @@ -25,8 +23,8 @@ libraryDependencies ++= List(
"org.scalatest" %% "scalatest" % "2.2.4" % "test",
// dependencies for kafka-test
"junit" % "junit" % "4.11" % "test",
"org.apache.curator" % "curator-test" % "2.8.0" % "test",
"com.101tec" % "zkclient" % "0.4" % "test",
"org.apache.curator" % "curator-test" % "2.11.0" % "test",
"com.101tec" % "zkclient" % "0.8" % "test",
"com.yammer.metrics" % "metrics-core" % "2.2.0" % "test",
"org.apache.kafka" %% "kafka" % "0.8.2.1" % "test" classifier "test"
)
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.7.5")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.8.2")

addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.5.1")
124 changes: 75 additions & 49 deletions src/main/scala/okapies/finagle/Kafka.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package okapies.finagle

import com.twitter.util.{Future, Promise}
import com.twitter.finagle.{Client, Name, Server, Service, ServiceFactory}
import com.twitter.finagle.client.{Bridge, DefaultClient}
import com.twitter.finagle.server.DefaultServer
import com.twitter.util.{Closable, Future, Promise}
import com.twitter.finagle._
import com.twitter.finagle.client.{StackClient, Transporter, StdStackClient}
import com.twitter.finagle.server.{StackServer, StdStackServer, Listener}
import com.twitter.finagle.dispatch.{PipeliningDispatcher, GenSerialServerDispatcher}
import com.twitter.finagle.netty3.{Netty3Transporter, Netty3Listener}
import com.twitter.finagle.pool.SingletonPool
Expand All @@ -21,61 +21,87 @@ trait KafkaRichClient { self: Client[Request, Response] =>

}

object KafkaTransporter extends Netty3Transporter[Request, Response](
name = "kafka",
pipelineFactory = KafkaBatchClientPipelineFactory
)

object KafkaClient extends DefaultClient[Request, Response](
name = "kafka",
endpointer =
Bridge[Request, Response, Request, Response](KafkaTransporter, new PipeliningDispatcher(_)),
pool = (sr: StatsReceiver) => new SingletonPool(_, sr)
) with KafkaRichClient

class KafkaServerDispatcher(
trans: Transport[Response, Request],
service: Service[Request, Response])
extends GenSerialServerDispatcher[Request, Response, Response, Request](trans) {

trans.onClose ensure {
service.close()
}
object Kafka
extends Client[Request, Response]
with KafkaRichClient
with Server[Request, Response] {

case class Client(
stack: Stack[ServiceFactory[Request, Response]] = StackClient.newStack,
params: Stack.Params = StackClient.defaultParams
) extends StdStackClient[Request, Response, Client] {
protected type In = Request
protected type Out = Response

protected def dispatch(msg: Request, eos: Promise[Unit]) = msg match {
case req: Request =>
service(req) ensure eos.setDone()
case failure =>
eos.setDone
Future.exception(new IllegalArgumentException(s"Invalid message $failure"))
protected def copy1(
stack: Stack[ServiceFactory[Request, Response]],
params: Stack.Params): Client =
copy(stack, params)

protected def newTransporter(): Transporter[Request, Response] =
Netty3Transporter(KafkaBatchClientPipelineFactory, params)

protected def newDispatcher(
transport: Transport[Request, Response]): Service[Request, Response] =
new PipeliningDispatcher(transport)
}

protected def handle(resp: Response) = resp match {
case r: NilResponse => Future.Unit // write no response to the transport
case anyResp => trans.write(resp)

val client = Client()

def newClient(dest: Name, label: String): ServiceFactory[Request, Response] =
client.newClient(dest, label)

def newService(dest: Name, label: String): Service[Request, Response] =
client.newService(dest, label)


private[finagle] class KafkaServerDispatcher(
trans: Transport[Response, Request],
service: Service[Request, Response])
extends GenSerialServerDispatcher[Request, Response, Response, Request](trans) {

trans.onClose ensure {
service.close()
}

protected def dispatch(msg: Request, eos: Promise[Unit]): Future[Response] = msg match {
case req: Request =>
service(req) ensure eos.setDone()
case failure =>
eos.setDone
Future.exception(new IllegalArgumentException(s"Invalid message $failure"))
}

protected def handle(resp: Response): Future[Unit] = resp match {
case r: NilResponse => Future.Unit // write no response to the transport
case anyResp => trans.write(resp)
}
}
}


object Kafka
extends Client[Request, Response]
with KafkaRichClient
with Server[Request, Response] {
case class Server(
stack: Stack[ServiceFactory[Request, Response]] = StackServer.newStack,
params: Stack.Params = StackServer.defaultParams
) extends StdStackServer[Request, Response, Server] {
protected type In = Response
protected type Out = Request

def newClient(dest: Name, label: String) = KafkaClient.newClient(dest, label)
protected def copy1(
stack: Stack[ServiceFactory[Request, Response]],
params: Stack.Params): Server =
copy(stack, params)

def newService(dest: Name, label: String) = KafkaClient.newService(dest, label)
protected def newListener(): Listener[In, Out] =
Netty3Listener(new KafkaServerPipelineFactory, params)

class Server() extends DefaultServer[Request, Response, Response, Request](
"kafka-server",
new Netty3Listener(
"kafka-server-listener",
new KafkaServerPipelineFactory
),
new KafkaServerDispatcher(_, _)
)
protected def newDispatcher(
transport: Transport[In, Out],
service: Service[Request, Response]): Closable =
new KafkaServerDispatcher(transport, service)
}

private val server = new Server()
val server = Server()

def serve(addr: SocketAddress, service: ServiceFactory[Request, Response]) =
server.serve(addr, service)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ class KafkaCodecFactory(stats: StatsReceiver) extends CodecFactory[Request, Resp
new Codec[Request, Response] {
def pipelineFactory = KafkaStreamClientPipelineFactory

override def prepareConnFactory(underlying: ServiceFactory[Request, Response]) = {
override def prepareConnFactory(
underlying: ServiceFactory[Request, Response],
params: Stack.Params
): ServiceFactory[Request, Response] = {
new KafkaTracingFilter() andThen new KafkaLoggingFilter(stats) andThen underlying
}
}
Expand Down

0 comments on commit ce9bc3a

Please sign in to comment.