...

/

Channels as a Communication Primitive

Channels as a Communication Primitive

See when channels are useful with the help of a practical example.

We'll cover the following...

Channels are useful when different coroutines need to communicate with each other. They guarantee no conflicts (i.e., no problem with the shared state) and fairness.

To see them in action, imagine that different baristas are making coffees. Each barista should be a separate coroutine working independently. Different coffee types take different amounts of time to prepare, but we want to handle orders in the order they appear. The easiest way to solve this problem is by sending both the orders and the resulting coffees in channels. A barista can be defined using the produce builder.

suspend fun CoroutineScope.serveOrders(
orders: ReceiveChannel<Order>,
baristaName: String
): ReceiveChannel<CoffeeResult> = produce {
for (order in orders) {
val coffee = prepareCoffee(order.type)
send(
CoffeeResult(
coffee = coffee,
customer = order.customer,
baristaName = baristaName
)
)
}
}

When we set up a pipeline, we can use the previously defined fanIn function to merge the results produced by the different baristas into one.

package kotlinx.coroutines.app
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

data class Order(val customer: String, val type: CoffeeType)
enum class CoffeeType { ESPRESSO, LATTE }
class Milk
class GroundCoffee

sealed class Coffee

class Espresso(val ground: GroundCoffee) : Coffee() {
    override fun toString(): String = "Espresso"
}

class Latte(val milk: Milk, val espresso: Espresso) : Coffee() {
    override fun toString(): String = "Latte"
}

suspend fun main() = coroutineScope<Unit> {
    val orders = List(100) { Order("Customer$it", CoffeeType.values().random()) }
    val ordersChannel = produce {
        orders.forEach { send(it) }
    }

    val coffeeResults = fanIn(
        serveOrders(ordersChannel, "Alex"),
        serveOrders(ordersChannel, "Bob"),
        serveOrders(ordersChannel, "Celine"),
    )

    for (coffeeResult in coffeeResults) {
        println("Serving $coffeeResult")
    }
}

fun <T> CoroutineScope.fanIn(
    vararg channels: ReceiveChannel<T>
): ReceiveChannel<T> = produce {
    for (channel in channels) {
        launch {
            for (elem in channel) {
                send(elem)
            }
        }
    }
}

data class CoffeeResult(val coffee: Coffee, val customer: String, val baristaName: String)

fun CoroutineScope.serveOrders(
    orders: ReceiveChannel<Order>,
    baristaName: String
): ReceiveChannel<CoffeeResult> = produce {
    for (order in orders) {
        val coffee = prepareCoffee(order.type)
        send(CoffeeResult(coffee, order.customer, baristaName))
    }
}

private fun prepareCoffee(type: CoffeeType): Coffee {
    val groundCoffee = groundCoffee()
    val espresso = makeEspresso(groundCoffee)
    val coffee = when (type) {
        CoffeeType.ESPRESSO -> espresso
        CoffeeType.LATTE -> {
            val milk = brewMilk()
            Latte(milk, espresso)
        }
    }
    return coffee
}

fun groundCoffee(): GroundCoffee {
    longOperation()
    return GroundCoffee()
}

fun brewMilk(): Milk {
    longOperation()
    return Milk()
}


fun makeEspresso(ground: GroundCoffee): Espresso {
    longOperation()
    return Espresso(ground)
}

fun longOperation() {
    //    val size = 820 // ~1 second on my MacBook
    val size = 350 // ~0.1 second on my MacBook
    val list = List(size) { it }
    val listOfLists = List(size) { list }
    val listOfListsOfLists = List(size) { listOfLists }
    listOfListsOfLists.hashCode()
}
Barista example

We’ll find ...