Enfoque imperativo para datos reactivos en el ejemplo de Jetbrains KTor y R2DBC

Un artículo sobre el uso del acceso reactivo a bases de datos desde corutina. Spring simplifica todo, pero no afecta la comprensión de los procesos reales de la aplicación. Para la demostración, se eligió el marco KTor (simplemente porque me gusta ver lo que hace JetBrains), que usa intensivamente las rutinas, de modo que la tarea de combinar las Corrientes reactivas y estas mismas rutinas agrega interés. En el proceso, quedó claro que lo que estaba sucediendo era un claro ejemplo de la transformación de una corriente en chorro incomprensible a una programación imperativa comprensible, en la que nos comimos un perro. Me encantan las cadenas de jet, pero ¿por qué no complacer a los que aman el orden del ejército?


Las aplicaciones de Jet se han ganado los corazones y los nervios de muchos desarrolladores, y estos conjuntos se cruzan notablemente. Plantarían aún más si no fuera por los esfuerzos de las comunidades que adaptan la corriente mental pura de los creadores de especificaciones en bibliotecas digeribles. Esto sucedió con la especificación R2DBC y el marco Spring (Boot): el desarrollador ya puede ver la API Spring Data familiar con los tipos de datos reactivos familiares. Sin embargo, hay razones para no usar Spring: no quieres Spring y quieres algo nuevo. Bueno, todavía hay código heredado, pero en este caso es poco probable que tenga acceso reactivo a los datos.


En este caso, debe mirar R2DBC sin barnizar. Y se espera que sea muy diferente de lo que se nos ofrece en el marco terminado, al igual que JDBC es diferente de Spring Data JPA. Más reactividad. Y reactividad según la especificación de Reactive Streams. Y escuchamos corutinas. A los que les gusta el futuro y aún los reescriben.


También puede iniciar las corutinas manualmente desde el método principal , pero tratemos de imaginar que realmente necesitamos paralelismo y competencia, es decir, alta carga (¡digamos, una solicitud por hora!) Y decidimos seriamente escribir esto sin Spring. Pero aquí, resulta que tenemos un análogo liviano completamente escrito en Kotlin, e incluso de los creadores del lenguaje en sí, e incluso en las rutinas con las que soñamos.


Estamos preparando un proyecto


, - ( IntelliJ IDEA).


https://start.ktor.io :


  • Call Logging: ,
  • Routing: (endpoint URI)
  • Gson: .

, — OAuth, JWT, LDAP , .


. , IntelliJ IDEA Community Edition.



H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

Si JSON es feo, entonces está configurado en la sección gson de la aplicación ( setPrettyPrinting ).


Damos estadísticas de la piscina


Bueno y por belleza (ya que no conectamos el módulo estándar de métricas) agregaremos un punto para ver las estadísticas del grupo. Los motores de plantillas son superfluos para nosotros, ya que nuestras herramientas de lenguaje nos permiten:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Por supuesto, si lo desea, puede hacerlo a través de HTML DSL.


recomendaciones


Es posible hacer amigos de Coroutines con flujos reactivos, pero es necesario cambiar entre estilos reactivos e imperativos, preferiblemente con menos frecuencia y tratar de adherirse a un estilo.


¡No solo una sola primavera!


El acceso reactivo a la base de datos no es tan hermoso como después del maquillaje Spring Data JPA, pero puede usarlo.


All Articles