Imperative Loop or Functional Stream Pipeline? Beware of the Performance Impact!

I like weird, yet concise language constructs and API usages

Yes. I am guilty. Evil? Don’t know. But guilty. I heavily use and abuse the java.lang.Boolean type to implement three valued logic in Java:

  • Boolean.TRUE means true (duh)
  • Boolean.FALSE means false
  • null can mean anything like “unknown” or “uninitialised”, etc.

I know – a lot of enterprise developers will bikeshed and cargo cult the old saying:

Code is read more often than it is written

But as with everything, there is a tradeoff. For instance, in algorithm-heavy, micro optimised library code, it is usually more important to have code that really performs well, rather than code that apparently doesn’t need comments because the author has written it in such a clear and beautiful way.

I don’t think it matters much in the case of the boolean type (where I’m just too lazy to encode every three valued situation in an enum). But here’s a more interesting example from that same twitter thread. The code is simple:

woot:
if (something) {
  for (Object o : list) 
    if (something(o))
      break woot;

  throw new E();
}

Yes. You can break out of “labeled ifs”. Because in Java, any statement can be labeled, and if the statement is a compound statement (observe the curly braces following the if), then it may make sense to break out of it. Even if you’ve never seen that idiom, I think it’s quite immediately clear what it does.

Ghasp!

If Java were a bit more classic, it might have supported this syntax:

if (something) {
  for (Object o : list) 
    if (something(o))
      goto woot;

  throw new E();
}
woot:

Nicolai suggested that the main reason I hadn’t written the following, equivalent, and arguably more elegant logic, is because jOOQ still supports Java 6:

if (something && list.stream().noneMatch(this::something))
  throw new E();

It’s more concise! So, it’s better, right? Everything new is always better.

A third option would have been the less concise solution that essentially just replaces break by return:

if (something && noneMatchSomething(list)
  throw new E();

// And then:
private boolean noneMatchSomething(List<?> list) {
  for (Object o : list)
    if (something(o))
      return false;
  return true;
}

There’s an otherwise useless method that has been extracted. The main benefit is that people are not used to breaking out of labeled statements (other than loops, and even then it’s rare), so this is again about some subjective “readability”. I personally find this particular example less readable, because the extracted method is no longer local. I have to jump around in the class and interrupt my train of thoughts. But of course, YMMV with respect to the two imperative alternatives.

Back to objectivity: Performance

When I tweet about Java these days, I’m mostly tweeting about my experience writing jOOQ. A library. A library that has been tuned so much over the past years, that the big client side bottleneck (apart from the obvious database call) is the internal StringBuilder that is used to generate dynamic SQL. And compared to most database queries, you will not even notice that.

But sometimes you do. E.g. if you’re using an in-memory H2 database and run some rather trivial queries, then jOOQ’s overhead can become measurable again. Yes. There are some use-cases, which I do want to take seriously as well, where the difference between an imperative loop and a stream pipeline is measurable.

In the above examples, let’s remove the throw statement and replace it by something simpler (because exceptions have their own significant overhead).

I’ve created this JMH benchmark, which compares the 3 approaches:

  • Imperative with break
  • Imperative with return
  • Stream

Here’s the benchmark

package org.jooq.test.benchmark;

import java.util.ArrayList;
import java.util.List;

import org.openjdk.jmh.annotations.*;

@Fork(value = 3, jvmArgsAppend = "-Djmh.stack.lines=3")
@Warmup(iterations = 5, time = 3)
@Measurement(iterations = 7, time = 3)
public class ImperativeVsStream {

    @State(Scope.Benchmark)
    public static class BenchmarkState {

        boolean something = true;

        @Param({ "2", "8" })
        int listSize;

        List<Integer> list = new ArrayList<>();

        boolean something() {
            return something;
        }

        boolean something(Integer o) {
            return o > 2;
        }

        @Setup(Level.Trial)
        public void setup() throws Exception {
            for (int i = 0; i < listSize; i++)
                list.add(i);
        }

        @TearDown(Level.Trial)
        public void teardown() throws Exception {
            list = null;
        }
    }

    @Benchmark
    public Object testImperativeWithBreak(BenchmarkState state) {
        woot:
        if (state.something()) {
            for (Integer o : state.list)
                if (state.something(o))
                    break woot;

            return 1;
        }

        return 0;
    }

    @Benchmark
    public Object testImperativeWithReturn(BenchmarkState state) {
        if (state.something() && woot(state))
            return 1;

        return 0;
    }

    private boolean woot(BenchmarkState state) {
        for (Integer o : state.list)
            if (state.something(o))
                return false;

        return true;
    }

    @Benchmark
    public Object testStreamNoneMatch(BenchmarkState state) {
        if (state.something() && state.list.stream().noneMatch(state::something))
            return 1;

        return 0;
    }

    @Benchmark
    public Object testStreamAnyMatch(BenchmarkState state) {
        if (state.something() && !state.list.stream().anyMatch(state::something))
            return 1;

        return 0;
    }

    @Benchmark
    public Object testStreamAllMatch(BenchmarkState state) {
        if (state.something() && state.list.stream().allMatch(s -> !state.something(s)))
            return 1;

        return 0;
    }
}

The results are pretty clear:

Benchmark                                    (listSize)   Mode  Cnt         Score          Error  Units
ImperativeVsStream.testImperativeWithBreak            2  thrpt   14  86513288.062 ± 11950020.875  ops/s
ImperativeVsStream.testImperativeWithBreak            8  thrpt   14  74147172.906 ± 10089521.354  ops/s
ImperativeVsStream.testImperativeWithReturn           2  thrpt   14  97740974.281 ± 14593214.683  ops/s
ImperativeVsStream.testImperativeWithReturn           8  thrpt   14  81457864.875 ±  7376337.062  ops/s
ImperativeVsStream.testStreamAllMatch                 2  thrpt   14  14924513.929 ±  5446744.593  ops/s
ImperativeVsStream.testStreamAllMatch                 8  thrpt   14  12325486.891 ±  1365682.871  ops/s
ImperativeVsStream.testStreamAnyMatch                 2  thrpt   14  15729363.399 ±  2295020.470  ops/s
ImperativeVsStream.testStreamAnyMatch                 8  thrpt   14  13696297.091 ±   829121.255  ops/s
ImperativeVsStream.testStreamNoneMatch                2  thrpt   14  18991796.562 ±   147748.129  ops/s
ImperativeVsStream.testStreamNoneMatch                8  thrpt   14  15131005.381 ±   389830.419  ops/s

With this simple example, break or return don’t matter. At some point, adding additional methods might start getting in the way of inlining (because of stacks getting too deep), but not creating additional methods might be getting in the way of inlining as well (because of method bodies getting too large). I don’t want to bet on either approach here at this level, nor is jOOQ tuned that much. Like most similar libraries, the traversal of the jOOQ expression tree generates stack that are too deep to completely inline anyway.

But the very obvious loser here is the Stream approach, which is roughly 6.5x slower in this benchmark than the imperative approaches. This isn’t surprising. The stream pipeline has to be set up every single time to represent something as trivial as the above imperative loop. I’ve already blogged about this in the past, where I compared replacing simple for loops by Stream.forEach()

Meh, does it matter?

In your business logic? Probably not. Your business logic is I/O bound, mostly because of the database. Wasting a few CPU cycles on a client side loop is not the main issue. Even if it is, the waste probably happens because your loop shouldn’t even be at the client side in the first place, but moved into the database as well. I’m currently touring conferences with a call about that topic:

In your infrastructure logic? Maybe! If you’re writing a library, or if you’re using a library like jOOQ, then yes. Chances are that a lot of your logic is CPU bound. You should occasionally profile your application and spot such bottlenecks, both in your code and in third party libraries. E.g. in most of jOOQ’s internals, using a stream pipeline might be a very bad choice, because ultimately, jOOQ is something that might be invoked from within your loops, thus adding significant overhead to your application, if your queries are not heavy (e.g. again when run against an H2 in-memory database).

So, given that you’re clearly “micro-losing” on the performance side by using the Stream API, you may need to evaluate the readability tradeoff more carefully. When business logic is complex, readability is very important compared to micro optimisations. With infrastructure logic, it is much less likely so, in my opinion. And I’m not alone:

Note: there’s that other cargo cult of premature optimisation going around. Yes, you shouldn’t worry about these details too early in your application implementation. But you should still know when to worry about them, and be aware of the tradeoffs.

And while you’re still debating what name to give to that extracted method, I’ve written 5 new labeled if statements! ;-)

