我们尝试了解StateFlow

大家好。

几天前,JetBrains发布了新版本的Corutin-1.3.6,其中一项创新是Flow的新亚种-StateFlow,它取代了ConflatedBroadcastChannel。我决定尝试使用StateFlow并研究其内部结构。

我认为许多为Android或MPP开发时使用Kotlin的人都熟悉这些术语,而并非如此-这些实体与RxJava的BehaviorProcessor / BehaviorSubject和Jetpack的LiveData / MutableLiveData相似。

StateFlow本身是Flow接口的简单扩展,有两种形式:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

这个想法与LiveData / MutableLiveData中的想法相同-通过一个接口它只能读取当前状态,而通过另一个接口也可以安装。

与ConflatedBroadcastChannel相比,StateFlow为我们提供了什么:

  • 一个更简单且无垃圾的内部实现。
  • 需要默认项。空也是可能的。
  • 分为只读和读写接口。
  • 通过相等性比较元素,而不是比较链接。

现在,让我们尝试实现StateFlow的简单用法。为此,我制作了一个基本包装器,该包装器能够默认设置具有null元素的任何类型:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

我们得到数据:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

并且我们在屏幕上显示了用于测试的最简单界面,它不会引起任何问题,并且一切都像时钟一样工作:



现在,让我们看一下内部,看看它是如何实现的。

令我惊讶的是,该实现实际上非常简单,目前仅占用316行,其中25%是javadoki。

因此,主要的实现类是StateFlowImpl类:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state-存储我们状态的原子链接。

序列 -辅助指示符,根据奇偶校验/奇数,报告有关更新状态
时隙的当前过程-数组/池StateFlowSlot。 StateFlowSlot-与StateFlow的每个“连接”的辅助抽象。
nSlots,nextIndex-用于可扩展数组插槽的辅助变量

让我们提前考虑StateFlowSlot。他仅代表:

private val _state = atomic<Any?>(null)

加上更改插槽状态的方法。

每个插槽可以处于以下状态之一:

null-已创建,但未使用
NONE-已使用的收集器
PENDING状态 -预期将新值发送到收集器
CancellableContinuationImpl-挂起状态接近目标PENDING状态,正在挂起收集器直到状态流中的新状态才会出现。

考虑设置新值时会发生什么:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

这里的主要任务是修复来自Flowflow的连续调用的不同流的StateFlow状态更改。

可以区分几个步骤:

  1. 设置一个新值。
  2. 将序列标记设置为奇数值意味着我们已经在进行更新。
  3. makePending()-将所有插槽状态(即所有连接)设置为PENDING-我们将很快发送一个新值。
  4. 循环检查序列== curSequence所有任务均已完成,并将序列设置为偶数。

collect方法中会发生什么

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

主要任务是发送初始默认值并等待新值:

  1. 我们创建或将插槽重新用于新连接。
  2. 我们检查状态是否为空或状态是否更改。Emittim是一个新的含义。
  3. 我们检查是否有准备更新的插槽(PENDING状态),如果没有,我们会在等待新值的情况下暂停该插槽。

一般来说,仅此而已。我们没有考虑插槽的分配及其状态的更改是如何发生的,但是我认为这对于StateFlow的总体情况并不重要。

谢谢。

All Articles