...

/

SharedFlow and StateFlow

SharedFlow and StateFlow

Learn about shared flow and how to use it.

Since Flow is typically cold, its values are calculated on demand. However, there are cases in which we want multiple receivers to be subscribed to one source of changes. This is where we use SharedFlow, which is conceptually similar to a mailing list. We also have StateFlow, which is identical to an observable value. Let’s explain them both step by step.

MutableSharedFlow

Let’s start with MutableSharedFlow, which is like a broadcast channel: everyone can send (emit) messages, which will be received by every coroutine that is listening (collecting).

package kotlinx.coroutines.app
import kotlinx.coroutines.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow =
        MutableSharedFlow<String>(replay = 0)
    // or MutableSharedFlow<String>()

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
        }
    }
    launch {
        mutableSharedFlow.collect {
            println("#2 received $it")
        }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
}
Using MutableSharedFlow

Note: The program above never ends because the coroutineScope is waiting for the coroutines that were started with the launch and keep listening on MutableSharedFlow. But MutableSharedFlow is not closeable, so the only way to fix this problem is to cancel the whole scope.

A MutableSharedFlow can also keep sending messages. If we set the replay parameter (it defaults to 0), the defined number of last values will be kept. If a coroutine now starts observing, it will receive these values first. We can also reset this cache with resetReplayCache.

package kotlinx.coroutines.app
import kotlinx.coroutines.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>(
        replay = 2,
    )
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
    mutableSharedFlow.emit("Message3")

    println(mutableSharedFlow.replayCache)
    // [Message2, Message3]

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
        }
        // #1 received Message2
        // #1 received Message3
    }

    delay(100)
    mutableSharedFlow.resetReplayCache()
    println(mutableSharedFlow.replayCache) // []
}
Resetting the cache using resetReplayCache

Note:

  • MutableSharedFlow is conceptually similar to RxJava Subjects. When we set the replay parameter to 0, it is similar to a PublishSubject. When replay
...