Hallo alle zusammen.Vor einigen Tagen veröffentlichte JetBrains eine neue Version von Corutin - 1.3.6. Eine der Innovationen war die neue Unterart Flow - StateFlow, die den ConflatedBroadcastChannel ersetzt. Ich beschloss, StateFlow in Aktion zu testen und die interne Struktur zu untersuchen.Ich denke, viele, die Kotlin bei der Entwicklung für Android oder in MPP verwenden, kennen diese Begriffe, die es nicht sind - diese Entitäten sind enge Analoga von BehaviorProcessor / BehaviorSubject von RxJava und LiveData / MutableLiveData von Jetpack.StateFlow selbst ist eine einfache Erweiterung der Flow-Schnittstelle und gibt es in zwei Formen:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
Die Idee ist dieselbe wie in LiveData / MutableLiveData - über eine Schnittstelle kann nur der aktuelle Status gelesen und über die andere auch installiert werden.Was bietet uns StateFlow im Vergleich zu ConflatedBroadcastChannel:- Eine einfachere und müllfreie interne Implementierung.
- Die Notwendigkeit eines Standardelements. Null ist auch möglich.
- Trennung in schreibgeschützte und schreibgeschützte Schnittstellen.
- Vergleichen von Elementen über Gleichheit statt Vergleichen von Links.
Versuchen wir nun, eine einfache Verwendung von StateFlow zu implementieren. Zu diesem Zweck habe ich einen Elementar-Wrapper erstellt, mit dem jeder Typ standardmäßig mit einem Null-Element festgelegt werden kann:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Wir bekommen die Daten:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
Und wir zeigen auf dem Bildschirm die einfachste Oberfläche für Tests an, es verursacht keine Probleme und alles funktioniert wie eine Uhr:
Schauen wir uns jetzt an und sehen, wie es implementiert wird.Zu meiner Überraschung ist die Implementierung wirklich sehr einfach und dauert derzeit nur 316 Zeilen, von denen 25% Javadoki sind.Daher ist die Hauptimplementierungsklasse die StateFlowImpl-Klasse:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - atomarer Link zum Speichern unseres Zustands.Sequenz - ein Hilfsindikator, der abhängig von der Parität / Seltsamkeit über den aktuellen Prozess der Aktualisierung der Status-Slots berichtet - Array / Pool StateFlowSlot. StateFlowSlot - Hilfsabstraktion jeder "Verbindung" zu StateFlow.nSlots, nextIndex - Hilfsvariablen für die Arbeit mit den erweiterbaren Array-SlotsBetrachten wir StateFlowSlot im Voraus. Er vertritt nur:private val _state = atomic<Any?>(null)
Plus-Methoden zum Ändern der Steckplatzzustände.Jeder Schlitz kann in einem der Zustände sein:null - erstellt, aber nicht verwendetNONE - durch den verwendetenPENDING Kollektor - während für einen neuen Wert warten an den Kollektor gesendet werdenCancellableContinuationImpl - Zustand ausgesetzt, nahe dem Ziel mit PENDING, Aussetzung der Sammler , bis ein neuer Zustand kommt in Stateflow.Überlegen Sie, was passiert, wenn Sie einen neuen Wert festlegen:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
Die Hauptaufgabe hier besteht darin, StateFlow-Statusänderungen aus verschiedenen Flows für aufeinanderfolgende Aufrufe von FlowCollector zu korrigieren.Es können mehrere Schritte unterschieden werden:- Neuen Wert setzen.
- Wenn Sie den Sequenzmarker auf einen ungeraden Wert setzen, werden wir bereits aktualisiert.
- makePending () - Setzen aller Steckplatzzustände (d. h. aller Verbindungen) auf PENDING - Wir werden bald einen neuen Wert senden.
- Die Schleife überprüft die Sequenz == curSequence, ob alle Aufgaben abgeschlossen sind, und setzt die Sequenz auf eine gerade Zahl.
Was passiert bei der Erfassungsmethode :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
Die Hauptaufgabe besteht darin, den anfänglichen Standardwert zu senden und auf neue Werte zu warten:- Wir erstellen oder verwenden einen Steckplatz für eine neue Verbindung.
- Wir prüfen den Zustand auf Null oder auf Zustandsänderung. Emittim ist eine neue Bedeutung.
- Wir prüfen, ob Slots zur Aktualisierung bereit sind (Status PENDING), und wenn nicht, setzen wir den Slot in Erwartung neuer Werte aus.
Im Allgemeinen ist das alles. Wir haben nicht berücksichtigt, wie die Zuweisung von Slots und die Änderung ihrer Zustände erfolgen, aber ich habe festgestellt, dass dies für das Gesamtbild von StateFlow nicht wichtig ist.Danke.