Group and Sort
Learn to use the group_by() and take_sort() functions in the Flow module.
We'll cover the following...
Group
Like Enum.group_by/2
, Flow.group_by/2
groups the data by the given criteria. They both are implemented using the reduce
function behind the scenes. Therefore, we’ll keep our Flow.partition/2
function to continue routing events to the correct process. This ensures that the same process groups airports of the same country.
Let’s replace Flow.reduce/3
with Flow.group_by/2
:
#file path -> airports/lib/airports.exdef open_airports() doairports_csv()|> File.stream!()|> Flow.from_enumerable()|> Flow.map(fn row ->[row] = CSV.parse_string(row, skip_headers: false)%{id: Enum.at(row, 0),type: Enum.at(row, 2),name: Enum.at(row, 3),country: Enum.at(row, 8)}end)|> Flow.reject(&(&1.type == "closed"))|> Flow.partition(key: {:key, :country})|> Flow.group_by(& &1.country)|> Flow.map(fn {country, data} -> {country, Enum.count(data)} end)|> Enum.to_list()end
The result of group_by/2
is a list of tuples, where the first element is the group key, and the second is the group items. We add Flow.map/2
to transform this output and replace the group items element with a counter.
When we rerun the function in IEx we get a result similar to what we’ve seen before:
Airports.open_airports()
Using Flow.group_by/2
is more ...