Using Oracle AQ via Java 8 Streams

One of the most awesome features of the Oracle database is Oracle AQ: Oracle Database Advanced Queuing. The AQ API implements a full fledged, transactional messaging system directly in the database.

In a classic architecture where the database is at the center of your system, with multiple applications (some of which written in Java, others written in Perl or PL/SQL, etc.) accessing the same database, using AQ for inter-process communication is just great. If you’re more on the Java EE side, you might purchase a Java-based MQ solution, and put that message bus / middleware at the center of your system architecture. But why not use the database instead?

How to use the PL/SQL AQ API with jOOQ

The PL/SQL API for AQ message enqueuing and dequeuing is rather simple, and it can be accessed very easily from Java using jOOQ’s OracleDSL.DBMS_AQ API.

The queue configuration used here would look something like:

CREATE OR REPLACE TYPE message_t AS OBJECT (
  ID         NUMBER(7),
  title      VARCHAR2(100 CHAR)
)
/

BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE(
    queue_table => 'message_aq_t',
    queue_payload_type => 'message_t'
  );

  DBMS_AQADM.CREATE_QUEUE(
    queue_name => 'message_q',
    queue_table => 'message_aq_t'
  );

  DBMS_AQADM.START_QUEUE(
    queue_name => 'message_q'
  );
  COMMIT;
END;
/

And the jOOQ code generator would generate the useful classes with all the type information directly associated with them (simplified example):

class Queues {
    static final Queue<MessageTRecord> MESSAGE_Q = 
        new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);
}

class MessageTRecord {
    void setId(Integer id) { ... }
    Integer getId() { ... }
    void setTitle(String title) { ... }
    String getTitle() { ... }
    MessageTRecord(
        Integer id, String title
    ) { ... }
}

These classes can then be used to enqueue and dequeue messages type safely directly on the generated queue references:

// The jOOQ configuration
Configuration c = ...

// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q, 
    new MessageTRecord(1, "test"));

// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);

Easy, isn’t it?

Now, let’s leverage Java 8 features

A message queue is nothing other than an infinite (blocking) stream of messages. Since Java 8, we have a formidable API for such message streams, the Stream API.

This is why we have added (for the upcoming jOOQ 3.8) a new API that combines the existing jOOQ AQ API with Java 8 Streams:

// The jOOQ configuration
Configuration c = ...

DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .filter(m -> "test".equals(m.getTitle()))
       .forEach(System.out::println);

The above stream pipeline will listen on the MESSAGE_Q queue, consume all messages, filter out messages that do not contain the "test", and print the remaining messages.

Blocking streams

The interesting thing is the fact that this is a blocking, infinite stream. As long as there is no new message in the queue, the stream pipeline processing will simply block on the queue, waiting for new messages. This is not an issue for sequential streams, but when calling Stream.parallel(), what happens then?

jOOQ will consume each message in a transaction. A jOOQ 3.8 transaction runs in a ForkJoinPool.ManagedBlocker:

static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;

        @Override
        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    @Override
                    public boolean block() {
                        result = supplier.get();
                        return true;
                    }

                    @Override
                    public boolean isReleasable() {
                        return result != null;
                    }
                });
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return asyncResult;
        }
    };
}

This isn’t a lot of magic. A ManagedBlocker runs some special code when it is run by a ForkJoinWorkerThread, making sure the thread’s ForkJoinPool won’t suffer from thread exhaustion and thus from deadlocks. For more info, read this interesting article here:
http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health

Or this Stack Overflow answer:
http://stackoverflow.com/a/35272153/521799

So, if you want a super-fast parallel AQ dequeuing process, just run:

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...

DBMS_AQ.dequeueStream(c, MESSAGE_Q)
       .parallel()
       .filter(m -> "test".equals(m.getTitle()))
       .forEach(System.out::println);

And you’ll have several threads that will dequeue messages in parallel.

Don’t want to wait for jOOQ 3.8?

No problem. Use the current version and wrap the dequeue operation in your own Stream:

Stream<MessageTRecord> stream = Stream.generate(() ->
    DSL.using(config).transactionResult(c ->
        dequeue(c, MESSAGE_Q)
    )
);

Done.

Bonus: Asynchronous dequeuing

