How to Avoid the Dreaded Dead Lock when Pessimistic Locking – And some Awesome Java 8 Usage!

Sometimes you simply cannot avoid it: Pessimistic locking via SQL. In fact, it’s an awesome tool when you want to synchronise several applications on a shared, global lock.

Some may think this is abusing the database. We think use the tools you have if they can solve the problem you have. For instance, the RDBMS can be the perfect implementation for a message queue.

Let’s assume you do have that pessimistic locking use-case and you do want to choose the RDBMS. Now, how to get it right? Because it is really easy to produce a deadlock. Imagine the following setup (and I’m using Oracle for this):

CREATE TABLE locks (v NUMBER(18));

INSERT INTO locks
SELECT level
FROM dual
CONNECT BY level <= 10;

This generates 10 records, which we’ll use as 10 distinct row-level locks.

Now, let’s connect to the database from two sqlplus clients:

Instance 1

SQL> SELECT *
  2  FROM locks
  3  WHERE v = 1
  4  FOR UPDATE;

         V
----------
         1

Instance 2

SQL> SELECT *
  2  FROM locks
  3  WHERE v = 2
  4  FOR UPDATE;

         V
----------
         2

We’ve now acquired two different locks from two different sessions.

And then, let’s inverse things:

Instance 1

SQL> SELECT *
  2  FROM locks
  3  WHERE v = 2
  4  FOR UPDATE;

Instance 2

SQL> SELECT *
  2  FROM locks
  3  WHERE v = 1
  4  FOR UPDATE;

Both sessions are now locked and luckily, Oracle will detect this and fail one of the sessions:

ORA-00060: deadlock detected while waiting for resource

Avoiding deadlocks

This is a very explicit example where it is easy to see why it happens, and potentially, how to avoid it. A simple way to avoid deadlocks is to establish a rule that all locks will always have to be acquired in ascending order. If you know you need lock number 1 and 2, you must acquire them in that order. This way, you will still produce locking and thus contention, but at least the contention will eventually (probably) get resolved once load decreases. Here’s an example that shows what happens when you have more clients. This time, written as Java threads.

In the example, we’re using jOOλ for simpler lambda expressions (e.g. lambdas throwing checked exceptions). And of course, we’ll be abusing Java 8, heavily!

Class.forName("oracle.jdbc.OracleDriver");

// We want a collection of 4 threads and their
// associated execution counters
List<Tuple2<Thread, AtomicLong>> list =
IntStream
    .range(0, 4)

    // Let's use jOOλ here to wrap checked exceptions
    // we'll map the thread index to the actual tuple
    .mapToObj(Unchecked.intFunction(i -> {
        final Connection con = DriverManager.getConnection(
            "jdbc:oracle:thin:@localhost:1521:xe", 
            "TEST", "TEST");

        final AtomicLong counter = new AtomicLong();
        final Random rnd = new Random();

        return Tuple.tuple(

            // Each thread acquires a random number of
            // locks in ascending order
            new Thread(Unchecked.runnable(() -> {
                for (;;) {
                    String sql =
                      " SELECT *"
                    + " FROM locks"
                    + " WHERE v BETWEEN ? AND ?"
                    + " ORDER BY v"
                    + " FOR UPDATE";

                    try (PreparedStatement stmt = 
                             con.prepareStatement(sql)) {
                        stmt.setInt(1, rnd.nextInt(10));
                        stmt.setInt(2, rnd.nextInt(10));
                        stmt.executeUpdate();

                        counter.incrementAndGet();
                        con.commit();
                    }
                }
            })),
            counter
        );
    }))
    .collect(Collectors.toList());

// Starting each thread
list.forEach(tuple -> tuple.v1.start());

// Printing execution counts
for (;;) {
    list.forEach(tuple -> {
        System.out.print(String.format(
            "%1s:%2$-10s",
            tuple.v1.getName(),
            tuple.v2.get()
        ));
    });

    System.out.println();
    Thread.sleep(1000);
}

As the program runs, you can see that it continues progressively, with each thread taking approximately the same load as the other threads:

Thread-1:0         Thread-2:0         Thread-3:0         Thread-4:0
Thread-1:941       Thread-2:966       Thread-3:978       Thread-4:979
Thread-1:2215      Thread-2:2206      Thread-3:2244      Thread-4:2253
Thread-1:3422      Thread-2:3400      Thread-3:3466      Thread-4:3418
Thread-1:4756      Thread-2:4720      Thread-3:4855      Thread-4:4847
Thread-1:6095      Thread-2:5987      Thread-3:6250      Thread-4:6173
Thread-1:7537      Thread-2:7377      Thread-3:7644      Thread-4:7503
Thread-1:9122      Thread-2:8884      Thread-3:9176      Thread-4:9155

Now, for the sake of the argument, let’s do the forbidden thing and ORDER BY DBMS_RANDOM.VALUE

String sql =
  " SELECT *"
+ " FROM locks"
+ " WHERE v BETWEEN ? AND ?"
+ " ORDER BY DBMS_RANDOM.VALUE"
+ " FOR UPDATE";

It won’t take long and your application explodes:

Thread-1:0         Thread-2:0         Thread-3:0         Thread-4:0         
Thread-1:72        Thread-2:79        Thread-3:79        Thread-4:90        
Thread-1:72        Thread-2:79        Thread-3:79        Thread-4:90        
Thread-1:72        Thread-2:79        Thread-3:79        Thread-4:90        
Exception in thread "Thread-3" org.jooq.lambda.UncheckedException: 
java.sql.SQLException: ORA-00060: deadlock detected while waiting for resource

Thread-1:72        Thread-2:79        Thread-3:79        Thread-4:93        
Thread-1:72        Thread-2:79        Thread-3:79        Thread-4:93        
Thread-1:72        Thread-2:79        Thread-3:79        Thread-4:93        
Exception in thread "Thread-1" org.jooq.lambda.UncheckedException: 
java.sql.SQLException: ORA-00060: deadlock detected while waiting for resource

