Listen to notifications from Postgresql with Scala

5 minute read

In the past I’ve written a couple of articles (Building a REST service in Scala with Akka HTTP, Akka Streams and reactive mongo and ReactiveMongo with Akka, Scala and websockets) which used MongoDB to push updates directly from the database to a Scala application. This is a very nice feature if you just want to subscribe your application to a list of streaming events where it doesn’t really matter if you miss one when your application is down. While MongoDB is a great database, it isn’t a right fit for all purposes. Sometimes you want a relational database, with a well defined schema, or a database that can combine the SQL and noSQL worlds. Personally I’ve always really liked Postgresql. It’s one of the best relational databases, has great GIS support (which I really like a lot), and is getting more and more JSON/Schema-less support (which I need to dive into sometime). One of the features I didn’t know about in Postgresql was that it provides a kind of subscribe mechanism. I learned about that when reading the “Listening to generic JSON notifications from PostgreSQL in Go” article which shows how to use this from Go. In this article we’ll try to see what you need to do, to get something similar working in Scala (approach for Java is pretty much the same).

How does this work in Postgresql

It is actually very easy to listen to notifications in Postgresql. All you have to do is the following:

LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
NOTIFY virtual, 'This is the payload';
Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.

The connection that wants to listen to events calls LISTEN with the name of the channel it wants to listen on. And the sending connection just runs NOTIFY with the name of the channel, and a possible payload.

Preparing the database

The cool thing from the article on Go I mentioned in the introduction is that it provides a stored procedure which automatically sends a notification whenever a table row is INSERTed, UPDATEd, or DELETEd. The following, taken from Listening to generic JSON notifications from PostgreSQL in Go create a stored procedure which sends notifications when called.

The really cool thing about this stored procedure, is that the data is converted to JSON, so we can easily process it in our application. For this example I’ll use the same tables and data used in the Go article, so first create a table:

CREATE TABLE products (
  id SERIAL,
  name TEXT,
  quantity FLOAT

And create a trigger whenever something happens to the table.

CREATE TRIGGER products_notify_event

At this point, whenever a row is inserted, updated or deleted on the products table, a notify event is created. We can simply test this by using the pgsql command line:

triggers=# LISTEN events;
triggers=# INSERT INTO products(name, quantity) VALUES ('Something', 99999);
Asynchronous notification "events" with payload "{"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}}" received from server process with PID 24131.

As you can see, the INSERT resulted in an asynchronous event which contains the data. So, so far we’ve pretty much followed the steps also outlined in the Go article. Now lets look at how we can access the notifications from Scala.

Accessing notifications from Scala

First lets setup our project’s dependencies. As always we use SBT. The build.sbt for this project looks like this:

A quick summary of the depencies:

  • scalikeJDBC: This project provides an easy to use wrapper around JDBC, so we don't have to use the Java way of connection handling and stuff.
  • akka: We use the Akka framework to managed the connection with the database. Since the JDBC driver isn't asynchronous ar can push data, we need to set an interval.
  • json4s: This is just a simple Scala JSON library. We use this to quickly convert the incoming data into a simple case class.

We’ll first show you the complete source code for this example, and then explain the various parts:

If you’re familiar with Akka and with scalikeJDBC the code will look familiar. We start with some general setup stuff:

Here we define our case class to which we’ll transform the incoming JSON, setup a connection pool, define the Akka-System and start our Poller actor. Nothing too special here, the only thing special is on line 23. To add a listener from Scala we need access to the underlying JDBC Connection. Since scalikeJDBC uses connection pooling, we need to explicitly call setAccessToUnderlyingConnectionAllowed to make sure we’re allowed to access the actual connection when we call getInnerMostDelegate, and not just wrapped one from the connection pool. Interesting to note here, is that if we don’t set this, we don’t get an error message or anything, we just get a Null from this method call….

With this out of the way, and our Actor started, lets see what it does:

The first thing we do in our actor is set some properties needed by scalikeJDBC, and setup an timer which fires a message each 500 ms. Also note the preStart and postStop functions. In the preStart we execute a small piece of SQL, which tells postgres that this connection will be listening to notifications with the name “events”. We also set DB.autoClose to falls, to avoid the session pooling mechanism closing the session and connection. We want to keep these alive, so we can receive events. When the actor is terminated we make sure to clean up the timer and connection.

In the receive function we first get the real PGConnection and then get the notifications from the connection:

If there a no notification Null will be returned, so we wrap this in an Option, and just return an empty array in the case of Null. If there are any notification we just process them in a foreach loop and print out the result:

Here you can also see that we just get the “data” element from the notification, and convert it to our Product class for further processing. All you have to do now is start the application and from the same pgsql terminal add some events. If all went well, you’ll see output similar to this in your console:

Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":47,"name":"pen","quantity":10200}} 
Received as object: Product(47,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":48,"name":"pen","quantity":10200}} 
Received as object: Product(48,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":49,"name":"pen","quantity":10200}} 
Received as object: Product(49,pen,10200)
Received for: events from process with PID: 24131
Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}} 
Received as object: Product(50,Something,99999)

Now that you’ve got this basic construct working it’s trivial to use this, for instance, as a source for reactive streams, or just use websockets to further propagate these events.