While we were at it, another very nice feature of queuing systems is their asynchronicity. With Java 8, a very useful type to model (and compose) asynchronous algorithms is the CompletionStage, and it’s default implementation the CompletableFuture, which executes tasks in the ForkJoinPool again.

Using jOOQ 3.8, you can again simply call

// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...

CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q)
       .thenCompose(m -> ...)
       ...;

Stay tuned for another article on the jOOQ blog soon, where we look into more sophisticated use-cases for asynchronous, blocking SQL statements with jOOQ 3.8 and Java 8

jOOQ Newsletter: jOOLY 23, 2014 – Only 8 Days Left in jOOLY

subscribe to this newsletter here

Only 8 Days Left in jOOLY

Time is running so fast! The month of jOOLY is almost over – have you taken advantage of our limited-time promotional discount of 20% that we’re offering to all of your purchases in July 2014? And that’s not it, you will also get a free copy of the popular e-book SQL Performance Explained by Markus Winand, a book that we believe belongs on the shelf of every SQL developer.

Act now to get 20% off your next jOOQ purchase!

Tweet of the Day

Our customers, users, and followers are sharing their love for jOOQ with the world and we can hardly catch up with them! Here are:

Álvaro Hernández Tortosa, who has the final word on frameworks that hide SQL, because SQL is really powerful

Calvin Thomas, who Has come to an end of his search for the stack he direly needs. And that consists of AngularJS, Bootstrap, Play, Scala, jOOQ. Well done!

The famous Adam Bien, who explains how to properly use jOOQ in a Java EE context.

Thanks for the shouts, guys! You make the jOOQ experience rock!

New tiered pricing model

In the recent months, we have been having a lot of interesting discussions about our workstation-based pricing model, and how that fits in larger organisations with more fluctuation among team members.

We think of our workstation-based model as particularly fair because the price increases when more value is added – but we have heard the various concerns about simplifying the administration effort for large volumes. This is why we’re now officially offering a tiered pricing model on all subscriptions larger than 10 workstations.

If this is interesting for your organisation, please consider the updated license textcontaining prices (on page 17), or contact us directly.

Of course, if you act quickly, this offering can be combined with the “jOOLY” promotional discount to help you get even more value out of your next purchase!

jOOQ 3.5: Oracle AQ Support

The upcoming jOOQ 3.5 will ship with an extension to the code generator and the API that will make using Oracle AQ with jOOQ as easy as everything else!

Oracle AQ is a very powerful feature when you need to notify your database clients of data changes. Typical use-cases include triggers on updates needing to invalidate a UI cache for an “expensive” value.

If you’re using Oracle AQ with JDBC directly, however, you might be put off by the complexity of binding / loading OBJECT types from CallableStatements. Not with jOOQ.

This is what an enqueue call will look like:

DBMS_AQ.enqueue(conf, QUEUE_NAME, object);

And this is what a dequeue call will look like:

MyObjectType object = DBMS_AQ.dequeue(conf, QUEUE_NAME);

Both the MyObjectType and the QUEUE_NAME reference are generated objects with type information associated with them. This means, you can enqueue / dequeue just as if Java were the same as PL/SQL. Excited? We are!

Community Zone – The jOOQ aficionados have been active!

The jOOQ community has been very active again in the last month. We’re happy to point out these editor’s picks from our radar:

Tired of building with Maven? We’re very happy to announce Etienne Studer’s publication of a fully-functional gradle-jooq-plugin. This is a great community effort for those of you working with jOOQ and Gradle – or even Groovy in general.

Bert van Langen is a passionate DB2 DBA who has given us this excellent introduction to jOOQ on his blog. An alternative tutorial that should get new users started very quickly.

Marco Behler has published a treaties about the Java persistence ghetto (and how jOOQ might change that). We’re very glad to see that Marco is also coming to the same conclusion that we try to repeat time and again: Nothing keeps you from using JPA and SQL (e.g. in the form of jOOQ) in the same project.

Feedback zone

You’ve read to the end of this newsletter, that’s great! Did you like it? What did we do great? What can we improve? What other subjects would you like us to cover?

We’d love to hear from you, so if you want to reach out to us, just drop a message tocontact@datageekery.com. Looking forward to hearing from you!