Reactive Database Access – Part 2 – Actors

We’re very happy to continue our a guest post series on the jOOQ blog by Manuel Bernhardt. In this blog series, Manuel will explain the motivation behind so-called reactive technologies and after introducing the concepts of Futures and Actors use them in order to access a relational database in combination with jOOQ.

manuel-bernhardtManuel Bernhardt is an independent software consultant with a passion for building web-based systems, both back-end and front-end. He is the author of “Reactive Web Applications” (Manning) and he started working with Scala, Akka and the Play Framework in 2010 after spending a long time with Java. He lives in Vienna, where he is co-organiser of the local Scala User Group. He is enthusiastic about the Scala-based technologies and the vibrant community and is looking for ways to spread its usage in the industry. He’s also scuba-diving since age 6, and can’t quite get used to the lack of sea in Austria.

This series is split in three parts, which we’ll publish over the next month:

Introduction

In our last post we introduced the concept of reactive applications, explained the merits of asynchronous programming and introduced Futures, a tool for expressing and manipulating asynchronous values.

In this post we will look into another tool for building asynchronous programs based on the concept of message-driven communication: actors.

The Actor-based concurrency model was popularized by the Erlang programming language and its most popular implementation on the JVM is the Akka concurrency toolkit.

In one way, the Actor model is object-orientation done “right”: the state of an actor can be mutable, but it is never exposed directly to the outside world. Instead, actors communicate with each other on the basis of asynchronous message-passing in which the messages themselves are immutable. An actor can only do one of three things:

  • send and receive any number of messages
  • change its behaviour or state in response to a message arriving
  • start new child actors

It is always in the hands of an actor to decide what state it is ready to share, and when to mutate it. This model therefore makes it much easier for us humans to write concurrent programs that are not riddled with race-conditions or deadlocks that we may have introduced by accidentally reading or writing outdated state or using locks as a means to avoid the latter.

In what follows we are going to see how Actors work and how to combine them with Futures.

Actor fundamentals

Actors are lightweight objects that communicate with eachother by sending and receiving messages. Each actor has amailbox in which incoming messages are queued before they get processed.

Two actors talking to each other

Actors have different states: they can be started, resumed, stopped and restarted. Resuming or restarting an actor is useful when an actor crashes as we will see later on.

Actors also have an actor reference which is a means for one actor to reach another. Like a phone number, the actor reference is a pointer to an actor, and if the actor were to be restarted and replaced by a new incarnation in case of crash it would make no difference to other actors attempting to send messages to it since the only thing they know about the actor is its reference, not the identity of one particular incarnation.

Sending and receiving messages

Let’s start by creating a simple actor:

import akka.actor._

class Luke extends Actor {
  def receive = {
    case _ => // do nothing
  }
}

This is really all it takes to create an actor. But that’s not very interesting. Let’s spice things up a little and define a reaction to a given message:

import akka.actor._

case object RevelationOfFathership

class Luke extends Actor {
  def receive = {
    case RevelationOfFathership =>
      System.err.println("Noooooooooo")
  }
}   

Here we go! RevelationOfFathership is a case object, i.e. an immutable message. This last detail is rather important: your messages should always be self-contained and not referencing the internal state of any actor since this would effectively leak this state to the outside, hence breaking the guarantee that only an actor can change its internal state. This last bit is paramount for actors to offer a better, more human-friendly concurrency model and for not getting any surprises.

Now that Luke knows how to appropriately respond to the inconvenient truth that Dark Vader is his father, all we need is the dark lord himself.

import akka.actor._

class Vader extends Actor {

  override def preStart(): Unit =
    context.actorSelection("akka://application/user/luke") ! RevelationOfFathership

  def receive = {
    case _ => // ...
  }
}

The Vader actor uses the preStart lifecycle method in order to trigger sending the message to his son when he gets started up. We’re using the actor’s context in order to send a message to Luke.

The entire sequence for running this example would look as follows:

import akka.actor._

val system = ActorSystem("application")
val luke = system.actorOf(Props[Luke], name = "luke")
val vader = system.actorOf(Props[Vader], name = "vader")

The Props are a means to describe how to obtain an instance of an actor. Since they are immutable they can be freely shared, for example accross different JVMs running on different machines (this is useful for example when operating an Akka cluster).

Actor supervision

Actors do not merely exist in the wild, but instead are part of an actor hierarchy and each actor has a parent. Actors that we create are supervised by the User Guardian of the application’s ActorSystem which is a special actor provided by Akka and responsible for supervising all actors in user space. The role of a supervising actor is to decide how to deal with the failure of a child actor and to act accordingly.

The User Guardian itself is supervised by the Root Guardian (which also supervises another special actor internal to Akka), and is itself supervised by a special actor reference. Legend says that this reference was there before all other actor references came into existence and is called “the one who walks the bubbles of space-time” (if you don’t believe me, check the official Akka documentation).

Organizing actors in hierarchies offers the advantage of encoding error handling right into the hierarchy. Each parent is responsible for the actions of their children. Should something go wrong and a child crash, the parent would have the opportunity to restart it.

Vader, for example, has a few storm troopers:

import akka.actor._
import akka.routing._

class Vader extends Actor {

  val troopers: ActorRef = context.actorOf(
    RoundRobinPool(8).props(Props[StromTrooper])
  )
}

The RoundRobinPool is a means of expressing the fact that messages sent to troopers will be sent to each trooper child one after the other. Routers encode strategies for sending messages to several actors at once, Akka provides many predefined routers.

Crashed Stormtroopers

Ultimately, actors can crash, and it is then the job of the supervisor to decide what to do. The decision-making mechanism is represented by a so-called supervision strategy. For example, Vader could decide to retry restarting a storm trooper 3 times before giving up and stopping it:

import akka.actor._

class Vader extends Actor {

  val troopers: ActorRef = context.actorOf(
    RoundRobinPool(8).props(Props[StromTrooper])
  )

  override def supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3) {
      case t: Throwable =>
        log.error("StormTrooper down!", t)
        SupervisorStrategy.Restart
    }
}

This supervision strategy is rather crude since it deals with all types of Throwable in the same fashion. We will see in our next post that supervision strategies are an effective means of reacting to different types of failures in different ways.

Combining Futures and Actors

