Using Oracle AQ via Java 8 Streams

One of the most awesome features of the Oracle database is Oracle AQ: Oracle Database Advanced Queuing. The AQ API implements a full fledged, transactional messaging system directly in the database.

In a classic architecture where the database is at the center of your system, with multiple applications (some of which written in Java, others written in Perl or PL/SQL, etc.) accessing the same database, using AQ for inter-process communication is just great. If you’re more on the Java EE side, you might purchase a Java-based MQ solution, and put that message bus / middleware at the center of your system architecture. But why not use the database instead?

How to use the PL/SQL AQ API with jOOQ

The PL/SQL API for AQ message enqueuing and dequeuing is rather simple, and it can be accessed very easily from Java using jOOQ’s OracleDSL.DBMS_AQ API.

The queue configuration used here would look something like:

  ID         NUMBER(7),
  title      VARCHAR2(100 CHAR)

    queue_table => 'message_aq_t',
    queue_payload_type => 'message_t'

    queue_name => 'message_q',
    queue_table => 'message_aq_t'

    queue_name => 'message_q'

And the jOOQ code generator would generate the useful classes with all the type information directly associated with them (simplified example):

class Queues {
    static final Queue<MessageTRecord> MESSAGE_Q = 
        new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);

class MessageTRecord {
    void setId(Integer id) { ... }
    Integer getId() { ... }
    void setTitle(String title) { ... }
    String getTitle() { ... }
        Integer id, String title
    ) { ... }

These classes can then be used to enqueue and dequeue messages type safely directly on the generated queue references:

// The jOOQ configuration
Configuration c = ...

// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q, 
    new MessageTRecord(1, "test"));

// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);

Easy, isn’t it?

Now, let’s leverage Java 8 features

A message queue is nothing other than an infinite (blocking) stream of messages. Since Java 8, we have a formidable API for such message streams, the Stream API.

This is why we have added (for the upcoming jOOQ 3.8) a new API that combines the existing jOOQ AQ API with Java 8 Streams:

// The jOOQ configuration
Configuration c = ...

DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .filter(m -> "test".equals(m.getTitle()))

The above stream pipeline will listen on the MESSAGE_Q queue, consume all messages, filter out messages that do not contain the "test", and print the remaining messages.

Blocking streams

The interesting thing is the fact that this is a blocking, infinite stream. As long as there is no new message in the queue, the stream pipeline processing will simply block on the queue, waiting for new messages. This is not an issue for sequential streams, but when calling Stream.parallel(), what happens then?

jOOQ will consume each message in a transaction. A jOOQ 3.8 transaction runs in a ForkJoinPool.ManagedBlocker:

static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;

        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    public boolean block() {
                        result = supplier.get();
                        return true;

                    public boolean isReleasable() {
                        return result != null;
            catch (InterruptedException e) {
                throw new RuntimeException(e);

            return asyncResult;

This isn’t a lot of magic. A ManagedBlocker runs some special code when it is run by a ForkJoinWorkerThread, making sure the thread’s ForkJoinPool won’t suffer from thread exhaustion and thus from deadlocks. For more info, read this interesting article here:

Or this Stack Overflow answer:

So, if you want a super-fast parallel AQ dequeuing process, just run:

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...

DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .filter(m -> "test".equals(m.getTitle()))

And you’ll have several threads that will dequeue messages in parallel.

Don’t want to wait for jOOQ 3.8?

No problem. Use the current version and wrap the dequeue operation in your own Stream:

Stream<MessageTRecord> stream = Stream.generate(() ->
    DSL.using(config).transactionResult(c ->
        dequeue(c, MESSAGE_Q)


Bonus: Asynchronous dequeuing

While we were at it, another very nice feature of queuing systems is their asynchronicity. With Java 8, a very useful type to model (and compose) asynchronous algorithms is the CompletionStage, and it’s default implementation the CompletableFuture, which executes tasks in the ForkJoinPool again.

Using jOOQ 3.8, you can again simply call

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...

CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q)
       .thenCompose(m -> ...)

Stay tuned for another article on the jOOQ blog soon, where we look into more sophisticated use-cases for asynchronous, blocking SQL statements with jOOQ 3.8 and Java 8

Using Oracle AQ in Java Won’t Get Any Easier Than This

As recently announced in our newsletter, the upcoming jOOQ 3.5 will include an awesome new feature for those of you using the Oracle database: Native support for Oracle AQ! And your client code will be so easy to write, you’ll be putting those AQs all over your database immediately.

How does it work?

jOOQ rationale

The biggest reason why many of our users love jOOQ is our code generator. It generates a Java representation of your database schema, with all the relevant objects that you need when writing SQL. So far, this has included tables, sequences, user-defined-types, packages, procedures.

What’s new is that AQ objects are now also generated and associated with the generated object type.

A simple schema

Let’s consider writing this simple schema (all sources available on GitHub)

  ID         NUMBER(7),
  title      VARCHAR2(100 CHAR),
  language   VARCHAR2(2 CHAR)

  AS VARRAY(32) OF book_t

  ID         NUMBER(7),
  first_name VARCHAR2(100 CHAR),
  last_name  VARCHAR2(100 CHAR),
  books      books_t

  AS VARRAY(32) OF author_t

    queue_table => 'new_author_aq_t',
    queue_payload_type => 'author_t'

    queue_name => 'new_author_aq',
    queue_table => 'new_author_aq_t'

    queue_name => 'new_author_aq'

So, essentially, we have both OBJECT and VARRAY types for books and authors. You might prefer using TABLE types rather than VARRAY types, but for the sake of simplicity, we stick with VARRAY (as it isn’t so easy to use nested TABLE types with AQs in Oracle).

We have also created a queue that notifies listeners every time a new author is added to the database – along with their books. Imagine enqueue operations being done in a trigger on either the author or the book table.

jOOQ-generated code

When you run the jOOQ codegenerator (version 3.5 upwards) against the above schema, you’ll get a new file, which contains:

public class Queues {
    public static final Queue<AuthorT> NEW_AUTHOR_AQ 
      = new QueueImpl<AuthorT>(
         "NEW_AUTHOR_AQ", SP, AUTHOR_T);

Obviously, also the previously shown OBJECT and VARRAY types are also generated by jOOQ, just like lables.

(of course, the actual naming patterns for generated Java code are completely configurable)

Using the generated artefacts

The above code is not really nicely formatted on this blog, but you don’t see any of this in your every day work. Because when you want to enqueue a message to this queue, you can simply write:

// Create a new OBJECT type with nested
// VARRAY type
AuthorT author = new AuthorT(
    new BooksT(
        new BookT(1, "1984", "en"),
        new BookT(2, "Animal Farm", "en")

// ... and simply enqueue that on NEW_AUTHOR_AQ
DBMS_AQ.enqueue(configuration, NEW_AUTHOR_AQ, author);

Seriously? That easy? Yes!

Compare the above to anything you’ve written before through JDBC, or using Oracle’s native APIs. You’ll find a couple of examples about how to serialise / deserialise RAW types, but frankly, queues are awesome because you can send OBJECT types through the database, and we don’t see those examples from Oracle. In fact, trust us, you don’t want to serialise OBJECT, VARRAY, or TABLE types through JDBC. You don’t. That’s our job. We’re hacking JDBC so you don’t have to.

Of course, you can also pass MESSAGE_PROPERTIES_T, ENQUEUE_OPTIONS_T, and DEQUEUE_OPTIONS_T types as arguments to the enqueue() and dequeue() methods.

Dequeuing is just as easy. The following will generate a blocking call and wait for the next AUTHOR_T message to arrive:

AuthorT author =
  DBMS_AQ.dequeue(configuration, NEW_AUTHOR_AQ);

That’s it. Can’t be that hard, can it?

jOOQ: The best way to use Oracle AQ in Java

Goodie: Java 8 and Oracle AQ

With the above simple API and Java 8, we can do what Oracle must’ve known long ago, when they renamed Oracle AQ’s marketing name to Oracle Streams. Let’s create a Java 8 Stream of AQ-produced OBJECT types with jOOQ. Easy as pie. Just write:

static <R extends UDTRecord<R>> Stream<R> stream(
    Configuration c, 
    Queue<R> queue
) {
    return Stream.generate(() -> 
        DBMS_AQ.dequeue(c, queue)

And now, use this beauty like so:

stream(configuration, NEW_AUTHOR_AQ)
    .forEach(author -> {
            author.getFirstName() + " " +

The above statement takes the next 10 messages dequeued this way and prints them to the console.

jOOQ Newsletter: jOOLY 23, 2014 – Only 8 Days Left in jOOLY

subscribe to this newsletter here

Only 8 Days Left in jOOLY

Time is running so fast! The month of jOOLY is almost over – have you taken advantage of our limited-time promotional discount of 20% that we’re offering to all of your purchases in July 2014? And that’s not it, you will also get a free copy of the popular e-book SQL Performance Explained by Markus Winand, a book that we believe belongs on the shelf of every SQL developer.

Act now to get 20% off your next jOOQ purchase!

Tweet of the Day

Our customers, users, and followers are sharing their love for jOOQ with the world and we can hardly catch up with them! Here are:

Álvaro Hernández Tortosa, who has the final word on frameworks that hide SQL, because SQL is really powerful

Calvin Thomas, who Has come to an end of his search for the stack he direly needs. And that consists of AngularJS, Bootstrap, Play, Scala, jOOQ. Well done!

The famous Adam Bien, who explains how to properly use jOOQ in a Java EE context.

Thanks for the shouts, guys! You make the jOOQ experience rock!

New tiered pricing model

In the recent months, we have been having a lot of interesting discussions about our workstation-based pricing model, and how that fits in larger organisations with more fluctuation among team members.

We think of our workstation-based model as particularly fair because the price increases when more value is added – but we have heard the various concerns about simplifying the administration effort for large volumes. This is why we’re now officially offering a tiered pricing model on all subscriptions larger than 10 workstations.

If this is interesting for your organisation, please consider the updated license textcontaining prices (on page 17), or contact us directly.

Of course, if you act quickly, this offering can be combined with the “jOOLY” promotional discount to help you get even more value out of your next purchase!

jOOQ 3.5: Oracle AQ Support

The upcoming jOOQ 3.5 will ship with an extension to the code generator and the API that will make using Oracle AQ with jOOQ as easy as everything else!

Oracle AQ is a very powerful feature when you need to notify your database clients of data changes. Typical use-cases include triggers on updates needing to invalidate a UI cache for an “expensive” value.

If you’re using Oracle AQ with JDBC directly, however, you might be put off by the complexity of binding / loading OBJECT types from CallableStatements. Not with jOOQ.

This is what an enqueue call will look like:

DBMS_AQ.enqueue(conf, QUEUE_NAME, object);

And this is what a dequeue call will look like:

MyObjectType object = DBMS_AQ.dequeue(conf, QUEUE_NAME);

Both the MyObjectType and the QUEUE_NAME reference are generated objects with type information associated with them. This means, you can enqueue / dequeue just as if Java were the same as PL/SQL. Excited? We are!

Community Zone – The jOOQ aficionados have been active!

The jOOQ community has been very active again in the last month. We’re happy to point out these editor’s picks from our radar:

Tired of building with Maven? We’re very happy to announce Etienne Studer’s publication of a fully-functional gradle-jooq-plugin. This is a great community effort for those of you working with jOOQ and Gradle – or even Groovy in general.

Bert van Langen is a passionate DB2 DBA who has given us this excellent introduction to jOOQ on his blog. An alternative tutorial that should get new users started very quickly.

Marco Behler has published a treaties about the Java persistence ghetto (and how jOOQ might change that). We’re very glad to see that Marco is also coming to the same conclusion that we try to repeat time and again: Nothing keeps you from using JPA and SQL (e.g. in the form of jOOQ) in the same project.

Feedback zone

You’ve read to the end of this newsletter, that’s great! Did you like it? What did we do great? What can we improve? What other subjects would you like us to cover?

We’d love to hear from you, so if you want to reach out to us, just drop a message Looking forward to hearing from you!