We’re excited to announce another very interesting guest post on the
jOOQ Blog by
John Mcclean from
AOL.
AOL is a global digital media and technology company, founded in 1985 and once known as America Online, AOL is now part of the Verizon Group. AOL focuses on four areas – video, mobile, ad technology and platforms, and open ecosystems. AOL connects publishers with advertisers across their global, programmatic platforms, tapping into Microsoft inventory and original content brands like TechCrunch, The Huffington Post and MAKERS.
John is an Architect at AOL. He works in the ad tech and platforms group, where he leads the advertising demand side forecasting team. A team that builds and runs a system that processes billions of RTB, impression and viewability records in realtime to generate price volume curves and other forecasts for advertising campaigns in milliseconds. John is also the lead developer for AOL open source projects cyclops-react and Microserver. Extracted from AOL’s forecasting system these projects allow AOL to rapidly deploy new features that work at scale, by guiding Java developers along the path of functional, reactive, microservices.
What is Cyclops-react?
The arrival of Lambda expressions and default methods in Java 8 heralded the biggest structural changes to the Java language in a decade. Building on top of this were some new cool APIs, such as
Stream, Optional, CompletableFuture
– finally Java developers could code in a more functional style. While this was very welcome, for many the enhancements did not quite go far enough.
Stream, Optional, CompletableFuture
all share the same abstract structure and obey the same rules. Yet the APIs don’t agree on common method names, never mind provide a common interface. For example
Stream#map
/
Optional#map
becomes
CompletableFuture#thenApply
. Also, the functionality added to
Stream & Optional
is missing from collections generally. Where is
List#map
?
The JDK Stream implementation performs well, is totally lazy and well designed for extension, but provides only a limited subset of potential operators (constrained, perhaps, by a focus on data parallelism). Into the void stepped libraries such as
jOOλ with its sequential Stream extension (called
Seq
).
Seq
adds many additional Streaming operators. jOOλ generally adds many missing functional features such as Tuples.
A core goal of
cyclops-react, as well as adding original features such as
FutureStreams, is to provide a mechanism for joining up both the JDK APIs and the third party functional libraries. There was a Cambrian explosion of cool libraries that emerged after the launch of Java 8. Libraries like
vavr &
Project Reactor. cyclops-react does this in the first instance by extending the JDK, and by leveraging other libraries such as
jOOλ,
pCollections &
Agrona. These libraries in turn also extend JDK interfaces where possible to add features such as Persistent Collections and wait free Many Producer Single Consumer Queues.
Beyond reusing and extending JDK interfaces our aims were to make it easy for developers to integrate with external libraries by making use of third party standards such as the reactive-streams API and by building our own abstractions where no set standard existed. The libraries we currently focus on integrating with are Google’s Guava, RxJava, Functional Java, Project Reactor and
vavr. We’ve created abstractions for wrapping types like
Stream, Optional & CompletableFuture
– where no interface existed or was possible before. We chose these goals, because we are using cyclops-react in production across a Microservices architecture and being able to leverage the right technology for a problem and have it integrate smoothly with the rest of our code base is critical.
cyclops-react is quite a large feature rich project, and in addition has a number of
integration modules. In the article below I’ll cover some of the available features with a particular goal of showing how cyclops-react helps join up the dots across the JDK and into the brave new world of the pace setting Java 8 open source community.
Extending the JDK
cyclops-react extends JDK APIs where possible. For example
ReactiveSeq
adds functionality for handling errors, asynchronous processing and much more extends extends both JDK Stream and jOOλ’s Seq. cyclops-react Collection extensions, rather than creating new collection implementations, implement and extend the appropriate JDK interfaces. cyclops-react
LazyFutureStream
in turn extends
ReactiveSeq
, and allows aggregate operations over Streams of Futures as if it were a simple Stream (this proves to be very useful for handling a large number typical Java I/O operations asynchronously and performantly).
ListX
extends
List
, but adds operators that execute eagerly
ListX<Integer> tenTimes = ListX.of(1,2,3,4)
.map(i->i*10);
cyclops-react adds lots of operators for users to explore. We can, for example, apply functions across multiple collections at the same time
The reactive-streams API acts as a natural bridge between producers (publishers) of data and consumers (subscribers). All cyclops-react data types implement the
Publisher
interface from reactive-streams, and
Subscriber
implementations that can convert to any cyclops-react type are provided also. This makes direct integration with other reactive-streams based libraries, such as Project Reactor straightforward.
For example we can lazily populate a Reactor Flux from any cyclops publisher, such as
SortedSetX
, or populate a cyclops-react type from a Reactor type.
Flux<Integer> stream = Flux.from(
SortedSetX.of(1,2,3,4,5,6,7,8));
//Flux[1,2,3,4,5,6,7,8]
ListX<Character> list = ListX.fromPublisher(
Flux.just("a","b","c"));
Reactor Flux and Mono types can work directly with cyclops-react
For
comprehensions (each supported library also has their own set of native
For
comprehension classes in their integration module).
// import static com.aol.cyclops.control.For.*;
Publishers.each2(
Flux.just(1,2,3),
i -> ReactiveSeq.range(i,5),Tuple::tuple).printOut();
/*
(1, 1)
(1, 2)
(1, 3)
(1, 4)
(2, 2)
(2, 3)
(2, 4)
(3, 3)
(3, 4)
*/
A
For
comprehension is a way of managing nested iteration over types with flatMap and map methods, by cascading calls to the appropriate methods. In cyclops-react, nested statements can access the elements of the previous statements, so
For
comprehensions can be a very useful way of managing the behavior of existing. For example to ensure that calls to existing methods findId and loadData which may return null values, and will throw NPEs if provided with a null parameter we can make use of a
For
comprehension that will safely execute loadData only when an Optional with a value is returned from findId()
List<Data> data =
For.optional(findId())
.optional(this::loadData);
//loadData is only called if findId() returns a value
Similarly, a type such as Try could be used to handle exceptional results from either findId or loadData, Futures can be used to execute chained methods asynchronously and so on.
Building cross-library abstractions
Java 8 introduced Monads to Java (
Stream, Optional, CompletableFuture
), but didn’t provide a common interface that would help reuse, in fact the method names used in
CompletableFuture
differ significantly from those used in
Optional & Stream
for the same function. So
map
became
thenApply
and
flatMap thenCompose
. Across the Java 8 world monads are becoming an increasingly common pattern, but there is often no way to abstract across them. In cyclops-react, rather than attempt to define an interface to represent monads, we built a set of wrapper interfaces and a number of custom adapters to adapt different instances from across the main functional-style libraries for Java 8 to those wrappers. The wrappers extend
AnyM
(short for Any Monad) and there are two sub-interfaces –
AnyMValue
which represents any monadic type that resolves to a single value (like
Optional
or
CompletableFuture
) or
AnyMSeq
that ultimately resolves to a sequence of values (like a Stream or List). The cyclops extension wrappers provide a mechanism to wrap the types from RxJava, Guava, Reactor, FunctionalJava and vavr.
//We can wrap any type from Reactor, RxJava,
//FunctionalJava, vavr, Guava
AnyMSeq<Integer> wrapped =
Fj.list(List.list(1,2,3,4,5));
//And manipulate it
AnyMSeq<Integer> timesTen = wrapped.map(i->i*10);
cyclops-react provides a common set of interfaces that these wrappers (and other cyclops-react types) inherit from, allowing developers to write more generic reusable code.
AnyM
extends reactive-streams publishers, meaning you can make any vavr, Guava, FunctionalJava or RxJava type a reactive-streams publisher with cyclops-react.
AnyMSeq<Integer> wrapped =
Javaslang.traversable(List.of(1,2,3,4,5));
//The wrapped type is a reactive-streams publisher
Flux<Integer> fromVavr = Flux.from(wrapped);
wrapped.forEachWithError(
System.out::println,
System.out::err);
Furthermore the reactive functionality from cyclops-react is provided directly on the AnyM types. This means we can, for example, schedule data emission from a vavr or FunctionalJava Stream – or execute a reduce operation lazily, or asynchronously.
AnyMSeq<Integer> wrapped =
Javaslang.traversable(Stream.of(1,2,3,4,5));
CompletableFuture<Integer> asyncResult =
wrapped.futureOperations(Executors.newFixedThreadPool(1))
.reduce(50, (acc, next) -> acc + next);
//CompletableFuture[1550]
AnyMSeq<Integer> wrapped =
FJ.list(list.list(1,2,3,4,5));
Eval<Integer> lazyResult =
wrapped.map(i -> i * 10)
.lazyOperations()
.reduce(50, (acc,next) -> acc + next);
//Eval[15500]
HotStream<Integer> emitting = wrapped.schedule(
"0 * * * * ?",
Executors.newScheduledThreadPool(1));
emitting.connect()
.debounce(1,TimeUnit.DAYS)
.forEachWithError(
this::logSuccess,
this::logFailure);
Theres a lot to explore both in cyclops-react and in the new broader Java 8 eco-system, hopefully you’ll have a fun adventure playing with, learning from and extending the Java 8 boundaries yourself!
Like this:
Like Loading...
Awesome library, very interesting.