There is one golden rule of working with actors: you should not perform any blocking operation such as for example a blocking network call. The reason is simple: if the actor blocks, it can not process incoming messages which may lead to a full mailbox (or rather, since the default mailbox used by actors is unbounded, to an OutOfMemoryException.

This is why it may be useful to be able to use Futures within actors. The pipe pattern is designed to do just that: it send the result of a Future to an actor:

import akka.actor._
import akka.pattern.pipe

class Luke extends Actor {
  def receive = {
    case RevelationOfFathership =>
      sendTweet("Nooooooo") pipeTo self
    case tsr: TweetSendingResult =>
      // ...
  }

  def sendTweet(msg: String): Future[TweetSendingResult] = ...
}  

In this example we call the sendTweet Future upon reception of the RevelationOfFathership and use the pipeTo method to indicate that we would like the result of the Future to be sent to ourselves.

There is just one problem with the code above: if the Future were to fail, we would receive the failed throwable in a rather inconvenient format, wrapped in a message of type akka.actor.Status.Failure, without any useful context. This is why it may be more appropriate to recover failures before piping the result:

import akka.actor._
import akka.pattern.pipe
import scala.control.NonFatal

class Luke extends Actor {
  def receive = {
    case RevelationOfFathership =>
      val message = "Nooooooo"
      sendTweet(message) recover {
        case NonFatal(t) => TweetSendFailure(message, t)
      } pipeTo self
    case tsr: TweetSendingResult =>
      // ...
    case tsf: TweetSendingFailure =>
      // ...
  }

  def sendTweet(msg: String): Future[TweetSendingResult] = ...
}   

With this failure handling we now know which message failed to be sent on Twitter and can take an appropriate action (e.g. re-try sending it).

That’s it for this short introduction to Actors. In the next and last post of this series we will see how to use Futures and Actors in combination for reactive database access.

Read on

Stay tuned as we’ll publish Part 3 shortly as a part of this series:

Reactive Database Access – Part 1 – Why “Async”

We’re very happy to announce a guest post series on the jOOQ blog by Manuel Bernhardt. In this blog series, Manuel will explain the motivation behind so-called reactive technologies and after introducing the concepts of Futures and Actors use them in order to access a relational database in combination with jOOQ.

manuel-bernhardtManuel Bernhardt is an independent software consultant with a passion for building web-based systems, both back-end and front-end. He is the author of “Reactive Web Applications” (Manning) and he started working with Scala, Akka and the Play Framework in 2010 after spending a long time with Java. He lives in Vienna, where he is co-organiser of the local Scala User Group. He is enthusiastic about the Scala-based technologies and the vibrant community and is looking for ways to spread its usage in the industry. He’s also scuba-diving since age 6, and can’t quite get used to the lack of sea in Austria.

This series is split in three parts, which we’ll publish over the next month:

Reactive?

The concept of Reactive Applications is getting increasingly popular these days and chances are that you have already heard of it someplace on the Internet. If not, you could read the Reactive Manifesto or we could perhaps agree to the following simple summary thereof: in a nutshell, Reactive Applications are applications that:

  • make optimal use of computational resources (in terms of CPU and memory usage) by leveraging asynchronous programming techniques
  • know how to deal with failure, degrading gracefully instead of, well, just crashing and becoming unavailable to their users
  • can adapt to intensive workloads, scaling out on several machines / nodes as load increases (and scaling back in)

Reactive Applications do not exist merely in a wild, pristine green field. At some point they will need to store and access data in order to do something meaningful and chances are that the data happens to live in a relational database.

Latency and database access

When an application talks to a database more often than not the database server is not going to be running on the same server as the application. If you’re unlucky it might even be that the server (or set of servers) hosting the application live in a different data centre than the database server. Here is what this means in terms of latency:

Latency numbers every programmer should know

Say that you have an application that runs a simple SELECT query on its front page (let’s not debate whether or not this is a good idea here). If your application and database servers live in the same data centre, you are looking at a latency of the order of 500 µs (depending on how much data comes back). Now compare this to all that your CPU could do during that time (all those green and black squares on the figure above) and keep this in mind – we’ll come back to it in just a minute.

The cost of threads

Let’s suppose that you run your welcome page query in a synchronous manner (which is what JDBC does) and wait for the result to come back from the database. During all of this time you will be monopolizing a thread that waits for the result to come back. A Java thread that just exists (without doing anything at all) can take up to 1 MB of stack memory, so if you use a threaded server that will allocate one thread per user (I’m looking at you, Tomcat) then it is in your best interest to have quite a bit of memory available for your application in order for it to still work when featured on Hacker News (1 MB / concurrent user).

Reactive applications such as the ones built with the Play Framework make use of a server that follows the evented server model: instead of following the “one user, one thread” mantra it will treat requests as a set of events (accessing the database would be one of these events) and run it through an event loop:

Evented server and its event loop

Such a server will not use many threads. For example, default configuration of the Play Framework is to create one thread per CPU core with a maximum of 24 threads in the pool. And yet, this type of server model can deal with many more concurrent requests than the threaded model given the same hardware. The trick, as it turns out, is to hand over the thread to other events when a task needs to do some waiting – or in other words: to program in an asynchronous fashion.

Painful asynchronous programming

Asynchronous programming is not really new and programming paradigms for dealing with it have been around since the 70s and have quietly evolved since then. And yet, asynchronous programming is not necessarily something that brings back happy memories to most developers. Let’s look at a few of the typical tools and their drawbacks.

Callbacks

Some languages (I’m looking at you, Javascript) have been stuck in the 70s with callbacks as their only tool for asynchronous programming up until recently (ECMAScript 6 introduced Promises). This is also knows as christmas tree programming:

Christmas tree of hell

Ho ho ho.

Threads

As a Java developer, the word asynchronous may not necessarily have a very positive connotation and is often associated to the infamous synchronized keyword:

Working with threads in Java is hard, especially when using mutable state – it is so much more convenient to let an underlying application server abstract all of the asynchronous stuff away and not worry about it, right? Unfortunately as we have just seen this comes at quite a hefty cost in terms of performance.

And I mean, just look at this stack trace:

Abstraction is the mother of all trees

In one way, threaded servers are to asynchronous programming what Hibernate is to SQL – a leaky abstraction that will cost you dearly on the long run. And once you realize it, it is often too late and you are trapped in your abstraction, fighting by all means against it in order to increase performance. Whilst for database access it is relatively easy to let go of the abstraction (just use plain SQL, or even better, jOOQ), for asynchronous programming the better tooling is only starting to gain in popularity.

Let’s turn to a programming model that finds its roots in functional programming: Futures.

Futures: the SQL of asynchronous programming

Futures as they can be found in Scala leverage functional programming techniques that have been around for decades in order to make asynchronous programming enjoyable again.

Future fundamentals

A scala.concurrent.Future[T] can be seen as a box that will eventually contain a value of type T if it succeeds. If it fails, theThrowable at the origin of the failure will be kept. A Future is said to have succeeded once the computation it is waiting for has yielded a result, or failed if there was an error during the computation. In either case, once the Future is done computing, it is said to be completed.

Welcome to the Futures

As soon as a Future is declared, it will start running, which means that the computation it tries to achieve will be executed asynchronously. For example, we can use the WS library of the Play Framework in order to execute a GET request against the Play Framework website:

val response: Future[WSResponse] = 
  WS.url("http://www.playframework.com").get()

This call will return immediately and lets us continue to do other things. At some point in the future, the call may have been executed, in which case we could access the result to do something with it. Unlike Java’s java.util.concurrent.Future<V>which lets one check whether a Future is done or block while retrieving it with the get() method, Scala’s Future makes it possible to specify what to do with the result of an execution.

Transforming Futures

Manipulating what’s inside of the box is easy as well and we do not need to wait for the result to be available in order to do so:

val response: Future[WSResponse] = 
  WS.url("http://www.playframework.com").get()

val siteOnline: Future[Boolean] = 
  response.map { r =>
    r.status == 200
  }

siteOnline.foreach { isOnline =>
  if(isOnline) {
    println("The Play site is up")
  } else {
    println("The Play site is down")
  }
}

In this example, we turn our Future[WSResponse] into a Future[Boolean] by checking for the status of the response. It is important to understand that this code will not block at any point: only when the response will be available will a thread be made available for the processing of the response and execute the code inside of the map function.

Recovering failed Futures

Failure recovery is quite convenient as well:

val response: Future[WSResponse] =
  WS.url("http://www.playframework.com").get()

val siteAvailable: Future[Option[Boolean]] = 
  response.map { r =>
    Some(r.status == 200)
  } recover {
    case ce: java.net.ConnectException => None
  }

At the very end of the Future we call the recover method which will deal with a certain type of exception and limit the dammage. In this example we are only handling the unfortunate case of a java.net.ConnectException by returning a Nonevalue.

Composing Futures

The killer feature of Futures is their composeability. A very typical use-case when building asynchronous programming workflows is to combine the results of several concurrent operations. Futures (and Scala) make this rather easy:

def siteAvailable(url: String): Future[Boolean] =
  WS.url(url).get().map { r =>
    r.status == 200
}

val playSiteAvailable =
  siteAvailable("http://www.playframework.com")

val playGithubAvailable =
  siteAvailable("https://github.com/playframework")

val allSitesAvailable: Future[Boolean] = for {
  siteAvailable <- playSiteAvailable
  githubAvailable <- playGithubAvailable
} yield (siteAvailable && githubAvailable)

The allSitesAvailable Future is built using a for comprehension which will wait until both Futures have completed. The two Futures playSiteAvailable and playGithubAvailable will start running as soon as they are being declared and the for comprehension will compose them together. And if one of those Futures were to fail, the resulting Future[Boolean] would fail directly as a result (without waiting for the other Future to complete).

This is it for the first part of this series. In the next post we will look at another tool for reactive programming and then finally at how to use those tools in combination in order to access a relational database in a reactive fashion.

Read on

Stay tuned as we’ll publish Parts 2 and 3 shortly as a part of this series:

A SQL query DSL for Scala by ScalikeJDBC

There are a tremendous amount of SQL APIs natively written in Scala. Manuel Bernhardt has summarised a nice collection in his a post. Another collection of Scala SQL APIs can be seen in this Stack Overflow question.

One API that we want to focus on in particular is ScalikeJDBC (licensed ASL 2.0), which has recently published a SQL query DSL API similar to that of jOOQ. See the full documentation here:

http://scalikejdbc.org/documentation/query-dsl.html

A couple of examples:

val orders: List[Order] = withSQL {
  select
    .from(Order as o)
    .innerJoin(Product as p).on(o.productId, p.id)
    .leftJoin(Account as a).on(o.accountId, a.id)
    .where.eq(o.productId, 123)
    .orderBy(o.id).desc
    .limit(4)
    .offset(0)
  }.map(Order(o, p, a)).list.apply()

The above example looks very similar to jOOQ code, except that the SELECT DSL seems to be a bit more rigid than jOOQ’s. For instance, it is not immediately obvious how to connect several complex predicates in that WHERE clause, or if complex predicates are available at all.

What’s really nice, however, is their way of leveraging Scala language features to provide a very fluent way of constructing dynamic SQL, as can be seen in this example:

def findOrder(id: Long, accountRequired: Boolean) = 
withSQL {
  select
    .from[Order](Order as o)
    .innerJoin(Product as p).on(o.productId, p.id)
    .map { sql =>
      if (accountRequired) 
        sql.leftJoin(Account as a)
           .on(o.accountId, a.id)
      else 
        sql
    }.where.eq(o.id, 13)
  }.map { rs =>
    if (accountRequired) 
      Order(o, p, a)(rs) 
    else 
      Order(o, p)(rs)
  }.single.apply()

From how we understand things, the map method that is invoked in the middle of the SQL statement (between innerJoin and where) can transform the intermediate DSL state using a lambda expression that allows for appending a leftJoin if needed. Obviously, this can be done in a more procedural fashion as well, by assigning that intermediate DSL state to a local variable.

The need for SQL query DSLs

We’ve blogged about many of these similar SQL query DSLs in the past. The fact that they constantly pop up in various APIs is no coincidence. SQL is a very typesafe and composable language that is hard to use dynamically through string-based APIs such as JDBC, ODBC, etc.

Having a typesafe internal domain-specific language model SQL in a host language like Java or Scala brings great advantages. But the disadvantages may shine through quickly, when the DSL is not carefully crafted in a completely foreseeable way. Take the following ScalikeJDBC QueryDSL example, for instance:

val ids = withSQL {
  select(o.result.id).from(Order as o)
    .where(sqls.toAndConditionOpt(
      productId.map(id => sqls.eq(o.productId, id)),
      accountId.map(id => sqls.eq(o.accountId, id))
    ))
    .orderBy(o.id)
}.map(_.int(1)).list.apply()

This toAndConditionOpt method is really unexpected and doesn’t follow the principle of least astonishment.

This is why jOOQ’s API design is based on a formal BNF that closely mimicks SQL itself. Read more about that here.