Exploring ZIO - Part II - ZStream and modules

16 minute read

In the previous article we looked at the basics of ZIO. In this part we’re going to extend on that example and add the following:

  • ZStream integration: we’re going to create our own stream that polls an external service, and we’re going to use the reactive-stream integration to query a Mongo database.
  • Storage layer: we’ll add persistence to the example, where everything we get from the external service is stored in a database (MongoDB in our case).
  • Module pattern 2.0: we’ve rewritten a couple of services to the new module pattern. Not all services could be rewritten though, but I’ll try to explain my reasoning why and why not.
  • Real REST endpoint: We’ll connect the HTTP4S endpoint to the MongoDB server to actually retrieve some data.

We’ll not look at all the code (which you can find here: https://github.com/josdirksen/zio-playground), but focus on the main data flow and how that is connected. The main steps we’ll explore in this article are:

  1. Poll an external REST endpoint to retrieve temperature data.
  2. This data is then stored in MongoDB.
  3. Expose a REST endpoint to query all the stored data.

Getting started

First of, let’s quickly look at the sbt dependencies so you get a sense of which libraries we’ll be using:

import Dependencies._

ThisBuild / scalaVersion := "2.13.4"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "org.smartjava"
ThisBuild / organizationName := "smartjava"

// need to use an older version, since the newest version of http4s
// supports the latest cats version, while the zio-interop-cats one
// doesn't support this yet.
val Http4sVersion = "1.0.0-M4"
val ZioVersion = "1.0.8"

lazy val root = (project in file("."))
  .settings(
    name := "zio-playground",
    libraryDependencies += scalaTest % Test,
    libraryDependencies ++= Seq(
      "org.http4s" %% "http4s-blaze-server" % Http4sVersion,
      "org.http4s" %% "http4s-dsl" % Http4sVersion,
      "org.http4s" %% "http4s-blaze-client" % Http4sVersion,
      "org.http4s" %% "http4s-circe" % Http4sVersion,
      // JSON Mapping
      "io.circe" %% "circe-generic" % "0.12.3",
      "io.circe" %% "circe-literal" % "0.12.3",
      // ZIO stuff
      "dev.zio" %% "zio" % ZioVersion,
      "dev.zio" %% "zio-streams" % ZioVersion,
      "dev.zio" %% "zio-interop-reactivestreams" % "1.3.5",
      "dev.zio" %% "zio-interop-cats" % "2.4.0.0",
      "com.github.pureconfig" %% "pureconfig" % "0.15.0",
      "org.slf4j" % "slf4j-api" % "1.7.5",
      "org.slf4j" % "slf4j-simple" % "1.7.5",
      "dev.zio" %% "zio-test" % ZioVersion % "test",
      "dev.zio" %% "zio-test-sbt" % ZioVersion % "test",
      "org.mongodb.scala" %% "mongo-scala-driver" % "4.2.3"
    ),
    testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
  )

Nothing to special. We use circe for the JSON stuff, use the standard mongo-scala-driver for accessing the database, and we’ve addedd the relevant zio-streams dependencies to create streams and to interact with reactivestreams.

Creating our first stream

We’re first going to create our own stream. This stream will be a stream of Temperature values retrieved from an external API. This is a stream that’ll poll the external service every so often, and pass on these values to anyone that is interested.

All the stuff happens in the TempClient.scala file (we’ll explain the interesting parts below this code fragment):

/** The temperature client calls a remote webservice ever so often and gives
  * access to a stream of temperatures.
  */
object tempClient {

  trait TempClient {
    val temperatureStream: ZStream[Any, Throwable, Temperature]
  }

  object TempClient {

    /** get the zstream within the context of the provided environment. This will
      * return a stream that, when drained, will provide a temperature update every
      * tick. The only dependency here is the TempClient, retrieving this stream
      * won't result in any errors. Note that we lift the temperatureStream in
      * the UIO, else we can't use the serviceWith function, which expects the
      * result to be A wrapped in a URIO
      */
    val temperatureStream: URIO[Has[TempClient], ZStream[Any, Throwable, Temperature]] =
      ZIO.serviceWith[TempClient](s => UIO.succeed(s.temperatureStream))
  }

  case class TempClientLive(
      client: Client[Task],
      console: Console.Service,
      clock: Clock.Service,
      configuration: Configuration
  ) extends TempClient {

    import org.http4s.circe._
    import io.circe.generic.auto._
    import TempClientLive.OpenWeather._
    implicit val userDecoder = jsonOf[Task, OWResult]

    val tempConfig = configuration.config.temperatureConfig

    override val temperatureStream: ZStream[Any, Throwable, Temperature] =
      // emit one right away and then do the rest in an interval
      (ZStream.succeed(-1L) ++ ZStream.fromSchedule(Schedule.spaced(tempConfig.interval.toJava)))
        .mapM(_ => makeTemperatureCall(tempConfig.endpoint + tempConfig.apiKey))
        .tap { p => console.putStrLn(p.toString()) }
        // We still have to provide the clock for the schedule to eliminate all R
        // and just use everything provided to this service.
        .provide(Has(clock))

    /** Make the call. This requires a client to be in the environment, and returns a string
      */
    private def makeTemperatureCall(url: String): ZIO[Any, Throwable, Temperature] = {
      for {
        // for converting to string, we can use the standard EntityDecoder from HTTP4S together with
        // the zio.interop.cats_ for mapping the Cats stuff to ZIO
        res <- client.expect[OWResult](url)
      } yield (Temperature(res.dt, res.main.temp))
    }
  }

  object TempClientLive {

    object OpenWeather {
      // the openweather model
      case class OWResult(coord: OWCoord, main: OWMain, visibility: Integer, wind: OWWind, dt: Long)
      case class OWCoord(lat: Double, lon: Double)
      case class OWMain(
          temp: Double,
          feels_like: Double,
          temp_min: Double,
          temp_max: Double,
          pressure: Int,
          humidity: Int
      )
      case class OWWind(speed: Double, deg: Long, gust: Double)
    }

    // the dependencies for this service
    type TempClientLiveDeps = Has[Client[Task]]
      with Has[Console.Service]
      with Has[Clock.Service]
      with Has[Configuration]

    // the layer that can be fed to other services, and which specifies what is needed by this layer
    val layer: URLayer[TempClientLiveDeps, Has[TempClient]] = (TempClientLive(_, _, _, _)).toLayer
  }
}

Let’s start by looking at the structure. This setup uses the Module pattern 2.0:

object tempClient {

  trait TempClient {
    val temperatureStream: ZStream[Any, Throwable, Temperature]
  }

  object TempClient {
    val temperatureStream: URIO[Has[TempClient], ZStream[Any, Throwable, Temperature]] =
      ZIO.serviceWith[TempClient](s => UIO.succeed(s.temperatureStream))
  }

  case class TempClientLive(
      client: Client[Task],
      console: Console.Service,
      clock: Clock.Service,
      configuration: Configuration
  ) extends TempClient {

    ...
  }


  object TempClientLive {

    ...

    // the dependencies for this service
    type TempClientLiveDeps = Has[Client[Task]]
      with Has[Console.Service]
      with Has[Clock.Service]
      with Has[Configuration]

    // the layer that can be fed to other services, and which specifies what is needed by this layer
    val layer: URLayer[TempClientLiveDeps, Has[TempClient]] = (TempClientLive(_, _, _, _)).toLayer
  }
}

Follow the previous link for a detailled explanation of this pattern, but the main thing that changed is that we don’t introduce a type alias for the Has[A] part, but just define our module as a simple trait trait TempClient, and implementations of this trait TempClientLive are exposed as layers. In our example you can see that our implementation has dependencies to four other modules (Has[Client[Task], Has[Console.Service], Has[Clock.Service], and Has[Configuration]). You’ll also notice the change here, that we don’t refer types anymore, but explicitly reference the Has[A] type. I like this, since it makes it much more clear that we’re talking about layers and dependencies, and not actual implementations or traits. Since our implementation now is a simple case class we can use the toLayer function to automatically inject the dependencies our service needs when creating the layer.

We’ve already looked at the layers in the previous article, so the only interesting part that is left here is the creation of the stream in the TempClientLive implementation:

  case class TempClientLive(
      client: Client[Task],
      console: Console.Service,
      clock: Clock.Service,
      configuration: Configuration
  ) extends TempClient {

    import org.http4s.circe._
    import io.circe.generic.auto._
    import TempClientLive.OpenWeather._
    implicit val userDecoder = jsonOf[Task, OWResult]

    val tempConfig = configuration.config.temperatureConfig

    override val temperatureStream: ZStream[Any, Throwable, Temperature] =
      // emit one right away and then do the rest in an interval
      (ZStream.succeed(-1L) ++ ZStream.fromSchedule(Schedule.spaced(tempConfig.interval.toJava)))
        .mapM(_ => makeTemperatureCall(tempConfig.endpoint + tempConfig.apiKey))
        .tap { p => console.putStrLn(p.toString()) }
        // We still have to provide the clock for the schedule to eliminate all R
        // and just use everything provided to this service.
        .provide(Has(clock))

    /** Make the call. This requires a client to be in the environment, and returns a string
      */
    private def makeTemperatureCall(url: String): ZIO[Any, Throwable, Temperature] = {
      for {
        // for converting to string, we can use the standard EntityDecoder from HTTP4S together with
        // the zio.interop.cats_ for mapping the Cats stuff to ZIO
        res <- client.expect[OWResult](url)
      } yield (Temperature(res.dt, res.main.temp))
    }
  }

The stream is exposed as a value temperatureStream when we create this service. Basically what we do is we first create a ZStream from a single value ZStream.succeed, and concat that one with the ZStream.fromSchedule(Schedule.spaced(tempConfig.interval.toJava)). The second one will run once every minute, based on the Schedule that we passed in through the configuration dependency. Once we start consuming this stream, at each tick, we use mapM to call makeTemperatureCall, which uses the client dependency to make a REST call, that returns the case class, which the consumer can then process (in our case, the consumer will store it in MongoDB). We use MapM here, since the makeTemperatureCall returns a ZIO:

def mapM[R1 <: R, E1 >: E, O2](f: O => ZIO[R1, E1, O2]): ZStream[R1, E1, O2]

Finally you can see that we use provide(Has(clock)) on the ZStream we created. Doing that eliminates the dependency of the resulting stream on Has[Clock.Service], and results in a value that, when called by the consumer, doesn’t require anything specific in the environment. We could have left out the provide but then the signature of this function would change:

// with provide
val temperatureStream: ZStream[Any, Throwable, Temperature]
// without provide
val temperatureStream: ZStream[Has[Clock.Service], Throwable, Temperature]

It’s debatable whether we should leave the dependency in the signature or remove it. For me I think it is cleaner to pass the dependency during the creation of the Live layer, since it is more implementation focussed, and doesn’t really deal with any business logic needed in the environment (e.g I would add a UserContext explicitly to the trait).

For completeness sake I’ll also show the implementation of the Has[Client] (the HTTP4S client):

object http4sClient {

  object Http4sClientLive {
    val layer: ZLayer[Any, Throwable, Has[Client[Task]]] = {
      implicit val runtime: Runtime[ZEnv] = Runtime.default
      val res = BlazeClientBuilder[Task](runtime.platform.executor.asEC).resource.toManagedZIO
      ZLayer.fromManaged(res)
    }
  }
}

Here it isn’t really useful to create a trait and use construction based injection of dependencies, so we just create the implementation directly when we access the layer.

Storing data in the database

Before we connect all the different parts lets look at the mongoDB stuff. To connect to Mongo we’re going to need a MongoDBClient:

object mongo {

  object MongoDBConnectionLive {

    /** Slightly different approach for when we're using case classes, since we don't really expose
      * the functions on the service, but want to expose a mongoDB connection directly.
      */
    val managedMongoClient: ZManaged[Has[Configuration], Throwable, MongoClient] = for {
      config <- Configuration.config.toManaged_
      mongoClient <- ZManaged.make(acquireConnection(config.dbConfig.endpoint))(releaseConnection(_))
    } yield (mongoClient)

    /** Try and connect the database
      */
    private def acquireConnection(databaseUri: String): Task[MongoClient] = ZIO.fromTry(Try {
      MongoClient(databaseUri)
    })

    /** Release the connection. If an error occurs during releasing, we just ignore it
      * for now. It would probably be better to check the error, whether it can be ignored
      * and log some stuff. But for now this should be enough
      *
      * @param mongoClient the client for which we want to release the connection
      * @return effect that will always succeed
      */
    private def releaseConnection(mongoClient: MongoClient): ZIO[Any, Nothing, Unit] =
      ZIO.fromTry(Try(mongoClient.close)).orElse(ZIO.succeed())

    /** For this component, we're not going to create a case class, since the construction of this layer
      * can fail, and we've got a managed resource for which we want to create a connection.
      */
    val layer: ZLayer[Has[Configuration], Throwable, Has[MongoClient]] = ZLayer.fromManaged(managedMongoClient)
  }
}

The code above should look rather familiar by now. We create a ZLayer from a managed resource. So we specify how to acquire a connection, and what to do when we release it. For the rest nothing that exiting. Once we have the above wrapped resource, we can use it in our repository implementation.

object storage {

  /** The trait for storing.
    */
  trait TemperatureStorage {
    def insert(temperature: Temperature): ZIO[Any, Throwable, Unit]
    def getAll(): ZIO[Any, Throwable, List[Temperature]]
  }

  /** The implementation of temp storage, very naive for now, just to show the different parts connected to one another
    *
    * @param configuration
    */
  case class TemperatureStorageLive(configuration: Configuration, mongoClient: MongoClient) extends TemperatureStorage {

    val temperatureCodecProvider = Macros.createCodecProvider[Temperature]()
    val codecRegistry = fromRegistries(fromProviders(temperatureCodecProvider), DEFAULT_CODEC_REGISTRY)

    /** Try and store the temperature string
      * @param temp element to store
      * @return ZIO containing the result
      */
    override def insert(temperature: Temperature): ZIO[Any, Throwable, Unit] = withCollection[Unit, Temperature] {
      collection =>
        collection
          // insert the entry
          .insertOne(temperature)
          // convert to a single result, since the result is a singleObservable
          .toStream()
          .runHead
          // we should do some more error handling here in a real world scenario
          .flatMap {
            case Some(res) => ZIO.succeed()
            case None      => ZIO.fail(new IllegalArgumentException("Expected result from mongodb"))
          }
    }

    /** Return all the elements we currently have
      *
      * @return list of all the temperatures we've got stored
      */
    override def getAll(): ZIO[Any, Throwable, List[Temperature]] = withCollection[List[Temperature], Temperature] {
      _.find()
        .toStream()
        .runCollect
        .map(_.toList)
    }

    /** Get the database and collection to which to store.
      *
      * @param f function to call within the context of this collection
      * @return result of wrapped
      */
    private def withCollection[A, T: ClassTag](
        f: MongoCollection[T] => ZIO[Any, Throwable, A]
    ): ZIO[Any, Throwable, A] = {
      val collectionZIO = ZIO.fromTry(Try {
        // TODO: we should get the database name at least from the configuration
        mongoClient.getDatabase("sampleservice").withCodecRegistry(codecRegistry).getCollection[T]("temperatures")
      })
      collectionZIO.flatMap(f)
    }
  }

  object TemperatureStorageLive {
    val layer: URLayer[Has[Configuration] with Has[MongoClient], Has[TemperatureStorage]] =
      (TemperatureStorageLive(_, _)).toLayer
  }
}

Disclaimer: There are already other MongoDB clients out there that use ZIO, I just created one from scratch to show how easy it really is to interop with existing libraries and tools.

The code above follows the same module pattern, where we define a trait, a case class implementing the trait, and pass in any required dependencies by using the toLayer function. Most of the code above is just making sure we can work with MongoDB, and the only interesting stuff is happening in the insert and getAll functions.

    override def insert(temperature: Temperature): ZIO[Any, Throwable, Unit] = withCollection[Unit, Temperature] {
      collection =>
        collection
          // insert the entry
          .insertOne(temperature)
          // convert to a single result, since the result is a singleObservable
          .toStream()
          .runHead
          // we should do some more error handling here in a real world scenario
          .flatMap {
            case Some(res) => ZIO.succeed()
            case None      => ZIO.fail(new IllegalArgumentException("Expected result from mongodb"))
          }
    }

When you look at the insertOne signature:

 def insertOne(document: TResult): SingleObservable[InsertOneResult] = wrapped.insertOne(document)

You’ll see that it returns a SingleObservable. This type is a trait defined like this:

trait SingleObservable[T] extends Observable[T] { ... }
trait Observable[T] extends Publisher[T] { ... }

Publisher is part of the org.reactivestreams library and provides a generic interface for integration different kinds of streams. This means that we can easily convert the result from the insertOne call to a ZStream by just calling toStream() (which is a function provided by ZIO). The result is that we’ve got a ZStream which will always just return a single element. To get this element we call the runHead function, which will return a ZIO[Option[T]], which we process in the normal way. And that’s already it, without any complex integration we can easily reuse existing libraries.

The getAll function is pretty much the same:

    override def getAll(): ZIO[Any, Throwable, List[Temperature]] = withCollection[List[Temperature], Temperature] {
      _.find()
        .toStream()
        .runCollect
        .map(_.toList)

Here we convert the resulting stream from the Mongo driver to a ZStream, collect all the elements in the stream using runCollect, and finally map the Chunk[T] which we get back from the runCollect to a List[T]. I was really impressed with how easy working with ZStream is, the interoperability with existing libraries, and the interaction between the ZIO types and the ZStream types.

The rest endpoint to get all the data

So we’ve covered pretty much everything I wanted to explain in this article. The only thing left is the REST route to access the stored data. For this we first define some routes:

object Routes {

  trait TemperatureRoute {
    val routes: HttpRoutes[Task]
  }

  object TemperatureRoute {
    // helper function which access the correct resource from our environment, and lifts
    // it in an effect.
    val temperatureRoutes: URIO[Has[TemperatureRoute], HttpRoutes[Task]] = ZIO.access(_.get.routes)
  }

  object TemperatureRouteLive {

    private val dsl = Http4sDsl[Task]
    import dsl._
    import io.circe.generic.auto._, io.circe.syntax._
    import org.http4s.dsl.Http4sDsl
    import org.http4s.circe._
    import zio.interop.catz._

    /** A simple layer which returns the routes for the temperature.
      */
    val layer: ZLayer[Has[storage.TemperatureStorage], Nothing, Has[TemperatureRoute]] =
      ZLayer.fromService { storage =>
        new TemperatureRoute {

          val routes = HttpRoutes
            .of[Task] {
              case GET -> Root / "temperatures" => {
                storage.getAll().flatMap { all => Ok(all.asJson) }
              }
            }
        }
      }
  }
}

As you can see I defined these as a separate layer, which requires access to the database to get the list of the stored data. This way we can just define the routes in isolation, and to add them we just mark them as dependency for the HTTP4S server.

With all this out of the way, we can start the server and make an HTTP call to get the (very boring looking) data.

combining everything

To start this, we first setup the layers and then define a simple program:

object Application extends zio.App {

  // combine the configuration layer and the standard environment into
  // a new layer.
  val defaultLayer = ConfigurationLive.layer ++ ZEnv.live

  // the storage layer
  val storageLayer = defaultLayer >+> db.mongo.MongoDBConnectionLive.layer >>> storage.TemperatureStorageLive.layer
  val httpsLayer =
    storageLayer >+> TemperatureRouteLive.layer ++ defaultLayer >+> http4sServer.Http4sServerLive.layer ++ http4sClient.Http4sClientLive.layer

  // combine the layers into a single layer to feed into the program
  val applicationLayer =
    httpsLayer >+> tempClient.TempClientLive.layer ++ storageLayer

  /** Provide the layer to the program and run it
    *
    * @param args
    * @return
    */
  override def run(args: List[String]): URIO[ZEnv, ExitCode] =
    pp.provideLayer(applicationLayer).exitCode

  /** Helper type to indicate which dependencies are needed to run the program
    */
  type ProgramDeps = Has[Server]
    with Has[storage.TemperatureStorage]
    with Has[tempClient.TempClient]
    with Has[Client[Task]]
    with zio.clock.Clock
    with zio.console.Console

  /** The program that starts polling the https endpoint and write the results to
    * the database.
    */
  val pp: ZIO[ProgramDeps, Throwable, Unit] = {
    for {
      // start the stream, which polls the temperature endpoint
      stream <- tempClient.TempClient.temperatureStream
      mapped = stream.mapM(temperatureString =>
        ZIO.serviceWith[storage.TemperatureStorage](_.insert(temperatureString))
      )
      _ <- mapped.runDrain
    } yield ()
  }
}

In this program the only thing we do is, that we get the stream of temperature updates (tempClient.TempClient.temperatureStream), and for each element in the stream we define that we want to store it in database using the mapM function. We run this stream by calling runDrain, which will just keep waiting for new elements in the stream and process those.

When we run this, we see output like this:

[zio-default-async-1] INFO org.mongodb.driver.cluster - Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms'}
[cluster-ClusterId{value='60ad3ce890de0d65166e20ab', description='null'}-localhost:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:2, serverValue:7}] to localhost:27017
[cluster-ClusterId{value='60ad3ce890de0d65166e20ab', description='null'}-localhost:27017] INFO org.mongodb.driver.cluster - Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=8, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=18662711}
[cluster-rtt-ClusterId{value='60ad3ce890de0d65166e20ab', description='null'}-localhost:27017] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:1, serverValue:8}] to localhost:27017
[zio-default-async-5] INFO org.http4s.blaze.channel.nio1.NIO1SocketServerGroup - Service bound to address /127.0.0.1:8081
[zio-default-async-5] INFO org.http4s.server.blaze.BlazeServerBuilder -
  _   _   _        _ _
 | |_| |_| |_ _ __| | | ___
 | ' \  _|  _| '_ \_  _(_-<
 |_||_\__|\__| .__/ |_|/__/
             |_|
[zio-default-async-5] INFO org.http4s.server.blaze.BlazeServerBuilder - http4s v1.0.0-M4 on blaze v0.14.13 started at http://127.0.0.1:8081/
Temperature(1621966031,285.05)
[InnocuousThread-2] INFO org.mongodb.driver.connection - Opened connection [connectionId{localValue:3, serverValue:9}] to localhost:27017

And when we call into the webservice the result is something like this:

$ curl -v localhost:8081/temperatures
*   Trying 127.0.0.1:8081...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8081 (#0)
> GET /temperatures HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.68.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: application/json
< Date: Tue, 25 May 2021 18:08:54 GMT
< Content-Length: 780
<
* Connection #0 to host localhost left intact
[{"timestamp":1621876805,"temperature":284.12},{"timestamp":1621876918,"temperature":284.1},{"timestamp":1621876805,"temperature":284.12},{"timestamp":1621876805,"temperature":284.12},{"timestamp":1621876918,"temperature":284.1},{"timestamp":1621876918,"temperature":284.1},{"timestamp":1621877523,"temperature":283.79},{"timestamp":1621877523,"temperature":283.79},{"timestamp":1621877644,"temperature":283.76},{"timestamp":1621877644,"temperature":283.76},{"timestamp":1621877644,"temperature":283.76},{"timestamp":1621877644,"temperature":283.76},{"timestamp":1621965947,"temperature":285.05},{"timestamp":1621965947,"temperature":285.05},{"timestamp":1621966031,"temperature":285.05},{"timestamp":1621966031,"temperature":285.05},{"timestamp":1621966031,"temperature":285.05}]

Updated: