تحية للجميع.قبل بضعة أيام ، أصدرت JetBrains إصدارًا جديدًا من Corutin - 1.3.6 وكان أحد الابتكارات هو النوع الفرعي الجديد من Flow - StateFlow ، الذي يحل محل ConflatedBroadcastChannel. قررت أن أجرب StateFlow في العمل ودراسة الهيكل الداخلي.أعتقد أن العديد ممن يستخدمون Kotlin عند التطوير لنظام Android أو في MPP على دراية بهذه المصطلحات ، وهم ليسوا كذلك - هذه الكيانات هي نظائر قريبة من BehaviorProcessor / BehaviorSubject من RxJava و LiveData / MutableLiveData من Jetpack.StateFlow نفسها عبارة عن امتداد بسيط لواجهة Flow ويأتي في شكلين:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
الفكرة هي نفسها كما في LiveData / MutableLiveData - من خلال واجهة واحدة يمكن فقط قراءة الحالة الحالية ، ومن خلال أخرى يمكن تثبيتها أيضًا.ماذا تقدم لنا StateFlow مقارنة ConflatedBroadcastChannel:- تنفيذ داخلي أبسط وخالي من القمامة.
- الحاجة إلى عنصر افتراضي. Null ممكن أيضًا.
- الفصل في واجهات القراءة فقط والكتابة.
- مقارنة العناصر من خلال المساواة بدلاً من مقارنة الروابط.
الآن دعونا نحاول تنفيذ استخدام بسيط لـ StateFlow. للقيام بذلك ، قمت بعمل غلاف أولي مع القدرة على تعيين أي نوع بعنصر فارغ افتراضيًا:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
نحصل على البيانات:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
ونعرض على الشاشة مع أبسط واجهة للاختبارات ، ولا يسبب أي مشاكل ويعمل كل شيء مثل الساعة:
الآن دعونا ننظر في الداخل ونرى كيف يتم تنفيذه.من المدهش أن التنفيذ بسيط للغاية ولا يستغرق حاليًا سوى 316 خطًا ، 25٪ منها هي javadoki.وهكذا ، فإن فئة التنفيذ الرئيسية هي فئة StateFlowImpl:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - الرابط الذري لتخزين دولتنا.تسلسل - مؤشر مساعد ، والذي يعتمد على العملية الحالية لتحديثفتحات الحالة - الصفيف / التجمع StateFlowSlot ، اعتمادًا على التكافؤ / الغرابة . StateFlowSlot - التجريد الإضافي لكل "اتصال" بـ StateFlow.nSlots ، nextIndex - المتغيرات المساعدة للعمل مع فتحات المصفوفة القابلة للتوسيعدعونا ننظر في StateFlowSlot مقدمًا. يمثل فقط:private val _state = atomic<Any?>(null)
بالإضافة إلى طرق لتغيير حالات الفتحة.يمكن أن تكون كل فتحة في إحدى الحالات:فارغة - يتم إنشاؤها ، ولكن لم يتم استخدامNONE -حالة PENDING المجمعة المستخدمة - تحسبًا لإرسال قيمة جديدة إلى جامعCancellableContinuationImpl - حالة saspended بالقرب من حالة PENDING الوجهة ، يتم تعليق المجمّع لن يأتي حتى حالة جديدة في Stateflow.ضع في اعتبارك ما يحدث عند تعيين قيمة جديدة:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
المهمة الرئيسية هنا هي إصلاح تغييرات حالة StateFlow من التدفقات المختلفة للمكالمات المتتالية إلى FlowCollector.يمكن تمييز عدة خطوات:- تحديد قيمة جديدة.
- يعني تعيين علامة التسلسل على قيمة فردية أننا بالفعل في عملية التحديث.
- makePending () - ضبط جميع حالات الفتحات (أي جميع الاتصالات) على تعليق - سنرسل قريبًا قيمة جديدة.
- تقوم الحلقة بفحص التسلسل == curSequence بأن جميع المهام مكتملة وتضبط التسلسل على رقم زوجي.
ماذا يحدث في طريقة الجمع :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
المهمة الرئيسية هي إرسال القيمة الافتراضية الأولية وانتظر القيم الجديدة:- نقوم بإنشاء أو إعادة استخدام فتحة لاتصال جديد.
- نتحقق من الحالة بحثًا عن قيمة فارغة أو لتغيير الدولة. Emittim معنى جديد.
- نتحقق مما إذا كانت هناك فتحات جاهزة للتحديث (حالة تعليق) وإذا لم تكن كذلك ، فإننا نعلق الفتحة تحسبًا لقيم جديدة.
بشكل عام ، هذا كل شيء. لم نأخذ في الاعتبار كيف يتم تخصيص الفتحات وتغيير حالاتها ، لكنني اعتقدت أن هذا ليس مهمًا للصورة العامة لـ StateFlow.شكرا.