Reactive SQL with jOOQ 3.15 and R2DBC

One of the biggest new features of the recently released jOOQ 3.15 is its new support for reactive querying via R2DBC. This has been a highly popular feature request, and we finally delivered on it.

You can continue using jOOQ the way you were used to, providing you with type safe, embedded SQL in Java, kotlin, or scala, but your query executions are no longer blocking. Instead, your jOOQ ResultQuery or Query can be used as a Publisher<R> or Publisher<Integer> in the reactive-streams implementation of your choice.

Instead of (or in addition to) configuring your jOOQ DSLContext with a JDBC java.sql.Connection or javax.sql.DataSource, just configure it with an R2DBC io.r2dbc.spi.Connection or io.r2dbc.spi.ConnectionFactory:

ConnectionFactory connectionFactory = ConnectionFactories.get(
    ConnectionFactoryOptions
        .parse("r2dbc:h2:file://localhost/~/r2dbc-test")
        .mutate()
        .option(ConnectionFactoryOptions.USER, "sa")
        .option(ConnectionFactoryOptions.PASSWORD, "")
        .build()
);

DSLContext ctx = DSL.using(connectionFactory);

Alternatively, use Spring Boot to auto-configure jOOQ like this:

Starting from this DSLContext, you can build your queries as always, but instead of calling the usual blocking execute() or fetch() methods, you’ll just wrap the query in a Flux, for example. Assuming you ran the jOOQ code generator on your H2 INFORMATION_SCHEMA, you can now write:

record Table(String schema, String table) {}

Flux.from(ctx
        .select(
            INFORMATION_SCHEMA.TABLES.TABLE_SCHEMA,
            INFORMATION_SCHEMA.TABLES.TABLE_NAME)
        .from(INFORMATION_SCHEMA.TABLES))

    // Type safe mapping from Record2<String, String> to Table::new
    .map(Records.mapping(Table::new))
    .doOnNext(System.out::println)
    .subscribe();

jOOQ will acquire an R2DBC Connection from your ConnectionFactory, and release it again after the query has been executed, allowing for optimised resource management, something that is otherwise a bit tricky with R2DBC and reactor. In other words, the above execution corresponds to this manually written query:

Flux.usingWhen(
        connectionFactory.create(),
        c -> c.createStatement(
                """
                SELECT table_schema, table_name
                FROM information_schema.tables
                """
             ).execute(),
        c -> c.close()
    )
    .flatMap(it -> it.map((r, m) -> 
         new Table(r.get(0, String.class), r.get(1, String.class))
    ))
    .doOnNext(System.out::println)
    .subscribe();

Both will print something like the following:

Table[schema=INFORMATION_SCHEMA, table=TABLE_PRIVILEGES]
Table[schema=INFORMATION_SCHEMA, table=REFERENTIAL_CONSTRAINTS]
Table[schema=INFORMATION_SCHEMA, table=TABLE_TYPES]
Table[schema=INFORMATION_SCHEMA, table=QUERY_STATISTICS]
Table[schema=INFORMATION_SCHEMA, table=TABLES]
Table[schema=INFORMATION_SCHEMA, table=SESSION_STATE]
Table[schema=INFORMATION_SCHEMA, table=HELP]
Table[schema=INFORMATION_SCHEMA, table=COLUMN_PRIVILEGES]
Table[schema=INFORMATION_SCHEMA, table=SYNONYMS]
Table[schema=INFORMATION_SCHEMA, table=SESSIONS]
Table[schema=INFORMATION_SCHEMA, table=IN_DOUBT]
Table[schema=INFORMATION_SCHEMA, table=USERS]
Table[schema=INFORMATION_SCHEMA, table=COLLATIONS]
Table[schema=INFORMATION_SCHEMA, table=SCHEMATA]
Table[schema=INFORMATION_SCHEMA, table=TABLE_CONSTRAINTS]
Table[schema=INFORMATION_SCHEMA, table=INDEXES]
Table[schema=INFORMATION_SCHEMA, table=ROLES]
Table[schema=INFORMATION_SCHEMA, table=FUNCTION_COLUMNS]
Table[schema=INFORMATION_SCHEMA, table=CONSTANTS]
Table[schema=INFORMATION_SCHEMA, table=SEQUENCES]
Table[schema=INFORMATION_SCHEMA, table=RIGHTS]
Table[schema=INFORMATION_SCHEMA, table=FUNCTION_ALIASES]
Table[schema=INFORMATION_SCHEMA, table=CATALOGS]
Table[schema=INFORMATION_SCHEMA, table=CROSS_REFERENCES]
Table[schema=INFORMATION_SCHEMA, table=SETTINGS]
Table[schema=INFORMATION_SCHEMA, table=DOMAINS]
Table[schema=INFORMATION_SCHEMA, table=KEY_COLUMN_USAGE]
Table[schema=INFORMATION_SCHEMA, table=LOCKS]
Table[schema=INFORMATION_SCHEMA, table=COLUMNS]
Table[schema=INFORMATION_SCHEMA, table=TRIGGERS]
Table[schema=INFORMATION_SCHEMA, table=VIEWS]
Table[schema=INFORMATION_SCHEMA, table=TYPE_INFO]
Table[schema=INFORMATION_SCHEMA, table=CONSTRAINTS]

Note that if you’re using JDBC and not R2DBC, you can continue to use the jOOQ API with your reactive streams libraries in a blocking manner in exactly the same way as above, e.g. if your favourite RDBMS does not yet support a reactive R2DBC driver. Currently supported drivers according to r2dbc.io include:

All of which we integration test with jOOQ 3.15+.

A runnable example

Go play with the example here: https://github.com/jOOQ/jOOQ/tree/main/jOOQ-examples/jOOQ-r2dbc-example

It uses the following schema:

CREATE TABLE r2dbc_example.author (
  id INT NOT NULL AUTO_INCREMENT,
  first_name VARCHAR(100) NOT NULL,
  last_name VARCHAR(100) NOT NULL,
  
  CONSTRAINT pk_author PRIMARY KEY (id)
);

CREATE TABLE r2dbc_example.book (
  id INT NOT NULL AUTO_INCREMENT,
  author_id INT NOT NULL,
  title VARCHAR(100) NOT NULL,
  
  CONSTRAINT pk_book PRIMARY KEY (id),
  CONSTRAINT fk_book_author FOREIGN KEY (id) 
    REFERENCES r2dbc_example.author
);

And runs this code

Flux.from(ctx
        .insertInto(AUTHOR)
        .columns(AUTHOR.FIRST_NAME, AUTHOR.LAST_NAME)
        .values("John", "Doe")
        .returningResult(AUTHOR.ID))
    .flatMap(id -> ctx
        .insertInto(BOOK)
        .columns(BOOK.AUTHOR_ID, BOOK.TITLE)
        .values(id.value1(), "Fancy Book"))
    .thenMany(ctx
        .select(
             BOOK.author().FIRST_NAME, 
             BOOK.author().LAST_NAME, 
             BOOK.TITLE)
        .from(BOOK))
    .doOnNext(System.out::println)
    .subscribe();

To insert two records and fetch the joined result as follows:

+----------+---------+----------+
|FIRST_NAME|LAST_NAME|TITLE     |
+----------+---------+----------+
|John      |Doe      |Fancy Book|
+----------+---------+----------+