Approche impérative des données réactives sur l'exemple de Jetbrains KTor et R2DBC

Un article sur l'utilisation de l'accĂšs rĂ©actif aux bases de donnĂ©es Ă  partir de la corutine. Spring simplifie tout, mais il n'affecte pas la comprĂ©hension des processus rĂ©els de l'application. Pour dĂ©monstration, le framework KTor a Ă©tĂ© choisi (simplement parce que j'aime regarder ce que fait JetBrains), qui utilise intensivement les coroutines - de sorte que la tĂąche de combiner Reactive Streams et ces mĂȘmes coroutines ajoute de l'intĂ©rĂȘt. Dans le processus, il est devenu clair que ce qui se passait Ă©tait un exemple clair de la transformation d'un jet stream incomprĂ©hensible en une programmation impĂ©rative comprĂ©hensible, sur laquelle nous avons mangĂ© un chien. J'adore les chaĂźnes Ă  rĂ©action, mais pourquoi ne pas plaire Ă  ceux qui aiment l'ordre de l'armĂ©e?


Les applications Jet ont conquis le cƓur et les nerfs de nombreux dĂ©veloppeurs, et ces ensembles se croisent nettement. Ils planteraient encore plus sans les efforts des communautĂ©s pour adapter le pur courant d'esprit des crĂ©ateurs de spĂ©cifications aux bibliothĂšques digestes. Cela s'est produit avec la spĂ©cification R2DBC et le framework Spring (Boot) - le dĂ©veloppeur peut dĂ©jĂ  voir l'API Spring Data familiĂšre avec les types de donnĂ©es rĂ©actifs familiers. Cependant, il y a des raisons de ne pas utiliser Spring: vous ne voulez pas de Spring et vous voulez quelque chose de nouveau. Eh bien, il reste du code hĂ©ritĂ©, mais dans ce cas, il est peu probable que vous ayez un accĂšs rĂ©actif aux donnĂ©es.


Dans ce cas, vous devez regarder R2DBC sans vernis. Et il devrait ĂȘtre trĂšs diffĂ©rent de ce qui nous est proposĂ© dans le cadre fini - tout comme JDBC est diffĂ©rent de Spring Data JPA. Plus de rĂ©activitĂ©. Et la rĂ©activitĂ© selon la spĂ©cification Reactive Streams. Et nous entendons des coroutines. Quel genre de comme l'avenir et encore les rĂ©Ă©crire.


Vous pouvez Ă©galement dĂ©marrer les coroutines manuellement Ă  partir de la mĂ©thode principale , mais essayons d'imaginer que nous avons vraiment besoin de parallĂ©lisme et de concurrence - c'est-Ă -dire une charge Ă©levĂ©e (disons, une demande par heure!) Et nous avons sĂ©rieusement dĂ©cidĂ© d'Ă©crire ceci sans Spring. Mais ici, il s'avĂšre, nous avons un analogue lĂ©ger entiĂšrement Ă©crit en Kotlin, et mĂȘme des crĂ©ateurs de la langue elle-mĂȘme, et mĂȘme sur les coroutines dont nous rĂȘvons.


Nous préparons un projet


, - ( IntelliJ IDEA).


https://start.ktor.io :


  • Call Logging: ,
  • Routing: (endpoint URI)
  • Gson: .

, — OAuth, JWT, LDAP , .


. , IntelliJ IDEA Community Edition.



H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


— Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

— launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

Si JSON est moche, il est configuré dans la section gson de l' application ( setPrettyPrinting ).


Nous donnons des statistiques de pool


Eh bien et pour la beauté (puisque nous n'avons pas connecté le module standard de métriques), nous ajouterons un point pour afficher les statistiques du pool. Les moteurs de modÚle nous sont superflus, car nos outils de langage nous permettent de:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Bien sûr, si vous le souhaitez, vous pouvez le faire via HTML DSL.


résultats


Il est possible de se faire des amis de Coroutines avec des flux réactifs, mais il est nécessaire de basculer entre les styles réactifs et impératifs - de préférence moins souvent et d'essayer d'adhérer à un style.


Pas seulement un seul ressort!


L'accÚs réactif à la base de données n'est pas aussi beau qu'aprÚs le maquillage Spring Data JPA, mais vous pouvez l'utiliser.


All Articles