Nous essayons de comprendre StateFlow

Bonjour à tous.

Il y a quelques jours, JetBrains a publié une nouvelle version de Corutin - 1.3.6 et l'une des innovations a été la nouvelle sous-espèce de Flow - StateFlow, qui remplace ConflatedBroadcastChannel. J'ai décidé d'essayer StateFlow en action et d'étudier la structure interne.

Je pense que beaucoup de ceux qui utilisent Kotlin lors du développement pour Android ou MPP connaissent ces termes, qui ne le sont pas - ces entités sont des analogues proches de BehaviorProcessor / BehaviorSubject de RxJava et LiveData / MutableLiveData de Jetpack.

StateFlow lui-même est une simple extension de l'interface Flow et se présente sous deux formes:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

L'idée est la même que dans LiveData / MutableLiveData - via une interface, elle ne peut lire que l'état actuel, et via l'autre, elle peut également être installée.

Qu'est-ce que StateFlow nous offre par rapport à ConflatedBroadcastChannel:

  • Une implémentation interne plus simple et sans déchets.
  • La nécessité d'un élément par défaut. Null est également possible.
  • Séparation en interfaces en lecture seule et en lecture-écriture.
  • Comparer des éléments via l'égalité au lieu de comparer des liens.

Essayons maintenant d'implémenter une utilisation simple de StateFlow. Pour ce faire, j'ai créé un wrapper élémentaire avec la possibilité de définir n'importe quel type avec un élément null par défaut:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Nous obtenons les données:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

Et nous affichons sur l'écran avec l'interface la plus simple pour les tests, cela ne pose aucun problème et tout fonctionne comme une horloge:



maintenant regardons à l'intérieur et voyons comment il est implémenté.

À ma grande surprise, la mise en œuvre est vraiment très simple et ne prend actuellement que 316 lignes, dont 25% de javadoki.

Et donc, la classe d'implémentation principale est la classe StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - lien atomique pour stocker notre état.

séquence - un indicateur auxiliaire qui, en fonction de la parité / bizarrerie, rend compte du processus actuel de mise à jour des
emplacements d' état - tableau / pool StateFlowSlot. StateFlowSlot - abstraction auxiliaire de chaque "connexion" à StateFlow.
nSlots, nextIndex - variables d'assistance pour travailler avec les emplacements de tableau extensibles

Considérons à l'avance StateFlowSlot. Il représente uniquement:

private val _state = atomic<Any?>(null)

Plus de méthodes pour changer les états des emplacements.

Chaque emplacement peut être dans l'un des états suivants:

null - créé, mais non utilisé
AUCUN - utilisé par le collecteur
PENDING - en attendant qu'une nouvelle valeur soit envoyée au collecteur
CancellableContinuationImpl - état suspendu, proche de la destination avec PENDING, suspendez le collecteur jusqu'à ce qu'un nouvel état arrive dans StateFlow.

Considérez ce qui se passe lorsque vous définissez une nouvelle valeur:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

La tâche principale ici est de corriger les changements d'état de StateFlow à partir de différents flux pour des appels successifs à FlowCollector.

On distingue plusieurs étapes:

  1. Définition d'une nouvelle valeur.
  2. Mettre le marqueur de séquence sur une valeur impaire signifie que nous sommes déjà en train de mettre à jour.
  3. makePending () - définissant tous les états des emplacements (c'est-à-dire toutes les connexions) sur PENDING - nous enverrons bientôt une nouvelle valeur.
  4. La boucle vérifie la séquence == curSequence que toutes les tâches sont terminées et définit la séquence sur un nombre pair.

Que se passe-t-il dans la méthode de collecte :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

La tâche principale consiste à envoyer la valeur par défaut initiale et à attendre de nouvelles valeurs:

  1. Nous créons ou réutilisons un emplacement pour une nouvelle connexion.
  2. Nous vérifions l'état pour null ou pour un changement d'état. Emittim est un nouveau sens.
  3. Nous vérifions s'il y a des emplacements prêts pour la mise à jour (état PENDING) et sinon, nous suspendons l'emplacement en prévision de nouvelles valeurs.

En général, c'est tout. Nous n'avons pas considéré comment l'allocation des créneaux horaires et le changement de leurs états se produisent, mais je me suis dit que ce n'était pas important pour l'image globale de StateFlow.

Remercier.

All Articles