Run Streams Concurrently and Understand Flows
Learn to run the streams concurrently using Flow.map() and understand how flows execute in code.
We'll cover the following...
Run streams concurrently
We need to convert an existing data source to use Flow. The result of the conversion is called a flow. We use Flow.from_enumerable/2, to create a flow from a stream. We’ll discuss how from_enumerable/2 works in a moment.
Replace Enum with Flow
Let’s change open_airports/0 once again. This time we’ll replace Stream.map/2 and Stream.reject/2 with Flow.map/2 and Flow.reject/2 respectively. Make sure to measure the performance afterward. Here is how the code looks after the changes:
#file path -> airports/lib/airports.exdef open_airports() doairports_csv()|> File.stream!()|> CSV.parse_stream()|> Flow.from_enumerable()|> Flow.map(fn row ->%{id: :binary.copy(Enum.at(row, 0)),type: :binary.copy(Enum.at(row, 2)),name: :binary.copy(Enum.at(row, 3)),country: :binary.copy(Enum.at(row, 8))}end)|> Flow.reject(&(&1.type == "closed"))|> Enum.to_list()end
To see how much time it takes, we use the following command:
:timer.tc(&Airports.open_airports/0)
This version takes about 2200ms. It’s still faster than our original implementation but is more than twice as slow as the previous one. Something has gone wrong.
Move the parsing logic into Flow.map
When we look at the code closely, it seems like we’ve created a bottleneck. The flow is based on a single data source—the result of the parse_stream/1 function—which might have trouble keeping up. Let’s move the parsing logic ...