Add Concurrency With ConsumerSupervisor
Learn how GenStage uses ConsumerSupervisor to achieve concurrency when starting processes.
The properties of ConsumerSupervisor
Using multiple consumers to process events one by one concurrently is a very useful technique. To make this task even easier, GenStage comes with a special supervisor called ConsumerSupervisor. It works like a consumer and can subscribe to one or more producers. It can also monitor, start and restart child processes, just like a supervisor.
What makes ConsumerSupervisor special is that when it receives a list of events from a producer, it automatically starts a process for each event and passes the event as an argument to the process. When a child process exits successfully, a new demand will be issued by ConsumerSupervisor, and the cycle repeats. This figure illustrates how child processes are started on-demand when new events are received:
Now, we’ll refactor some of our existing logic using ConsumerSupervisor to demonstrate how it works in practice.
Create a ConsumerSupervisor
We’ll create a new file, page_consumer_supervisor.ex, and place it in the lib directory. Here are the entire contents:
This is a lot of new code, so let’s break it down and explain how it works.
The start_link function
We name our module PageConsumerSupervisor, and immediately after the defmodule declaration, we bring in the ConsumerSupervisor module logic. Since this is a process, we define the now-familiar start_link/1 function. The state is not relevant for ConsumerSupervisor, so we pass an :ok atom.