SharedFlow and StateFlow
Learn about shared flow and how to use it.
Since Flow
is typically cold, its values are calculated on demand. However, there are cases in which we want multiple receivers to be subscribed to one source of changes. This is where we use SharedFlow, which is conceptually similar to a mailing list. We also have StateFlow, which is identical to an observable value. Let’s explain them both step by step.
MutableSharedFlow
Let’s start with MutableSharedFlow
, which is like a broadcast channel: everyone can send (emit) messages, which will be received by every coroutine that is listening (collecting).
package kotlinx.coroutines.app import kotlinx.coroutines.* import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow<String>(replay = 0) // or MutableSharedFlow<String>() launch { mutableSharedFlow.collect { println("#1 received $it") } } launch { mutableSharedFlow.collect { println("#2 received $it") } } delay(1000) mutableSharedFlow.emit("Message1") mutableSharedFlow.emit("Message2") }
Note: The program above never ends because the
coroutineScope
is waiting for the coroutines that were started with thelaunch
and keep listening onMutableSharedFlow
. ButMutableSharedFlow
is not closeable, so the only way to fix this problem is to cancel the whole scope.
A MutableSharedFlow
can also keep sending messages. If we set the replay
parameter (it defaults to 0), the defined number of last values will be kept. If a coroutine now starts observing, it will receive these values first. We can also reset this cache with resetReplayCache
.
package kotlinx.coroutines.app import kotlinx.coroutines.* import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow<String>( replay = 2, ) mutableSharedFlow.emit("Message1") mutableSharedFlow.emit("Message2") mutableSharedFlow.emit("Message3") println(mutableSharedFlow.replayCache) // [Message2, Message3] launch { mutableSharedFlow.collect { println("#1 received $it") } // #1 received Message2 // #1 received Message3 } delay(100) mutableSharedFlow.resetReplayCache() println(mutableSharedFlow.replayCache) // [] }
...Note:
MutableSharedFlow
is conceptually similar to RxJava Subjects. When we set thereplay
parameter to 0, it is similar to aPublishSubject
. Whenreplay