Search⌘ K

Add Concurrency With ConsumerSupervisor

Learn how GenStage uses ConsumerSupervisor to achieve concurrency when starting processes.

The properties of ConsumerSupervisor

Using multiple consumers to process events one by one concurrently is a very useful technique. To make this task even easier, GenStage comes with a special supervisor called ConsumerSupervisor. It works like a consumer and can subscribe to one or more producers. It can also monitor, start and restart child processes, just like a supervisor.

What makes ConsumerSupervisor special is that when it receives a list of events from a producer, it automatically starts a process for each event and passes the event as an argument to the process. When a child process exits successfully, a new demand will be issued by ConsumerSupervisor, and the cycle repeats. This figure illustrates how child processes are started on-demand when new events are received:

Now, we’ll refactor some of our existing logic using ConsumerSupervisor to demonstrate how it works in practice.

Create a ConsumerSupervisor

We’ll create a new file, page_consumer_supervisor.ex, and place it in the lib directory. Here are the entire contents:

C++
#file path -> scraper/lib/page_consumer_supervisor.ex
defmodule PageConsumerSupervisor do
use ConsumerSupervisor
require Logger
def start_link(_args) do
ConsumerSupervisor.start_link(__MODULE__, :ok)
end
def init(:ok) do
Logger.info("PageConsumerSupervisor init")
children = [
%{
id: PageConsumer,
start: {PageConsumer, :start_link, []},
restart: :transient
}
]
opts = [
strategy: :one_for_one,
subscribe_to: [
{PageProducer, max_demand: 2}
]
]
ConsumerSupervisor.init(children, opts)
end
end

This is a lot of new code, so let’s break it down and explain how it works.

The start_link function

We name our module PageConsumerSupervisor, and immediately after the defmodule declaration, we bring in the ConsumerSupervisor module logic. Since this is a process, we define the now-familiar start_link/1 function. The state is not relevant for ConsumerSupervisor, so we pass an :ok atom.

The init

...