What is Cyclops-react?The arrival of Lambda expressions and default methods in Java 8 heralded the biggest structural changes to the Java language in a decade. Building on top of this were some new cool APIs, such as
Stream, Optional, CompletableFuture– finally Java developers could code in a more functional style. While this was very welcome, for many the enhancements did not quite go far enough.
Stream, Optional, CompletableFutureall share the same abstract structure and obey the same rules. Yet the APIs don’t agree on common method names, never mind provide a common interface. For example
CompletableFuture#thenApply. Also, the functionality added to
Stream & Optionalis missing from collections generally. Where is
List#map? The JDK Stream implementation performs well, is totally lazy and well designed for extension, but provides only a limited subset of potential operators (constrained, perhaps, by a focus on data parallelism). Into the void stepped libraries such as jOOλ with its sequential Stream extension (called
Seqadds many additional Streaming operators. jOOλ generally adds many missing functional features such as Tuples. A core goal of cyclops-react, as well as adding original features such as FutureStreams, is to provide a mechanism for joining up both the JDK APIs and the third party functional libraries. There was a Cambrian explosion of cool libraries that emerged after the launch of Java 8. Libraries like vavr & Project Reactor. cyclops-react does this in the first instance by extending the JDK, and by leveraging other libraries such as jOOλ, pCollections & Agrona. These libraries in turn also extend JDK interfaces where possible to add features such as Persistent Collections and wait free Many Producer Single Consumer Queues. Beyond reusing and extending JDK interfaces our aims were to make it easy for developers to integrate with external libraries by making use of third party standards such as the reactive-streams API and by building our own abstractions where no set standard existed. The libraries we currently focus on integrating with are Google’s Guava, RxJava, Functional Java, Project Reactor and vavr. We’ve created abstractions for wrapping types like
Stream, Optional & CompletableFuture– where no interface existed or was possible before. We chose these goals, because we are using cyclops-react in production across a Microservices architecture and being able to leverage the right technology for a problem and have it integrate smoothly with the rest of our code base is critical. cyclops-react is quite a large feature rich project, and in addition has a number of integration modules. In the article below I’ll cover some of the available features with a particular goal of showing how cyclops-react helps join up the dots across the JDK and into the brave new world of the pace setting Java 8 open source community.
Extending the JDKcyclops-react extends JDK APIs where possible. For example
ReactiveSeqadds functionality for handling errors, asynchronous processing and much more extends extends both JDK Stream and jOOλ’s Seq. cyclops-react Collection extensions, rather than creating new collection implementations, implement and extend the appropriate JDK interfaces. cyclops-react
LazyFutureStreamin turn extends
ReactiveSeq, and allows aggregate operations over Streams of Futures as if it were a simple Stream (this proves to be very useful for handling a large number typical Java I/O operations asynchronously and performantly).
List, but adds operators that execute eagerly
cyclops-react adds lots of operators for users to explore. We can, for example, apply functions across multiple collections at the same time The reactive-streams API acts as a natural bridge between producers (publishers) of data and consumers (subscribers). All cyclops-react data types implement the
ListX<Integer> tenTimes = ListX.of(1,2,3,4) .map(i->i*10);
Publisherinterface from reactive-streams, and
Subscriberimplementations that can convert to any cyclops-react type are provided also. This makes direct integration with other reactive-streams based libraries, such as Project Reactor straightforward. For example we can lazily populate a Reactor Flux from any cyclops publisher, such as
SortedSetX, or populate a cyclops-react type from a Reactor type.
Reactor Flux and Mono types can work directly with cyclops-react
Flux<Integer> stream = Flux.from( SortedSetX.of(1,2,3,4,5,6,7,8)); //Flux[1,2,3,4,5,6,7,8] ListX<Character> list = ListX.fromPublisher( Flux.just("a","b","c"));
Forcomprehensions (each supported library also has their own set of native
Forcomprehension classes in their integration module).
// import static com.aol.cyclops.control.For.*; Publishers.each2( Flux.just(1,2,3), i -> ReactiveSeq.range(i,5),Tuple::tuple).printOut(); /* (1, 1) (1, 2) (1, 3) (1, 4) (2, 2) (2, 3) (2, 4) (3, 3) (3, 4) */
Forcomprehension is a way of managing nested iteration over types with flatMap and map methods, by cascading calls to the appropriate methods. In cyclops-react, nested statements can access the elements of the previous statements, so
Forcomprehensions can be a very useful way of managing the behavior of existing. For example to ensure that calls to existing methods findId and loadData which may return null values, and will throw NPEs if provided with a null parameter we can make use of a
Forcomprehension that will safely execute loadData only when an Optional with a value is returned from findId()
Similarly, a type such as Try could be used to handle exceptional results from either findId or loadData, Futures can be used to execute chained methods asynchronously and so on.
List<Data> data = For.optional(findId()) .optional(this::loadData); //loadData is only called if findId() returns a value
Building cross-library abstractionsJava 8 introduced Monads to Java (
Stream, Optional, CompletableFuture), but didn’t provide a common interface that would help reuse, in fact the method names used in
CompletableFuturediffer significantly from those used in
Optional & Streamfor the same function. So
flatMap thenCompose. Across the Java 8 world monads are becoming an increasingly common pattern, but there is often no way to abstract across them. In cyclops-react, rather than attempt to define an interface to represent monads, we built a set of wrapper interfaces and a number of custom adapters to adapt different instances from across the main functional-style libraries for Java 8 to those wrappers. The wrappers extend
AnyM(short for Any Monad) and there are two sub-interfaces –
AnyMValuewhich represents any monadic type that resolves to a single value (like
AnyMSeqthat ultimately resolves to a sequence of values (like a Stream or List). The cyclops extension wrappers provide a mechanism to wrap the types from RxJava, Guava, Reactor, FunctionalJava and vavr.
cyclops-react provides a common set of interfaces that these wrappers (and other cyclops-react types) inherit from, allowing developers to write more generic reusable code.
//We can wrap any type from Reactor, RxJava, //FunctionalJava, vavr, Guava AnyMSeq<Integer> wrapped = Fj.list(List.list(1,2,3,4,5)); //And manipulate it AnyMSeq<Integer> timesTen = wrapped.map(i->i*10);
AnyMextends reactive-streams publishers, meaning you can make any vavr, Guava, FunctionalJava or RxJava type a reactive-streams publisher with cyclops-react.
Furthermore the reactive functionality from cyclops-react is provided directly on the AnyM types. This means we can, for example, schedule data emission from a vavr or FunctionalJava Stream – or execute a reduce operation lazily, or asynchronously.
AnyMSeq<Integer> wrapped = Javaslang.traversable(List.of(1,2,3,4,5)); //The wrapped type is a reactive-streams publisher Flux<Integer> fromVavr = Flux.from(wrapped); wrapped.forEachWithError( System.out::println, System.out::err);
Theres a lot to explore both in cyclops-react and in the new broader Java 8 eco-system, hopefully you’ll have a fun adventure playing with, learning from and extending the Java 8 boundaries yourself!
AnyMSeq<Integer> wrapped = Javaslang.traversable(Stream.of(1,2,3,4,5)); CompletableFuture<Integer> asyncResult = wrapped.futureOperations(Executors.newFixedThreadPool(1)) .reduce(50, (acc, next) -> acc + next); //CompletableFuture AnyMSeq<Integer> wrapped = FJ.list(list.list(1,2,3,4,5)); Eval<Integer> lazyResult = wrapped.map(i -> i * 10) .lazyOperations() .reduce(50, (acc,next) -> acc + next); //Eval HotStream<Integer> emitting = wrapped.schedule( "0 * * * * ?", Executors.newScheduledThreadPool(1)); emitting.connect() .debounce(1,TimeUnit.DAYS) .forEachWithError( this::logSuccess, this::logFailure);