...

/

Run Streams Concurrently and Understand Flows

Run Streams Concurrently and Understand Flows

Learn to run the streams concurrently using Flow.map() and understand how flows execute in code.

Run streams concurrently

We need to convert an existing data source to use Flow. The result of the conversion is called a flow. We use Flow.from_enumerable/2, to create a flow from a stream. We’ll discuss how from_enumerable/2 works in a moment.

Replace Enum with Flow

Let’s change open_airports/0 once again. This time we’ll replace Stream.map/2 and Stream.reject/2 with Flow.map/2 and Flow.reject/2 respectively. Make sure to measure the performance afterward. Here is how the code looks after the changes:

Press + to interact
#file path -> airports/lib/airports.ex
def open_airports() do
airports_csv()
|> File.stream!()
|> CSV.parse_stream()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
%{
id: :binary.copy(Enum.at(row, 0)),
type: :binary.copy(Enum.at(row, 2)),
name: :binary.copy(Enum.at(row, 3)),
country: :binary.copy(Enum.at(row, 8))
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Enum.to_list()
end

To see how much time it takes, we use the following command:

Press + to interact
:timer.tc(&Airports.open_airports/0)

This version takes about 2200ms. It’s still faster than our original implementation but is more than twice as slow as the previous one. Something has gone wrong.

Move the parsing logic into Flow.map

When we look at the code closely, it seems like we’ve created a bottleneck. The flow is based on a single data source—the result of the parse_stream/1 function—which might have trouble keeping up. Let’s move the parsing logic ...