Create a reactive websocket server with akka-streams
In the previous article we looked at how backpressure works when working with akka-streams. Akka also uses akka-streams as the base for akka-http, with which you can easily create a HTTP server and client, where each request is processed using a stream materialized using actors. In the latest release akka-http also supports websockets. In this article we’ll show you how you can create a websockets server using akka-http.
We’ll show and explain the following subjects:
- Respond to messages using a simple flow created with the flow API.
- Respond to messages with a flow graph, created with the flow graph DSL.
- Proactively push messages to the client by introducing an additional source to the flow.
- Create a custom publisher from an Akka Actor.
When writing this article, it became a bit longer than initially planned. I’ll write a follow-up on how you can see that with websockets backpressure and rate control is also working, so watch for that one in a couple of weeks. The source files for this article can be found in the following Github Gists:
So first lets look at how to set up the basic skeleton of the application.
Getting started
Lets start by looking at the dependencies we need. For all the examples we use the following simple sbt file
As you can see we use the RC2 version of akka-streams and akka-http, which was released at the end of April. Creating a websocket server with akka-http is very easy, and pretty much the same way as we also did with the http server:
In this example we bind a set of handlers, more on these later, to localhost port 9001. Whenever a request comes in, we try to match it using pattern matching. To detect a WebSocket request we need to check whether the value in a specific header is an UpgradeToWebSocket message. We do this in the WSRequest extractor. So when we have a WebSocket request and a specific Uri.Path we handle the request using the specified flow. If we can’t match the request, we just return a 400. Note that the binding itself is a future, so we just wait a second for the server to start, or assume something went wrong. In our pattern matching we match four different patterns with each its own flow. In the next sections we’ll look a bit closer at each of the flows to see how they work. Lets start simple with the “/echo” flow, which also allows us to introduce some test tools.
Echo flow
Before we look at the flow lets look a bit closer at what our handler functions require. The signature for “req.header[UpgradeToWebsocket].get.handleMessages” looks like this:
def handleMessages(handlerFlow: Flow[Message, Message, Any], subprotocol: Option[String] = None)(implicit mat: FlowMaterializer): HttpResponse
As you can see this function requires a Flow with an open input which accepts a Message and an open output which also expects a message. Akka-streams will attach the created websocket as a Source and pass any sent messages from the client into this flow. Akka-streams will also use the same websocket as a Sink and pass the resulting message from this flow to it. The result from this function is a HTTPResponse that will be sent to the client.
Now lets look at the echo flow. For this flow we defined the following case:
So whenever we receive a websocket message on “/echo”, we run it through the Flows.echoFlow:
Calling Flow[Message] like this, returns a minimal flow, which just returns the message it received as is input, directly to the output. So in our case any websocket message received, is passed back to the client. Now, lets see this in action. To test this we need a websocket client. We can, of course, write one ourselves, but for now we’ll just use a chrome plugin (Simple Web Socket Client). Connect this client to http://localhost:9001/echo and send a message:
The flow configured at this endpoint responds with the same text as was entered. We didn’t really do anything with this message. Lets add some custom functionality to the flow and see what happens.
SimpleFlow
When we connect a client to “/simple” the Flows.reverseFlow is used to handle incoming websocket messages:
This time we create a simple Flow using the standard streams API:
This time we once again create a flow using Flow[Message], and on the result we call map. In the provided map function we check whether we have a standard TextMessage (we’re ignoring streaming and binary messages in this article), and if we do, we reverse the text and return a new TextMessage. The result is pretty much like you’d expect:
Anything you enter here, is returned to the client but reversed.
So far we’ve only created very simple flows, using the flow API directly. I’ve you’ve already looked a bit closer at akka-streams you probably know that there is an alternative way of creating flows. You can also use the Graph DSL, as we’ll show in the next example:
The Graph flow
With a graph flow it is very easy to create more complex message processing graphs. In the following sample we’ll show you how you can use a couple of standard flow constructs to easily process and filter incoming messages. This sample will be run when we access the server on the following endpoint:
Lets first look at the source code:
In this graph we first filter out messages we don’t want by using a collect step. This step only passes on the incoming message, when there is a match in the provided partial function. Next we send the message to a broadcast step. The broadcast step allows you to duplicate the message and send it to multiple downstream steps. In our case we send it to three downstread computer steps. The compute step in itself is just a simple Map function where we concat a number to the message. With the zip node we create a single String from the three Strings created by the compute nodes. Finally, since our flow requires a Message, we map the string to a message in the mapToMessage step. To complete this flow, we need to return a tuple with the entry and exit points of this flow. At this point, we can run the sample again:
At this point we’ve only responded to messages from the client, but didn’t push anything to the client from the server proactively. In the following sample, we’ll introduce an additional source that can push messages to the client regardless whether the client requested it.
Pushing messages to the client
One of the patterns that are matched, use the Flow.graphFlowWithExtraSource flow:
This flow ignores incoming messages, and just sends out 2000 random strings to the connected client. The complete code for this flow is shown next:
In this graph we use a Merge step to connect the source we want to use to the rest of the flow. A merge step takes an element from an upstream source when it becomes available. To make sure that we only take elements from our source (the rangeSource), we add a filter on the messages we receive from the websocket client. Besides these steps we’ve got a couple of map steps we’ve already seen before, and that makes up our flow.
When we run a message to this flow, we should see 2000 messages being pushed to the client as fast as the client can process:
Which is exactly what happens. As soon as the connection is created, 2000 messages are pushed to the client. Any messages sent from the client are ignored, as you can see in the following screenshot:
We’ve also added a small logging step to the flow. This will just print out all numbers from 1 tot 2000, to give us an idea how everything is running. At this point we’ve only used the standard components provided by akka-streams. In the next section we’re going to create a custom publisher, that pushes VM information such as memory usage to a websocket client.
Pusing messages to the client with a custom publisher
We’ll need to take a couple of steps before we can get this to work correctly, and this will involve creating a couple of agents:
- We'll need a actor that forms our stream. For this we'll use an agent that together with a scheduler sends a VMStat messages at a configured interval.
- In akka-streams you can't connect a new subscriber to a running publisher. To work around this we'll have the actor from step 1, send its messages to a router. This router will then broadcast the messages further to an actor that can inject them into a flow.
- Finally we need the actor that connects the messages to the akka flow. For this we create an actor for each websocket request, which acts like a publisher, and passes on messages received from the router into the flow.
Lets start with the first one.
The VMActor
The VMActor is a simple actor, which, when started, sends a message every period to the provided actorRef like this:
The code for this actor isn’t that special. It’s just a basic actor which collects some information and passes it on in a map to the provided actorRef. (I know, I know, should have made that a case class…). Now, lets look at the router.
The router
For the router, initially, I wanted to use the standard BroadcastGroup router. But this router is immutable and doesn’t really allow dynamically adding new routees. So for this usecase we create a very simple alternative router, which we create like this:
Here we create both actors, and pass in the actorRef of the router to the vmactor. The implementation of this router looks like this:
The router uses the default case classes the default routers of Akka also use. The vmactor will send an update 50 times per second, with an initial delay of 2 seconds. All that is left to do is create an actor that registers itself to the router as routee and can publish to the flow.
VMStatsPublisher
Akka-streams provides an ActorPublisher[T] trait which you must use on your actors, so that they can be used as publisher inside a flow. Before we look at the implementation of this actor, first lets look at the flow that uses this actor:
If you’ve looked at the other flows, this one shouldn’t come as a suprise. It looks a lot like the previous one, only this time we define the source like this:
This means that everytime a websocket connection is made, a new VMStatsPublisher actor is created and the router actorRef is passed into the constructor. So, finally, lets look at this publisher. We will first look at the complete code of this actor and then we’ll highlight a couple of small things in the discussion afterwards:
The comments inline should give you a fairly good idea what is happening here, but lets look at a couple of items. First lets look at how we register this actor with the router:
In the preStart we register, and we must also make sure to deregister before we’re stopped. In the receive method we can receive three types of objects. We can receive stats, which we send to an internal queue, we can receive a request from downstream for more message (the Request message), or we can get Cancel message when the subscriber closes in an orderly fashion. To deliver messages to our downstream subscriber we use the onNext call in the deliver function.
As long as there is a demand (totalDemand property which is managed by akka-streams), and we’ve got messages in our queue we’ll continue sending messages. This function also outputs a console message when there is no more demand from the subscriber to this publisher. When we connect to this flow using our client we see the following in our websocket client:
Cool right! There are a couple of other topics to explore regarding websockets and akka-streams, most importantly backpressure. I’ll create a separate article on that one in the next couple of weeks to show that slow websocket clients trigger backpressure with akka-streams.