One of the most awesome features of the Oracle database is
Oracle AQ: Oracle Database Advanced Queuing. The AQ API implements a full fledged, transactional messaging system directly in the database.
In a classic architecture where the database is at the center of your system, with multiple applications (some of which written in Java, others written in Perl or PL/SQL, etc.) accessing the same database, using AQ for inter-process communication is just great. If you’re more on the Java EE side, you might purchase a Java-based MQ solution, and put that message bus / middleware at the center of your system architecture.
But why not use the database instead?
How to use the PL/SQL AQ API with jOOQ
The PL/SQL API for AQ message enqueuing and dequeuing is rather simple, and it can be accessed very easily from Java using jOOQ’s
OracleDSL.DBMS_AQ
API.
The queue configuration used here would look something like:
CREATE OR REPLACE TYPE message_t AS OBJECT (
ID NUMBER(7),
title VARCHAR2(100 CHAR)
)
/
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table => 'message_aq_t',
queue_payload_type => 'message_t'
);
DBMS_AQADM.CREATE_QUEUE(
queue_name => 'message_q',
queue_table => 'message_aq_t'
);
DBMS_AQADM.START_QUEUE(
queue_name => 'message_q'
);
COMMIT;
END;
/
And the jOOQ code generator would generate the useful classes with all the type information directly associated with them (simplified example):
class Queues {
static final Queue<MessageTRecord> MESSAGE_Q =
new QueueImpl<>("NEW_AUTHOR_AQ", MESSAGE_T);
}
class MessageTRecord {
void setId(Integer id) { ... }
Integer getId() { ... }
void setTitle(String title) { ... }
String getTitle() { ... }
MessageTRecord(
Integer id, String title
) { ... }
}
These classes can then be used to enqueue and dequeue messages type safely directly on the generated queue references:
// The jOOQ configuration
Configuration c = ...
// Enqueue a message
DBMS_AQ.enqueue(c, MESSAGE_Q,
new MessageTRecord(1, "test"));
// Dequeue it again
MessageTRecord message = DBMS_AQ.dequeue(c, MESSAGE_Q);
Easy, isn’t it?
Now, let’s leverage Java 8 features
A message queue is nothing other than an infinite (blocking) stream of messages. Since Java 8, we have a formidable API for such message streams, the
Stream API.
This is why we have added (for the upcoming jOOQ 3.8) a new API that combines the existing jOOQ AQ API with Java 8 Streams:
// The jOOQ configuration
Configuration c = ...
DBMS_AQ.dequeueStream(c, MESSAGE_Q)
.filter(m -> "test".equals(m.getTitle()))
.forEach(System.out::println);
The above stream pipeline will listen on the
MESSAGE_Q
queue, consume all messages, filter out messages that do not contain the
"test"
, and print the remaining messages.
Blocking streams
The interesting thing is the fact that this is a blocking, infinite stream. As long as there is no new message in the queue, the stream pipeline processing will simply block on the queue, waiting for new messages. This is not an issue for sequential streams, but when calling
Stream.parallel()
, what happens then?
jOOQ will consume each message in a transaction. A jOOQ 3.8 transaction runs in a
ForkJoinPool.ManagedBlocker
:
static <T> Supplier<T> blocking(Supplier<T> supplier) {
return new Supplier<T>() {
volatile T result;
@Override
public T get() {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() {
result = supplier.get();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return asyncResult;
}
};
}
This isn’t a lot of magic. A
ManagedBlocker
runs some special code when it is run by a
ForkJoinWorkerThread
, making sure the thread’s
ForkJoinPool
won’t suffer from thread exhaustion and thus from deadlocks. For more info, read this interesting article here:
http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health
Or this Stack Overflow answer:
https://stackoverflow.com/a/35272153/521799
So, if you want a super-fast parallel AQ dequeuing process, just run:
// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...
DBMS_AQ.dequeueStream(c, MESSAGE_Q)
.parallel()
.filter(m -> "test".equals(m.getTitle()))
.forEach(System.out::println);
And you’ll have several threads that will dequeue messages in parallel.
Don’t want to wait for jOOQ 3.8?
No problem. Use the current version and wrap the
dequeue
operation in your own
Stream
:
Stream<MessageTRecord> stream = Stream.generate(() ->
DSL.using(config).transactionResult(c ->
dequeue(c, MESSAGE_Q)
)
);
Done.
Bonus: Asynchronous dequeuing
While we were at it, another very nice feature of queuing systems is their asynchronicity. With Java 8, a very useful type to model (and compose) asynchronous algorithms is the
CompletionStage
, and it’s default implementation the
CompletableFuture
, which executes tasks in the
ForkJoinPool
again.
Using jOOQ 3.8, you can again simply call
// The jOOQ configuration. Make sure its referenced
// ConnectionPool has enough connections
Configuration c = ...
CompletionStage<MessageTRecord> stage =
DBMS_AQ.dequeueAsync(c, MESSAGE_Q)
.thenCompose(m -> ...)
...;
Stay tuned for another article on the
jOOQ blog soon, where we look into more sophisticated use-cases for asynchronous, blocking SQL statements with jOOQ 3.8 and Java 8
Like this:
Like Loading...
Any plans to add support for PostgreSQL’s LISTEN/NOTIFY functionality which works in a similar fashion?
I’ve investigated this. Unfortunately, JDBC support for LISTEN / NOTIFY is rather unsatisfactory. Polling of the database is required, and the polling does not block on the channel, so I’m out of ideas how to implement this nicely…
Is that how the pgjdbc-ng driver works for this feature? By polling? My understanding was that the pgjdb-ng driver could make this work in a reactive manner.
I think that pgjdbc-ng gets it right, but jOOQ cannot rely on this driver. In any case, the PostgreSQL wire protocol allows for true asynchronicity (not only for LISTEN / NOTIFY). Unfortunately, the official JDBC driver lacks a lot of functionality (and I mean a lot)
Yeah, JDBC is pretty good for the 80% mean, but not so great for the long tail… Thanks for the quick response!
Don’t tell me. I’m wrestling the ojdbc extensions almost every day :)