Java 8 Friday Goodies: Lean Concurrency

At Data Geekery, we love Java. And as we’re really into jOOQ’s fluent API and query DSL, we’re absolutely thrilled about what Java 8 will bring to our ecosystem. We have blogged a couple of times about some nice Java 8 goodies, and now we feel it’s time to start a new blog series, the…

Java 8 Friday

Every Friday, we’re showing you a couple of nice new tutorial-style Java 8 features, which take advantage of lambda expressions, extension methods, and other great stuff. You’ll find the source code on GitHub.

Java 8 Goodie: Lean Concurrency

Someone once said that (unfortunately, we don’t have the source anymore):

Junior programmers think concurrency is hard.
Experienced programmers think concurrency is easy.
tweet thisSenior programmers think concurrency is hard.

That is quite true. But on the bright side, Java 8 will at least improve things by making it easier to write concurrent code with lambdas and the many improved APIs. Let’s have a closer look:

Java 8 improving on JDK 1.0 API

java.lang.Thread has been around from the very beginning in JDK 1.0. So has java.lang.Runnable, which is going to be annotated with FunctionalInterface in Java 8.

It is almost a no-brainer how we can finally submit Runnables to a Thread from now on. Let’s assume we have a long-running operation:

public static int longOperation() {
    System.out.println("Running on thread #"
       + Thread.currentThread().getId());

    // [...]
    return 42;
}

We can then pass this operation to Threads in various ways, e.g.

Thread[] threads = {

    // Pass a lambda to a thread
    new Thread(() -> {
        longOperation();
    }),

    // Pass a method reference to a thread
    new Thread(ThreadGoodies::longOperation)
};

// Start all threads
Arrays.stream(threads).forEach(Thread::start);

// Join all threads
Arrays.stream(threads).forEach(t -> {
    try { t.join(); }
    catch (InterruptedException ignore) {}
});

As we’ve mentioned in our previous blog post, it’s a shame that lambda expressions did not find a lean way to work around checked exceptions. None of the newly added functional interfaces in the java.util.function package allow for throwing checked exceptions, leaving the work up to the call-site.

jool-logo-blackIn our last post, we’ve thus published jOOλ (also jOOL, jOO-Lambda), which wraps each one of the JDK’s functional interfaces in an equivalent functional interface that allows for throwing checked exceptions. This is particularly useful with old JDK APIs, such as JDBC, or the above Thread API. With jOOλ, we can then write:

// Join all threads
Arrays.stream(threads).forEach(Unchecked.consumer(
    t -> t.join()
));

Java 8 improving on Java 5 API

Java’s multi-threading APIs had been pretty dormant up until the release of Java 5’s awesome ExecutorService. Managing threads had been a burden, and people needed external libraries or a J2EE / JEE container to manage thread pools. This has gotten a lot easier with Java 5. We can now submit a Runnable or a Callable to an ExecutorService, which manages its own thread-pool.

Here’s an example how we can leverage these Java 5 concurrency APIs in Java 8:

ExecutorService service = Executors
    .newFixedThreadPool(5);

Future[] answers = {
    service.submit(() -> longOperation()),
    service.submit(ThreadGoodies::longOperation)
};

Arrays.stream(answers).forEach(Unchecked.consumer(
    f -> System.out.println(f.get())
));

Note, how we again use an UncheckedConsumer from jOOλ to wrap the checked exception thrown from the get() call in a RuntimeException.

Parallelism and ForkJoinPool in Java 8

Now, the Java 8 Streams API changes a lot of things in terms of concurrency and parallelism. In Java 8, you can write the following, for instance:

Arrays.stream(new int[]{ 1, 2, 3, 4, 5, 6 })
      .parallel()
      .max()
      .ifPresent(System.out::println);

While it isn’t necessary in this particular case, it’s still interesting to see that the mere calling of parallel() will run the IntStream.max() reduce operation on all available threads of your JDK’s internal ForkJoinPool without you having to worry about the involved ForkJoinTasks. This can be really useful, as not everybody welcomed the JDK 7 ForkJoin API the complexity it has introduced.

Read more about Java 8’s parallel streams in this interesting InfoQ article.

More on Java 8

Parallelism was one of the main driving forces behind the new Streams API. Being able to just set a flag called parallel() on a Stream is marvellous in many situations.

In the last example, we’ve seen the OptionalInt.ifPresent() method that takes an IntConsumer argument to be executed if the previous reduce operation succeeded.

Other languages such as Scala have known an “Option” type to improve NULL handling. We’ve blogged about Optional before, and we’ll reiterate the Java 8 Optional type in the context of Java 8 Streams, so stay tuned!

In the mean time, have a look at Eugen Paraschiv’s awesome Java 8 resources page

Fast File System Operations with Xtend, Lambdas, and ThreadPools

Recently, I’ve blogged about 10 Subtle Best Practices when Coding Java, and I have mentioned that you should start writing SAMs (Single Abstract Method) now, in order to be prepared for Java 8. But there’s another language gem out there, which comes in handy every once in a while, and that’s Eclipse Xtend. Xtend is a “dialect” of the Java language, compiling into Java source code, which then compiles into byte code.

Here’s a quickie showing how easily recursive file system operations can be done with Xtend, Lambdas, and ThreadPools.

class Transform {

  // This is the thread pool performing
  // all the "hard" work
  static ExecutorService ex;

  def static void main(String[] args) {
    // Initialise the thread pool with
    // something meaningful
    ex = Executors::newFixedThreadPool(4);

    // Pass the root directory to the
    // transform method
    val in = new File(...);

    // Recurse into the file transformation
    transform(in);
  }

  def static transform(File in) {

    // Calculate the target file name
    val out = new File(...);

    // Recurse into directories
    if (in.directory) {

      // Pass a FileFilter in the form of an
      // Xtend lambda expression
      for (file : in.listFiles[path |
             !path.name.endsWith(".class")
          && !path.name.endsWith(".zip")
          && !path.name.endsWith(".jar")
        ]) {
        transform(file);
      }
    }
    else {
      // Pass an Xtend lambda expression to
      // the ExecutorService
      ex.submit[ |
        // Read and write could be implemented
        // in Apache Commons IO
        write(out, transform(read(in)));
      ];
    }
  }

  def static transform(String content) {
    // Do the actual string transformation
  }
}

Granted, with Java 8, we’ll get lambdas as well, and that’s awesome. But Xtend has a couple of other nice features that can be seen above:

  • Passing lambdas to a couple of JDK methods, such as File.listFiles() or ExecutorService.submit()
  • Local variable type inference using valvar, or for
  • Method return type inference using def
  • Ability to omit parentheses when passing a lambda to a method
  • Calling getters and setters by convention, e.g. path.name, instead of path.getName(), or in.directory, instead of in.isDirectory()
  • You could also omit semi-colons, although I don’t personally think that’s a good idea.

Xtend is Java 8 already now, which can be very useful for scripts like the above