Implementing a generic REDUCE aggregate function with SQL

So, @rotnroll666 nerd sniped me again. Apparently, the Neo4j Cypher query language supports arbitrary reductions, just like any functional collection API, oh say, the JDK Stream API:

Stream.of(2, 4, 3, 1, 6, 5)
      .reduce((i, j) -> i * j)
      .ifPresent(System.out::println); // Prints 720

SQL doesn’t have this, yet it would be very useful to be able to occasionally do that. An arbitrary reduction can be implemented “easily” in SQL. Let’s look at the above multiplication reduction. In PostgreSQL, you’d write it like this:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 
  (
    with recursive
      u(i, o) as (
         select i, o
         from unnest(array_agg(t.i)) with ordinality as u(i, o)
      ),
      r(i, o) as (
        select u.i, u.o from u where o = 1
        union all 
        select r.i * u.i, u.o from u join r on u.o = r.o + 1
        --     ^^^^^^^^^ reduction
      )
    select i from r
    order by o desc
    limit 1
  )
from t;

Woah. That’s a bit of a syntactic beast. Let’s decompose it.

The aggregate function

First off, if we were summing the values, we’d use the built-in SUM function, like this:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select sum(i)
from t;

That would produce 21. If you’re willing to lose precision, you could emulate PRODUCT() using logarithms. But we wrote REDUCE(), a hypothetical one, like this:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select reduce(
  t1.i * t2.i referencing accumulated as t1, accumulating as t2
)
from t;

This is SQL, so the lambda expression would obviously use a ton of keywords, completely novel and unique to this particular function, and you’d need jOOQ to make it composable 😁. Essentially, we’d have some sort of reduction expression based on two pseudo tables:

  • The accumulated table containing the result
  • The accumulating table (or rather row)

A reduction is a generic aggregate function that operates on groups. So, we will have to re-use some SQL aggregate function mechanism to achieve the desired behaviour.

Using ARRAY_AGG() to get the aggregation effect

First off, let’s do some aggregation. PostgreSQL’s ARRAY_AGG() is perfect for this job, because it

  • Aggregates
  • Yet kinda leaves the data untouched, unlike e.g. SUM()

In a way, it’s a collection like Stream.collect(), not a reduction.

If we use ARRAY_AGG() in a correlated subquery, we’ll still get the aggregation effect, but we can unnest the array again to a table, in order to operate on it. You can see this in the following example:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 
  (
    select string_agg(i::text, ', ')
    from unnest(array_agg(t.i)) as u(i)
  )
from t;

This yields:

2, 4, 3, 1, 6, 5

Not a very useful thing to do, aggregate, unnest, and aggregate again, but it shows the power of nesting an aggregate function in a correlated subquery’s FROM clause. If your RDBMS doesn’t have arrays, maybe you can do the same thing using JSON_ARRAYAGG and JSON_TABLE, or XMLAGG and XMLTABLE.

Disclaimer: PostgreSQL often Does The Right Thing™. I think you’d be more hard pressed to juggle with SQL syntax as elegantly in most other RDBMS, so this approach isn’t portable. But as Lætitia Avrot so elegantly put it:

Next step, generate row numbers

There are mainly 2 ways how we can generate row numbers in our example:

Adapting our previous example for some visualisation:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 
  (
    select string_agg(row(i, o)::text, ', ')
    from unnest(array_agg(t.i)) with ordinality as u(i, o)
  )
from t;

(Awesome, that row constructor!)

This produces:

(2,1), (4,2), (3,3), (1,4), (6,5), (5,6)

Doesn’t look fancy, but imagine we group by even numbers:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 
  i % 2,
  (
    select string_agg(row(i, o)::text, ', ')
    from unnest(array_agg(t.i)) with ordinality as u(i, o)
  )
from t
group by i % 2;

The result is now:

i % 2string_agg
0(2,1), (4,2), (6,3)
1(3,1), (1,2), (5,3)

It’s a bit weird, right? We GROUP BY in the outer query, and the entire correlated subquery is the aggregate function based on the fact that its FROM clause contains ARRAY_AGG(). This isn’t so much different from this query:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 1 + sum(i) + 2
from t;

We’re used to building scalar expressions from aggregate functions all the time. This is nothing fancy. We can easily also just wrap the function in another subquery:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select (select 1 + sum(i) + 2)
from t;

From here, it’s not far fetched to extend the aggregate-function-in-scalar-subquery approach to the FROM clause, and then unnesting the aggregation again. This may not “click” immediately. The GROUP BY clause in SQL is a bit weird, syntactically.

Remark: Regrettably, PostgreSQL doesn’t allow using aggregate functions in the FROM clause on the same query level like in a correlated subquery. I was going to show a fancy LATERAL version, but this doesn’t work (yet).

Now, recurse

