Webflux with Kotlin and Arrow

11 minute read

A month into the new year and I’m already lagging behind my two articles per month goal. So for Februari I’m going to try and do three articles. To kick of the month in this article we’re going to look into a setup for using kotlin together with webflux and arrow, to create a simple basic REST service. Nothing too fancy yet, just a simple setup.

The code for this example can be found in github: https://github.com/josdirksen/arrow-flux, and I’ll skip the basic gradle stuff for now and just show how everything is connected to each other.

Application structure

For the application structure we’ll use the most simple layered approach we can think off:

  • REST layer: Based on Spring Webflux, which just offers some JSON endpoints.
  • Service layer: Plain kotlin service layer, no spring dependencies, just simply orchestrates the calls to the various repositories.
  • Repository layer: We’ll use mongoDB for storing the data, since that also provides a reactive API.

More information on how to setup gradle, and import the correct libraries, see here: kotlin-arrow-typeclasses

Repository layer

We’ll start at the bottom of the stack, and look at how we define our repositories.

/**
 * @param F Wrapper for the single results e.g a ForMonoK or a ForId
 * @param S Wrapper for the streaming results e.g a ForFluxK or a ForListK
 */
interface ItemRepository<F, S> {

    fun storeItem(item: Item): Kind<F, Item>
    fun getItem(itemId: UUID): Kind<F, Item>
    fun getAllItems(): Kind<F, List<Item>>
    fun getAllItemsStreaming(): Kind<S, Item>
}

The previous code fragment shows how we can define an interface to a repository. This repository allows us to retrieve Items wher an item is just a simple data class: data class Item(val id: UUID = UUID.randomUUID(), val name: String, val description: String). The repository is parameterized with two types T ans S. Which are used to define the box in which the items are returned. For this project the idea is to define the F for normal results, and the S as the type for streaming results. But you don’t have to use async types, for instance for testing we could just use the Id monad. For this simple service, though, we’ll implement it with Arrow’s wrappers around a Mono and a Flux. A Mono returns a single async result and is done after that single result, a Flux can return multiple results. I won’t go to deep into the details of these classes here, since that’s a bit out of scope for this article. More information on this can be found in the execellent reactor documentation: Mono-Flux

Before we look a bit closer at how Arrow can help us in the implementation and working with Mono and Flux, a quick note on using Kind<F, Item>. The reason we need to do this, is that Kotlin doesn’t allow us to specify that the F and the S types we define in the interface have to be type constructors. To work around that problem Arrow introduces the Kind<A, B> construct which allows us to define type constructors in a different way. In our example it means that the type F is a Box with an Item. More on this can be found here: https://arrow-kt.io/docs/patterns/glossary/#type-constructors

One of the great features of Arrow is that it allows us to do scala like for-comprehension on the monads that is provides. This allows us to write clean code, instead of going into heavily nested flatmap-map statements. For instance lets say that we’ve got a number of optional values (e.g. value in kotlin that are nullable, since I don’t find working with nullables in Kotlin intuitive, I’d usually just wrap them in an arrow Option).

As an example we’ll take the Item data class. Say that I’m getting the fields as nullables, and want to create an Item from it. The code you can use is something like this:

val optionUUID: Option<UUID> = ...
val optionName: Option<String> = ...
val optionDescription: Option<String> = ...

val optionItem = optionUUID.flatMap { uuid ->
    optionName.flatMap { name ->
        optionDescription.map { desc ->
            Item(uuid, name, desc)
        }
    }
}

This will result in an Option<Item>, but as more steps are added, and the nesting becomes deeper, this is really hard to read and reason about. With Arrow, we’ve also got something called a binding. With this we can rewrite the above piece of code to this:

val optionItem = binding {
    val uuid = optionUUID.bind()
    val name = optionName.bind()
    val description = optionName.bind()

    Item(uuid, name, description)
}

Which is much more readable, and accomplishes exactly the same. More on using binding (and bindingCatch, which provides a good way when interacting with code that can throw exceptions), can be found in the arrow documentation: https://arrow-kt.io/docs/patterns/monad_comprehensions/, but the previous code fragment nicely captures the general idea. Arrow provides a binding for all of its data types, but also provides extensions for a couple of external libraries. One of them being Reactor which is where the Flux and Mono classes come from. With Arrow we get a MonoK and a FluxK that wrap the Mono and Flux classes, and allows you to use the same binding approach.