Thread-1:72        Thread-2:1268      Thread-3:79        Thread-4:1330      
Thread-1:72        Thread-2:3332      Thread-3:79        Thread-4:3455      
Thread-1:72        Thread-2:5691      Thread-3:79        Thread-4:5841      
Thread-1:72        Thread-2:8663      Thread-3:79        Thread-4:8811      
Thread-1:72        Thread-2:11307     Thread-3:79        Thread-4:11426     
Thread-1:72        Thread-2:12231     Thread-3:79        Thread-4:12348     
Thread-1:72        Thread-2:12231     Thread-3:79        Thread-4:12348     
Thread-1:72        Thread-2:12231     Thread-3:79        Thread-4:12348     
Exception in thread "Thread-4" org.jooq.lambda.UncheckedException: 
java.sql.SQLException: ORA-00060: deadlock detected while waiting for resource

Thread-1:72        Thread-2:13888     Thread-3:79        Thread-4:12348     
Thread-1:72        Thread-2:17037     Thread-3:79        Thread-4:12348     
Thread-1:72        Thread-2:20234     Thread-3:79        Thread-4:12348     
Thread-1:72        Thread-2:23495     Thread-3:79        Thread-4:12348     

And in the end, all but one of your threads have been killed (at least in our example) because of deadlock exceptions.

Beware of execution plans

The above example has worked, because in the given example, the execution plan applied locking AFTER ordering as can be seen here:

SQL_ID  bcyyxqyubp4v8, child number 0
-------------------------------------
SELECT * FROM locks WHERE v BETWEEN :v1 AND :v2 ORDER BY v FOR UPDATE
 
Plan hash value: 2944215640
 
--------------------------------------
| Id  | Operation            | Name  |
--------------------------------------
|   0 | SELECT STATEMENT     |       |
|   1 |  FOR UPDATE          |       |
|   2 |   SORT ORDER BY      |       | <-- happens before FOR UPDATE
|*  3 |    FILTER            |       |
|*  4 |     TABLE ACCESS FULL| LOCKS |
--------------------------------------
 
Predicate Information (identified by operation id):
---------------------------------------------------
 
   3 - filter(TO_NUMBER(:V1)<=TO_NUMBER(:V2))
   4 - filter(("V"=TO_NUMBER(:V1)))

(see this article to learn how to get Oracle execution plans like the above)

You should obviously not rely on this in a more real world scenario.

Beware of contention, though

The above examples have also been impressive in terms of displaying the other negative side-effects of pessimistic locking (or locking in general): Contention. The single thread that continued executing in the “bad example” was almost as fast as the four threads before. Our silly example where we used random lock ranges led to the fact that on average, almost every attempt to acquire locks did at least some blocking. How can you figure this out? By looking out for enq: TX – row lock contention events in your sessions. For instance:

SELECT blocking_session, event
FROM v$session
WHERE username = 'TEST'

The above query returns the catastrophic result, here:

BLOCKING_SESSION   EVENT
-------------------------------------
48                 enq: TX - row lock contention
54                 enq: TX - row lock contention
11                 enq: TX - row lock contention
11                 enq: TX - row lock contention

Conclusion

The conclusion can only be: Use pessimistic locking sparingly and always expect the unexpected. When doing pessimistic locking, both deadlocks and heavy contention are quite possible problems that you can run into. As a general rule of thumb, follow these rules (in order):

  • Avoid pessimistic locking if you can
  • Avoid locking more than one row per session if you can
  • Avoid locking rows in random order if you can
  • Avoid going to work to see what happened

How to Use Java 8 Streams to Swiftly Replace Elements in a List

Imagine you have a list of items:

List<String> books = Arrays.asList(
    "The Holy Cow: The Bovine Testament",
    "True Hip Hop",
    "Truth and Existence",
    "The Big Book of Green Design"
);

(Don’t judge me. Books from this random book generator)

Now you’d like to create a new list where the third item only is replaced by some new value:

List<String> books = Arrays.asList(
    "The Holy Cow: The Bovine Testament",
    "True Hip Hop",
    "Pregnancy For Dummies", // New book at index 2
    "The Big Book of Green Design"
);

Of course, you could go and either modify the original list:

books.set(2, "Pregnancy For Dummies");

… or create a copy of the original list and then modify that copy:

List<String> copy = new ArrayList<>(books);
copy.set(2, "Pregnancy For Dummies");

But if you want to write a one-liner to do the same in a functional style, you’ll write the following, using jOOλ

seq(books)
    .zipWithIndex()
    .map(t -> t.v2 == 2 
            ? "Pregnancy For Dummies"
            : t.v1)
    .toList();

With the JDK standard Streams API, things get a bit harder. You could write:

Stream.concat(
    Stream.concat(
        books.stream().limit(2),
        Stream.of("Pregnancy For Dummies")
    ),
    books.stream.skip(3)
).collect(Collectors.toList());

That would be a bit unfortunate, though, as the first part of the stream would need to be traversed twice – once for the limit and once for the skipping (see also our post about the caveats of OFFSET pagination in SQL)

Swift or not?

Clearly, the JDK APIs won’t help you to write concise functional logic, as can be seen above and the “imperative” style is more straight-forward. We’ve written about this before. This has also been our main motivation to create jOOλ.

If you’re looking for even more functional bliss, do also have a look at the following libraries:

How to Translate SQL GROUP BY and Aggregations to Java 8

I couldn’t resist. I have read this question by Hugo Prudente on Stack Overflow. And I knew there had to be a better way than what the JDK has to offer.

The question reads:

I’m looking for a lambda to refine the data already retrieved. I have a raw resultset, if the user do not change the date I want use java’s lambda to group by the results for then. And I’m new to lambdas with java.

The lambda I’m looking for works simliar to this query.

SELECT
    z, w, 
    MIN(x), MAX(x), AVG(x), 
    MIN(y), MAX(y), AVG(y) 
FROM table 
GROUP BY z, w;

SQL is declarative. Functional programming is not.

Before we go on with this discussion, let’s establish a very important fact. SQL is a completely declarative language. Functional (or “functional-ish”, to keep the Haskell-aficionados at peace) programming languages like Java 8 are not declarative. While expressing data transformation algorithms using functions is much more concise than expressing them using objects, or worse, using imperative instructions, you’re still explicitly expressing the algorithm.

When you write SQL, you don’t write any algorithm. You just describe the result you want to have. The SQL engine’s optimiser will figure out the algorithm for you – e.g. based on the fact that you may have an index on Z but not on W or on (Z, W).

While simple examples like these can easily be implemented using Java 8, you will quickly run into Java’s limitations, once you need to do more complex reporting.

