Abordagem imperativa para dados reativos no exemplo de Jetbrains KTor e R2DBC

Um artigo sobre o uso do acesso reativo a bancos de dados da corutin. O Spring simplifica tudo, mas não afeta a compreensão dos processos reais do aplicativo. Para demonstração, foi escolhida a estrutura do KTor (simplesmente porque eu gosto de ver o que o JetBrains faz), que usa intensamente corotinas - para que a tarefa de combinar Fluxos Reativos e essas mesmas corotinas acrescente interesse. No processo, ficou claro que o que estava acontecendo era um exemplo claro da transformação de um jato incompreensível em uma programação imperativa compreensível, na qual comíamos um cachorro. Eu amo correntes de jato, mas por que não agradar aqueles que amam a ordem do exército?


Os aplicativos a jato conquistaram os corações e os nervos de muitos desenvolvedores, e esses conjuntos se cruzam acentuadamente. Eles teriam plantado ainda mais se não fossem os esforços das comunidades que adaptam o fluxo mental puro dos criadores de especificações em bibliotecas digeríveis. Isso aconteceu com a especificação R2DBC e a estrutura Spring (Boot) - o desenvolvedor já pode ver a API Spring Data familiar com os tipos de dados reativos familiares. No entanto, existem razões para não usar o Spring: você não quer o Spring e deseja algo novo. Bem, ainda existe um código legado, mas nesse caso é improvável que você tenha acesso a dados reativos.


Nesse caso, você deve observar o R2DBC sem envernizamento . E espera-se que seja muito diferente do que nos é oferecido na estrutura finalizada - assim como o JDBC é diferente do Spring Data JPA. Mais reatividade. E reatividade de acordo com a especificação de Reativos reativos. E ouvimos corotinas. Que tipo de como o futuro e ainda reescrevê-los.


Você também pode iniciar corotinas manualmente a partir do método principal , mas vamos tentar imaginar que realmente precisamos de paralelismo e competição - ou seja, carga alta (digamos, uma solicitação por hora!) E decidimos escrevê-la muito a sério sem o Spring. Mas aqui, ao que parece, temos um análogo leve inteiramente escrito em Kotlin, e até mesmo pelos criadores da própria linguagem, e até pelas corotinas com as quais sonhamos.


Estamos preparando um projeto


, - ( IntelliJ IDEA).


https://start.ktor.io :


  • Call Logging: ,
  • Routing: (endpoint URI)
  • Gson: .

, — OAuth, JWT, LDAP , .


. , IntelliJ IDEA Community Edition.



H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

Se o JSON for feio, ele será configurado na seção gson do aplicativo ( setPrettyPrinting ).


Damos estatísticas da piscina


Bem, e para a beleza (como não conectamos o módulo padrão de métricas), adicionaremos um ponto para visualizar as estatísticas do pool. Os mecanismos de modelo são supérfluos para nós, pois nossas ferramentas de linguagem nos permitem:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Obviamente, se desejar, você pode fazer isso via DSL HTML.


achados


Você pode fazer corotinas com fluxos reativos, mas precisa alternar entre estilos reativo e imperativo - de preferência com menos frequência e tentar aderir a um estilo.


Não é apenas uma única primavera!


O acesso reativo ao banco de dados não é tão bonito quanto após a maquiagem do Spring Data JPA, mas você pode usá-lo.


All Articles