Group and Sort

Learn to use the group_by() and take_sort() functions in the Flow module.

We'll cover the following...

Group

Like Enum.group_by/2, Flow.group_by/2 groups the data by the given criteria. They both are implemented using the reduce function behind the scenes. Therefore, we’ll keep our Flow.partition/2 function to continue routing events to the correct process. This ensures that the same process groups airports of the same country.

Let’s replace Flow.reduce/3 with Flow.group_by/2:

Press + to interact
#file path -> airports/lib/airports.ex
def open_airports() do
airports_csv()
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV.parse_string(row, skip_headers: false)
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 8)
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Flow.partition(key: {:key, :country})
|> Flow.group_by(& &1.country)
|> Flow.map(fn {country, data} -> {country, Enum.count(data)} end)
|> Enum.to_list()
end

The result of group_by/2 is a list of tuples, where the first element is the group key, and the second is the group items. We add Flow.map/2 to transform this output and replace the group items element with a counter.

When we rerun the function in IEx we get a result similar to what we’ve seen before:

Press + to interact
Airports.open_airports()

Using Flow.group_by/2 is more ...