We try and understand StateFlow

Hello everyone.

A few days ago, JetBrains released a new version of Corutin - 1.3.6 and one of the innovations was the new subspecies of Flow - StateFlow, which replaces the ConflatedBroadcastChannel. I decided to try StateFlow in action and study the internal structure.

I think many who use Kotlin when developing for Android or in MPP are familiar with these terms, who are not - these entities are close analogues of BehaviorProcessor / BehaviorSubject from RxJava and LiveData / MutableLiveData from Jetpack.

StateFlow itself is a simple extension of the Flow interface and comes in two forms:

public interface StateFlow<out T> : Flow<T> {
    /**
     * The current value of this state flow.
     */
    public val value: T
}

public interface MutableStateFlow<T> : StateFlow<T> {
    /**
     * The current value of this state flow.
     *
     * Setting a value that is [equal][Any.equals] to the previous one does nothing.
     */
    public override var value: T
}

The idea is the same as in LiveData / MutableLiveData - through one interface it can only read the current state, and through the other it can also be installed.

What does StateFlow offer us compared to ConflatedBroadcastChannel:

  • A simpler and garbage-free internal implementation.
  • The need for a default item. Null is also possible.
  • Separation into read-only and read-write interfaces.
  • Comparing elements via equality instead of comparing links.

Now let's try to implement a simple use of StateFlow. To do this, I made an elementary wrapper with the ability to set any type with a null element by default:

class StateFlowRepository<T>(initialValue: T? = null) {
    private val stateFlow = MutableStateFlow(initialValue)

    var value: T?
        get() = stateFlow.value
        set(value) {
            stateFlow.value = value
        }

    val stream: Flow<T?> = stateFlow
}

We get the data:

lifecycleScope.launch {
            simpleRepo.stream.collect {
                addData(it.toString())
            }
        }

And we display on the screen with the simplest interface for tests, it does not cause any problems and everything works like a clock:



Now let's look inside and see how it is implemented.

To my surprise, the implementation is really very simple and currently takes only 316 lines, of which 25% are javadoki.

And so, the main implementation class is the StateFlowImpl class:

private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
    private val _state = atomic(initialValue) // T | NULL
    private var sequence = 0 // serializes updates, value update is in process when sequence is odd
    private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
    private var nSlots = 0 // number of allocated (!free) slots
    private var nextIndex = 0 // oracle for the next free slot index

. . .
}

_state - atomic link to store our state.

sequence - an auxiliary indicator, which, depending on the parity / oddness, reports on the current process of updating the state
slots - array / pool StateFlowSlot. StateFlowSlot - auxiliary abstraction of each "connection" to StateFlow.
nSlots, nextIndex - helper variables for working with the extensible array slots

Let's consider StateFlowSlot in advance. He represents only:

private val _state = atomic<Any?>(null)

Plus methods for changing slot states.

Each slot can be in one of the states:

null - is created, but not used
NONE - used collector
PENDING state - in anticipation of sending a new value to the collector
CancellableContinuationImpl - saspended state close to the destination PENDING state, are suspending the collector will not come until a new state in the Stateflow.

Consider what happens when you set a new value:

public override var value: T
        get() = NULL.unbox(_state.value)
        set(value) {
            var curSequence = 0
            var curSlots: Array<StateFlowSlot?> = this.slots // benign race, we will not use it
            val newState = value ?: NULL
            synchronized(this) {
                val oldState = _state.value
                if (oldState == newState) return // Don't do anything if value is not changing
                _state.value = newState
                curSequence = sequence
                if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update)
                    curSequence++ // make it odd
                    sequence = curSequence
                } else {
                    // update is already in process, notify it, and return
                    sequence = curSequence + 2 // change sequence to notify, keep it odd
                    return
                }
                curSlots = slots // read current reference to collectors under lock
            }
            /*
               Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines
               Loop until we're done firing all the changes. This is sort of simple flat combining that
               ensures sequential firing of concurrent updates and avoids the storm of collector resumes
               when updates happen concurrently from many threads.
             */
            while (true) {
                // Benign race on element read from array
                for (col in curSlots) {
                    col?.makePending()
                }
                // check if the value was updated again while we were updating the old one
                synchronized(this) {
                    if (sequence == curSequence) { // nothing changed, we are done
                        sequence = curSequence + 1 // make sequence even again
                        return // done
                    }
                    // reread everything for the next loop under the lock
                    curSequence = sequence
                    curSlots = slots
                }
            }
        }

The main task here is to fix StateFlow state changes from different flows for successive calls to FlowCollector.

Several steps can be distinguished:

  1. Setting a new value.
  2. Setting the sequence marker to an odd value means that we are already in the process of updating.
  3. makePending () - setting all slot states (i.e., all connections) to PENDING - we will soon send a new value.
  4. The loop checks the sequence == curSequence that all tasks are completed and sets the sequence to an even number.

What happens in the collect method :

override suspend fun collect(collector: FlowCollector<T>) {
        val slot = allocateSlot()
        var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet)
        try {
            // The loop is arranged so that it starts delivering current value without waiting first
            while (true) {
                // Here the coroutine could have waited for a while to be dispatched,
                // so we use the most recent state here to ensure the best possible conflation of stale values
                val newState = _state.value
                // Conflate value emissions using equality
                if (prevState == null || newState != prevState) {
                    collector.emit(NULL.unbox(newState))
                    prevState = newState
                }
                // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot
                if (!slot.takePending()) { // try fast-path without suspending first
                    slot.awaitPending() // only suspend for new values when needed
                }
            }
        } finally {
            freeSlot(slot)
        }
    }

The main task is to send the initial default value and wait for new values:

  1. We create or reuse a slot for a new connection.
  2. We check the state for null or for state change. Emittim is a new meaning.
  3. We check if there are slots ready for updating (PENDING state) and if not, we suspend the slot in anticipation of new values.

In general, thatโ€™s all. We did not consider how the allocation of slots and the change of their states occurs, but I figured that this is not important for the overall picture of StateFlow.

Thank.

All Articles