Writing Custom Aggregate Functions in SQL Just Like a Java 8 Stream Collector

All SQL databases support the standard aggregate functions COUNT(), SUM(), AVG(), MIN(), MAX().

Some databases support other aggregate functions, like:

  • EVERY()
  • STDDEV_POP()
  • STDDEV_SAMP()
  • VAR_POP()
  • VAR_SAMP()
  • ARRAY_AGG()
  • STRING_AGG()

But what if you want to roll your own?

Java 8 Stream Collector

When using Java 8 streams, we can easily roll our own aggregate function (i.e. a Collector). Let’s assume we want to find the second highest value in a stream. The highest value can be obtained like this:

System.out.println(
    Stream.of(1, 2, 3, 4)
          .collect(Collectors.maxBy(Integer::compareTo))
) ;

Yielding:

Optional[4]

Now, what about the second highest value? We can write the following collector:

System.out.println(
    Stream.of(1, 6, 2, 3, 4, 4, 5).parallel()
          .collect(Collector.of(
              () -> new int[] { 
                  Integer.MIN_VALUE, 
                  Integer.MIN_VALUE 
              },
              (a, i) -> {
                  if (a[0] < i) {
                      a[1] = a[0];
                      a[0] = i;
                  }
                  else if (a[1] < i)
                      a[1] = i;
              },
              (a1, a2) -> {
                  if (a2[0] > a1[0]) {
                      a1[1] = a1[0];
                      a1[0] = a2[0];

                      if (a2[1] > a1[1])
                          a1[1] = a2[1];
                  }
                  else if (a2[0] > a1[1])
                      a1[1] = a2[0];

                  return a1;
              },
              a -> a[1]
          ))
) ;

It doesn’t do anything fancy. It has these 4 functions:

  • Supplier<int[]>: A supplier that provides an intermediary int[] of length 2, initialised with Integer.MIN_VALUE, each. This array will remember the MAX() value in the stream at position 0 and the SECOND_MAX() value in the stream at position 1
  • BiConsumer<int[], Integer>: A accumulator that accumulates new values from the stream into our intermediary data structure.
  • BinaryOperator<int[]>: A combiner that combines two intermediary data structures. This is used for parallel streams only.
  • Function<int[], Integer>: The finisher function that extracts the SECOND_MAX() function from the second position in our intermediary array.

The output is now:

5

How to do the same thing with SQL?

Many SQL databases offer a very similar way of calculating custom aggregate functions. Here’s how to do the exact same thing with…

Oracle:

With the usual syntactic ceremony…

CREATE TYPE u_second_max AS OBJECT (

  -- Intermediary data structure
  MAX NUMBER,
  SECMAX NUMBER,

  -- Corresponds to the Collector.supplier() function
  STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max) RETURN NUMBER,

  -- Corresponds to the Collector.accumulate() function
  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER,

  -- Corresponds to the Collector.combineer() function
  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER,

  -- Correspodns to the Collector.finisher() function
  MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER
)
/

