Wir versuchen StateFlow zu verstehen

Hallo alle zusammen.

Vor einigen Tagen veröffentlichte JetBrains eine neue Version von Corutin - 1.3.6. Eine der Innovationen war die neue Unterart Flow - StateFlow, die den ConflatedBroadcastChannel ersetzt. Ich beschloss, StateFlow in Aktion zu testen und die interne Struktur zu untersuchen.

Ich denke, viele, die Kotlin bei der Entwicklung für Android oder in MPP verwenden, kennen diese Begriffe, die es nicht sind - diese Entitäten sind enge Analoga von BehaviorProcessor / BehaviorSubject von RxJava und LiveData / MutableLiveData von Jetpack.

StateFlow selbst ist eine einfache Erweiterung der Flow-Schnittstelle und gibt es in zwei Formen:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

Die Idee ist dieselbe wie in LiveData / MutableLiveData - über eine Schnittstelle kann nur der aktuelle Status gelesen und über die andere auch installiert werden.

Was bietet uns StateFlow im Vergleich zu ConflatedBroadcastChannel:

  • Eine einfachere und müllfreie interne Implementierung.
  • Die Notwendigkeit eines Standardelements. Null ist auch möglich.
  • Trennung in schreibgeschützte und schreibgeschützte Schnittstellen.
  • Vergleichen von Elementen über Gleichheit statt Vergleichen von Links.

Versuchen wir nun, eine einfache Verwendung von StateFlow zu implementieren. Zu diesem Zweck habe ich einen Elementar-Wrapper erstellt, mit dem jeder Typ standardmäßig mit einem Null-Element festgelegt werden kann:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Wir bekommen die Daten:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

Und wir zeigen auf dem Bildschirm die einfachste Oberfläche für Tests an, es verursacht keine Probleme und alles funktioniert wie eine Uhr:



Schauen wir uns jetzt an und sehen, wie es implementiert wird.

Zu meiner Überraschung ist die Implementierung wirklich sehr einfach und dauert derzeit nur 316 Zeilen, von denen 25% Javadoki sind.

Daher ist die Hauptimplementierungsklasse die StateFlowImpl-Klasse:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - atomarer Link zum Speichern unseres Zustands.

Sequenz - ein Hilfsindikator, der abhängig von der Parität / Seltsamkeit über den aktuellen Prozess der Aktualisierung der Status-
Slots berichtet - Array / Pool StateFlowSlot. StateFlowSlot - Hilfsabstraktion jeder "Verbindung" zu StateFlow.
nSlots, nextIndex - Hilfsvariablen für die Arbeit mit den erweiterbaren Array-Slots

Betrachten wir StateFlowSlot im Voraus. Er vertritt nur:

private val _state = atomic<Any?>(null)

Plus-Methoden zum Ändern der Steckplatzzustände.

Jeder Schlitz kann in einem der Zustände sein:

null - erstellt, aber nicht verwendet
NONE - durch den verwendeten
PENDING Kollektor - während für einen neuen Wert warten an den Kollektor gesendet werden
CancellableContinuationImpl - Zustand ausgesetzt, nahe dem Ziel mit PENDING, Aussetzung der Sammler , bis ein neuer Zustand kommt in Stateflow.

Überlegen Sie, was passiert, wenn Sie einen neuen Wert festlegen:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

Die Hauptaufgabe hier besteht darin, StateFlow-Statusänderungen aus verschiedenen Flows für aufeinanderfolgende Aufrufe von FlowCollector zu korrigieren.

Es können mehrere Schritte unterschieden werden:

  1. Neuen Wert setzen.
  2. Wenn Sie den Sequenzmarker auf einen ungeraden Wert setzen, werden wir bereits aktualisiert.
  3. makePending () - Setzen aller Steckplatzzustände (d. h. aller Verbindungen) auf PENDING - Wir werden bald einen neuen Wert senden.
  4. Die Schleife überprüft die Sequenz == curSequence, ob alle Aufgaben abgeschlossen sind, und setzt die Sequenz auf eine gerade Zahl.

Was passiert bei der Erfassungsmethode :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

Die Hauptaufgabe besteht darin, den anfänglichen Standardwert zu senden und auf neue Werte zu warten:

  1. Wir erstellen oder verwenden einen Steckplatz für eine neue Verbindung.
  2. Wir prüfen den Zustand auf Null oder auf Zustandsänderung. Emittim ist eine neue Bedeutung.
  3. Wir prüfen, ob Slots zur Aktualisierung bereit sind (Status PENDING), und wenn nicht, setzen wir den Slot in Erwartung neuer Werte aus.

Im Allgemeinen ist das alles. Wir haben nicht berücksichtigt, wie die Zuweisung von Slots und die Änderung ihrer Zustände erfolgen, aber ich habe festgestellt, dass dies für das Gesamtbild von StateFlow nicht wichtig ist.

Danke.

All Articles