Imperativer Ansatz für reaktive Daten am Beispiel von Jetbrains KTor und R2DBC

Ein Artikel über die Verwendung des reaktiven Zugriffs auf Datenbanken von Corutin. Spring vereinfacht alles, hat jedoch keinen Einfluss auf das Verständnis der tatsächlichen Prozesse der Anwendung. Zur Demonstration wurde das KTor-Framework ausgewählt (einfach, weil ich mir gerne anschaue, was JetBrains tut), das intensiv Coroutinen verwendet - so dass die Aufgabe, reaktive Streams und dieselben Coroutinen zu kombinieren, das Interesse erhöht. Dabei wurde klar, dass das, was geschah, ein klares Beispiel für die Umwandlung eines unverständlichen Jetstreams in eine verständliche imperative Programmierung war, auf der wir einen Hund aßen. Ich liebe Düsenketten, aber warum nicht denen gefallen, die die Ordnung der Armee lieben?


Jet-Anwendungen haben die Herzen und Nerven vieler Entwickler erobert, und diese Sets überschneiden sich deutlich. Sie würden noch mehr pflanzen, wenn nicht die Bemühungen der Gemeinschaften den reinen Geistesstrom der Ersteller von Spezifikationen in verdauliche Bibliotheken umwandeln würden. Dies geschah mit der R2DBC-Spezifikation und dem Spring (Boot) -Framework - der Entwickler kann bereits die bekannte Spring Data-API mit den bekannten reaktiven Datentypen sehen. Es gibt jedoch Gründe, Spring nicht zu verwenden: Sie möchten Spring nicht und Sie möchten etwas Neues. Nun, es gibt noch Legacy-Code, aber in diesem Fall ist es unwahrscheinlich, dass Sie reaktiven Datenzugriff haben.


In diesem Fall müssen Sie R2DBC ungeschminkt betrachten. Und es wird erwartet, dass es sich sehr von dem unterscheidet, was wir im fertigen Framework anbieten - genau wie JDBC sich von Spring Data JPA unterscheidet. Plus Reaktivität. Und Reaktivität gemäß der Reactive Streams-Spezifikation. Und wir hören Coroutinen. Welche Art von mögen die Zukunft und schreiben sie immer noch neu.


Sie können Coroutinen auch manuell von der Hauptmethode aus starten , aber stellen wir uns vor, wir brauchen wirklich Parallelität und Konkurrenz - das heißt hohe Last (z. B. eine Anfrage pro Stunde!). Und wir haben uns ernsthaft entschlossen, dies ohne Spring zu schreiben. Aber hier, so stellt sich heraus, haben wir ein leichtes Analogon, das vollständig in Kotlin geschrieben ist, und sogar von den Schöpfern der Sprache selbst und sogar von den Coroutinen, von denen wir träumen.


Wir bereiten ein Projekt vor


, - ( IntelliJ IDEA).


https://start.ktor.io :


  • Call Logging: ,
  • Routing: (endpoint URI)
  • Gson: .

, — OAuth, JWT, LDAP , .


. , IntelliJ IDEA Community Edition.



H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

Wenn JSON hässlich ist, wird es im Abschnitt gson der Anwendung ( setPrettyPrinting ) konfiguriert .


Wir geben Poolstatistiken


Nun und aus Gründen der Schönheit (da wir das Standardmodul für Metriken nicht verbunden haben) werden wir einen Punkt zum Anzeigen von Poolstatistiken hinzufügen. Template-Engines sind für uns überflüssig, da unsere Sprachwerkzeuge Folgendes ermöglichen:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Wenn Sie möchten, können Sie dies natürlich auch über HTML DSL angeben.


Ergebnisse


Es ist möglich, Coroutines mit reaktiven Flüssen anzufreunden, aber es ist notwendig, zwischen reaktiven und imperativen Stilen zu wechseln - vorzugsweise seltener - und zu versuchen, sich an einen Stil zu halten.


Nicht nur eine einzige Feder!


Der reaktive Datenbankzugriff ist nicht so schön wie nach dem JPA-Make-up von Spring Data, aber Sie können ihn verwenden.


All Articles