Olá a todos.Alguns dias atrás, o JetBrains lançou uma nova versão do Corutin - 1.3.6 e uma das inovações foi a nova subespécie do Flow - StateFlow, que substitui o ConflatedBroadcastChannel. Decidi experimentar o StateFlow em ação e estudar a estrutura interna.Eu acho que muitos que usam o Kotlin no desenvolvimento para Android ou MPP estão familiarizados com esses termos, que não são - essas entidades são análogas ao BehaviorProcessor / BehaviorSubject do RxJava e LiveData / MutableLiveData do Jetpack.O StateFlow em si é uma extensão simples da interface Flow e vem de duas formas:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
A idéia é a mesma do LiveData / MutableLiveData - por uma interface, ele pode apenas ler o estado atual e pela outra, também pode ser instalado.O que a StateFlow nos oferece em comparação com o ConflatedBroadcastChannel:- Uma implementação interna mais simples e sem lixo.
- A necessidade de um item padrão. Nulo também é possível.
- Separação em interfaces somente leitura e leitura / gravação.
- Comparar elementos via igualdade em vez de comparar links.
Agora vamos tentar implementar um uso simples do StateFlow. Para fazer isso, criei um invólucro elementar com a capacidade de definir qualquer tipo com um elemento nulo por padrão:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Nós obtemos os dados:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
E exibimos na tela a interface mais simples para testes, ela não causa problemas e tudo funciona como um relógio:
agora vamos olhar para dentro e ver como ela é implementada.Para minha surpresa, a implementação é realmente muito simples e atualmente ocupa apenas 316 linhas, 25% das quais são javadoki.E assim, a principal classe de implementação é a classe StateFlowImpl:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - link atômico para armazenar nosso estado.sequência - um indicador auxiliar que, dependendo da paridade / estranheza, informa sobre o processo atual de atualização dosslots de estado - array / pool StateFlowSlot. StateFlowSlot - abstração auxiliar de cada "conexão" ao StateFlow.nSlots, nextIndex - variáveis auxiliares para trabalhar com os slots de matriz extensíveisVamos considerar StateFlowSlot antecipadamente. Ele representa apenas:private val _state = atomic<Any?>(null)
Métodos positivos para alterar os estados dos slots.Cada slot pode estar em um dos estados:null - é criado, mas não é usadoNENHUM -estado PENDING do coletor usado - em antecipação ao envio de um novo valor ao coletorCancellableContinuationImpl - estado suspenso próximo ao estado PENDING de destino, está suspendendo o coletor até que um novo estado no fluxo de estado.Considere o que acontece quando você define um novo valor:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
A principal tarefa aqui é corrigir as alterações do estado StateFlow de diferentes fluxos para chamadas sucessivas ao FlowCollector.Vários passos podem ser distinguidos:- Definindo um novo valor.
- Definir o marcador de sequência com um valor ímpar significa que já estamos no processo de atualização.
- makePending () - configurando todos os estados do slot (ou seja, todas as conexões) para PENDING - em breve enviaremos um novo valor.
- O loop verifica a sequência == curSequence que todas as tarefas estão concluídas e define a sequência como um número par.
O que acontece no método de coleta :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
A tarefa principal é enviar o valor padrão inicial e aguardar novos valores:- Criamos ou reutilizamos um slot para uma nova conexão.
- Verificamos o estado para nulo ou para alteração de estado. Emittim é um novo significado.
- Verificamos se há slots prontos para atualização (estado PENDENTE) e, se não, suspendemos o slot antecipando novos valores.
Em geral, é tudo. Não consideramos como ocorre a alocação de slots e a alteração de seus estados, mas achei que isso não é importante para o quadro geral do StateFlow.Obrigado.