Add Concurrency With ConsumerSupervisor
Learn how GenStage uses ConsumerSupervisor to achieve concurrency when starting processes.
We'll cover the following...
The properties of ConsumerSupervisor
Using multiple consumers to process events one by one concurrently is a very useful technique. To make this task even easier, GenStage
comes with a special supervisor called ConsumerSupervisor
. It works like a consumer and can subscribe to one or more producers. It can also monitor, start and restart child processes, just like a supervisor.
What makes ConsumerSupervisor
special is that when it receives a list of events from a producer, it automatically starts a process for each event and passes the event as an argument to the process. When a child process exits successfully, a new demand will be issued by ConsumerSupervisor
, and the cycle repeats. This figure illustrates how child processes are started on-demand when new events are received:
Now, we’ll refactor some of our existing logic using ConsumerSupervisor
to demonstrate how it works in practice.
Create a ConsumerSupervisor
We’ll create a new file, page_consumer_supervisor.ex
, and place it in the lib
directory. Here are the entire contents:
#file path -> scraper/lib/page_consumer_supervisor.exdefmodule PageConsumerSupervisor douse ConsumerSupervisorrequire Loggerdef start_link(_args) doConsumerSupervisor.start_link(__MODULE__, :ok)enddef init(:ok) doLogger.info("PageConsumerSupervisor init")children = [%{id: PageConsumer,start: {PageConsumer, :start_link, []},restart: :transient}]opts = [strategy: :one_for_one,subscribe_to: [{PageProducer, max_demand: 2}]]ConsumerSupervisor.init(children, opts)endend
This is a lot of new code, so let’s break it down and explain how it works.
The start_link
function
We name our module PageConsumerSupervisor
, and immediately after the defmodule
declaration, we bring in the ConsumerSupervisor
module logic. Since this is a process, we define the now-familiar start_link/1
function. The state is not relevant for ConsumerSupervisor
, so we pass an :ok
atom.