An article about using reactive access to databases from corutin. Spring simplifies everything, but it does not affect the understanding of the real processes of the application. For demonstration, the KTor framework was chosen (simply because I like to look at what JetBrains does), which intensively uses coroutines - so that the task of combining Reactive Streams and these same coroutines adds interest. In the process, it became clear that what was happening was a clear example of the transformation of an incomprehensible jet stream to understandable imperative programming, on which we ate a dog. I love jet chains, but why not please those who love army order?
Jet applications have won the hearts and nerves of many developers, and these sets intersect markedly. They would have planted even more if not for the efforts of communities adapting the pure stream of mind from the creators of specifications into digestible libraries. This happened with the R2DBC specification and the Spring (Boot) framework - the developer can already see the familiar Spring Data API with the familiar reactive data types. However, there are reasons not to use Spring: you don't want Spring and you want something new. Well, thereβs still legacy code, but in this case youβre unlikely to have reactive data access.
In this case, you have to look at R2DBC unvarnished. And it will be expected to be very different from what we are offered in the finished framework - just like JDBC is different from Spring Data JPA. Plus reactivity. And reactivity according to the Reactive Streams specification. And we hear coroutines. Which kind of like the future and still rewrite them.
You can also start coroutines manually from the main method, but let's try to imagine that we really need parallelism and competition - that is, high load (say, one request per hour!) And we seriously decided to write this without Spring. But here, it turns out, we have a lightweight analogue entirely written in Kotlin, and even from the creators of the language itself, and even on the coroutines that we dream of.
We are preparing a project
, - ( IntelliJ IDEA).
https://start.ktor.io :
- Call Logging: ,
- Routing: (endpoint URI)
- Gson: .
, β OAuth, JWT, LDAP , .
. , IntelliJ IDEA Community Edition.
H2, build.gradle
implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"
implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"
Reactive Streams, R2DBC , Reactor Kotlin, :
implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"
: JVM 1.8. , - native Java. .
build.gradle
tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
kotlinOptions {
jvmTarget = "1.8"
}
}
β Application.kt, . , , :
fun Application.module(testing: Boolean = false) {
initDatabase@
applicationStarted@
boringStandardInitialization@
install(ContentNegotiation) {
gson {
}
}
install(CallLogging) {
level = Level.INFO
filter { call -> call.request.path().startsWith("/") }
}
routing {
get("/") {
showTables@
call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
}
get("/json/gson") {
showPool@
call.respond(mapOf("hello" to "world"))
}
}
}
, application.conf ktor.deployment port . , :
watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]
β , devtools, . , .
initDatabase ( ) .
initDatabase@
""
val connectionFactory = H2ConnectionFactory(
H2ConnectionConfiguration.builder()
.inMemory("users")
.property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
.build()
)
val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(10.seconds.toJavaDuration())
.maxSize(20)
.build()
val pool = ConnectionPool(poolConfig)
@ExperimentalTime β API.
, ( ) . , . :
applicationStarted@
environment.monitor.subscribe(ApplicationStarted) {
launch {
val defer : Mono<Int> = pool.warmup()
defer.awaitFirst()
log.debug("Pool is hot, welcome!")
}
}
β launch . , , β , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , β . β . , , Reactor RxJava . β "!" " !". , , - . β RxJava Reactor β !
, . , .
CRUD , , , H2 : TABLES. - :
data class Tables( val catalog: String?, val schema: String?, val tableName: String?, val tableType: String?, val storageType: String?, val id: String?, val typeName: String?, val tableClass: String?, val rowCountEstimate: Long?)
typealias , IDE
typealias R2DBCResult = io.r2dbc.spi.Result
. , R2DBC:
get("/tables") {
showTables@
""
val connection = pool.create().awaitSingle()
val list = try {
val result: List<R2DBCResult> = connection.createStatement(
"""
select
TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
from
INFORMATION_SCHEMA.TABLES
""".trimIndent()
)
.execute()
.toFlux()
.collectList()
.awaitFirst()
convertData@
""
TODO()
} finally {
connection
.close()
.awaitFirstOrNull()
}
call.respond(list)
}
, Reactor map/flatMap . map/flatMap , , . finally β ( ) doFinally . , README conn.close(), . β , β . , map await, .
awaitFirstOrNull() Mono, onNext() .
, β , , .
R2DBC - (map) . companion object -:
companion object DB{
fun ofRow(r: Row, unused: RowMetadata) = Tables(
r.get("TABLE_CATALOG", String::class.java),
r.get("TABLE_SCHEMA", String::class.java),
r.get("TABLE_NAME", String::class.java),
r.get("TABLE_TYPE", String::class.java),
r.get("STORAGE_TYPE", String::class.java),
r.get("ID", Integer::class.java)?.toString(),
r.get("TYPE_NAME", String::class.java),
r.get("TABLE_CLASS", String::class.java),
r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0
)
}
, JDBC .
map Reactor, ReactiveStream Java Stream, R2DBC.
convertData@
result.flatMap {
it
.map(Tables.DB::ofRow)
.toFlux()
.collectList()
.awaitFirst()
}
, :
curl http://localhost:8080/tables
If JSON is ugly, then it is configured in the gson section of the application ( setPrettyPrinting ).
We give pool statistics
Well and for beauty (since we did not connect the standard module of metrics) we will add a point for viewing pool statistics. Template engines are superfluous to us, since our language tools allow us to:
get("/pool") {
showPool@
call.respondText {
(pool.metrics.map {
"""
Max allocated size: ${it.maxAllocatedSize}
Max pending size : ${it.maxPendingAcquireSize}
Acquired size : ${it.acquiredSize()}
Pending acquire size: ${it.pendingAcquireSize()}
Allocated size : ${it.allocatedSize()}
Idle size : ${it.idleSize()}\n
""".trimIndent()
}.orElse("NO METRICS"))
}
}
Of course, if you wish, you can give this via HTML DSL.
findings
It is possible to make Coroutines friends with reactive flows, but it is necessary to switch between reactive and imperative styles - preferably less often and try to adhere to one style.
Not only a single spring!
Reactive database access is not as beautiful as after Spring Data JPA makeup, but you can use it.