-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vertx RxScala Implementation #48
Comments
@vietj: Is this repo still active? |
@willisjtc yes it is |
I already played around with RxJava and here are my 2 cents: |
Are you thinking of doing rxscala or rxjava2 first? I'd prefer starting with rxscala only because we use rxjava 1.0 at work and getting adoption would be easier on the same rx version. I'm totally fine either way, though. |
The problem is: Both will take quite some work to get working. I'd still be open to also do RxScala but we will have to maintain both. |
Any issues i could start working on? |
Right now I'd like to work on #54 to be better prepared for Java 9 and changes cutting across different modules. I am currently spending some time working with RxJava 2 and trying to find a way to integrate it and get some reuse from vertx-rxjava2. |
@codepitbull have you considered using Monix? It's Reactive Streams compliant and has an idiomatic Scala API. Wrapping AsyncResult handlers is as simple as this: // Decorate the Task companion object with a method for converting handler methods to Task
implicit class MonixTaskCompanionVertxOps(task: Task.type) {
def handle[A](f: Handler[AsyncResult[A]] => Unit): Task[A] = {
def handler(cb: Callback[A]): Handler[AsyncResult[A]] = { result =>
if (result.succeeded)
cb.onSuccess(result.result)
else
cb.onError(result.cause)
}
def runnable(cb: Callback[A]): Runnable =
() => try f(handler(cb)) catch {
case NonFatal(e) => cb.onError(e)
}
Task.create { (s, cb) =>
val scb = Callback.safe(cb)(s)
s.execute(runnable(scb))
Cancelable.empty
}
}
}
// Now every Handler[AsyncResult[A]] => Unit can be wrapped like this
Task.handle[String] { handler =>
vertx.deployVerticle(arg0, arg1, handler)
} or to convert ReadStream to an Observable: implicit class VertxReadStreamOps[A](readStream: ReadStream[A]) {
def toObservable(vertx: Vertx): Observable[A] = {
val writeStream = ReactiveWriteStream.writeStream[A](vertx)
Pump.pump(readStream, writeStream).start()
readStream.endHandler(_ => writeStream.end())
Observable.fromReactivePublisher(writeStream)
}
}
val stream = vertx.eventBus
.consumer[String](address)
.toObservable(vertx) |
That's very nice. I looked at Monix for quite a while ago for another project. |
Yep, sure, I've actually been messing around with generating wrapper APIs to use with Monix here if you want to see what this might look like |
@DavidGregory084 I've been working on and off on a value classes based approach to integrating Vert.x and Scala. The current (working) results are here https://github.com/codepitbull/vertx-lang-scala-vc |
@codepitbull I finally got back to experimenting with this idea again here: https://github.com/DavidGregory084/vertices It uses the value classes / implicit class based approach just like your repo. One thing that is a little awkward is that you can't add extension methods to Java so that they are callable like static methods, the way that you can with scala companion objects. I have used the vertx-codegen code in order to generate extension methods for all of the vertx APIs, although my approach is a little mechanical and it can get things wrong, such as generating |
@DavidGregory084 you might want to check the current state of vertx-lang-scala. The master branch now contains a full implementation of my approach and you might be able to grab a lot of stuff from there since I already get all the methods from the Java-API for conversion. |
Do you plan on providing an rxscala toolkit for vertx applications? If so, would you want some assistance?
The text was updated successfully, but these errors were encountered: