When the Java 8 Streams API is not Enough

Java 8 was – as always – a release of compromises and backwards-compatibility. A release where the JSR-335 expert group might not have agreed upon scope or feasibility of certain features with some of the audience. See some concrete explanations by Brian Goetz about why … But today we’re going to focus on the Streams API’s “short-comings”, or as Brian Goetz would probably put it: things out of scope given the design goals.

Parallel Streams?

Parallel computing is hard, and it used to be a pain. People didn’t exactly love the new (now old) Fork / Join API, when it was first shipped with Java 7. Conversely, and clearly, the conciseness of calling Stream.parallel() is unbeatable. But many people don’t actually need parallel computing (not to be confused with multi-threading!). In 95% of all cases, people would have probably preferred a more powerful Streams API, or perhaps a generally more powerful Collections API with lots of awesome methods on various Iterable subtypes. Changing Iterable is dangerous, though. Even a no-brainer as transforming an Iterable into a Stream via a potential Iterable.stream() method seems to risk opening pandora’s box!.

Sequential Streams!

So if the JDK doesn’t ship it, we create it ourselves! Streams are quite awesome per se. They’re potentially infinite, and that’s a cool feature. Mostly – and especially with functional programming – the size of a collection doesn’t really matter that much, as we transform element by element using functions. If we admit Streams to be purely sequential, then we could have any of these pretty cool methods as well (some of which would also be possible with parallel Streams):
  • cycle() – a guaranteed way to make every stream infinite
  • duplicate() – duplicate a stream into two equivalent streams
  • foldLeft() – a sequential and non-associative alternative to reduce()
  • foldRight() – a sequential and non-associative alternative to reduce()
  • limitUntil() – limit the stream to those records before the first one to satisfy a predicate
  • limitWhile() – limit the stream to those records before the first one not to satisfy a predicate
  • maxBy() – reduce the stream to the maximum mapped value
  • minBy() – reduce the stream to the minimum mapped value
  • partition() – partition a stream into two streams, one satisfying a predicate and the other not satisfying the same predicate
  • reverse() – produce a new stream in inverse order
  • skipUntil() – skip records until a predicate is satisified
  • skipWhile() – skip records as long as a predicate is satisfied
  • slice() – take a slice of the stream, i.e. combine skip() and limit()
  • splitAt() – split a stream into two streams at a given position
  • unzip() – split a stream of pairs into two streams
  • zip() – merge two streams into a single stream of pairs
  • zipWithIndex() – merge a stream with its corresponding stream of indexes into a single stream of pairs

jOOλ’s new Seq type does all that

All of the above is part of jOOλ. jOOλ (pronounced “jewel”, or “dju-lambda”, also written jOOL in URLs and such) is an ASL 2.0 licensed library that emerged from our own development needs when implementing jOOQ integration tests with Java 8. Java 8 is exceptionally well-suited for writing tests that reason about sets, tuples, records, and all things SQL. But the Streams API just slightly feels insufficient, so we have wrapped JDK’s Streams into our own Seq type (Seq for sequence / sequential Stream):

// Wrap a stream in a sequence
Seq<Integer> seq1 = seq(Stream.of(1, 2, 3));

// Or create a sequence directly from values
Seq<Integer> seq2 = Seq.of(1, 2, 3);

We’ve made Seq a new interface that extends the JDK Stream interface, so you can use Seq fully interoperably with other Java APIs – leaving the existing methods unchanged:

public interface Seq<T> extends Stream<T> {

    /**
     * The underlying {@link Stream} implementation.
     */
    Stream<T> stream();
	
	// [...]
}

Now, functional programming is only half the fun if you don’t have tuples. Unfortunately, Java doesn’t have built-in tuples and while it is easy to create a tuple library using generics, tuples are still second-class syntactic citizens when comparing Java to Scala, for instance, or C# and even VB.NET. Nonetheless…

jOOλ also has tuples

We’ve run a code-generator to produce tuples of degree 1-8 (we might add more in the future, e.g. to match Scala’s and jOOQ’s “magical” degree 22). And if a library has such tuples, the library also needs corresponding functions. The essence of these TupleN and FunctionN types is summarised as follows:

public class Tuple3<T1, T2, T3>
implements 
    Tuple, 
	Comparable<Tuple3<T1, T2, T3>>, 
	Serializable, Cloneable {
    
    public final T1 v1;
    public final T2 v2;
    public final T3 v3;
	
	// [...]
}

and