The final bit is the recursion with the r table:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 
  (
    with recursive
      u(i, o) as (
         select i, o
         from unnest(array_agg(t.i)) with ordinality as u(i, o)
      ),
      r(i, o) as (
        select u.i, u.o from u where o = 1
        union all 
        select r.i * u.i, u.o from u join r on u.o = r.o + 1
        --     ^^^^^^^^^ reduction
      )
    select i from r
    order by o desc
    limit 1
  )
from t;

We simply recurse on the ordinality. The first subquery of UNION ALL produces the first row of our data, namely (1, 1). The next iterations just always multiply the result of r.i by the value of u.i from the next row by ordinality. This is probably best shown visually:

r.ir.ou.i
2 = u.i (first iteration)12
8 = prev r.i * u.i24
24 = prev r.i * u.i33
24 = prev r.i * u.i41
144 = prev r.i * u.i56
720 = prev r.i * u.i65

Finally, we don’t care about SQL’s set-based way of working. I.e. we don’t care about the whole set of multiplications that are shown in the table above. We only care about the last row, ordered by the ordinality, which contains our result in r.i

Done!

Using group by

Just as shown before, we can easily add a GROUP BY clause to the outer query. E.g. let’s multiply odd and even numbers separately:

with t(i) as (values (2), (4), (3), (1), (6), (5))
select 
  i % 2,
  (
    with recursive
      u(i, o) as (
         select i, o
         from unnest(array_agg(t.i)) with ordinality as u(i, o)
      ),
      r(i, o) as (
        select u.i, u.o from u where o = 1
        union all 
        select r.i * u.i, u.o from u join r on u.o = r.o + 1
      )
    select i from r
    order by o desc
    limit 1
  ),
  string_agg(i::text, ' * ')
from t
group by i % 2

I’ve added another aggregate function STRING_AGG() for good measure to get:

i % 2istring_agg
0482 * 4 * 6
1153 * 1 * 5

Wonderful, isn’t it? Now, I wasn’t able to just add an OVER() clause right there. That produced

SQL Error [42P20]: ERROR: window functions are not allowed in functions in FROM

Maybe that will work as well, in the near future? Or, I might come up with another hack to make it work, in case of which I’ll update this post.

jOOQ support

Obviously, this will be supported in jOOQ soon: https://github.com/jOOQ/jOOQ/issues/11385. The syntax will be again much more bearable:

ctx.select(T.I.mod(inline(2)), reduce(T.I, (i1, i2) -> i1.times(i2)))
   .from(T.I)
   .groupBy(T.I.mod(inline(2)))
   .fetch();

Other emulations using actual CREATE AGGREGATE FUNCTION will be investigated as well, in the near future.

Writing Custom Aggregate Functions in SQL Just Like a Java 8 Stream Collector

All SQL databases support the standard aggregate functions COUNT(), SUM(), AVG(), MIN(), MAX().

Some databases support other aggregate functions, like:

  • EVERY()
  • STDDEV_POP()
  • STDDEV_SAMP()
  • VAR_POP()
  • VAR_SAMP()
  • ARRAY_AGG()
  • STRING_AGG()

But what if you want to roll your own?

Java 8 Stream Collector

When using Java 8 streams, we can easily roll our own aggregate function (i.e. a Collector). Let’s assume we want to find the second highest value in a stream. The highest value can be obtained like this:

System.out.println(
    Stream.of(1, 2, 3, 4)
          .collect(Collectors.maxBy(Integer::compareTo))
) ;

Yielding:

Optional[4]

Now, what about the second highest value? We can write the following collector:

System.out.println(
    Stream.of(1, 6, 2, 3, 4, 4, 5).parallel()
          .collect(Collector.of(
              () -> new int[] { 
                  Integer.MIN_VALUE, 
                  Integer.MIN_VALUE 
              },
              (a, i) -> {
                  if (a[0] < i) {
                      a[1] = a[0];
                      a[0] = i;
                  }
                  else if (a[1] < i)
                      a[1] = i;
              },
              (a1, a2) -> {
                  if (a2[0] > a1[0]) {
                      a1[1] = a1[0];
                      a1[0] = a2[0];

                      if (a2[1] > a1[1])
                          a1[1] = a2[1];
                  }
                  else if (a2[0] > a1[1])
                      a1[1] = a2[0];

                  return a1;
              },
              a -> a[1]
          ))
) ;

It doesn’t do anything fancy. It has these 4 functions:

  • Supplier<int[]>: A supplier that provides an intermediary int[] of length 2, initialised with Integer.MIN_VALUE, each. This array will remember the MAX() value in the stream at position 0 and the SECOND_MAX() value in the stream at position 1
  • BiConsumer<int[], Integer>: A accumulator that accumulates new values from the stream into our intermediary data structure.
  • BinaryOperator<int[]>: A combiner that combines two intermediary data structures. This is used for parallel streams only.
  • Function<int[], Integer>: The finisher function that extracts the SECOND_MAX() function from the second position in our intermediary array.

