Imperative approach to reactive data on the example of Jetbrains KTor and R2DBC

An article about using reactive access to databases from corutin. Spring simplifies everything, but it does not affect the understanding of the real processes of the application. For demonstration, the KTor framework was chosen (simply because I like to look at what JetBrains does), which intensively uses coroutines - so that the task of combining Reactive Streams and these same coroutines adds interest. In the process, it became clear that what was happening was a clear example of the transformation of an incomprehensible jet stream to understandable imperative programming, on which we ate a dog. I love jet chains, but why not please those who love army order?


Jet applications have won the hearts and nerves of many developers, and these sets intersect markedly. They would have planted even more if not for the efforts of communities adapting the pure stream of mind from the creators of specifications into digestible libraries. This happened with the R2DBC specification and the Spring (Boot) framework - the developer can already see the familiar Spring Data API with the familiar reactive data types. However, there are reasons not to use Spring: you don't want Spring and you want something new. Well, there’s still legacy code, but in this case you’re unlikely to have reactive data access.


In this case, you have to look at R2DBC unvarnished. And it will be expected to be very different from what we are offered in the finished framework - just like JDBC is different from Spring Data JPA. Plus reactivity. And reactivity according to the Reactive Streams specification. And we hear coroutines. Which kind of like the future and still rewrite them.


You can also start coroutines manually from the main method, but let's try to imagine that we really need parallelism and competition - that is, high load (say, one request per hour!) And we seriously decided to write this without Spring. But here, it turns out, we have a lightweight analogue entirely written in Kotlin, and even from the creators of the language itself, and even on the coroutines that we dream of.


We are preparing a project


, - ( IntelliJ IDEA).


https://start.ktor.io :


  • Call Logging: ,
  • Routing: (endpoint URI)
  • Gson: .

, β€” OAuth, JWT, LDAP , .


. , IntelliJ IDEA Community Edition.



H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


β€” Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

β€” , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime β€” API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

β€” launch . , , β€” , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , β€” . β€” . , , Reactor RxJava . β€” "!" " !". , , - . β€” RxJava Reactor β€” !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally β€” ( ) doFinally . , README conn.close(), . β€” , β€” . , map await, .


awaitFirstOrNull() Mono, onNext() .

, β€” , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

If JSON is ugly, then it is configured in the gson section of the application ( setPrettyPrinting ).


We give pool statistics


Well and for beauty (since we did not connect the standard module of metrics) we will add a point for viewing pool statistics. Template engines are superfluous to us, since our language tools allow us to:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Of course, if you wish, you can give this via HTML DSL.


findings


It is possible to make Coroutines friends with reactive flows, but it is necessary to switch between reactive and imperative styles - preferably less often and try to adhere to one style.


Not only a single spring!


Reactive database access is not as beautiful as after Spring Data JPA makeup, but you can use it.


All Articles