Use reactive streams API to combine akka-streams with rxJava
Just a quick article this time, since I’m still experimenting with this stuff. There is a lot of talk around reactive programming. In Java 8 we’ve got the Stream API, we got rxJava we got ratpack and Akka has got akka-streams.
The main issue with these implementations is that they aren’t compatible. You can’t connect the subscriber of one implementation to the publisher of another. Luckily an initiative has started to provide a way that these different implementations can work together:
"It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application."
From - http://www.reactive-streams.org/
How does this work
Now how do we do this? Lets look at a quick example based on the akka-stream provided examples (from here). In the following listing
package sample.stream
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.{Flow, SubscriberSink, PublisherSource, Source}
import rx.internal.reactivestreams.RxSubscriberToRsSubscriberAdapter
import rx.{Subscriber, RxReactiveStreams, Observable}
import scala.collection.JavaConverters._
object BasicTransformation {
def main(args: Array[String]): Unit = {
// define an implicit actorsystem and import the implicit dispatcher
implicit val system = ActorSystem("Sys")
// flow materializer determines how the stream is realized.
// this time as a flow between actors.
implicit val materializer = FlowMaterializer()
// input text for the stream.
val text =
"""|Lorem Ipsum is simply dummy text of the printing and typesetting industry.
|Lorem Ipsum has been the industry's standard dummy text ever since the 1500s,
|when an unknown printer took a galley of type and scrambled it to make a type
|specimen book.""".stripMargin
// create an observable from a simple list (this is in rxjava style)
val first = Observable.from(text.split("\\s").toList.asJava);
// convert the rxJava observable to a publisher
val publisher = RxReactiveStreams.toPublisher(first);
// based on the publisher create an akka source
val source = PublisherSource(publisher);
// create a subscriber
val sub = new Subscriber[String]() {
override def onCompleted(): Unit = println("The observerd Akka-Stream is completed")
override def onError(e: Throwable): Unit = println("The observered Akka-Stream caused on error")
override def onNext(t: String): Unit = println("Message received:" + t)
}
val subSink = SubscriberSink(new RxSubscriberToRsSubscriberAdapter(sub));
// we create a simple flow of three steps
val flow = Flow[String]
.map(_.toUpperCase). // step
filter(_.length > 3). // step
take(5) // step
// we want to run this flow with our specified source and sink
// both the source and the sink are rxJava components converted
// used with 'reactive streams'
flow.runWith(source,subSink)
}
}
The code comments in this example explain pretty much what is happening. What we do here is we create a rxJava based Observable. Convert this Observable to a “reactive streams” publisher and use this publisher to create an akka-streams source. For the rest of the code we can use the akka-stream style flow API to model the stream. In this case we just do some filtering and print out the result.