Pendekatan imperatif terhadap data reaktif pada contoh Jetbrains KTor dan R2DBC

Artikel tentang menggunakan akses reaktif ke database dari corutin. Spring menyederhanakan segalanya, tetapi itu tidak mempengaruhi pemahaman tentang proses nyata aplikasi. Sebagai demonstrasi, kerangka kerja KTor dipilih (hanya karena saya suka melihat apa yang dilakukan JetBrains), yang secara intensif menggunakan coroutine - sehingga tugas menggabungkan Reactive Streams dan coroutine yang sama ini menambah minat. Dalam prosesnya, menjadi jelas bahwa apa yang terjadi adalah contoh yang jelas dari transformasi aliran jet yang tidak dapat dipahami menjadi pemrograman imperatif yang dapat dipahami, di mana kami memakan seekor anjing. Saya suka rantai jet, tapi mengapa tidak menyenangkan mereka yang menyukai perintah tentara?


Aplikasi Jet telah memenangkan hati dan kegelisahan banyak pengembang, dan perangkat ini saling bersilangan. Mereka akan menanam lebih banyak jika bukan karena upaya masyarakat mengadaptasi aliran pikiran murni dari pencipta spesifikasi ke dalam perpustakaan yang dapat dicerna. Ini terjadi dengan spesifikasi R2DBC dan kerangka Spring (Boot) - pengembang sudah dapat melihat API Data Spring yang dikenal dengan tipe data reaktif yang sudah dikenal. Namun, ada alasan untuk tidak menggunakan Spring: Anda tidak ingin Spring dan Anda menginginkan sesuatu yang baru. Ya, masih ada kode lawas, tetapi dalam hal ini Anda tidak mungkin memiliki akses data reaktif.


Dalam hal ini, Anda harus melihat R2DBC tanpa pernis . Dan itu akan diharapkan sangat berbeda dari apa yang kami tawarkan dalam kerangka kerja yang sudah jadi - seperti JDBC berbeda dari Spring Data JPA. Ditambah reaktivitas. Dan reaktivitas sesuai dengan spesifikasi Reactive Streams. Dan kami mendengar coroutine. Jenis yang seperti masa depan dan masih menulis ulang mereka.


Anda juga dapat memulai coroutine secara manual dari metode utama , tetapi mari kita coba bayangkan bahwa kita benar-benar membutuhkan paralelisme dan persaingan - yaitu, beban tinggi (katakanlah, satu permintaan per jam!) Dan kami serius memutuskan untuk menulis ini tanpa Spring. Tapi di sini, ternyata, kita memiliki analog ringan yang seluruhnya ditulis di Kotlin, dan bahkan dari pencipta bahasa itu sendiri, dan bahkan pada coroutine yang kita impikan.


Kami sedang mempersiapkan sebuah proyek


, - ( IntelliJ IDEA).


https://start.ktor.io :


  • Call Logging: ,
  • Routing: (endpoint URI)
  • Gson: .

, — OAuth, JWT, LDAP , .


. , IntelliJ IDEA Community Edition.



H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

Jika JSON jelek, maka dikonfigurasi di bagian gson aplikasi ( setPrettyPrinting ).


Kami memberikan statistik kumpulan


Baik dan untuk kecantikan (karena kami tidak menghubungkan modul standar metrik), kami akan menambahkan titik untuk melihat statistik kumpulan. Mesin template tidak berguna bagi kami, karena alat bahasa kami memungkinkan kami untuk:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

Tentu saja, jika mau, Anda dapat memberikan ini melalui HTML DSL.


temuan


Anda dapat membuat coroutine dengan aliran reaktif, tetapi Anda perlu beralih antara gaya reaktif dan imperatif - lebih disukai lebih jarang dan cobalah untuk mengikuti satu gaya.


Tidak hanya satu pegas!


Akses reaktif ke database tidak seindah setelah makeup Spring Data JPA, tetapi Anda bisa menggunakannya.


All Articles