Of course, as we’ve blogged before, the optimum is reached when you combine SQL and functional programming.

How can this be written in Java 8?

There are a variety of ways to do it. The essence is to understand all the participants in such a transformation. And no matter if you find this easy or hard, suitable for Java 8 or inadequate, thinking about the different, lesser-known parts of new Stream API is certainly worth the exercise.

The main participants here are:

  • Stream: If you’re using JDK 8 libraries, then the new java.util.stream.Stream type will be your first choice.
  • Collector: The JDK provides us with a rather low-level and thus very powerful new API for data aggregation (also known as “reduction”). This API is summarised by the new java.util.stream.Collector type, a new type from which we have heard only little so far in the blogosphere

Disclaimer

Some of the code displayed here might not work in your favourite IDE. Unfortunately, even if Java 7 reaches its end of life, all major IDEs (Eclipse, IntelliJ, NetBeans), and even the javac compiler still have quite a few bugs related to the combination of generic type inference and lambda expressions. Stay tuned until those bugs are fixed! And report any bug you discover. We’ll all thank you for it!

Let’s go!

Let’s review our SQL statement:

SELECT
    z, w, 
    MIN(x), MAX(x), AVG(x), 
    MIN(y), MAX(y), AVG(y) 
FROM table 
GROUP BY z, w;

In terms of the Stream API, the table itself is the Stream. Let’s just assume that we have a “table type” A as such:

class A {
    final int w;
    final int x;
    final int y;
    final int z;

    A(int w, int x, int y, int z) {
        this.w = w;
        this.x = x;
        this.y = y;
        this.z = z;
    }

    @Override
    public String toString() {
        return "A{" +
                "w=" + w +
                ", x=" + x +
                ", y=" + y +
                ", z=" + z +
                '}';
    }
}

You can also add equals() and hashCode() if you must.

We can now easily compose the Stream using Stream.of(), and some sample data:

Stream<A> stream =
Stream.of(
    new A(1, 1, 1, 1),
    new A(1, 2, 3, 1),
    new A(9, 8, 6, 4),
    new A(9, 9, 7, 4),
    new A(2, 3, 4, 5),
    new A(2, 4, 4, 5),
    new A(2, 5, 5, 5));

Now, the next step is to GROUP BY z, w. The Stream API itself, unfortunately, doesn’t contain such a convenience method. We have to resort to more low-level operations by specifying the more general Stream.collect() operation, and passing a Collector to it that does the grouping. Luckily, a variety of different grouping Collectors are already made available from the Collectors helper class.

So we add that to our stream

Stream.of(
    new A(1, 1, 1, 1),
    new A(1, 2, 3, 1),
    new A(9, 8, 6, 4),
    new A(9, 9, 7, 4),
    new A(2, 3, 4, 5),
    new A(2, 4, 4, 5),
    new A(2, 5, 5, 5))
.collect(Collectors.groupingBy(...));

jool-logo-blackNow the interesting part starts. How do we specify that we want to group by both A.z and A.w? We need to provide this groupingBy method with a function that can extract something like a SQL tuple from the A type. We could write our own tuple, or we simply use that of jOOλ, a library that we have created and open-sourced to improve our jOOQ integration tests.

The Tuple2 type roughly looks like this:

public class Tuple2<T1, T2> {

    public final T1 v1;
    public final T2 v2;

    public T1 v1() {
        return v1;
    }

    public T2 v2() {
        return v2;
    }

    public Tuple2(T1 v1, T2 v2) {
        this.v1 = v1;
        this.v2 = v2;
    }
}

public interface Tuple {
    static <T1, T2> Tuple2<T1, T2> tuple(T1 v1, T2 v2) {
        return new Tuple2<>(v1, v2);
    }
}

It has many more useful features, but these ones will be sufficient for this article.

On a side-note

Why the JDK doesn’t ship with built-in tuples like C#’s or Scala’s escapes me.

Functional programming without tuples is like coffee without sugar: A bitter punch in your face.

Anyway… back on track

So we’re grouping by the (A.z, A.w) tuple, as we would in SQL

Map<Tuple2<Integer, Integer>, List<A>> map =
Stream.of(
    new A(1, 1, 1, 1),
    new A(1, 2, 3, 1),
    new A(9, 8, 6, 4),
    new A(9, 9, 7, 4),
    new A(2, 3, 4, 5),
    new A(2, 4, 4, 5),
    new A(2, 5, 5, 5))
.collect(Collectors.groupingBy(
    a -> tuple(a.z, a.w)
));

As you can see, this produces a verbose but very descriptive type, a map containing our grouping tuple as its key, and a list of collected table records as its value.

Running the following statement

map.entrySet().forEach(System.out::println);

will yield:

(1, 1)=[A{w=1, x=1, y=1, z=1}, A{w=1, x=2, y=3, z=1}]
(4, 9)=[A{w=9, x=8, y=6, z=4}, A{w=9, x=9, y=7, z=4}]
(5, 2)=[A{w=2, x=3, y=4, z=5}, A{w=2, x=4, y=4, z=5}, A{w=2, x=5, y=5, z=5}]

That’s already quite awesome! In fact, this behaves like the SQL:2011 standard COLLECT() aggregate function, that is also available in Oracle 10g+

Now, instead of actually collecting the A records, we prefer to aggregate the individual values of x and y. The JDK provides us with a couple of interesting new types, e.g. the java.util.IntSummaryStatistics, which is available for convenience again from the Collectors type via Collectors.summarizingInt().

On a side note

For my taste, this sledge-hammer data aggregation technique is a bit quirky. The JDK libraries have been left intentionally low level and verbose, perhaps to keep the library footprint small, or to prevent “horrible” consequences when in 5-10 years (after the release of JDK 9 and 10), it becomes obvious that some features may have been added prematurely.

At the same time, there is this all-or-nothing IntSummaryStatistics, that blindly aggregates these popular aggregation values for your collection:

  • COUNT(*)
  • SUM()
  • MIN()
  • MAX()

and obviously, once you have SUM() and COUNT(*), you also have AVG() = SUM() / COUNT(*). So that’s going to be the Java way. IntSummaryStatistics.

In case you were wondering, the SQL:2011 standard specifies these aggregate functions:

AVG, MAX, MIN, SUM, EVERY, ANY, SOME, COUNT, STDDEV_POP, STDDEV_SAMP, VAR_SAMP, VAR_POP, COLLECT, FUSION, INTERSECTION, COVAR_POP, COVAR_SAMP, CORR, REGR_SLOPE, REGR_INTERCEPT, REGR_COUNT, REGR_R2, REGR_AVGX, REGR_AVGY, REGR_SXX, REGR_SYY, REGR_SXY, PERCENTILE_CONT, PERCENTILE_DISC, ARRAY_AGG

And obviously there are many other, vendor-specific aggregate and window functions in SQL. We’ve blogged about them all:

True, MIN, MAX, SUM, COUNT, AVG are certainly the most popular ones. But it would’ve been nicer if they hadn’t been included in these default aggregation types, but made available in a much more composable way.

Anyway… back on track

If you want to stay low-level and use mostly JDK API, you can use the following technique to implement aggregation over two columns:

Map<
    Tuple2<Integer, Integer>, 
    Tuple2<IntSummaryStatistics, IntSummaryStatistics>
> map = Stream.of(
    new A(1, 1, 1, 1),
    new A(1, 2, 3, 1),
    new A(9, 8, 6, 4),
    new A(9, 9, 7, 4),
    new A(2, 3, 4, 5),
    new A(2, 4, 4, 5),
    new A(2, 5, 5, 5))
.collect(Collectors.groupingBy(
    a -> tuple(a.z, a.w),
    Collector.of(

        // When collecting, we'll aggregate data
        // into two IntSummaryStatistics for x and y
        () -> tuple(new IntSummaryStatistics(), 
                    new IntSummaryStatistics()),

        // The accumulator will simply take
        // new t = (x, y) values
        (r, t) -> {
            r.v1.accept(t.x);
            r.v2.accept(t.y);
        },

        // The combiner will merge two partial
        // aggregations, in case this is executed
        // in parallel
        (r1, r2) -> {
            r1.v1.combine(r2.v1);
            r1.v2.combine(r2.v2);

            return r1;
        }
    )
));

map.entrySet().forEach(System.out::println);

The above would now print

(1, 1)=(IntSummaryStatistics{count=2, sum=3, min=1, average=1.500000, max=2}, 
        IntSummaryStatistics{count=2, sum=4, min=1, average=2.000000, max=3})
(4, 9)=(IntSummaryStatistics{count=2, sum=17, min=8, average=8.500000, max=9}, 
        IntSummaryStatistics{count=2, sum=13, min=6, average=6.500000, max=7})
(5, 2)=(IntSummaryStatistics{count=3, sum=12, min=3, average=4.000000, max=5}, 
        IntSummaryStatistics{count=3, sum=13, min=4, average=4.333333, max=5})

But obviously, no one will want to write that much code. The same thing can be achieved with jOOλ with much less code

Map<
    Tuple2<Integer, Integer>, 
    Tuple2<IntSummaryStatistics, IntSummaryStatistics>
> map =

// Seq is like a Stream, but sequential only,
// and with more features
Seq.of(
    new A(1, 1, 1, 1),
    new A(1, 2, 3, 1),
    new A(9, 8, 6, 4),
    new A(9, 9, 7, 4),
    new A(2, 3, 4, 5),
    new A(2, 4, 4, 5),
    new A(2, 5, 5, 5))

// Seq.groupBy() is just short for 
// Stream.collect(Collectors.groupingBy(...))
.groupBy(
    a -> tuple(a.z, a.w),

    // ... because once you have tuples, 
    // why not add tuple-collectors?
    Tuple.collectors(
        Collectors.summarizingInt(a -> a.x),
        Collectors.summarizingInt(a -> a.y)
    )
);

What you see above is probably as close as it gets to the original, very simmple SQL statement:

SELECT
    z, w, 
    MIN(x), MAX(x), AVG(x), 
    MIN(y), MAX(y), AVG(y) 
FROM table 
GROUP BY z, w;

The interesting part here is the fact that we have what we call “tuple-collectors”, a Collector that collects data into tuples of aggregated results for any degree of the tuple (up to 8). Here’s the code for Tuple.collectors:

// All of these generics... sheesh!
static <T, A1, A2, D1, D2> 
       Collector<T, Tuple2<A1, A2>, Tuple2<D1, D2>> 
collectors(
    Collector<T, A1, D1> collector1
  , Collector<T, A2, D2> collector2
) {
    return Collector.of(
        () -> tuple(
            collector1.supplier().get()
          , collector2.supplier().get()
        ),
        (a, t) -> {
            collector1.accumulator().accept(a.v1, t);
            collector2.accumulator().accept(a.v2, t);
        },
        (a1, a2) -> tuple(
            collector1.combiner().apply(a1.v1, a2.v1)
          , collector2.combiner().apply(a1.v2, a2.v2)
        ),
        a -> tuple(
            collector1.finisher().apply(a.v1)
          , collector2.finisher().apply(a.v2)
        )
    );
}

Where the Tuple2<D1, D2> is the aggregation result type that we derive from collector1 (which provides D1) and from collector2 (which provides D2).

That’s it. We’re done!

Conclusion

Java 8 is a first step towards functional programming in Java. Using Streams and lambda expressions, we can already achieve quite a bit. The JDK APIs, however, are extremely low level and the experience when using IDEs like Eclipse, IntelliJ, or NetBeans can still be a bit frustrating. While writing this article (and adding the Tuple.collectors() method), I have reported around 10 bugs to the different IDEs. Some javac compiler bugs are not yet fixed, prior to JDK 1.8.0_40 ea. In other words:

I just keep throwing generic type parameters at the darn thing until the compiler stops bitching at me

But we’re on a good path. I trust that more useful API will ship with JDK 9 and especially with JDK 10, when all of the above will hopefully profit from the new value types and generic type specialization.

jool-logo-blackAnd, of course, if you haven’t already, download and contribute to jOOλ here!

We have created jOOλ to add the missing pieces to the JDK libraries. If you want to go all in on functional programming, i.e. when your vocabulary includes hipster terms (couldn’t resist) like monads, monoids, functors, and all that, we suggest you skip the JDK’s Streams and jOOλ entirely, and go download functionaljava by Mark Perry or javaslang by Daniel Dietrich

Let’s Stream a Map in Java 8 with jOOλ

I wanted to find an easy way to stream a Map in Java 8. Guess what? There isn’t!

