تحية للجميع.قبل بضعة أيام ، أصدرت JetBrains إصدارًا جديدًا من Corutin - 1.3.6 وكان أحد الابتكارات هو النوع الفرعي الجديد من Flow - StateFlow ، الذي يحل محل ConflatedBroadcastChannel. قررت أن أجرب StateFlow في العمل ودراسة الهيكل الداخلي.أعتقد أن العديد ممن يستخدمون Kotlin عند التطوير لنظام Android أو في MPP على دراية بهذه المصطلحات ، وهم ليسوا كذلك - هذه الكيانات هي نظائر قريبة من BehaviorProcessor / BehaviorSubject من RxJava و LiveData / MutableLiveData من Jetpack.StateFlow نفسها عبارة عن امتداد بسيط لواجهة Flow ويأتي في شكلين:public interface StateFlow<out T> : Flow<T> {
    
    public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
    
    public override var value: T
}
الفكرة هي نفسها كما في LiveData / MutableLiveData - من خلال واجهة واحدة يمكن فقط قراءة الحالة الحالية ، ومن خلال أخرى يمكن تثبيتها أيضًا.ماذا تقدم لنا StateFlow مقارنة ConflatedBroadcastChannel:- تنفيذ داخلي أبسط وخالي من القمامة.
 - الحاجة إلى عنصر افتراضي. Null ممكن أيضًا.
 - الفصل في واجهات القراءة فقط والكتابة.
 - مقارنة العناصر من خلال المساواة بدلاً من مقارنة الروابط.
 
الآن دعونا نحاول تنفيذ استخدام بسيط لـ StateFlow. للقيام بذلك ، قمت بعمل غلاف أولي مع القدرة على تعيين أي نوع بعنصر فارغ افتراضيًا:class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)
    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }
    val stream: Flow<T?> = stateFlow
}
نحصل على البيانات:lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }
ونعرض على الشاشة مع أبسط واجهة للاختبارات ، ولا يسبب أي مشاكل ويعمل كل شيء مثل الساعة:
الآن دعونا ننظر في الداخل ونرى كيف يتم تنفيذه.من المدهش أن التنفيذ بسيط للغاية ولا يستغرق حاليًا سوى 316 خطًا ، 25٪ منها هي javadoki.وهكذا ، فإن فئة التنفيذ الرئيسية هي فئة StateFlowImpl:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) 
    private var sequence = 0 
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 
    private var nextIndex = 0 
. . .
}
_state - الرابط الذري لتخزين دولتنا.تسلسل - مؤشر مساعد ، والذي يعتمد على العملية الحالية لتحديثفتحات الحالة - الصفيف / التجمع StateFlowSlot ، اعتمادًا على التكافؤ / الغرابة . StateFlowSlot - التجريد الإضافي لكل "اتصال" بـ StateFlow.nSlots ، nextIndex - المتغيرات المساعدة للعمل مع فتحات المصفوفة القابلة للتوسيعدعونا ننظر في StateFlowSlot مقدمًا. يمثل فقط:private val _state = atomic<Any?>(null)
بالإضافة إلى طرق لتغيير حالات الفتحة.يمكن أن تكون كل فتحة في إحدى الحالات:فارغة - يتم إنشاؤها ، ولكن لم يتم استخدامNONE -حالة PENDING المجمعة المستخدمة - تحسبًا لإرسال قيمة جديدة إلى جامعCancellableContinuationImpl - حالة saspended بالقرب من حالة PENDING الوجهة ، يتم تعليق المجمّع لن يأتي حتى حالة جديدة في Stateflow.ضع في اعتبارك ما يحدث عند تعيين قيمة جديدة:public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots 
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return 
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { 
                    curSequence++ 
                    sequence = curSequence
                } else {
                    
                    sequence = curSequence + 2 
                    return
                }
                curSlots = slots 
            }
            
            while (true) {
                
                for (col in curSlots) {
                    col?.makePending()
                }
                
                synchronized(this) {
                    if (sequence == curSequence) { 
                        sequence = curSequence + 1 
                        return 
                    }
                    
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }
المهمة الرئيسية هنا هي إصلاح تغييرات حالة StateFlow من التدفقات المختلفة للمكالمات المتتالية إلى FlowCollector.يمكن تمييز عدة خطوات:- تحديد قيمة جديدة.
 - يعني تعيين علامة التسلسل على قيمة فردية أننا بالفعل في عملية التحديث.
 - makePending () - ضبط جميع حالات الفتحات (أي جميع الاتصالات) على تعليق - سنرسل قريبًا قيمة جديدة.
 - تقوم الحلقة بفحص التسلسل == curSequence بأن جميع المهام مكتملة وتضبط التسلسل على رقم زوجي.
 
ماذا يحدث في طريقة الجمع :override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null 
        try {
            
            while (true) {
                
                
                val newState = _state.value
                
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                
                if (!slot.takePending()) { 
                    slot.awaitPending() 
                }
            }
        } finally {
            freeSlot(slot)
        }
    }
المهمة الرئيسية هي إرسال القيمة الافتراضية الأولية وانتظر القيم الجديدة:- نقوم بإنشاء أو إعادة استخدام فتحة لاتصال جديد.
 - نتحقق من الحالة بحثًا عن قيمة فارغة أو لتغيير الدولة. Emittim معنى جديد.
 - نتحقق مما إذا كانت هناك فتحات جاهزة للتحديث (حالة تعليق) وإذا لم تكن كذلك ، فإننا نعلق الفتحة تحسبًا لقيم جديدة.
 
بشكل عام ، هذا كل شيء. لم نأخذ في الاعتبار كيف يتم تخصيص الفتحات وتغيير حالاتها ، لكنني اعتقدت أن هذا ليس مهمًا للصورة العامة لـ StateFlow.شكرا.