The output is now:

5

How to do the same thing with SQL?

Many SQL databases offer a very similar way of calculating custom aggregate functions. Here’s how to do the exact same thing with…

Oracle:

With the usual syntactic ceremony…

CREATE TYPE u_second_max AS OBJECT (

  -- Intermediary data structure
  MAX NUMBER,
  SECMAX NUMBER,

  -- Corresponds to the Collector.supplier() function
  STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max) RETURN NUMBER,

  -- Corresponds to the Collector.accumulate() function
  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER,

  -- Corresponds to the Collector.combineer() function
  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER,

  -- Correspodns to the Collector.finisher() function
  MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER
)
/

-- This is our "colletor" implementation
CREATE OR REPLACE TYPE BODY u_second_max IS
  STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max)
  RETURN NUMBER IS
  BEGIN
    SCTX := U_SECOND_MAX(0, 0);
    RETURN ODCIConst.Success;
  END;

  MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER IS
  BEGIN
    IF VALUE > SELF.MAX THEN
      SELF.SECMAX := SELF.MAX;
      SELF.MAX := VALUE;
    ELSIF VALUE > SELF.SECMAX THEN
      SELF.SECMAX := VALUE;
    END IF;
    RETURN ODCIConst.Success;
  END;

  MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER IS
  BEGIN
    RETURNVALUE := SELF.SECMAX;
    RETURN ODCIConst.Success;
  END;

  MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER IS
  BEGIN
    IF CTX2.MAX > SELF.MAX THEN
      SELF.SECMAX := SELF.MAX;
      SELF.MAX := CTX2.MAX;
    
      IF CTX2.SECMAX > SELF.SECMAX THEN
        SELF.SECMAX := CTX2.SECMAX;
      END IF;
    ELSIF CTX2.MAX > SELF.SECMAX THEN
      SELF.SECMAX := CTX2.MAX;
    END IF;
  
    RETURN ODCIConst.Success;
  END;
END;
/

-- Finally, we have to give this aggregate function a name
CREATE FUNCTION SECOND_MAX (input NUMBER) RETURN NUMBER
PARALLEL_ENABLE AGGREGATE USING u_second_max;
/

We can now run the above on the Sakila database:

SELECT 
  max(film_id), 
  second_max(film_id) 
FROM film;

To get:

MAX     SECOND_MAX
------------------
1000    999

And what’s even better, we can use the aggregate function as a window function for free!

SELECT 
  film_id,
  length,
  max(film_id) OVER (PARTITION BY length), 
  second_max(film_id) OVER (PARTITION BY length)
FROM film
ORDER BY length, film_id;

The above yields:

FILM_ID  LENGTH  MAX   SECOND_MAX
---------------------------------
15       46      730   505
469      46      730   505
504      46      730   505
505      46      730   505
730      46      730   505
237      47      869   784
247      47      869   784
393      47      869   784
398      47      869   784
407      47      869   784
784      47      869   784
869      47      869   784
2        48      931   866
410      48      931   866
575      48      931   866
630      48      931   866
634      48      931   866
657      48      931   866
670      48      931   866
753      48      931   866
845      48      931   866
866      48      931   866
931      48      931   866

Beautiful, right?

PostgreSQL

PostgreSQL supports a slightly more concise syntax in the CREATE AGGREGATE statement. If we don’t allow for parallelism, we can write this minimal implementation:

CREATE FUNCTION second_max_sfunc (
  state INTEGER[], data INTEGER
) RETURNS INTEGER[] AS
$$
BEGIN
  IF state IS NULL THEN
    RETURN ARRAY[data, NULL];
  ELSE
    RETURN CASE 
      WHEN state[1] > data
      THEN CASE 
        WHEN state[2] > data
        THEN state
        ELSE ARRAY[state[1], data]
      END
      ELSE ARRAY[data, state[1]]
    END;
  END IF;
END;
$$ LANGUAGE plpgsql;
/

CREATE FUNCTION second_max_ffunc (
  state INTEGER[]
) RETURNS INTEGER AS
$$
BEGIN
  RETURN state[2];
END;
$$ LANGUAGE plpgsql;

CREATE AGGREGATE second_max (INTEGER) (
  SFUNC     = second_max_sfunc,
  STYPE     = INTEGER[],
  FINALFUNC = second_max_ffunc
);

Here, we use the STYPE (Collector.supplier()), the SFUNC (Collector.accumulator()), and the FINALFUNC (Collector.finisher()) specifications.

Other databases

Many other databases allow for specifying user defined aggregate functions. Look up your database manual’s details to learn more. They always work in the same way as a Java 8 Collector.