What I would’ve expected for convenience is the following method:

public interface Map<K, V> {

    default Stream<Entry<K, V>> stream() {
        return entrySet().stream();
    }    
}

But there’s no such method. There are probably a variety of reasons why such a method shouldn’t exist, e.g.:

  • There’s no “clear” preference for entrySet() being chosen over keySet() or values(), as a stream source
  • Map isn’t really a collection. It’s not even an Iterable
  • That wasn’t the design goal
  • The EG didn’t have enough time

Well, there is a very compelling reason for Map to have been retrofitted to provide both an entrySet().stream() and to finally implement Iterable<Entry<K, V>>. And that reason is the fact that we now have Map.forEach():

default void forEach(
        BiConsumer<? super K, ? super V> action) {
    Objects.requireNonNull(action);
    for (Map.Entry<K, V> entry : entrySet()) {
        K k;
        V v;
        try {
            k = entry.getKey();
            v = entry.getValue();
        } catch(IllegalStateException ise) {
            // this usually means the entry is no longer in the map.
            throw new ConcurrentModificationException(ise);
        }
        action.accept(k, v);
    }
}

forEach() in this case accepts a BiConsumer that really consumes entries in the map. If you search through JDK source code, there are really very few references to the BiConsumer type outside of Map.forEach() and perhaps a couple of CompletableFuture methods and a couple of streams collection methods.

So, one could almost assume that BiConsumer was strongly driven by the needs of this forEach() method, which would be a strong case for making Map.Entry a more important type throughout the collections API (we would have preferred the type Tuple2, of course).

Let’s continue this line of thought. There is also Iterable.forEach():

public interface Iterable<T> {
    default void forEach(Consumer<? super T> action) {
        Objects.requireNonNull(action);
        for (T t : this) {
            action.accept(t);
        }
    }
}

Both Map.forEach() and Iterable.forEach() intuitively iterate the “entries” of their respective collection model, although there is a subtle difference:

  • Iterable.forEach() expects a Consumer taking a single value
  • Map.forEach() expects a BiConsumer taking two values: the key and the value (NOT a Map.Entry!)

Think about it this way:

This makes the two methods incompatible in a “duck typing sense”, which makes the two types even more different

Bummer!

Improving Map with jOOλ

We find that quirky and counter-intuitive. forEach() is really not the only use-case of Map traversal and transformation. We’d love to have a Stream<Entry<K, V>>, or even better, a Stream<Tuple2<T1, T2>>. So we implemented that in jOOλ, a library which we’ve developed for our integration tests at jOOQ. With jOOλ, you can now wrap a Map in a Seq type (“Seq” for sequential stream, a stream with many more functional features):

Map<Integer, String> map = new LinkedHashMap<>();
map.put(1, "a");
map.put(2, "b");
map.put(3, "c");

assertEquals(
  Arrays.asList(
    tuple(1, "a"), 
    tuple(2, "b"), 
    tuple(3, "c")
  ),

  Seq.seq(map).toList()
);

What you can do with it? How about creating a new Map, swapping keys and values in one go:

System.out.println(
  Seq.seq(map)
     .map(Tuple2::swap)
     .toMap(Tuple2::v1, Tuple2::v2)
);

System.out.println(
  Seq.seq(map)
     .toMap(Tuple2::v2, Tuple2::v1)
);

Both of the above will yield:

{a=1, b=2, c=3}

Just for the record, here’s how to swap keys and values with standard JDK API:

System.out.println(
  map.entrySet()
     .stream()
     .collect(Collectors.toMap(
         Map.Entry::getValue, 
         Map.Entry::getKey
     ))
);

It can be done, but the every day verbosity of standard Java API makes things a bit hard to read / write

Don’t Miss out on Writing Java 8 SQL One-Liners with jOOλ or jOOQ

More and more people are catching up with the latest update to our platform by adopting functional programming also for their businesses.

At Data Geekery, we’re using Java 8 for our jOOQ integration tests, as using the new Streams API with lambda expressions makes generating ad-hoc test data so much easier.

However, we don’t feel that the JDK offers as much as it could, which is why we have also implemented and open-sourced jOOλ, a small utility library that patches those short-comings.

Note, we don’t aim to replace more sophisticated libraries like functionaljava. jOOλ is really just patching short-comings.

Putting lambdas to work with jOOλ or jOOQ

I’ve recently encountered this Stack Overflow question, which asked for streaming a result set with all columns into a single list. For example:

Input

+----+------------+------------+
| ID | FIRST_NAME | LAST_NAME  |
+----+------------+------------+
|  1 | Joslyn     | Vanderford |
|  2 | Rudolf     | Hux        |
+----+------------+------------+

Output

1
Joslyn
Vanderford
2
Rudolf
Hux

This is a typical school-book example for using functional programming rather than an iterative solution:

Iterative solution

ResultSet rs = ...;
ResultSetMetaData meta = rs.getMetaData();

List<Object> list = new ArrayList<>();

while (rs.next()) {
    for (int i = 0; i < meta.getColumnCount(); i++) {
        list.add(rs.getObject(i + 1));
    }
}

Truth is, the iterative solution isn’t all that bad, but let’s learn how this could be done with functional programming.

Using jOOλ

We’re using jOOλ for this example for a couple of reasons:

  • JDBC didn’t really adopt the new features. There is no simple ResultSet to Stream conversion, even if there should be.
  • Unfortunately, the new functional interfaces do not allow for throwing checked exceptions. The try .. catch blocks inside lambdas don’t exactly look nice
  • Interestingly, there is no way of generating a finite stream without also implementing an Iterator or Spliterator

So, here’s the plain code:

ResultSet rs = ...;
ResultSetMetaData meta = rs.getMetaData();

List<Object> list =
Seq.generate()
   .limitWhile(Unchecked.predicate(v -> rs.next()))
   .flatMap(Unchecked.function(v -> IntStream
       .range(0, meta.getColumnCount())
       .mapToObj(Unchecked.intFunction(i ->
           rs.getObject(i + 1)
       ))
   ))
   .toList()

So far, this looks about as verbose (or a bit more) than the iterative solution. As you can see, a couple of jOOλ extensions were needed here:

// This generate is a shortcut to generate an
// infinite stream with unspecified content
Seq.generate()

