نحن نحاول أن نفهم StateFlow

تحية للجميع.

قبل بضعة أيام ، أصدرت JetBrains إصدارًا جديدًا من Corutin - 1.3.6 وكان أحد الابتكارات هو النوع الفرعي الجديد من Flow - StateFlow ، الذي يحل محل ConflatedBroadcastChannel. قررت أن أجرب StateFlow في العمل ودراسة الهيكل الداخلي.

أعتقد أن العديد ممن يستخدمون Kotlin عند التطوير لنظام Android أو في MPP على دراية بهذه المصطلحات ، وهم ليسوا كذلك - هذه الكيانات هي نظائر قريبة من BehaviorProcessor / BehaviorSubject من RxJava و LiveData / MutableLiveData من Jetpack.

StateFlow نفسها عبارة عن امتداد بسيط لواجهة Flow ويأتي في شكلين:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

الفكرة هي نفسها كما في LiveData / MutableLiveData - من خلال واجهة واحدة يمكن فقط قراءة الحالة الحالية ، ومن خلال أخرى يمكن تثبيتها أيضًا.

ماذا تقدم لنا StateFlow مقارنة ConflatedBroadcastChannel:

  • تنفيذ داخلي أبسط وخالي من القمامة.
  • الحاجة إلى عنصر افتراضي. Null ممكن أيضًا.
  • الفصل في واجهات القراءة فقط والكتابة.
  • مقارنة العناصر من خلال المساواة بدلاً من مقارنة الروابط.

الآن دعونا نحاول تنفيذ استخدام بسيط لـ StateFlow. للقيام بذلك ، قمت بعمل غلاف أولي مع القدرة على تعيين أي نوع بعنصر فارغ افتراضيًا:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

نحصل على البيانات:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

ونعرض على الشاشة مع أبسط واجهة للاختبارات ، ولا يسبب أي مشاكل ويعمل كل شيء مثل الساعة:



الآن دعونا ننظر في الداخل ونرى كيف يتم تنفيذه.

من المدهش أن التنفيذ بسيط للغاية ولا يستغرق حاليًا سوى 316 خطًا ، 25٪ منها هي javadoki.

وهكذا ، فإن فئة التنفيذ الرئيسية هي فئة StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - الرابط الذري لتخزين دولتنا.

تسلسل - مؤشر مساعد ، والذي يعتمد على العملية الحالية لتحديث
فتحات الحالة - الصفيف / التجمع StateFlowSlot ، اعتمادًا على التكافؤ / الغرابة . StateFlowSlot - التجريد الإضافي لكل "اتصال" بـ StateFlow.
nSlots ، nextIndex - المتغيرات المساعدة للعمل مع فتحات المصفوفة القابلة للتوسيع

دعونا ننظر في StateFlowSlot مقدمًا. يمثل فقط:

private val _state = atomic<Any?>(null)

بالإضافة إلى طرق لتغيير حالات الفتحة.

يمكن أن تكون كل فتحة في إحدى الحالات:

فارغة - يتم إنشاؤها ، ولكن لم يتم استخدام
NONE -
حالة PENDING المجمعة المستخدمة - تحسبًا لإرسال قيمة جديدة إلى جامع
CancellableContinuationImpl - حالة saspended بالقرب من حالة PENDING الوجهة ، يتم تعليق المجمّع لن يأتي حتى حالة جديدة في Stateflow.

ضع في اعتبارك ما يحدث عند تعيين قيمة جديدة:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

المهمة الرئيسية هنا هي إصلاح تغييرات حالة StateFlow من التدفقات المختلفة للمكالمات المتتالية إلى FlowCollector.

يمكن تمييز عدة خطوات:

  1. تحديد قيمة جديدة.
  2. يعني تعيين علامة التسلسل على قيمة فردية أننا بالفعل في عملية التحديث.
  3. makePending () - ضبط جميع حالات الفتحات (أي جميع الاتصالات) على تعليق - سنرسل قريبًا قيمة جديدة.
  4. تقوم الحلقة بفحص التسلسل == curSequence بأن جميع المهام مكتملة وتضبط التسلسل على رقم زوجي.

ماذا يحدث في طريقة الجمع :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

المهمة الرئيسية هي إرسال القيمة الافتراضية الأولية وانتظر القيم الجديدة:

  1. نقوم بإنشاء أو إعادة استخدام فتحة لاتصال جديد.
  2. نتحقق من الحالة بحثًا عن قيمة فارغة أو لتغيير الدولة. Emittim معنى جديد.
  3. نتحقق مما إذا كانت هناك فتحات جاهزة للتحديث (حالة تعليق) وإذا لم تكن كذلك ، فإننا نعلق الفتحة تحسبًا لقيم جديدة.

بشكل عام ، هذا كل شيء. لم نأخذ في الاعتبار كيف يتم تخصيص الفتحات وتغيير حالاتها ، لكنني اعتقدت أن هذا ليس مهمًا للصورة العامة لـ StateFlow.

شكرا.

All Articles