Bonjour à tous.Il y a quelques jours, JetBrains a publié une nouvelle version de Corutin - 1.3.6 et l'une des innovations a été la nouvelle sous-espèce de Flow - StateFlow, qui remplace ConflatedBroadcastChannel. J'ai décidé d'essayer StateFlow en action et d'étudier la structure interne.Je pense que beaucoup de ceux qui utilisent Kotlin lors du développement pour Android ou MPP connaissent ces termes, qui ne le sont pas - ces entités sont des analogues proches de BehaviorProcessor / BehaviorSubject de RxJava et LiveData / MutableLiveData de Jetpack.StateFlow lui-même est une simple extension de l'interface Flow et se présente sous deux formes:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
L'idée est la même que dans LiveData / MutableLiveData - via une interface, elle ne peut lire que l'état actuel, et via l'autre, elle peut également être installée.Qu'est-ce que StateFlow nous offre par rapport à ConflatedBroadcastChannel:- Une implémentation interne plus simple et sans déchets.
- La nécessité d'un élément par défaut. Null est également possible.
- Séparation en interfaces en lecture seule et en lecture-écriture.
- Comparer des éléments via l'égalité au lieu de comparer des liens.
Essayons maintenant d'implémenter une utilisation simple de StateFlow. Pour ce faire, j'ai créé un wrapper élémentaire avec la possibilité de définir n'importe quel type avec un élément null par défaut:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
Nous obtenons les données:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
Et nous affichons sur l'écran avec l'interface la plus simple pour les tests, cela ne pose aucun problème et tout fonctionne comme une horloge:
maintenant regardons à l'intérieur et voyons comment il est implémenté.À ma grande surprise, la mise en œuvre est vraiment très simple et ne prend actuellement que 316 lignes, dont 25% de javadoki.Et donc, la classe d'implémentation principale est la classe StateFlowImpl:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - lien atomique pour stocker notre état.séquence - un indicateur auxiliaire qui, en fonction de la parité / bizarrerie, rend compte du processus actuel de mise à jour desemplacements d' état - tableau / pool StateFlowSlot. StateFlowSlot - abstraction auxiliaire de chaque "connexion" à StateFlow.nSlots, nextIndex - variables d'assistance pour travailler avec les emplacements de tableau extensiblesConsidérons à l'avance StateFlowSlot. Il représente uniquement:private val _state = atomic<Any?>(null)
Plus de méthodes pour changer les états des emplacements.Chaque emplacement peut être dans l'un des états suivants:null - créé, mais non utiliséAUCUN - utilisé par le collecteurPENDING - en attendant qu'une nouvelle valeur soit envoyée au collecteurCancellableContinuationImpl - état suspendu, proche de la destination avec PENDING, suspendez le collecteur jusqu'à ce qu'un nouvel état arrive dans StateFlow.Considérez ce qui se passe lorsque vous définissez une nouvelle valeur:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
La tâche principale ici est de corriger les changements d'état de StateFlow à partir de différents flux pour des appels successifs à FlowCollector.On distingue plusieurs étapes:- Définition d'une nouvelle valeur.
- Mettre le marqueur de séquence sur une valeur impaire signifie que nous sommes déjà en train de mettre à jour.
- makePending () - définissant tous les états des emplacements (c'est-à-dire toutes les connexions) sur PENDING - nous enverrons bientôt une nouvelle valeur.
- La boucle vérifie la séquence == curSequence que toutes les tâches sont terminées et définit la séquence sur un nombre pair.
Que se passe-t-il dans la méthode de collecte :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
La tâche principale consiste à envoyer la valeur par défaut initiale et à attendre de nouvelles valeurs:- Nous créons ou réutilisons un emplacement pour une nouvelle connexion.
- Nous vérifions l'état pour null ou pour un changement d'état. Emittim est un nouveau sens.
- Nous vérifions s'il y a des emplacements prêts pour la mise à jour (état PENDING) et sinon, nous suspendons l'emplacement en prévision de nouvelles valeurs.
En général, c'est tout. Nous n'avons pas considéré comment l'allocation des créneaux horaires et le changement de leurs états se produisent, mais je me suis dit que ce n'était pas important pour l'image globale de StateFlow.Remercier.