@FunctionalInterface
public interface Function3<T1, T2, T3, R> {

    default R apply(Tuple3<T1, T2, T3> args) {
        return apply(args.v1, args.v2, args.v3);
    }

    R apply(T1 v1, T2 v2, T3 v3);
}

There are many more features in Tuple types, but let’s leave them out for today.
On a side note, I’ve recently had an interesting discussion with Gavin King (the creator of Hibernate) on reddit. From an ORM perspective, Java classes seem like a suitable implementation for SQL / relational tuples, and they are indeed. From an ORM perspective. But classes and tuples are fundamentally different, which is a very subtle issue with most ORMs – e.g. as explained here by Vlad Mihalcea. Besides, SQL’s notion of row value expressions (i.e. tuples) is quite different from what can be modelled with Java classes. This topic will be covered in a subsequent blog post.

Some jOOλ examples

With the aforementioned goals in mind, let’s see how the above API can be put to work by example: zipping

// (tuple(1, "a"), tuple(2, "b"), tuple(3, "c"))
Seq.of(1, 2, 3).zip(Seq.of("a", "b", "c"));

// ("1:a", "2:b", "3:c")
Seq.of(1, 2, 3).zip(
    Seq.of("a", "b", "c"), 
    (x, y) -> x + ":" + y
);

// (tuple("a", 0), tuple("b", 1), tuple("c", 2))
Seq.of("a", "b", "c").zipWithIndex();

// tuple((1, 2, 3), (a, b, c))
Seq.unzip(Seq.of(
    tuple(1, "a"),
    tuple(2, "b"),
    tuple(3, "c")
));

This is already a case where tuples have become very handy. When we “zip” two streams into one, we want a wrapper value type that combines both values. Classically, people might’ve used Object[] for quick-and-dirty solutions, but an array doesn’t indicate attribute types or degree. Unfortunately, the Java compiler cannot reason about the effective bound of the <T> type in Seq<T>. This is why we can only have a static unzip() method (instead of an instance one), whose signature looks like this:

// This works
static <T1, T2> Tuple2<Seq<T1>, Seq<T2>> 
    unzip(Stream<Tuple2<T1, T2>> stream) { ... }
	
// This doesn't work:
interface Seq<T> extends Stream<T> {
    Tuple2<Seq<???>, Seq<???>> unzip();
}

Skipping and limiting

// (3, 4, 5)
Seq.of(1, 2, 3, 4, 5).skipWhile(i -> i < 3);

// (3, 4, 5)
Seq.of(1, 2, 3, 4, 5).skipUntil(i -> i == 3);

// (1, 2)
Seq.of(1, 2, 3, 4, 5).limitWhile(i -> i < 3);

// (1, 2)
Seq.of(1, 2, 3, 4, 5).limitUntil(i -> i == 3);

Other functional libraries probably use different terms than skip (e.g. drop) and limit (e.g. take). It doesn’t really matter in the end. We opted for the terms that are already present in the existing Stream API: Stream.skip() and Stream.limit() Folding

// "abc"
Seq.of("a", "b", "c").foldLeft("", (u, t) -> t + u);

// "cba"
Seq.of("a", "b", "c").foldRight("", (t, u) -> t + u);

The Stream.reduce() operations are designed for parallelisation. This means that the functions passed to it must have these important attributes: But sometimes, you really want to “reduce” a stream with functions that do not have the above attributes, and consequently, you probably don’t care about the reduction being parallelisable. This is where “folding” comes in. A nice explanation about the various differences between reducing and folding (in Scala) can be seen here. Splitting

// tuple((1, 2, 3), (1, 2, 3))
Seq.of(1, 2, 3).duplicate();

// tuple((1, 3, 5), (2, 4, 6))
Seq.of(1, 2, 3, 4, 5, 6).partition(i -> i % 2 != 0)

// tuple((1, 2), (3, 4, 5))
Seq.of(1, 2, 3, 4, 5).splitAt(2);

The above functions all have one thing in common: They operate on a single stream in order to produce two new streams, that can be consumed independently. Obviously, this means that internally, some memory must be consumed to keep buffers of partially consumed streams. E.g.
  • duplication needs to keep track of all values that have been consumed in one stream, but not in the other
  • partitioning needs to fast forward to the next value that satisfies (or doesn’t satisfy) the predicate, without losing all the dropped values
  • splitting might need to fast forward to the split index
For some real functional fun, let’s have a look at a possible splitAt() implementation:

static <T> Tuple2<Seq<T>, Seq<T>> 
splitAt(Stream<T> stream, long position) {
    return seq(stream)
          .zipWithIndex()
          .partition(t -> t.v2 < position)
          .map((v1, v2) -> tuple(
              v1.map(t -> t.v1),
              v2.map(t -> t.v1)
          ));
}

… or with comments:

static <T> Tuple2<Seq<T>, Seq<T>> 
splitAt(Stream<T> stream, long position) {
    // Add jOOλ functionality to the stream
    // -> local Type: Seq<T>
    return seq(stream)
	
    // Keep track of stream positions
    // with each element in the stream
    // -> local Type: Seq<Tuple2<T, Long>>
          .zipWithIndex()
	  
    // Split the streams at position
    // -> local Type: Tuple2<Seq<Tuple2<T, Long>>,
    //                       Seq<Tuple2<T, Long>>>
          .partition(t -> t.v2 < position)
		  
    // Remove the indexes from zipWithIndex again
    // -> local Type: Tuple2<Seq<T>, Seq<T>>
          .map((v1, v2) -> tuple(
              v1.map(t -> t.v1),
              v2.map(t -> t.v1)
          ));
}

Nice, isn’t it? A possible implementation for partition(), on the other hand, is a bit more complex. Here trivially with Iterator instead of the new Spliterator:

static <T> Tuple2<Seq<T>, Seq<T>> partition(
        Stream<T> stream, 
        Predicate<? super T> predicate
) {
    final Iterator<T> it = stream.iterator();
    final LinkedList<T> buffer1 = new LinkedList<>();
    final LinkedList<T> buffer2 = new LinkedList<>();

    class Partition implements Iterator<T> {

        final boolean b;

        Partition(boolean b) {
            this.b = b;
        }

        void fetch() {
            while (buffer(b).isEmpty() && it.hasNext()) {
                T next = it.next();
                buffer(predicate.test(next)).offer(next);
            }
        }

        LinkedList<T> buffer(boolean test) {
            return test ? buffer1 : buffer2;
        }

        @Override
        public boolean hasNext() {
            fetch();
            return !buffer(b).isEmpty();
        }

        @Override
        public T next() {
            return buffer(b).poll();
        }
    }

    return tuple(
        seq(new Partition(true)), 
        seq(new Partition(false))
    );
}

I’ll let you do the exercise and verify the above code.

Get and contribute to jOOλ, now!

All of the above is part of jOOλ, available for free from GitHub. There is already a partially Java-8-ready, full-blown library called functionaljava, which goes much further than jOOλ. Yet, we believe that all what’s missing from Java 8’s Streams API is really just a couple of methods that are very useful for sequential streams. In a previous post, we’ve shown how we can bring lambdas to String-based SQL using a simple wrapper for JDBC (of course, we still believe that you should use jOOQ instead). Today, we’ve shown how we can write awesome functional and sequential Stream processing very easily, with jOOλ. Stay tuned for even more jOOλ goodness in the near future (and pull requests are very welcome, of course!)

