大家好。几天前,JetBrains发布了新版本的Corutin-1.3.6,其中一项创新是Flow的新亚种-StateFlow,它取代了ConflatedBroadcastChannel。我决定尝试使用StateFlow并研究其内部结构。我认为许多为Android或MPP开发时使用Kotlin的人都熟悉这些术语,而并非如此-这些实体与RxJava的BehaviorProcessor / BehaviorSubject和Jetpack的LiveData / MutableLiveData相似。StateFlow本身是Flow接口的简单扩展,有两种形式:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
这个想法与LiveData / MutableLiveData中的想法相同-通过一个接口它只能读取当前状态,而通过另一个接口也可以安装。与ConflatedBroadcastChannel相比,StateFlow为我们提供了什么:- 一个更简单且无垃圾的内部实现。
- 需要默认项。空也是可能的。
- 分为只读和读写接口。
- 通过相等性比较元素,而不是比较链接。
现在,让我们尝试实现StateFlow的简单用法。为此,我制作了一个基本包装器,该包装器能够默认设置具有null元素的任何类型:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
我们得到数据:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
并且我们在屏幕上显示了用于测试的最简单界面,它不会引起任何问题,并且一切都像时钟一样工作:
现在,让我们看一下内部,看看它是如何实现的。令我惊讶的是,该实现实际上非常简单,目前仅占用316行,其中25%是javadoki。因此,主要的实现类是StateFlowImpl类:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state-存储我们状态的原子链接。序列 -辅助指示符,根据奇偶校验/奇数,报告有关更新状态时隙的当前过程-数组/池StateFlowSlot。 StateFlowSlot-与StateFlow的每个“连接”的辅助抽象。nSlots,nextIndex-用于可扩展数组插槽的辅助变量让我们提前考虑StateFlowSlot。他仅代表:private val _state = atomic<Any?>(null)
加上更改插槽状态的方法。每个插槽可以处于以下状态之一:null-已创建,但未使用NONE-已使用的收集器PENDING状态 -预期将新值发送到收集器CancellableContinuationImpl-挂起状态接近目标PENDING状态,正在挂起收集器直到状态流中的新状态才会出现。考虑设置新值时会发生什么:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
这里的主要任务是修复来自Flowflow的连续调用的不同流的StateFlow状态更改。可以区分几个步骤:- 设置一个新值。
- 将序列标记设置为奇数值意味着我们已经在进行更新。
- makePending()-将所有插槽状态(即所有连接)设置为PENDING-我们将很快发送一个新值。
- 循环检查序列== curSequence所有任务均已完成,并将序列设置为偶数。
在collect方法中会发生什么:override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
主要任务是发送初始默认值并等待新值:- 我们创建或将插槽重新用于新连接。
- 我们检查状态是否为空或状态是否更改。Emittim是一个新的含义。
- 我们检查是否有准备更新的插槽(PENDING状态),如果没有,我们会在等待新值的情况下暂停该插槽。
一般来说,仅此而已。我们没有考虑插槽的分配及其状态的更改是如何发生的,但是我认为这对于StateFlow的总体情况并不重要。谢谢。