نهج حتمي للبيانات التفاعلية على سبيل المثال من Jetbrains KTor و R2DBC

مقال حول استخدام الوصول التفاعلي إلى قواعد البيانات من corutin. يبسط الربيع كل شيء ، لكنه لا يؤثر على فهم العمليات الحقيقية للتطبيق. للتوضيح ، تم اختيار إطار KTor (ببساطة لأنني أحب النظر إلى ما يفعله JetBrains) ، والذي يستخدم المكثفات بشكل مكثف - بحيث تضيف مهمة الجمع بين التدفقات المتفاعلة وهذه الكورتيونات نفسها الاهتمام. في هذه العملية ، أصبح من الواضح أن ما كان يحدث هو مثال واضح على تحول تيار نفاث غير مفهوم إلى برمجة حتمية مفهومة ، حيث تناولنا كلبًا. أنا أحب سلاسل الطائرات ، ولكن لماذا لا ترضي أولئك الذين يحبون أمر الجيش؟


فازت تطبيقات Jet بقلوب وأعصاب العديد من المطورين ، وتتقاطع هذه المجموعات بشكل ملحوظ. كانوا سيزرعون أكثر إن لم يكن لجهود المجتمعات التي تتكيف مع التيار النقي للعقل من مبدعي المواصفات إلى مكتبات قابلة للهضم. حدث هذا مع مواصفات R2DBC وإطار الربيع (التمهيد) - يمكن للمطور رؤية واجهة برمجة تطبيقات Spring Data المألوفة مع أنواع البيانات التفاعلية المألوفة. ومع ذلك ، هناك أسباب لعدم استخدام Spring: لا تريد Spring وتريد شيئًا جديدًا. حسنًا ، لا يزال هناك رمز قديم ، ولكن في هذه الحالة من غير المحتمل أن يكون لديك وصول تفاعلي إلى البيانات.


في هذه الحالة ، يجب عليك أن تنظر إلى R2DBC غير مصقول . ومن المتوقع أن يكون مختلفًا تمامًا عما نقدمه في الإطار النهائي - تمامًا مثل JDBC يختلف عن Spring Data JPA. تفاعل زائد. والتفاعلية طبقا لمواصفات الجداول التفاعلية. ونسمع coroutines. أي نوع من المستقبل ومازال يعيد كتابته.


يمكنك أيضًا بدء Coroutines يدويًا من الطريقة الرئيسية ، ولكن دعونا نحاول أن نتخيل أننا نحتاج حقًا إلى التوازي والمنافسة - أي الحمل العالي (على سبيل المثال ، طلب واحد في الساعة!) وقررنا بجدية كتابة هذا بدون Spring. ولكن هنا ، اتضح أن لدينا نظيرًا خفيف الوزن مكتوبًا بالكامل في Kotlin ، وحتى من مبدعي اللغة نفسها ، وحتى على التماثيل التي نحلم بها.


نحن نستعد لمشروع


قياساً على جميع المشاريع المألوفة ، يمكن إنشاء تجميع فارغ من صفحة ويب (أو من خلال مكون إضافي لـ IntelliJ IDEA).


نذهب إلى https://start.ktor.io ونختار الوحدات:


  • تسجيل المكالمات : فقط لأنني أحب أن ألقي نظرة على السجلات الواردة
  • التوجيه : للتعريف الوظيفي لنقاط الدخول (نقطة النهاية URI)
  • Gson : لتسلسل استجابات الخدمة.

يمكن ملاحظة أن هناك الكثير من الوحدات الجاهزة للتحميل ومثيرة للاهتمام - جميع أنواع OAuth ، JWT ، LDAP لتقييد الوصول ، على سبيل المثال.


قم بتنزيل المشروع وفتح للتحرير. لا عجب أنني فتحته في IntelliJ IDEA Community Edition.


ربط التبعيات


H2, build.gradle


    implementation "io.r2dbc:r2dbc-h2:0.8.1.RELEASE"


    implementation "io.r2dbc:r2dbc-pool:0.8.1.RELEASE"

Reactive Streams, R2DBC , Reactor Kotlin, :


    implementation "io.projectreactor.kotlin:reactor-kotlin-extensions:1.0.2.RELEASE"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.5"

: JVM 1.8. , - native Java. .


build.gradle


tasks.withType(org.jetbrains.kotlin.gradle.tasks.KotlinCompile).configureEach {
    kotlinOptions {
        jvmTarget = "1.8"
    }
}


Application.kt, . , , :


fun Application.module(testing: Boolean = false) {
    //       
initDatabase@

    //   

applicationStarted@
    //       

boringStandardInitialization@
    //   ,   
    install(ContentNegotiation) {
        gson {
        }
    }

    install(CallLogging) {
        level = Level.INFO
        filter { call -> call.request.path().startsWith("/") }
    }

    routing {
        get("/") {
showTables@
            //       
            call.respondText("HELLO WORLD!", contentType = ContentType.Text.Plain)
        }

        get("/json/gson") {
showPool@            
            //      
            call.respond(mapOf("hello" to "world"))
        }
    }
}

, application.conf ktor.deployment port . , :


        watch = [ /home/votez/cc/ktor-reactive-db/build/classes/kotlin/main/ ]

— , devtools, . , .



initDatabase ( ) .


initDatabase@
    "" // ,        
    val connectionFactory = H2ConnectionFactory(
        H2ConnectionConfiguration.builder()
            .inMemory("users")
            .property(H2ConnectionOption.DB_CLOSE_DELAY, "-1")
            .build()
    )

    val poolConfig = ConnectionPoolConfiguration.builder(connectionFactory)
        .maxIdleTime(10.seconds.toJavaDuration()) //     @ExperimentalTime
        .maxSize(20)
        .build()

    val pool = ConnectionPool(poolConfig)

@ExperimentalTime — API.


, ( ) . , . :


applicationStarted@
    //       
    environment.monitor.subscribe(ApplicationStarted) {
        launch {
            val defer : Mono<Int> = pool.warmup()
            defer.awaitFirst()                             //   
            log.debug("Pool is hot, welcome!") //    
        }
    }

launch . , , — , . - ! "" (awaitFirst Kotlin), . : Reactor- Mono Flux ? . " -"? , — . — . , , Reactor RxJava . — "!" " !". , , - . — RxJava Reactor — !


, . , .



CRUD , , , H2 : TABLES. - :


data class Tables(    val catalog: String?,    val schema: String?,    val tableName: String?,    val tableType: String?,    val storageType: String?,    val id: String?,    val typeName: String?,    val tableClass: String?,    val rowCountEstimate: Long?) 

typealias , IDE


typealias R2DBCResult = io.r2dbc.spi.Result

. , R2DBC:


get("/tables") {
            showTables@
            //       
            ""
            val connection = pool.create().awaitSingle() //    -  
            val list = try { 
                val result: List<R2DBCResult> = connection.createStatement(
                        """
                            select 
                                TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, STORAGE_TYPE, SQL, ID, TYPE_NAME, TABLE_CLASS, ROW_COUNT_ESTIMATE
                            from 
                                INFORMATION_SCHEMA.TABLES
                                """.trimIndent()
                    )
                    .execute()  //    
                    .toFlux()   //      Reactor Flux
                    .collectList()  //             
                    .awaitFirst()   // ,    -    .
   convertData@
                ""
                TODO() //    ,    
            } finally {
                connection
                    .close()    //   
                    .awaitFirstOrNull() //   null -    .
            }

            call.respond(list)
        }

, Reactor map/flatMap . map/flatMap , , . finally — ( ) doFinally . , README conn.close(), . — , — . , map await, .


awaitFirstOrNull() Mono, onNext() .

, — , , .



R2DBC - (map) . companion object -:


 companion object DB{
        fun ofRow(r: Row, unused: RowMetadata) = Tables(
            r.get("TABLE_CATALOG", String::class.java),
            r.get("TABLE_SCHEMA", String::class.java),
            r.get("TABLE_NAME", String::class.java),
            r.get("TABLE_TYPE", String::class.java),
            r.get("STORAGE_TYPE", String::class.java),
            r.get("ID", Integer::class.java)?.toString(),
            r.get("TYPE_NAME", String::class.java),
            r.get("TABLE_CLASS", String::class.java),
            r.get("ROW_COUNT_ESTIMATE", java.lang.Long::class.java)?.toLong() ?: 0 //     Java ,     
        )
    }

, JDBC .


map Reactor, ReactiveStream Java Stream, R2DBC.


   convertData@
                result.flatMap {//      .  -   ,    
                    it
                        .map(Tables.DB::ofRow)  //     
                        .toFlux()               //      Reactor Flux 
                        .collectList()          //             
                        .awaitFirst()          // ,    -    .
                }

, :


curl http://localhost:8080/tables

إذا كان JSON قبيحًا ، فسيتم تكوينه في قسم gson من التطبيق ( setPrettyPrinting ).


نعطي إحصاءات تجمع


حسنًا وللجمال (نظرًا لأننا لم نربط الوحدة القياسية للمقاييس) سنضيف نقطة لعرض إحصاءات المجموعة. محركات القوالب غير ضرورية بالنسبة لنا ، لأن أدواتنا اللغوية تتيح لنا:


        get("/pool") {
            showPool@
            //      
            call.respondText {
                (pool.metrics.map {
                    """
                Max allocated size:                 ${it.maxAllocatedSize}
                Max pending size  :                 ${it.maxPendingAcquireSize}
                Acquired size     :                 ${it.acquiredSize()}
                Pending acquire size:               ${it.pendingAcquireSize()}
                Allocated size    :                 ${it.allocatedSize()}
                Idle size         :                 ${it.idleSize()}\n
            """.trimIndent()
                }.orElse("NO METRICS"))
            }
        }

بالطبع ، إذا كنت ترغب في ذلك ، يمكنك تقديم ذلك عبر HTML DSL.


الموجودات


يمكنك إنشاء coroutines بتدفقات تفاعلية ، ولكنك تحتاج إلى التبديل بين الأنماط التفاعلية والضرورية - ويفضل أقل في كثير من الأحيان ومحاولة الالتزام بنمط واحد.


ليس فقط ربيع واحد!


الوصول إلى قاعدة البيانات التفاعلية ليس جميلًا مثل ماكياج Spring Data JPA ، ولكن يمكنك استخدامه.


All Articles