12 thoughts on “When the Java 8 Streams API is not Enough

  1. Nice work! I have to play with this for a few hours this weekend ;-)

    You probably remember that, in the Project Lambda, the initial implementation of the Stream API created by the EG was entirely based on Iterators. In those days I experimented myself with high order functions entirely based on them following the pattern they were following in the reference implementation.

    However, later they decided to go in an entirely different direction. It looked like Iterators were not good enough to deal with parallelism. Brian had mentioned that the problem was something abot where to put the state, like in stateful lambdas, as arguments to methods like filter() will make the stream pipeline give wrong results if someone every tries to use it in parallel.

    I know your implementation is about sequential streams only, but I wonder what would happen if I do this?

    AtomicInteger nums = new AtomicInteger(0);
    Seq ints = Seq.generate(nums::incrementAndGet);
    Seq underMillion = ints.limitUntil(n -> n < 1000000);
    underMillion.stream().parallel().findFirst();

    I wonder what would happen even if I create a Stream out of an Iterator in Java 8.

    Based on your experience with Streams do you know what can be expected?

    1. You probably remember that, in the Project Lambda, the initial implementation of the Stream API created by the EG was entirely based on Iterators

      … I actually don’t! Thanks for sharing, that’s very interesting! See? I really think you should submit a couple of talks to conferences. E.g. “Subtle historic facts about Java 8, that you probably didn’t know about” ;-)

      underMillion.stream().parallel().findFirst();

      Good point. The current version is quite “drafty” – nowhere near sound. I suspect that Seq.stream() probably shouldn’t expose the underlying stream, or maybe, it should wrap it in yet another Seq

      I wonder what would happen even if I create a Stream out of an Iterator in Java 8.

      Based on your experience with Streams do you know what can be expected?

      In fact, I don’t have too much experience with parallel Streams. A “sequential” (Seq) straem can be created easily from an Iterator in Java 8. Either you simply call:

      Seq seq = Seq.seq(iterator);
      

      … or, you wrap the iterator explicitly in a Stream, using something like:

      StreamSupport.stream(spliteratorUnknownSize(iterator, ORDERED), false)
      

      This is more or less what’s suggested by Brian Goetz.

      Looking forward to your suggestions / feedback / pull requests (?) on jOOλ!

      1. You always have good ideas about articles and talks, Lukas. I should definitely listen to you more :-)

        This is quite interesting. I should definitely investigate more about this subject. And I will see about preparing one of those talks for the future.

  2. Nice work!

    By the way: the links are almost invisible, because the color is almost the same as the rest of the text. I discovered them accidentally. I suggest to use something more noticeable.

  3. Thank you for this wonderful library!
    I’m a bit experienced in Scala, so I was disappointed by Java 8’s streams and their lack of non-associative operations (mainly fold and scan, which are mandatory for some mathematical algorithms): Now I cannot consider using Java 8 without jOOλ!

    However I must say that most of Seq’s static methods are surprising me because they duplicate the way to do the same thing: Seq.scanLeft(stream, …) does exactly the same thing as Seq.seq(stream).scanLeft(…), but having the possibility to use both in the same codebase may lead to lack of consistency.
    But I reckon that this comment is far too late and that removing the duplicating static methods would only break existing code…

    1. Thanks for the feedback. Glad we did that for you! :)

      I agree with consistency. In early days of jOOλ, we weren’t sure about whether we wanted to wrap Stream in Seq (Seq.seq(stream).scanLeft(...)), or just simply provide standalone functionality (Seq.scanLeft(stream, ...))… This may change in the future, of course.

  4. I was looking for Streams of Tupels. But this library doesnt solve the problem.
    I would like to use BiConsumer on a sequence of tuples:

    seq(list).map(x->tuple(x,x)).map( (a,b) -> computeSomething(a,b) );
    

    This would require stream implementations for each tuple size (e.g. Seq1, Seq2, Seq3 ..) and a “mapToTuple2” method which returns a different implementation of the stream, which can handle BiConsumer in the map method.

    And there seems to be a performance issue. Eclipse used 100% CPU when using JOOL. After removing JOOL the CPU usage dropped to 5%.

    1. Thanks for the feedback. It’s true, if we could overload the map method in the way you suggested, we’d get closer to what looks like true tuple syntax. The tuple itself does have the map() method that you’d like to use. For instance, you could write:

      seq(list).map(x -> tuple(x,x)).map(t -> t.map(C::computeSomething));
      

      I’d say the difference is merely stylistic – at least in the Java language. In Scala with first-class tuple support built into the language, a lot more is possible, of course.

      And there seems to be a performance issue. Eclipse used 100% CPU when using JOOL. After removing JOOL the CPU usage dropped to 5%.

      Yes, there’s a known, very nasty issue in the Eclipse compiler that leads to exponential memory consumption for caching all the possible type permutations of tuple types:
      https://bugs.eclipse.org/bugs/show_bug.cgi?id=434326

      I’m very curious about your particular case. Do you have a reproducible test case showing how jOOL slows down your code’s compilation?

      1. Not sure if it is reproduceable. Just added JOOL to pom.xml and wrote some code:

        List list = Arrays.asList("test", "test2", "test3");
        org.jooq.lambda.Seq.seq(list);
        

        CPU-consumption jumped to 50% and stayed there even after I deleted the code.

        After I removed JOOL from pom.xml and rebuilded workspace, CPU-consumption stayed at 50%.

        Restarted eclipse and CPU-consumption was down to 5%.

        1. Really, for something as trivial as this? That would be very bad news, although I suspect you might be using an older version of Eclipse… What Eclipse version/build are you using? (Help > About Eclipse)

            1. Hmm, I’m using the same version, but I’m not running into that issue…

              But I can imagine that there are still such issues in Eclipse which haven’t been resolved yet – especially when m2e is involved. I’ve just always resorted to using Java Mission Control / Flight Recorder to profile Eclipse and report the issue…

Leave a Reply