All SQL databases support the standard aggregate functions COUNT()
, SUM()
, AVG()
, MIN()
, MAX()
.
Some databases support other aggregate functions, like:
EVERY()
STDDEV_POP()
STDDEV_SAMP()
VAR_POP()
VAR_SAMP()
ARRAY_AGG()
STRING_AGG()
But what if you want to roll your own?
Java 8 Stream Collector
When using Java 8 streams, we can easily roll our own aggregate function (i.e. a Collector
). Let’s assume we want to find the second highest value in a stream. The highest value can be obtained like this:
System.out.println( Stream.of(1, 2, 3, 4) .collect(Collectors.maxBy(Integer::compareTo)) ) ;
Yielding:
Optional[4]
Now, what about the second highest value? We can write the following collector:
System.out.println( Stream.of(1, 6, 2, 3, 4, 4, 5).parallel() .collect(Collector.of( () -> new int[] { Integer.MIN_VALUE, Integer.MIN_VALUE }, (a, i) -> { if (a[0] < i) { a[1] = a[0]; a[0] = i; } else if (a[1] < i) a[1] = i; }, (a1, a2) -> { if (a2[0] > a1[0]) { a1[1] = a1[0]; a1[0] = a2[0]; if (a2[1] > a1[1]) a1[1] = a2[1]; } else if (a2[0] > a1[1]) a1[1] = a2[0]; return a1; }, a -> a[1] )) ) ;
It doesn’t do anything fancy. It has these 4 functions:
- Supplier<int[]>: A supplier that provides an intermediary
int[]
of length 2, initialised withInteger.MIN_VALUE
, each. This array will remember theMAX()
value in the stream at position 0 and theSECOND_MAX()
value in the stream at position 1 - BiConsumer<int[], Integer>: A accumulator that accumulates new values from the stream into our intermediary data structure.
- BinaryOperator<int[]>: A combiner that combines two intermediary data structures. This is used for parallel streams only.
- Function<int[], Integer>: The finisher function that extracts the
SECOND_MAX()
function from the second position in our intermediary array.
The output is now:
5
How to do the same thing with SQL?
Many SQL databases offer a very similar way of calculating custom aggregate functions. Here’s how to do the exact same thing with…
Oracle:
With the usual syntactic ceremony…
CREATE TYPE u_second_max AS OBJECT ( -- Intermediary data structure MAX NUMBER, SECMAX NUMBER, -- Corresponds to the Collector.supplier() function STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max) RETURN NUMBER, -- Corresponds to the Collector.accumulate() function MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER, -- Corresponds to the Collector.combineer() function MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER, -- Correspodns to the Collector.finisher() function MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER ) / -- This is our "colletor" implementation CREATE OR REPLACE TYPE BODY u_second_max IS STATIC FUNCTION ODCIAggregateInitialize(sctx IN OUT u_second_max) RETURN NUMBER IS BEGIN SCTX := U_SECOND_MAX(0, 0); RETURN ODCIConst.Success; END; MEMBER FUNCTION ODCIAggregateIterate(self IN OUT u_second_max, value IN NUMBER) RETURN NUMBER IS BEGIN IF VALUE > SELF.MAX THEN SELF.SECMAX := SELF.MAX; SELF.MAX := VALUE; ELSIF VALUE > SELF.SECMAX THEN SELF.SECMAX := VALUE; END IF; RETURN ODCIConst.Success; END; MEMBER FUNCTION ODCIAggregateTerminate(self IN u_second_max, returnValue OUT NUMBER, flags IN NUMBER) RETURN NUMBER IS BEGIN RETURNVALUE := SELF.SECMAX; RETURN ODCIConst.Success; END; MEMBER FUNCTION ODCIAggregateMerge(self IN OUT u_second_max, ctx2 IN u_second_max) RETURN NUMBER IS BEGIN IF CTX2.MAX > SELF.MAX THEN SELF.SECMAX := SELF.MAX; SELF.MAX := CTX2.MAX; IF CTX2.SECMAX > SELF.SECMAX THEN SELF.SECMAX := CTX2.SECMAX; END IF; ELSIF CTX2.MAX > SELF.SECMAX THEN SELF.SECMAX := CTX2.MAX; END IF; RETURN ODCIConst.Success; END; END; / -- Finally, we have to give this aggregate function a name CREATE FUNCTION SECOND_MAX (input NUMBER) RETURN NUMBER PARALLEL_ENABLE AGGREGATE USING u_second_max; /
We can now run the above on the Sakila database:
SELECT max(film_id), second_max(film_id) FROM film;
To get:
MAX SECOND_MAX ------------------ 1000 999
And what’s even better, we can use the aggregate function as a window function for free!
SELECT film_id, length, max(film_id) OVER (PARTITION BY length), second_max(film_id) OVER (PARTITION BY length) FROM film ORDER BY length, film_id;
The above yields:
FILM_ID LENGTH MAX SECOND_MAX --------------------------------- 15 46 730 505 469 46 730 505 504 46 730 505 505 46 730 505 730 46 730 505 237 47 869 784 247 47 869 784 393 47 869 784 398 47 869 784 407 47 869 784 784 47 869 784 869 47 869 784 2 48 931 866 410 48 931 866 575 48 931 866 630 48 931 866 634 48 931 866 657 48 931 866 670 48 931 866 753 48 931 866 845 48 931 866 866 48 931 866 931 48 931 866
Beautiful, right?
PostgreSQL
PostgreSQL supports a slightly more concise syntax in the CREATE AGGREGATE
statement. If we don’t allow for parallelism, we can write this minimal implementation:
CREATE FUNCTION second_max_sfunc ( state INTEGER[], data INTEGER ) RETURNS INTEGER[] AS $$ BEGIN IF state IS NULL THEN RETURN ARRAY[data, NULL]; ELSE RETURN CASE WHEN state[1] > data THEN CASE WHEN state[2] > data THEN state ELSE ARRAY[state[1], data] END ELSE ARRAY[data, state[1]] END; END IF; END; $$ LANGUAGE plpgsql; / CREATE FUNCTION second_max_ffunc ( state INTEGER[] ) RETURNS INTEGER AS $$ BEGIN RETURN state[2]; END; $$ LANGUAGE plpgsql; CREATE AGGREGATE second_max (INTEGER) ( SFUNC = second_max_sfunc, STYPE = INTEGER[], FINALFUNC = second_max_ffunc );
Here, we use the STYPE
(Collector.supplier()
), the SFUNC
(Collector.accumulator()
), and the FINALFUNC
(Collector.finisher()
) specifications.
Other databases
Many other databases allow for specifying user defined aggregate functions. Look up your database manual’s details to learn more. They always work in the same way as a Java 8 Collector
.