For me, at least, this allows us to write cleaner code, avoid unexpected exceptions, and avoid nesting flatMap. An example from the arrow documentation nicely shows this. Without using the arrow wrappers we’ve got this:

getSongUrlAsync()
  .map { songUrl -> MediaPlayer.load(songUrl) }
  .flatMap {
    val totalTime = musicPlayer.getTotaltime()
    Flux.interval(Duration.ofMillis(100))
      .flatMap {
        Flux.create { musicPlayer.getCurrentTime() }
          .map { tick -> (tick / totalTime * 100).toInt() }
      }
      .takeUntil { percent -> percent >= 100 }
  }

And with the arrow wrappers we get this:

bindingCatch {
  val (songUrl) = getSongUrlAsync()
  val musicPlayer = MediaPlayer.load(songUrl)
  val totalTime = musicPlayer.getTotaltime()
    
  val end = DirectProcessor.create<Unit>()
  Flux.interval(Duration.ofMillis(100)).takeUntilOther(end).bind()
    
  val (tick) = musicPlayer.getCurrentTime()
  val percent = (tick / totalTime * 100).toInt()
  if (percent >= 100) {
    end.onNext(Unit)
  }
  percent
}

Much more readable, and even catches unexpected exceptions. So with this small sidestep, back to the repository. We’re going to implement it by storing the data in mongodb. The implementation for this looks like this:

class ReactiveItemRepository(val mongoTemplate: ReactiveMongoTemplate) : ItemRepository<ForMonoK, ForFluxK>, ReactiveMongoOperations by mongoTemplate {

    val ITEM_COLLECTION_NAME = "items"

    override fun getItem(itemId: UUID): Kind<ForMonoK, Item> = 
        StoreUtil.asMono {
            findById(itemId, Item::class.java, ITEM_COLLECTION_NAME)
                .errorIfEmpty(ITEM_COLLECTION_NAME, itemId.toString())
        }

    override fun getAllItems(): Kind<ForMonoK, List<Item>> =
        StoreUtil.asMono {
            findAll(Item::class.java, ITEM_COLLECTION_NAME)
                .collectList()
        }

    override fun storeItem(order: Item): Kind<ForMonoK, Item> =
        StoreUtil.asMono {
            insert(order, ITEM_COLLECTION_NAME)
                .mapToError(ITEM_COLLECTION_NAME, order.id.toString())
        }

    override fun getAllItemsStreaming(): Kind<ForFluxK, Item> =
        StoreUtil.asFlux {
            findAll(Item::class.java, ITEM_COLLECTION_NAME)
        }
}

We use the Arrow provided ForMonoK and ForFluxK types. These are defined as a typealias like this: typealias MonoKOf<A> = arrow.Kind<ForMonoK, A>. Now we can just have our functions return a MonoK<T>. The result from the functions on the mongoTemplate are either a Mono or a Flux, to convert these to the relevant MonoK or FluxK we’ve got a simple helper in the StoreUtil class:

    fun <T>asMono(thunk: () -> Mono<T>): MonoK<T> {
        return monokBindingCatch {
            thunk().k().bind()
        }
    }

    fun <T>asFlux(thunk: () -> Flux<T>): FluxK<T> {
        return fluxkBindingCatch {
            thunk().k().bind()
        }
    }

We’ve also use some other helper functions, to map the Mongo specific errors to our own domain specific errors. Since a Mono and a Flux can carry these errors, there is no need to wrap the result of these functions in an Either or a Try.

Service layer

Since we don’t want to call the reposity layer directly, lets wrap it with a (in the example kind of useless) service layer. First the interface and the implementation, which are very similar to the repository we just saw:

interface ItemService<F> {

    fun getItem(itemId: UUID): Kind<F, Item>
    fun createItem(order: Item): Kind<F, Item>
    fun getAllItems(): Kind<F, List<Item>>
}

class ReactiveItemService(val itemRepository: ItemRepository<ForMonoK>) :
    ItemService<ForMonoK> {

    override fun getAllItems(): Kind<ForMonoK, List<Item>> = itemRepository.getAllItems()
    override fun createItem(item: Item): Kind<ForMonoK, Item> = itemRepository.storeItem(item)
    override fun getItem(itemId: UUID): Kind<ForMonoK, Item> = itemRepository.getItem(itemId)
}