-- This is our "colletor" implementation
CREATE OR REPLACE TYPE BODY u_second_max IS
  STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max)
  RETURN NUMBER IS
  BEGIN
    SCTX := U_SECOND_MAX(0, 0);
    RETURN ODCIConst.Success;
  END;

  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER IS
  BEGIN
    IF VALUE > SELF.MAX THEN
      SELF.SECMAX := SELF.MAX;
      SELF.MAX := VALUE;
    ELSIF VALUE > SELF.SECMAX THEN
      SELF.SECMAX := VALUE;
    END IF;
    RETURN ODCIConst.Success;
  END;

  MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER IS
  BEGIN
    RETURNVALUE := SELF.SECMAX;
    RETURN ODCIConst.Success;
  END;

  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER IS
  BEGIN
    IF CTX2.MAX > SELF.MAX THEN
      SELF.SECMAX := SELF.MAX;
      SELF.MAX := CTX2.MAX;
    
      IF CTX2.SECMAX > SELF.SECMAX THEN
        SELF.SECMAX := CTX2.SECMAX;
      END IF;
    ELSIF CTX2.MAX > SELF.SECMAX THEN
      SELF.SECMAX := CTX2.MAX;
    END IF;
  
    RETURN ODCIConst.Success;
  END;
END;
/

-- Finally, we have to give this aggregate function a name
CREATE FUNCTION SECOND_MAX (input NUMBER) RETURN NUMBER
PARALLEL_ENABLE AGGREGATE USING u_second_max;
/

We can now run the above on the Sakila database:

SELECT 
  max(film_id), 
  second_max(film_id) 
FROM film;

To get:

MAX     SECOND_MAX
------------------
1000    999

And what’s even better, we can use the aggregate function as a window function for free!

SELECT 
  film_id,
  length,
  max(film_id) OVER (PARTITION BY length), 
  second_max(film_id) OVER (PARTITION BY length)
FROM film
ORDER BY length, film_id;

The above yields:

FILM_ID  LENGTH  MAX   SECOND_MAX
---------------------------------
15       46      730   505
469      46      730   505
504      46      730   505
505      46      730   505
730      46      730   505
237      47      869   784
247      47      869   784
393      47      869   784
398      47      869   784
407      47      869   784
784      47      869   784
869      47      869   784
2        48      931   866
410      48      931   866
575      48      931   866
630      48      931   866
634      48      931   866
657      48      931   866
670      48      931   866
753      48      931   866
845      48      931   866
866      48      931   866
931      48      931   866

Beautiful, right?

PostgreSQL

PostgreSQL supports a slightly more concise syntax in the CREATE AGGREGATE statement. If we don’t allow for parallelism, we can write this minimal implementation:

CREATE FUNCTION second_max_sfunc (
  state INTEGER[], data INTEGER
) RETURNS INTEGER[] AS
$$
BEGIN
  IF state IS NULL THEN
    RETURN ARRAY[data, NULL];
  ELSE
    RETURN CASE 
      WHEN state[1] > data
      THEN CASE 
        WHEN state[2] > data
        THEN state
        ELSE ARRAY[state[1], data]
      END
      ELSE ARRAY[data, state[1]]
    END;
  END IF;
END;
$$ LANGUAGE plpgsql;
/

CREATE FUNCTION second_max_ffunc (
  state INTEGER[]
) RETURNS INTEGER AS
$$
BEGIN
  RETURN state[2];
END;
$$ LANGUAGE plpgsql;

CREATE AGGREGATE second_max (INTEGER) (
  SFUNC     = second_max_sfunc,
  STYPE     = INTEGER[],
  FINALFUNC = second_max_ffunc
);

Here, we use the STYPE (Collector.supplier()), the SFUNC (Collector.accumulator()), and the FINALFUNC (Collector.finisher()) specifications.

Other databases

Many other databases allow for specifying user defined aggregate functions. Look up your database manual’s details to learn more. They always work in the same way as a Java 8 Collector.