// This predicate-based stream termination
// unfortunately doesn't exist in the JDK
// Besides, the checked exception is wrapped in a
// RuntimeException by calling Unchecked.wrapper(...)
   .limitWhile(Unchecked.predicate(v -> rs.next()))

// Standard JDK flatmapping, producing a "nested"
// stream of column values for the "outer" stream
// of database rows
   .flatMap(Unchecked.function(v -> IntStream
       .range(0, meta.getColumnCount())
       .mapToObj(Unchecked.intFunction(i ->
           rs.getObject(i + 1)
       ))
   ))

// This is another convenience method that is more
// verbose to write with standard JDK code
   .toList()

Using jOOQ

jOOQ has even more convenience API to operate on result records of your SQL statement. Consider the following piece of logic:

ResultSet rs = ...;

List<Object> list =
DSL.using(connection)
   .fetch(rs)
   .stream()
   .flatMap(r -> Arrays.stream(r.intoArray()))
   .collect(Collectors.toList());

Note that the above example is using standard JDK API, without resorting to jOOλ for convenience. If you want to use jOOλ with jOOQ, you could even write:

ResultSet rs = ...;

List<Object> list = 
Seq.seq(DSL.using(connection).fetch(rs))
   .flatMap(r -> Arrays.stream(r.intoArray()))
   .toList();

Easy? I would say so! Let’s remember that this example:

  • Fetches a JDBC ResultSet into a Java Collection
  • Transforms each record in the result set into an array of column values
  • Transforms each array into a stream
  • Flattens that stream into a stream of streams
  • Collects all values into a single list

Whew!

Conclusion

We’re heading towards exciting times! It will take a while until all Java 8 idioms and functional thinking will feel “natural” to Java developers, also in the enterprise.

The idea of having a sort of data source that can be configured with pipelined data transformations expressed as lambda expressions to be evaluated lazily is very compelling, though. jOOQ is an API that encapsulates SQL data sources in a very fluent and intuitive way, but it doesn’t stop there. jOOQ produces regular JDK collections of records, which can be transformed out-of-the-box via the new streams API.

We believe that this will drastically change the way the Java ecosystem will think about data transformation. Stay tuned for more examples on this blog!

When the Java 8 Streams API is not Enough

Java 8 was – as always – a release of compromises and backwards-compatibility. A release where the JSR-335 expert group might not have agreed upon scope or feasibility of certain features with some of the audience. See some concrete explanations by Brian Goetz about why …

But today we’re going to focus on the Streams API’s “short-comings”, or as Brian Goetz would probably put it: things out of scope given the design goals.

Parallel Streams?

Parallel computing is hard, and it used to be a pain. People didn’t exactly love the new (now old) Fork / Join API, when it was first shipped with Java 7. Conversely, and clearly, the conciseness of calling Stream.parallel() is unbeatable.

But many people don’t actually need parallel computing (not to be confused with multi-threading!). In 95% of all cases, people would have probably preferred a more powerful Streams API, or perhaps a generally more powerful Collections API with lots of awesome methods on various Iterable subtypes.

Changing Iterable is dangerous, though. Even a no-brainer as transforming an Iterable into a Stream via a potential Iterable.stream() method seems to risk opening pandora’s box!.

Sequential Streams!

So if the JDK doesn’t ship it, we create it ourselves!

Streams are quite awesome per se. They’re potentially infinite, and that’s a cool feature. Mostly – and especially with functional programming – the size of a collection doesn’t really matter that much, as we transform element by element using functions.

If we admit Streams to be purely sequential, then we could have any of these pretty cool methods as well (some of which would also be possible with parallel Streams):

  • cycle() – a guaranteed way to make every stream infinite
  • duplicate() – duplicate a stream into two equivalent streams
  • foldLeft() – a sequential and non-associative alternative to reduce()
  • foldRight() – a sequential and non-associative alternative to reduce()
  • limitUntil() – limit the stream to those records before the first one to satisfy a predicate
  • limitWhile() – limit the stream to those records before the first one not to satisfy a predicate
  • maxBy() – reduce the stream to the maximum mapped value
  • minBy() – reduce the stream to the minimum mapped value
  • partition() – partition a stream into two streams, one satisfying a predicate and the other not satisfying the same predicate
  • reverse() – produce a new stream in inverse order
  • skipUntil() – skip records until a predicate is satisified
  • skipWhile() – skip records as long as a predicate is satisfied
  • slice() – take a slice of the stream, i.e. combine skip() and limit()
  • splitAt() – split a stream into two streams at a given position
  • unzip() – split a stream of pairs into two streams
  • zip() – merge two streams into a single stream of pairs
  • zipWithIndex() – merge a stream with its corresponding stream of indexes into a single stream of pairs

jOOλ’s new Seq type does all that

All of the above is part of jOOλ. jOOλ (pronounced “jewel”, or “dju-lambda”, also written jOOL in URLs and such) is an ASL 2.0 licensed library that emerged from our own development needs when implementing jOOQ integration tests with Java 8. Java 8 is exceptionally well-suited for writing tests that reason about sets, tuples, records, and all things SQL.

But the Streams API just slightly feels insufficient, so we have wrapped JDK’s Streams into our own Seq type (Seq for sequence / sequential Stream):

// Wrap a stream in a sequence
Seq<Integer> seq1 = seq(Stream.of(1, 2, 3));

// Or create a sequence directly from values
Seq<Integer> seq2 = Seq.of(1, 2, 3);

We’ve made Seq a new interface that extends the JDK Stream interface, so you can use Seq fully interoperably with other Java APIs – leaving the existing methods unchanged:

public interface Seq<T> extends Stream<T> {

    /**
     * The underlying {@link Stream} implementation.
     */
    Stream<T> stream();
	
	// [...]
}

Now, functional programming is only half the fun if you don’t have tuples. Unfortunately, Java doesn’t have built-in tuples and while it is easy to create a tuple library using generics, tuples are still second-class syntactic citizens when comparing Java to Scala, for instance, or C# and even VB.NET.

Nonetheless…

jOOλ also has tuples

We’ve run a code-generator to produce tuples of degree 1-8 (we might add more in the future, e.g. to match Scala’s and jOOQ’s “magical” degree 22).

And if a library has such tuples, the library also needs corresponding functions. The essence of these TupleN and FunctionN types is summarised as follows:

public class Tuple3<T1, T2, T3>
implements 
    Tuple, 
	Comparable<Tuple3<T1, T2, T3>>, 
	Serializable, Cloneable {
    
    public final T1 v1;
    public final T2 v2;
    public final T3 v3;
	
	// [...]
}

and

@FunctionalInterface
public interface Function3<T1, T2, T3, R> {

    default R apply(Tuple3<T1, T2, T3> args) {
        return apply(args.v1, args.v2, args.v3);
    }

    R apply(T1 v1, T2 v2, T3 v3);
}

There are many more features in Tuple types, but let’s leave them out for today.

On a side note, I’ve recently had an interesting discussion with Gavin King (the creator of Hibernate) on reddit. From an ORM perspective, Java classes seem like a suitable implementation for SQL / relational tuples, and they are indeed. From an ORM perspective.

But classes and tuples are fundamentally different, which is a very subtle issue with most ORMs – e.g. as explained here by Vlad Mihalcea.

Besides, SQL’s notion of row value expressions (i.e. tuples) is quite different from what can be modelled with Java classes. This topic will be covered in a subsequent blog post.

Some jOOλ examples

With the aforementioned goals in mind, let’s see how the above API can be put to work by example:

zipping

// (tuple(1, "a"), tuple(2, "b"), tuple(3, "c"))
Seq.of(1, 2, 3).zip(Seq.of("a", "b", "c"));

// ("1:a", "2:b", "3:c")
Seq.of(1, 2, 3).zip(
    Seq.of("a", "b", "c"), 
    (x, y) -> x + ":" + y
);

// (tuple("a", 0), tuple("b", 1), tuple("c", 2))
Seq.of("a", "b", "c").zipWithIndex();

// tuple((1, 2, 3), (a, b, c))
Seq.unzip(Seq.of(
    tuple(1, "a"),
    tuple(2, "b"),
    tuple(3, "c")
));

This is already a case where tuples have become very handy. When we “zip” two streams into one, we want a wrapper value type that combines both values. Classically, people might’ve used Object[] for quick-and-dirty solutions, but an array doesn’t indicate attribute types or degree.

Unfortunately, the Java compiler cannot reason about the effective bound of the <T> type in Seq<T>. This is why we can only have a static unzip() method (instead of an instance one), whose signature looks like this:

// This works
static <T1, T2> Tuple2<Seq<T1>, Seq<T2>> 
    unzip(Stream<Tuple2<T1, T2>> stream) { ... }
	
// This doesn't work:
interface Seq<T> extends Stream<T> {
    Tuple2<Seq<???>, Seq<???>> unzip();
}

Skipping and limiting

// (3, 4, 5)
Seq.of(1, 2, 3, 4, 5).skipWhile(i -> i < 3);

// (3, 4, 5)
Seq.of(1, 2, 3, 4, 5).skipUntil(i -> i == 3);

// (1, 2)
Seq.of(1, 2, 3, 4, 5).limitWhile(i -> i < 3);

// (1, 2)
Seq.of(1, 2, 3, 4, 5).limitUntil(i -> i == 3);

Other functional libraries probably use different terms than skip (e.g. drop) and limit (e.g. take). It doesn’t really matter in the end. We opted for the terms that are already present in the existing Stream API: Stream.skip() and Stream.limit()

Folding

// "abc"
Seq.of("a", "b", "c").foldLeft("", (u, t) -> t + u);

// "cba"
Seq.of("a", "b", "c").foldRight("", (t, u) -> t + u);

The Stream.reduce() operations are designed for parallelisation. This means that the functions passed to it must have these important attributes:

But sometimes, you really want to “reduce” a stream with functions that do not have the above attributes, and consequently, you probably don’t care about the reduction being parallelisable. This is where “folding” comes in.

A nice explanation about the various differences between reducing and folding (in Scala) can be seen here.

Splitting

// tuple((1, 2, 3), (1, 2, 3))
Seq.of(1, 2, 3).duplicate();

// tuple((1, 3, 5), (2, 4, 6))
Seq.of(1, 2, 3, 4, 5, 6).partition(i -> i % 2 != 0)

// tuple((1, 2), (3, 4, 5))
Seq.of(1, 2, 3, 4, 5).splitAt(2);

The above functions all have one thing in common: They operate on a single stream in order to produce two new streams, that can be consumed independently.

Obviously, this means that internally, some memory must be consumed to keep buffers of partially consumed streams. E.g.

  • duplication needs to keep track of all values that have been consumed in one stream, but not in the other
  • partitioning needs to fast forward to the next value that satisfies (or doesn’t satisfy) the predicate, without losing all the dropped values
  • splitting might need to fast forward to the split index

For some real functional fun, let’s have a look at a possible splitAt() implementation:

static <T> Tuple2<Seq<T>, Seq<T>> 
splitAt(Stream<T> stream, long position) {
    return seq(stream)
          .zipWithIndex()
          .partition(t -> t.v2 < position)
          .map((v1, v2) -> tuple(
              v1.map(t -> t.v1),
              v2.map(t -> t.v1)
          ));
}

… or with comments:

static <T> Tuple2<Seq<T>, Seq<T>> 
splitAt(Stream<T> stream, long position) {
    // Add jOOλ functionality to the stream
    // -> local Type: Seq<T>
    return seq(stream)
	
    // Keep track of stream positions
    // with each element in the stream
    // -> local Type: Seq<Tuple2<T, Long>>
          .zipWithIndex()
	  
    // Split the streams at position
    // -> local Type: Tuple2<Seq<Tuple2<T, Long>>,
    //                       Seq<Tuple2<T, Long>>>
          .partition(t -> t.v2 < position)
		  
    // Remove the indexes from zipWithIndex again
    // -> local Type: Tuple2<Seq<T>, Seq<T>>
          .map((v1, v2) -> tuple(
              v1.map(t -> t.v1),
              v2.map(t -> t.v1)
          ));
}

Nice, isn’t it? A possible implementation for partition(), on the other hand, is a bit more complex. Here trivially with Iterator instead of the new Spliterator:

static <T> Tuple2<Seq<T>, Seq<T>> partition(
        Stream<T> stream, 
        Predicate<? super T> predicate
) {
    final Iterator<T> it = stream.iterator();
    final LinkedList<T> buffer1 = new LinkedList<>();
    final LinkedList<T> buffer2 = new LinkedList<>();

    class Partition implements Iterator<T> {

        final boolean b;

        Partition(boolean b) {
            this.b = b;
        }

        void fetch() {
            while (buffer(b).isEmpty() && it.hasNext()) {
                T next = it.next();
                buffer(predicate.test(next)).offer(next);
            }
        }

        LinkedList<T> buffer(boolean test) {
            return test ? buffer1 : buffer2;
        }

        @Override
        public boolean hasNext() {
            fetch();
            return !buffer(b).isEmpty();
        }

        @Override
        public T next() {
            return buffer(b).poll();
        }
    }

    return tuple(
        seq(new Partition(true)), 
        seq(new Partition(false))
    );
}

I’ll let you do the exercise and verify the above code.

Get and contribute to jOOλ, now!

All of the above is part of jOOλ, available for free from GitHub. There is already a partially Java-8-ready, full-blown library called functionaljava, which goes much further than jOOλ.

Yet, we believe that all what’s missing from Java 8’s Streams API is really just a couple of methods that are very useful for sequential streams.

In a previous post, we’ve shown how we can bring lambdas to String-based SQL using a simple wrapper for JDBC (of course, we still believe that you should use jOOQ instead).

Today, we’ve shown how we can write awesome functional and sequential Stream processing very easily, with jOOλ.

Stay tuned for even more jOOλ goodness in the near future (and pull requests are very welcome, of course!)

Three-State Booleans in Java

Every now and then, I miss SQL’s three-valued BOOLEAN semantics in Java. In SQL, we have:

  • TRUE
  • FALSE
  • UNKNOWN (also known as NULL)

Every now and then, I find myself in a situation where I wish I could also express this UNKNOWN or UNINITIALISED semantics in Java, when plain true and false aren’t enough.

Implementing a ResultSetIterator

For instance, when implementing a ResultSetIterator for jOOλ, a simple library modelling SQL streams for Java 8:

SQL.stream(stmt, Unchecked.function(r ->
    new SQLGoodies.Schema(
        r.getString("FIELD_1"),
        r.getBoolean("FIELD_2")
    )
))
.forEach(System.out::println);

In order to implement a Java 8 Stream, we need to construct an Iterator, which we can then pass to the new Spliterators.spliteratorUnknownSize() method:

StreamSupport.stream(
  Spliterators.spliteratorUnknownSize(iterator, 0), 
  false
);

Another example for this can be seen here on Stack Overflow.

When implementing the Iterator interface, we must implement hasNext() and next(). Note that with Java 8, remove() now has a default implementation, so we don’t need to implement it any longer.

While most of the time, a call to next() is preceded by a call to hasNext() exactly once, nothing in the Iterator contract requires this. It is perfectly fine to write:

if (it.hasNext()) {
    // Some stuff

    // Double-check again to be sure
    if (it.hasNext() && it.hasNext()) {

        // Yes, we're paranoid
        if (it.hasNext())
            it.next();
    }
}

How to translate the Iterator calls to backing calls on the JDBC ResultSet? We need to call ResultSet.next().

We could make the following translation:

  • Iterator.hasNext() == !ResultSet.isLast()
  • Iterator.next() == ResultSet.next()

But that translation is:

  • Expensive
  • Not dealing correctly with empty ResultSets
  • Not implemented in all JDBC drivers (Support for the isLast method is optional for ResultSets with a result set type of TYPE_FORWARD_ONLY)

So, we’ll have to maintain a flag, internally, that tells us:

  • If we had already called ResultSet.next()
  • What the result of that call was

Instead of creating a second variable, why not just use a three-valued java.lang.Boolean. Here’s a possible implementation from jOOλ:

class ResultSetIterator<T> implements Iterator<T> {

    final Supplier<? extends ResultSet>  supplier;
    final Function<ResultSet, T>         rowFunction;
    final Consumer<? super SQLException> translator;

    /**
     * Whether the underlying {@link ResultSet} has
     * a next row. This boolean has three states:
     * <ul>
     * <li>null:  it's not known whether there 
     *            is a next row</li>
     * <li>true:  there is a next row, and it
     *            has been pre-fetched</li>
     * <li>false: there aren't any next rows</li>
     * </ul>
     */
    Boolean hasNext;
    ResultSet rs;

    ResultSetIterator(
        Supplier<? extends ResultSet> supplier, 
        Function<ResultSet, T> rowFunction, 
        Consumer<? super SQLException> translator
    ) {
        this.supplier = supplier;
        this.rowFunction = rowFunction;
        this.translator = translator;
    }

    private ResultSet rs() {
        return (rs == null) 
             ? (rs = supplier.get()) 
             :  rs;
    }

    @Override
    public boolean hasNext() {
        try {
            if (hasNext == null) {
                hasNext = rs().next();
            }

            return hasNext;
        }
        catch (SQLException e) {
            translator.accept(e);
            throw new IllegalStateException(e);
        }
    }

    @Override
    public T next() {
        try {
            if (hasNext == null) {
                rs().next();
            }

            return rowFunction.apply(rs());
        }
        catch (SQLException e) {
            translator.accept(e);
            throw new IllegalStateException(e);
        }
        finally {
            hasNext = null;
        }
    }
}

As you can see, the hasNext() method locally caches the hasNext three-valued boolean state only if it was null before. This means that calling hasNext() several times will have no effect until you call next(), which resets the hasNext cached state.

Both hasNext() and next() advance the ResultSet cursor if needed.

Readability?

Some of you may argue that this doesn’t help readability. They’d introduce a new variable like:

boolean hasNext;
boolean hasHasNextBeenCalled;

The trouble with this is the fact that you’re still implementing three-valued boolean state, but distributed to two variables, which are very hard to name in a way that is truly more readable than the actual java.lang.Boolean solution. Besides, there are actually four state values for two boolean variables, so there is a slight increase in the risk of bugs.

Every rule has its exception. Using null for the above semantics is a very good exception to the null-is-bad histeria that has been going on ever since the introduction of Option / Optional

In other words: Which approach is best? There’s no TRUE or FALSE answer, only UNKNOWN ;-)

Be careful with this

However, as we’ve discussed in a previous blog post, you should avoid returning null from API methods if possible. In this case, using null explicitly as a means to model state is fine because this model is encapsulated in our ResultSetIterator. But try to avoid leaking such state to the outside of your API.