Since we don’t really have any business logic, connect to other components, brokers etc. or need to combine results from multiple repositories, this is a really basic implementation. But lets pretend we also want to create the following function:

fun updateItemIExistsAndReturnAllItems(toUpdate: UUID, description: String): Kind<F, Item>

An implementation of this function would look something like this:

    override fun updateItemIExistsAndReturnAllItems(toUpdate: UUID, description: String): Kind<ForMonoK, List<Item>> = binding {
        val existingItem = itemRepository.getItem(toUpdate).bind()
        val updatedItem = existingItem.copy(description = description)
        val storedItem = itemRepository.storeItem(updatedItem).bind()

        itemRepository.getAllItems().bind()
    }

As you can see, just a sequence of steps, even though all the steps themselves are using reactor elements. If one of the intermediary steps fails, the Mono as a whole will fail. With this approach we can write clean, easily understandable code, without having to deal with coroutines, or reactor internals ourselves.

Rest layer with WebFlux

The final step is exposing the service throug a REST api. For this demo I’ve just Spring Webflux, since that nicely ties into the whole reactor ecosystem. Let’s start with the code:

object ItemsRoute {

    const val ITEMS_PATH = "items"
    const val PATH_ID = "id"

    class ItemRoutes(itemsRouteHandler: ItemsRouteHandler) {

        val route: RouterFunction<ServerResponse> = router {

            (path("$BASE_ROUTE/$ITEMS_PATH") and accept(APPLICATION_JSON)).nest {
                POST("", itemsRouteHandler::createOrder)
                GET("", itemsRouteHandler::getAllOrders)
                GET("/{$PATH_ID}", itemsRouteHandler::getOrder)
            }
        }
    }

    class ItemsRouteHandler(val itemService: ItemService<ForMonoK, ForFluxK>) {

        fun getAllOrders(request: ServerRequest): Mono<ServerResponse> = okJsonList {
            itemService.getAllItems().fix()
        }

        fun getOrder(request: ServerRequest): Mono<ServerResponse> = okJson {
            binding {
                val uuid = pathParamToUUID(request, PATH_ID).bind()
                itemService.getItem(uuid).bind()
            }
        }

        fun createOrder(request: ServerRequest): Mono<ServerResponse> = okJson {
            binding {
                val item = request.bodyToMono<Item>().k().bind()
                itemService.createItem(item).bind()
            }
        }
    }
}

Based on what we’ve seen so far, this shouldn’t be too difficult to understand, since everywhere throughout this example we use the same approach:

  1. Make sure we get MonoK instances.
  2. Sequence them through a binding
  3. Return the result of the last bind, which result in a new MonoK

There are a couple of specifics here, for instance the helper okJson function:

    inline fun <reified T>okJson(p: () -> MonoK<T>) = ServerResponse.ok()
        .contentType(MediaType.APPLICATION_JSON)
        .body(BodyInserters
            .fromPublisher(p().mono, T::class.java))

Which just return a 200 and converts the result from the MonoK using the default marshallers. Another custom piece is the val uuid = pathParamToUUID(request, PATH_ID).bind() part. Where we convert an incoming parameter to a MonoK.

    fun pathParamToUUID(request: ServerRequest, param: String): MonoK<UUID> = Mono
            .just(request.pathVariable(param))
            .map { UUID.fromString(it) }
            .onErrorMap { t -> PropertyValidationException(
                invalidProperty = param,
                cause = t,
                message = "Parameter {$param} can't be converted to a UUID")
            }.k()

Here we get the variable from the path (which we wrap in a normal Mono). If it fails, we map it to an exception, finally, we use the k() function to convert the Mono to a MonoK. The result from this validation step is then used in a sequence of MonoKs:

binding {
    val uuid = pathParamToUUID(request, PATH_ID).bind()
    itemService.getItem(uuid).bind()
}

Another important part is where we define the route:

val route: RouterFunction<ServerResponse> = router {

    (path("$BASE_ROUTE/$ITEMS_PATH") and accept(APPLICATION_JSON)).nest {
        POST("", itemsRouteHandler::createOrder)
        GET("", itemsRouteHandler::getAllOrders)
        GET("/{$PATH_ID}", itemsRouteHandler::getOrder)
    }
}

This maps the url path to the function, that calls into a service.

Setup the application

