Halo semuanya.Beberapa hari yang lalu, JetBrains merilis versi baru Corutin - 1.3.6 dan salah satu inovasi adalah subspesies baru Flow - StateFlow, yang menggantikan ConflatedBroadcastChannel. Saya memutuskan untuk mencoba StateFlow dalam aksi dan mempelajari struktur internal.Saya pikir banyak yang menggunakan Kotlin ketika mengembangkan untuk Android atau MPP yang akrab dengan istilah-istilah ini, siapa yang tidak - entitas ini adalah analog dekat dari BehaviorProcessor / BehaviorSubject dari RxJava dan LiveData / MutableLiveData dari Jetpack.StateFlow sendiri adalah ekstensi sederhana dari antarmuka Flow dan hadir dalam dua bentuk:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
Idenya sama dengan di LiveData / MutableLiveData - melalui satu antarmuka hanya dapat membaca keadaan saat ini, dan melalui yang lain juga dapat diinstal.Apa yang ditawarkan StateFlow kepada kami dibandingkan dengan ConflatedBroadcastChannel:- Implementasi internal yang lebih sederhana dan bebas sampah.
- Kebutuhan akan barang bawaan. Null juga dimungkinkan.
- Pemisahan menjadi antarmuka read-only dan read-write.
- Membandingkan elemen melalui kesetaraan alih-alih membandingkan tautan.
Sekarang mari kita coba menerapkan penggunaan sederhana StateFlow. Untuk melakukan ini, saya membuat pembungkus dasar dengan kemampuan untuk mengatur semua jenis dengan elemen nol secara default:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Kami mendapatkan data:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
Dan kami menampilkan di layar dengan antarmuka paling sederhana untuk pengujian, itu tidak menyebabkan masalah dan semuanya berfungsi seperti jam:
Sekarang mari kita lihat ke dalam dan lihat bagaimana penerapannya.Yang mengejutkan saya, implementasinya sangat sederhana dan saat ini hanya membutuhkan 316 baris, 25% di antaranya adalah javadoki.Jadi, kelas implementasi utama adalah kelas StateFlowImpl:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - tautan atom untuk menyimpan keadaan kita.urutan - indikator tambahan, yang, tergantung pada paritas / keanehan, melaporkan proses saat ini memperbaruislot negara - array / kolam StateFlowSlot. StateFlowSlot - abstraksi tambahan dari setiap "koneksi" ke StateFlow.nSlots, nextIndex - variabel pembantu untuk bekerja dengan slot array yang dapat diperluasMari kita pertimbangkan StateFlowSlot sebelumnya. Dia hanya mewakili:private val _state = atomic<Any?>(null)
Metode plus untuk mengubah status slot.Setiap slot dapat berada dalam salah satu dari keadaan berikut:null - dibuat, tetapi tidak digunakanNONE - digunakan oleh kolektorPENDING - sambil menunggu nilai baru untuk dikirim ke kolektorCancellableContinuationImpl - status ditangguhkan, dekat dengan tujuan dengan PENDING, tunda kolektor sampai negara baru tiba di StateFlow.Pertimbangkan apa yang terjadi ketika Anda menetapkan nilai baru:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
Tugas utama di sini adalah untuk memperbaiki perubahan status StateFlow dari aliran berbeda untuk panggilan berturut-turut ke FlowCollector.Beberapa langkah dapat dibedakan:- Menetapkan nilai baru.
- Mengatur penanda urutan ke nilai ganjil berarti bahwa kami sudah dalam proses memperbarui.
- makePending () - mengatur semua status slot (mis., semua koneksi) ke PENDING - kami akan segera mengirimkan nilai baru.
- Loop memeriksa urutan == curSequence bahwa semua tugas diselesaikan dan menetapkan urutan ke nomor genap.
Apa yang terjadi dalam metode kumpulkan :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
Tugas utama adalah mengirim nilai default awal dan menunggu nilai-nilai baru:- Kami membuat atau menggunakan kembali slot untuk koneksi baru.
- Kami memeriksa status null atau perubahan status. Emittim makna baru.
- Kami memeriksa apakah ada slot yang siap diperbarui (status PENDING) dan jika tidak, kami menangguhkan slot untuk mengantisipasi nilai baru.
Secara umum, itu saja. Kami tidak mempertimbangkan bagaimana alokasi slot dan perubahan status mereka terjadi, tetapi saya pikir ini tidak penting untuk gambaran keseluruhan StateFlow.Terima kasih