以Jetbrains KTor和R2DBC为例对反应性数据进行命令式处理

有关使用corutin对数据库进行反应式访问的文章。 Spring简化了所有事情,但是并没有影响对应用程序实际过程的理解。为了进行演示,选择了KTor框架(这是因为我想看看JetBrains所做的事情),该框架大量使用了协程,因此组合反应流和这些协程的任务增加了兴趣。在此过程中,很明显发生的事情是将难以理解的喷射流转变为可以吃的命令式程序的清晰示例,我们在该程序上吃了一条狗。我爱喷气式飞机链,但是为什么不喜欢那些喜欢军队命令的人呢?


Jet应用程序赢得了许多开发人员的关注和神经,这些集合之间有着明显的交集。如果不是为了社区的努力,他们将种植更多的种子,而不是为了将规范创建者的纯思想转变为可消化的库。这是在R2DBC规范和Spring(Boot)框架中发生的-开发人员已经可以看到熟悉的Spring Data API和熟悉的反应数据类型。但是,有一些原因不使用Spring:您不希望使用Spring,而想要一些新东西。好的,仍然有遗留代码,但是在这种情况下,您不太可能具有响应数据访问权限。


在这种情况下,您必须查看未涂漆的R2DBC它将与最终框架中提供的内容完全不同-就像JDBC与Spring Data JPA不同。加上反应性。根据反应流规范进行反应。我们听到协程。什么样的未来仍会重写它们。


您也可以从main方法手动启动协程,但是让我们试想一下,我们确实需要并行性和竞争-也就是说,高负载(例如,每小时一个请求!),我们认真决定不使用Spring编写它。但事实证明,在这里,我们有一个完全由Kotlin编写的轻量级模拟,甚至是语言本身的创建者,甚至是我们梦dream以求的协程。


我们正在准备一个项目


与所有熟悉的项目类似,可以从网页(或通过IntelliJ IDEA的插件)生成程序集空白。


我们转到https://start.ktor.io并选择以下模块:


  • 呼叫记录:仅因为我喜欢查看传入请求的日志
  • 路由:用于入口点的功能定义(端点URI)
  • Gson:序列化服务响应。

可以看出,有许多现成的模块可供加载,它们很有趣-例如,各种OAuth,JWT,LDAP来限制访问。


下载项目并打开进行编辑。难怪我在IntelliJ IDEA社区版中打开了它。


连接依赖


H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

如果JSON难看,则在应用程序gson部分中对其进行配置setPrettyPrinting)。


我们给游泳池统计


好吧,为了美观(因为我们没有连接指标的标准模块),我们将添加一个用于查看池统计信息的点。模板引擎对我们来说是多余的,因为我们的语言工具使我们能够:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

当然,如果您愿意,可以通过HTML DSL来提供此功能。


发现


您可以使用反应流制作协程,但是您需要在反应式和命令式之间切换-最好减少频率,并尝试遵循一种样式。


不仅是一个春天!


反应性数据库访问没有Spring Data JPA补充后那么漂亮,但是您可以使用它。


All Articles