So far we’ve pretty much avoided Spring and Spring Boot. We just define services and use constructors to inject the correct depedencies. Webflux, however, does require a spring bean context to tie everything together. Since I don’t really like having all kinds of annotations and spring specific stuff in the core of the application, we only use Spring in this setup to connect the beans together.

class Application {

    val log = LoggerFactory.getLogger(Application::class.java)

    private val httpHandler: HttpHandler
    private val server: HttpServer

    /**
     * Setup the http server. Delegates to the bean named "webHandler" defined by
     * the beans function.
     */
    constructor() {

        // setup the context containing all our spring stuff
        val context = GenericApplicationContext().apply {
            allBeans().initialize(this)
            refresh()
        }
        server = HttpServer.create().port(Config.SERVER_PORT)

        // create the http handler, and assign filters
        httpHandler = WebHttpHandlerBuilder
                .applicationContext(context)
                .build()
    }

    fun startAndAwait() {
        server.handle(ReactorHttpHandlerAdapter(httpHandler))
            .bindUntilJavaShutdown(Duration.ofSeconds(Config.SERVER_SHUTDOWN_TIMEOUT)) { t ->
            log.info("Server Started")
        }
    }
}

/**
 * Starts the service, and waits until shutdown signal is received.
 */
fun main(args: Array<String>) {
    Application().startAndAwait()
}

The setup which you see above, starts a simple server. The important part of this is the httpHandler = WebHttpHandlerBuilder.applicationContext(context).build(), which uses the supplied application context to create the reactive HTTP handler. To create this application context you can use any approach that Spring provides. For Kotlin, Spring provides a more functional DSL, to declaratively setup this context. We do this in the allbeans() function that ius called when this class is instantiated. This function looks like this:

fun allBeans() = beans {
    routeBeans(this)
    filterBeans(this)
    serviceBeans(this)
    repoBeans(this)
    errorHandlerBeans(this)
}

fun filterBeans(ctx: BeanDefinitionDsl) = with (ctx) {
    bean { CorsWebFilter { CorsConfiguration().applyPermitDefaultValues() } }
}

fun routeBeans(ctx: BeanDefinitionDsl) = with (ctx) {
    bean(WebHttpHandlerBuilder.WEB_HANDLER_BEAN_NAME) {
        RouterFunctions.toWebHandler(ref<ItemRoutes>().route, HandlerStrategies.withDefaults())
    }

    bean<ItemRoutes>()
    bean<ItemsRouteHandler>()
}

fun serviceBeans(ctx: BeanDefinitionDsl) = with (ctx) {
    bean<ReactiveItemService>()
}

fun repoBeans(ctx: BeanDefinitionDsl) = with (ctx) {
    bean { ReactiveMongoTemplate(ref(), Config.DATABASE_NAME) }
    bean<MongoClient> { MongoClients
        .create(
            MongoClientSettings.builder()
                .applyToClusterSettings {it.hosts(listOf(ServerAddress(Config.DATABASE_HOST, Config.DATABASE_PORT)))}
                .build())
    }
    bean<ReactiveItemRepository>()
}

fun errorHandlerBeans(ctx: BeanDefinitionDsl) = with (ctx) {
    bean<ErrorWebFluxAutoConfiguration>()
    bean<ErrorWebExceptionHandler> {
        ref<ErrorWebFluxAutoConfiguration>().errorWebExceptionHandler(ref<DefaultErrorAttributes>())
    }

    bean<DefaultErrorAttributes> {
        ErrorMapping.registerExceptions()
        object: DefaultErrorAttributes() {
             override fun getError(request: ServerRequest?): Throwable {
                 val originalError = super.getError(request)
                 val originalErrorClass = originalError.javaClass

                 return ErrorMapping.mappings.get(originalErrorClass)?.invoke(originalError) ?: originalError
            }
        }
    }
    bean<ResourceProperties>()
    bean<ErrorProperties>()
    bean<ServerProperties>()
    bean<DefaultServerCodecConfigurer>()
}

Here we use the bean<> and ref() functions to create all the beans which are needed to setup the correct a WebFlux application. Note that there is also a spring-boot way of doing this, but I decided that I wanted to have control on how everything was set up, so created the needed beans using this approach.

Conclusion

I haven’t added the tests yet to the github project so I’ll get back to that in a later article. Easiest way to learn a bit more on how these tools work together might be to just check out the sample: https://github.com/josdirksen/arrow-flux, and play around with it a bit.

Should you have any questions, just let me know.

Updated: