Tentamos entender o StateFlow

Olá a todos.

Alguns dias atrás, o JetBrains lançou uma nova versão do Corutin - 1.3.6 e uma das inovações foi a nova subespécie do Flow - StateFlow, que substitui o ConflatedBroadcastChannel. Decidi experimentar o StateFlow em ação e estudar a estrutura interna.

Eu acho que muitos que usam o Kotlin no desenvolvimento para Android ou MPP estão familiarizados com esses termos, que não são - essas entidades são análogas ao BehaviorProcessor / BehaviorSubject do RxJava e LiveData / MutableLiveData do Jetpack.

O StateFlow em si é uma extensão simples da interface Flow e vem de duas formas:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

A idéia é a mesma do LiveData / MutableLiveData - por uma interface, ele pode apenas ler o estado atual e pela outra, também pode ser instalado.

O que a StateFlow nos oferece em comparação com o ConflatedBroadcastChannel:

  • Uma implementação interna mais simples e sem lixo.
  • A necessidade de um item padrão. Nulo também é possível.
  • Separação em interfaces somente leitura e leitura / gravação.
  • Comparar elementos via igualdade em vez de comparar links.

Agora vamos tentar implementar um uso simples do StateFlow. Para fazer isso, criei um invólucro elementar com a capacidade de definir qualquer tipo com um elemento nulo por padrão:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

Nós obtemos os dados:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

E exibimos na tela a interface mais simples para testes, ela não causa problemas e tudo funciona como um relógio:



agora vamos olhar para dentro e ver como ela é implementada.

Para minha surpresa, a implementação é realmente muito simples e atualmente ocupa apenas 316 linhas, 25% das quais são javadoki.

E assim, a principal classe de implementação é a classe StateFlowImpl:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - link atômico para armazenar nosso estado.

sequência - um indicador auxiliar que, dependendo da paridade / estranheza, informa sobre o processo atual de atualização dos
slots de estado - array / pool StateFlowSlot. StateFlowSlot - abstração auxiliar de cada "conexão" ao StateFlow.
nSlots, nextIndex - variáveis ​​auxiliares para trabalhar com os slots de matriz extensíveis

Vamos considerar StateFlowSlot antecipadamente. Ele representa apenas:

private val _state = atomic<Any?>(null)

Métodos positivos para alterar os estados dos slots.

Cada slot pode estar em um dos estados:

null - é criado, mas não é usado
NENHUM -
estado PENDING do coletor usado - em antecipação ao envio de um novo valor ao coletor
CancellableContinuationImpl - estado suspenso próximo ao estado PENDING de destino, está suspendendo o coletor até que um novo estado no fluxo de estado.

Considere o que acontece quando você define um novo valor:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

A principal tarefa aqui é corrigir as alterações do estado StateFlow de diferentes fluxos para chamadas sucessivas ao FlowCollector.

Vários passos podem ser distinguidos:

  1. Definindo um novo valor.
  2. Definir o marcador de sequência com um valor ímpar significa que já estamos no processo de atualização.
  3. makePending () - configurando todos os estados do slot (ou seja, todas as conexões) para PENDING - em breve enviaremos um novo valor.
  4. O loop verifica a sequência == curSequence que todas as tarefas estão concluídas e define a sequência como um número par.

O que acontece no método de coleta :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

A tarefa principal é enviar o valor padrão inicial e aguardar novos valores:

  1. Criamos ou reutilizamos um slot para uma nova conexão.
  2. Verificamos o estado para nulo ou para alteração de estado. Emittim é um novo significado.
  3. Verificamos se há slots prontos para atualização (estado PENDENTE) e, se não, suspendemos o slot antecipando novos valores.

Em geral, é tudo. Não consideramos como ocorre a alocação de slots e a alteração de seus estados, mas achei que isso não é importante para o quadro geral do StateFlow.

Obrigado.

All Articles