Kami mencoba dan memahami StateFlow

Halo semuanya.

Beberapa hari yang lalu, JetBrains merilis versi baru Corutin - 1.3.6 dan salah satu inovasi adalah subspesies baru Flow - StateFlow, yang menggantikan ConflatedBroadcastChannel. Saya memutuskan untuk mencoba StateFlow dalam aksi dan mempelajari struktur internal.

Saya pikir banyak yang menggunakan Kotlin ketika mengembangkan untuk Android atau MPP yang akrab dengan istilah-istilah ini, siapa yang tidak - entitas ini adalah analog dekat dari BehaviorProcessor / BehaviorSubject dari RxJava dan LiveData / MutableLiveData dari Jetpack.

StateFlow sendiri adalah ekstensi sederhana dari antarmuka Flow dan hadir dalam dua bentuk:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

Idenya sama dengan di LiveData / MutableLiveData - melalui satu antarmuka hanya dapat membaca keadaan saat ini, dan melalui yang lain juga dapat diinstal.

Apa yang ditawarkan StateFlow kepada kami dibandingkan dengan ConflatedBroadcastChannel:

  • Implementasi internal yang lebih sederhana dan bebas sampah.
  • Kebutuhan akan barang bawaan. Null juga dimungkinkan.
  • Pemisahan menjadi antarmuka read-only dan read-write.
  • Membandingkan elemen melalui kesetaraan alih-alih membandingkan tautan.

Sekarang mari kita coba menerapkan penggunaan sederhana StateFlow. Untuk melakukan ini, saya membuat pembungkus dasar dengan kemampuan untuk mengatur semua jenis dengan elemen nol secara default:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Kami mendapatkan data:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

Dan kami menampilkan di layar dengan antarmuka paling sederhana untuk pengujian, itu tidak menyebabkan masalah dan semuanya berfungsi seperti jam:



Sekarang mari kita lihat ke dalam dan lihat bagaimana penerapannya.

Yang mengejutkan saya, implementasinya sangat sederhana dan saat ini hanya membutuhkan 316 baris, 25% di antaranya adalah javadoki.

Jadi, kelas implementasi utama adalah kelas StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - tautan atom untuk menyimpan keadaan kita.

urutan - indikator tambahan, yang, tergantung pada paritas / keanehan, melaporkan proses saat ini memperbarui
slot negara - array / kolam StateFlowSlot. StateFlowSlot - abstraksi tambahan dari setiap "koneksi" ke StateFlow.
nSlots, nextIndex - variabel pembantu untuk bekerja dengan slot array yang dapat diperluas

Mari kita pertimbangkan StateFlowSlot sebelumnya. Dia hanya mewakili:

private val _state = atomic<Any?>(null)

Metode plus untuk mengubah status slot.

Setiap slot dapat berada dalam salah satu dari keadaan berikut:

null - dibuat, tetapi tidak digunakan
NONE - digunakan oleh kolektor
PENDING - sambil menunggu nilai baru untuk dikirim ke kolektor
CancellableContinuationImpl - status ditangguhkan, dekat dengan tujuan dengan PENDING, tunda kolektor sampai negara baru tiba di StateFlow.

Pertimbangkan apa yang terjadi ketika Anda menetapkan nilai baru:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

Tugas utama di sini adalah untuk memperbaiki perubahan status StateFlow dari aliran berbeda untuk panggilan berturut-turut ke FlowCollector.

Beberapa langkah dapat dibedakan:

  1. Menetapkan nilai baru.
  2. Mengatur penanda urutan ke nilai ganjil berarti bahwa kami sudah dalam proses memperbarui.
  3. makePending () - mengatur semua status slot (mis., semua koneksi) ke PENDING - kami akan segera mengirimkan nilai baru.
  4. Loop memeriksa urutan == curSequence bahwa semua tugas diselesaikan dan menetapkan urutan ke nomor genap.

Apa yang terjadi dalam metode kumpulkan :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

Tugas utama adalah mengirim nilai default awal dan menunggu nilai-nilai baru:

  1. Kami membuat atau menggunakan kembali slot untuk koneksi baru.
  2. Kami memeriksa status null atau perubahan status. Emittim makna baru.
  3. Kami memeriksa apakah ada slot yang siap diperbarui (status PENDING) dan jika tidak, kami menangguhkan slot untuk mengantisipasi nilai baru.

Secara umum, itu saja. Kami tidak mempertimbangkan bagaimana alokasi slot dan perubahan status mereka terjadi, tetapi saya pikir ini tidak penting untuk gambaran keseluruhan StateFlow.

Terima kasih

All Articles