...

/

Add More Consumers and Buffer Events

Add More Consumers and Buffer Events

Learn how adding more consumers and buffering events will affect the output of our scrape_pages function.

Add more consumers

Rather than working in batches within a single process, we can easily scale our data processing pipeline by running more than one consumer, each responsible for scraping one page at a time. To do this, let’s adjust the demand of PageConsumer:

Press + to interact
#scraper/lib/scraper/application.ex
children = [
PageProducer,
Supervisor.child_spec(PageConsumer, id: :consumer_a),
Supervisor.child_spec(PageConsumer, id: :consumer_b)
]

Add a PageProducer to init

Now let’s add a PageProducer to our init function:

Press + to interact
#scraper/lib/page_consumer.ex
def init(initial_state) do
Logger.info("PageConsumer init")
sub_opts = [{PageProducer, min_demand: 0, max_demand: 1}]
{:consumer, initial_state, subscribe_to: sub_opts}
end

Now, our consumers take only one event at a time, but we have two consumer processes running concurrently. Once one is free, it will issue the demand to scrape another page.

Notice that when we add another PageConsumer, we use Supervisor.child_spec/2. As we see, each process should have a unique ID in the supervision tree. In the example above, the processes are called :consumer_a and ...