Hello everyone.A few days ago, JetBrains released a new version of Corutin - 1.3.6 and one of the innovations was the new subspecies of Flow - StateFlow, which replaces the ConflatedBroadcastChannel. I decided to try StateFlow in action and study the internal structure.I think many who use Kotlin when developing for Android or in MPP are familiar with these terms, who are not - these entities are close analogues of BehaviorProcessor / BehaviorSubject from RxJava and LiveData / MutableLiveData from Jetpack.StateFlow itself is a simple extension of the Flow interface and comes in two forms:public interface StateFlow<out T> : Flow<T> {
public val value: T
}
public interface MutableStateFlow<T> : StateFlow<T> {
public override var value: T
}
The idea is the same as in LiveData / MutableLiveData - through one interface it can only read the current state, and through the other it can also be installed.What does StateFlow offer us compared to ConflatedBroadcastChannel:- A simpler and garbage-free internal implementation.
- The need for a default item. Null is also possible.
- Separation into read-only and read-write interfaces.
- Comparing elements via equality instead of comparing links.
Now let's try to implement a simple use of StateFlow. To do this, I made an elementary wrapper with the ability to set any type with a null element by default:class StateFlowRepository<T>(initialValue: T? = null) {
private val stateFlow = MutableStateFlow(initialValue)
var value: T?
get() = stateFlow.value
set(value) {
stateFlow.value = value
}
val stream: Flow<T?> = stateFlow
}
We get the data:lifecycleScope.launch {
simpleRepo.stream.collect {
addData(it.toString())
}
}
And we display on the screen with the simplest interface for tests, it does not cause any problems and everything works like a clock:
Now let's look inside and see how it is implemented.To my surprise, the implementation is really very simple and currently takes only 316 lines, of which 25% are javadoki.And so, the main implementation class is the StateFlowImpl class:private class StateFlowImpl<T>(initialValue: Any) : SynchronizedObject(), MutableStateFlow<T>, FusibleFlow<T> {
private val _state = atomic(initialValue)
private var sequence = 0
private var slots = arrayOfNulls<StateFlowSlot?>(INITIAL_SIZE)
private var nSlots = 0
private var nextIndex = 0
. . .
}
_state - atomic link to store our state.sequence - an auxiliary indicator, which, depending on the parity / oddness, reports on the current process of updating the stateslots - array / pool StateFlowSlot. StateFlowSlot - auxiliary abstraction of each "connection" to StateFlow.nSlots, nextIndex - helper variables for working with the extensible array slotsLet's consider StateFlowSlot in advance. He represents only:private val _state = atomic<Any?>(null)
Plus methods for changing slot states.Each slot can be in one of the states:null - is created, but not usedNONE - used collectorPENDING state - in anticipation of sending a new value to the collectorCancellableContinuationImpl - saspended state close to the destination PENDING state, are suspending the collector will not come until a new state in the Stateflow.Consider what happens when you set a new value:public override var value: T
get() = NULL.unbox(_state.value)
set(value) {
var curSequence = 0
var curSlots: Array<StateFlowSlot?> = this.slots
val newState = value ?: NULL
synchronized(this) {
val oldState = _state.value
if (oldState == newState) return
_state.value = newState
curSequence = sequence
if (curSequence and 1 == 0) {
curSequence++
sequence = curSequence
} else {
sequence = curSequence + 2
return
}
curSlots = slots
}
while (true) {
for (col in curSlots) {
col?.makePending()
}
synchronized(this) {
if (sequence == curSequence) {
sequence = curSequence + 1
return
}
curSequence = sequence
curSlots = slots
}
}
}
The main task here is to fix StateFlow state changes from different flows for successive calls to FlowCollector.Several steps can be distinguished:- Setting a new value.
- Setting the sequence marker to an odd value means that we are already in the process of updating.
- makePending () - setting all slot states (i.e., all connections) to PENDING - we will soon send a new value.
- The loop checks the sequence == curSequence that all tasks are completed and sets the sequence to an even number.
What happens in the collect method :override suspend fun collect(collector: FlowCollector<T>) {
val slot = allocateSlot()
var prevState: Any? = null
try {
while (true) {
val newState = _state.value
if (prevState == null || newState != prevState) {
collector.emit(NULL.unbox(newState))
prevState = newState
}
if (!slot.takePending()) {
slot.awaitPending()
}
}
} finally {
freeSlot(slot)
}
}
The main task is to send the initial default value and wait for new values:- We create or reuse a slot for a new connection.
- We check the state for null or for state change. Emittim is a new meaning.
- We check if there are slots ready for updating (PENDING state) and if not, we suspend the slot in anticipation of new values.
In general, thatโs all. We did not consider how the allocation of slots and the change of their states occurs, but I figured that this is not important for the overall picture of StateFlow.Thank.