Hola a todos.Hace unos días, JetBrains lanzó una nueva versión de Corutin - 1.3.6 y una de las innovaciones fue la nueva subespecie de Flow - StateFlow, que reemplaza a ConflatedBroadcastChannel. Decidí probar StateFlow en acción y estudiar la estructura interna.Creo que muchos de los que usan Kotlin cuando desarrollan para Android o en MPP están familiarizados con estos términos, quienes no lo están: estas entidades son análogos cercanos de BehaviorProcessor / BehaviorSubject de RxJava y LiveData / MutableLiveData de Jetpack.StateFlow en sí es una extensión simple de la interfaz Flow y viene en dos formas:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
La idea es la misma que en LiveData / MutableLiveData: a través de una interfaz solo puede leer el estado actual, y a través de la otra también se puede instalar.¿Qué nos ofrece StateFlow en comparación con ConflatedBroadcastChannel:- Una implementación interna más simple y libre de basura.
- La necesidad de un elemento predeterminado. Nulo también es posible.
- Separación en interfaces de solo lectura y lectura-escritura.
- Comparar elementos a través de la igualdad en lugar de comparar enlaces.
Ahora intentemos implementar un uso simple de StateFlow. Para hacer esto, hice un contenedor básico con la capacidad de establecer cualquier tipo con un elemento nulo de forma predeterminada:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Obtenemos los datos:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
Y mostramos en la pantalla con la interfaz más simple para las pruebas, no causa ningún problema y todo funciona como un reloj:
ahora veamos el interior y veamos cómo se implementa.Para mi sorpresa, la implementación es realmente muy simple y actualmente solo toma 316 líneas, el 25% de las cuales son javadoki.Y así, la clase de implementación principal es la clase StateFlowImpl:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - enlace atómico para almacenar nuestro estado.secuencia : un indicador auxiliar que, según la paridad / rareza, informa sobre el proceso actual de actualización de lasranuras de estado : matriz / agrupación StateFlowSlot. StateFlowSlot: abstracción auxiliar de cada "conexión" a StateFlow.nSlots, nextIndex : variables auxiliares para trabajar con las ranuras de matriz extensiblesConsideremos de antemano StateFlowSlot. Él representa solo:private val _state = atomic<Any?>(null)
Más métodos para cambiar los estados de las ranuras.Cada ranura puede estar en uno de los estados:nulo - se crea, pero no se usaNINGUNO -estado PENDIENTE del colector usado - en previsión de enviar un nuevo valor al colectorCancelableContinuationImpl - el estado suspendido cerca del estado PENDIENTE de destino, están suspendiendo el colector no llegará hasta un nuevo estado en el flujo de estado.Considere lo que sucede cuando establece un nuevo valor:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
La tarea principal aquí es corregir los cambios de estado de StateFlow de diferentes flujos para llamadas sucesivas a FlowCollector.Se pueden distinguir varios pasos:- Establecer un nuevo valor.
- Establecer el marcador de secuencia en un valor impar significa que ya estamos en el proceso de actualización.
- makePending (): establece todos los estados de ranura (es decir, todas las conexiones) en PENDIENTE: pronto enviaremos un nuevo valor.
- El ciclo verifica la secuencia == curSequence de que todas las tareas se completen y establece la secuencia en un número par.
Lo que sucede en el método de recopilación :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
La tarea principal es enviar el valor predeterminado inicial y esperar nuevos valores:- Creamos o reutilizamos una ranura para una nueva conexión.
- Verificamos el estado por nulo o por cambio de estado. Emittim es un nuevo significado.
- Verificamos si hay ranuras listas para actualizar (estado PENDIENTE) y si no, suspendemos la ranura en anticipación de nuevos valores.
En general, eso es todo. No consideramos cómo ocurre la asignación de espacios y el cambio de sus estados, pero supuse que esto no es importante para la imagen general de StateFlow.Gracias.