Intentamos entender StateFlow

Hola a todos.

Hace unos días, JetBrains lanzó una nueva versión de Corutin - 1.3.6 y una de las innovaciones fue la nueva subespecie de Flow - StateFlow, que reemplaza a ConflatedBroadcastChannel. Decidí probar StateFlow en acción y estudiar la estructura interna.

Creo que muchos de los que usan Kotlin cuando desarrollan para Android o en MPP están familiarizados con estos términos, quienes no lo están: estas entidades son análogos cercanos de BehaviorProcessor / BehaviorSubject de RxJava y LiveData / MutableLiveData de Jetpack.

StateFlow en sí es una extensión simple de la interfaz Flow y viene en dos formas:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

La idea es la misma que en LiveData / MutableLiveData: a través de una interfaz solo puede leer el estado actual, y a través de la otra también se puede instalar.

¿Qué nos ofrece StateFlow en comparación con ConflatedBroadcastChannel:

  • Una implementación interna más simple y libre de basura.
  • La necesidad de un elemento predeterminado. Nulo también es posible.
  • Separación en interfaces de solo lectura y lectura-escritura.
  • Comparar elementos a través de la igualdad en lugar de comparar enlaces.

Ahora intentemos implementar un uso simple de StateFlow. Para hacer esto, hice un contenedor básico con la capacidad de establecer cualquier tipo con un elemento nulo de forma predeterminada:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Obtenemos los datos:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

Y mostramos en la pantalla con la interfaz más simple para las pruebas, no causa ningún problema y todo funciona como un reloj:



ahora veamos el interior y veamos cómo se implementa.

Para mi sorpresa, la implementación es realmente muy simple y actualmente solo toma 316 líneas, el 25% de las cuales son javadoki.

Y así, la clase de implementación principal es la clase StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - enlace atómico para almacenar nuestro estado.

secuencia : un indicador auxiliar que, según la paridad / rareza, informa sobre el proceso actual de actualización de las
ranuras de estado : matriz / agrupación StateFlowSlot. StateFlowSlot: abstracción auxiliar de cada "conexión" a StateFlow.
nSlots, nextIndex : variables auxiliares para trabajar con las ranuras de matriz extensibles

Consideremos de antemano StateFlowSlot. Él representa solo:

private val _state = atomic<Any?>(null)

Más métodos para cambiar los estados de las ranuras.

Cada ranura puede estar en uno de los estados:

nulo - se crea, pero no se usa
NINGUNO -
estado PENDIENTE del colector usado - en previsión de enviar un nuevo valor al colector
CancelableContinuationImpl - el estado suspendido cerca del estado PENDIENTE de destino, están suspendiendo el colector no llegará hasta un nuevo estado en el flujo de estado.

Considere lo que sucede cuando establece un nuevo valor:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

La tarea principal aquí es corregir los cambios de estado de StateFlow de diferentes flujos para llamadas sucesivas a FlowCollector.

Se pueden distinguir varios pasos:

  1. Establecer un nuevo valor.
  2. Establecer el marcador de secuencia en un valor impar significa que ya estamos en el proceso de actualización.
  3. makePending (): establece todos los estados de ranura (es decir, todas las conexiones) en PENDIENTE: pronto enviaremos un nuevo valor.
  4. El ciclo verifica la secuencia == curSequence de que todas las tareas se completen y establece la secuencia en un número par.

Lo que sucede en el método de recopilación :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

La tarea principal es enviar el valor predeterminado inicial y esperar nuevos valores:

  1. Creamos o reutilizamos una ranura para una nueva conexión.
  2. Verificamos el estado por nulo o por cambio de estado. Emittim es un nuevo significado.
  3. Verificamos si hay ranuras listas para actualizar (estado PENDIENTE) y si no, suspendemos la ranura en anticipación de nuevos valores.

En general, eso es todo. No consideramos cómo ocurre la asignación de espacios y el cambio de sus estados, pero supuse que esto no es importante para la imagen general de StateFlow.

Gracias.

All Articles