...

/

Collection Processing

Collection Processing

Learn how collection processing works and how to combine two flows into one.

How does collection processing work?

We’ve seen quite a few flow processing and lifecycle functions. Their implementation is quite simple, so we can guess that there’s no magic going on there. Most such functions can be implemented with flow builder and collect with a lambda. Here’s a simple example of flow processing and some simplified map and flowOf implementations.

package kotlinx.coroutines.app
import kotlinx.coroutines.flow.*

suspend fun main() {
   flowOf('a', 'b')
       .map { it.uppercase() }
       .collect { print(it) } // AB
}

fun <T, R> Flow<T>.map(
   transform: suspend (value: T) -> R
): Flow<R> = flow {
   collect { value ->
       emit(transform(value))
   }
}

fun <T> flowOf(vararg elements: T): Flow<T> = flow {
   for (element in elements) {
       emit(element)
   }
}
Simplified implementation of map and flowOf

If we inline filter and map functions, we’ll end up with the following code (labels on lambdas and comments with numbers are added).

package kotlinx.coroutines.app
import kotlinx.coroutines.flow.*

suspend fun main() {
   flow map@{ // 1
       flow flowOf@{ // 2
           for (element in arrayOf('a', 'b')) { // 3
               this@flowOf.emit(element) // 4
           }
       }.collect { value -> // 5
           this@map.emit(value.uppercase()) // 6
       }
   }.collect { // 7
       print(it) // 8
   }
}
Simplified implementation of map and flowOf

Let’s analyze this step by step:

  • We start a flow at 1 and collect it at 7.
  • When we start collecting, we invoke the lambda @map (which starts at 1), which calls
...