两个非阻塞SQL数据库访问库的示例是Vert.X SQL和R2DBC。示例将基于PostgreSQL和Java反应性包装器。

让我们从最主要的东西开始:JDBC是一个很好的标准。服务,并忠实地服务。
但是新趋势要求新解决方案。有时推送JDBC甚至很有意义。
我建议看看两种从JVM应用程序访问SQL数据库的替代实现及其(实现)适用性。
, . — , .
. , (PostgreSQL 12) . , , , , , .
, JDK — OpenJDK 11, , Oracle JDK 8, .
pg_sleep(seconds) PostgreSQL.
— (netty). patch-, . , netty , , HTTP-.
, . . , JDBC, . , , JDBC. — , ThreadLocal , .
, , GitHub.
, . , -, , "" . , - - , "" .
. .
— , . , . , .
, "", — . - . , , . , — . , — ? , API . API JDBC, , — JPA, JOOQ, Hibernate ORM .
- , - " " — , . , . — ( ). , , . " " , — , , "" .
1: , - ( — , ) . , ( ) , (, ).
2: , . , . - , , REST , . "" , .
, "" "", - . "" - . — . -, "" , . , — -, .
, — . , , , — " 2". , .
. .
: , / ( Spring Data R2DBC ). JDBC — JPA, JOOQ, Hibernate ORM, . , , , . , — , .
: . :
Vert.X SQL (PG) Client.
Eclipse, GitHub Postgres.
:
- PostgreSQL, MySQL, MSSQL, DB2
- callback, RxJava2. Kotlin, .
- vert.x , Quarkus.
- .
→
H2, . , , , .
, (rxjava2) :
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.15.Final</version>
<classifier>linux-x86_64</classifier>
</dependency>
API:
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.reactivex.pgclient.PgPool;
import io.vertx.reactivex.sqlclient.Pool;
import io.vertx.reactivex.sqlclient.Tuple;
import io.vertx.sqlclient.PoolOptions;
. , prepared statement — .
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost(host)
.setDatabase("postgres")
.setCachePreparedStatements(true)
.setPreparedStatementCacheMaxSize(1000)
.setSsl(false)
.setUser(user)
.setPassword(password);
pool (, 50).
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(pool);
Pool client = PgPool.pool(connectOptions, poolOptions);
. :
- ,
- , Tuple
- RowSet, Iterable
- .
- , "" .
.
$1 $2 $3 . , rx — RxJava2, vert.x. callback hell. RxJava2 , :
Flowable<String> titles = client.rxGetConnection()
.flatMapPublisher(connection ->
connection.rxPrepare("SELECT title FROM nicer_but_slower_film_list WHERE FID = $1")
.flatMap(statement -> statement.query().rxExecute(Tuple.of(Math.abs(random.nextInt(990)))))
.flattenAsFlowable(Functions.identity())
.map(row -> row.getString("title"))
.doOnError(Throwable::printStackTrace)
.subscribeOn(Schedulers.computation())
.doFinally(connection::close));
client.close();
R2DBC
→
:
- — MariaDB, MySQL, PostgreSQL, MSSQL, H2
- Spring Data R2DBC, Spring Boot
- Reactive Streams, Project Reactor
:
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-pool</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
Project Reactor.
:
import io.r2dbc.pool.ConnectionPool;
import io.r2dbc.pool.ConnectionPoolConfiguration;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
. Prepared Statement , .
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, host)
.option(PORT, 5432)
.option(USER, user)
.option(PASSWORD, password)
.option(DATABASE, "postgres")
.option(SSL, false)
.build());
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMillis(1000))
.maxSize(poolSize)
.build();
ConnectionPool pool = new ConnectionPool(configuration);
. :
- ,
- Result
- mapper- .
- , "" .
— close, . $1 $2 $3 . Project Reactor :
Flux<String> titles = Flux.usingWhen(pool.create(), connection ->
Flux.from(
connection.createStatement("SELECT title FROM nicer_but_slower_film_list WHERE FID = $1")
.bind("$1", Math.abs(random.nextInt(990)))
.execute()
).flatMap(result -> result.map(RdbcTest::getTitle))
, Connection::close);
private static String getTitle(Row row, RowMetadata meta) {
return row.get("title", String.class);
}
.
pool.close();
, "" , - - . R2DBC, VertX JDBC. Executions (, 50 000) , concurrency (, 1000) . backpressure.
, . . , (, ) .
VertX Flowable.range(1, executions)
.doOnNext(i -> { if (i % chunk == 0) LOGGER.info("Process {}", i);})
.flatMap(i -> client.preparedQuery(
"SELECT title FROM nicer_but_slower_film_list WHERE FID = $1")
.rxExecute(Tuple.of(Math.abs(random.nextInt(990))))
.doOnError(Throwable::printStackTrace)
.flattenAsFlowable(Functions.identity())
.map(row -> row.getString("title").length())
.doOnError(Throwable::printStackTrace)
.subscribeOn(Schedulers.computation()),
false, concurrency)
.doOnComplete(() -> LOGGER.info("Done with VertX"))
.blockingSubscribe(unused -> { }, Throwable::printStackTrace);
R2DBC Flux.range(1, executions)
.doOnNext(i -> { if (i % chunk == 0) LOGGER.info("Processing {}", i);})
.flatMap(i -> Flux.usingWhen(pool.create(),
connection -> Flux.from(
connection.createStatement("SELECT title FROM nicer_but_slower_film_list WHERE FID = $1")
.bind("$1", Math.abs(random.nextInt(990)))
.execute()
)
.flatMap(result -> Flux.from(result.map(RdbcTest::getTitle))),
Connection::close)
.subscribeOn(Schedulers.parallel())
.doOnError(Throwable::printStackTrace)
, concurrency)
.doOnError(Throwable::printStackTrace)
.doOnComplete(() -> LOGGER.info("Done with R2DBC"))
.blockLast();
JDBC + Hikari Pool + Reactor HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://" + host + "/postgres");
config.setMaximumPoolSize(poolSize);
config.setIsolateInternalQueries(false);
HikariDataSource ds = new HikariDataSource(config);
Flux.range(1, executions)
.flatMap(Mono::just)
.flatMap(index -> Mono.fromCallable(ds::getConnection)
.doOnNext(i -> { if (index % chunk == 0) LOGGER.info("Process {}", index);})
.map(this::request).subscribeOn(Schedulers.elastic())
, concurrency
)
.subscribeOn(Schedulers.elastic())
.doOnError(Throwable::printStackTrace)
.doOnComplete( ()->LOGGER.info("Done with JDBC"))
.blockLast();
private Integer request(Connection connection) {
try {
var s = connection.prepareStatement(
"SELECT title FROM nicer_but_slower_film_list WHERE FID = ?"
);
s.setInt(1, Math.abs(random.nextInt(990)));
var results = s.executeQuery();
int counter = 0;
while (results.next()) {
counter += results.getString("title").length();
}
results.close();
s.close();
connection.close();
return counter;
} catch (RuntimeException e) {
e.printStackTrace();
throw e;
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(e);
}
}
c time:
/usr/bin/time --verbose java ...
Java Mission Control — Java Flight Recorder.
java -XX:StartFlightRecording=disk=true,dumponexit=true,filename=/tmp/r2dbc.jfr,settings=profile,path-to-gc-roots=false,delay=1s,name=R2DBC ...
time userspace system, . VertX , R2DBC JDBC , . "" ( ). , R2DBC system , userspace. , vertx .
time R2DBC Command being timed: "java -jar r2dbc-1.0-SNAPSHOT-jar-with-dependencies.jar -iterations 50000 -concurrent 1000 -pool 50 -user anonymous -password 12345678 -host pg12.local"
User time (seconds): 34.28
System time (seconds): 5.55
Percent of CPU this job got: 10%
Maximum resident set size (kbytes): 307004
Minor (reclaiming a frame) page faults: 76835
Voluntary context switches: 121789
Involuntary context switches: 9670
time JDBC Command being timed: "java -jar jdbc-1.0-SNAPSHOT-jar-with-dependencies.jar -iterations 50000 -concurrent 1000 -pool 50 -user anonymous -password 12345678 -host pg12.local"
User time (seconds): 38.72
System time (seconds): 5.80
Percent of CPU this job got: 76%
Maximum resident set size (kbytes): 459688
Minor (reclaiming a frame) page faults: 125453
Voluntary context switches: 187752
Involuntary context switches: 14221
time VertX Command being timed: "java -jar vertx-1.0-SNAPSHOT-jar-with-dependencies.jar -iterations 50000 -concurrent 1000 -pool 50-user anonymous -password 12345678 -host pg12.local"
User time (seconds): 19.06
System time (seconds): 2.02
Percent of CPU this job got: 20%
Maximum resident set size (kbytes): 178516
Minor (reclaiming a frame) page faults: 43054
Voluntary context switches: 109914
Involuntary context switches: 4691
Java Mission Control显示什么?JDBC需要更多的内存,GC需要更多的内存。但是在“线程”部分中有许多令人印象深刻的图片可供演示。
精细无阻塞
R2DBC:

VertX:

和令人毛骨悚然的JDBC:

一切都是红色,一切都被封锁。恐怖似乎是。好吧,你可以问“什么?”。红线本身并不表示任何含义,并且本身也不是问题。问题是当它们干扰其他应用程序活动时。这很有可能,但是必须分别证明每种情况。
希望本文对您有所帮助!