What is ParDo transform in Apache Beam?

Note: The examples below are in Python, but you can achieve similar functionality using Java and Go.

Overall, there are five core transforms in the Apache Beam model. ParDo is one of the most commonly used transform functions. ParDo works similar to the map phase of the map-reduce algorithm.

ParDo is the transform for parallel processing. It applies the processing function to every element in the PCollection input and returns zero or more elements to the output PCollection.

Using the ParDo function requires a user-defined DoFn. This function will have the transformation you plan to apply. DoFn is a Beam SDK class that describes a distributed processing function.

Using the ParDo function

To use ParDo, the apply method is called on the PCollection and passing with ParDo passed as an argument. The DoFn object is then passed to the ParDo created.

Let’s look at an example where we compute every word’s length in a PCollection.

# Apply a ParDo to the "words" PCollection to get lengths for every word.
word_lengths = words | beam.ParDo(WordLengthFn())

Here, WordLengthFn is a DoFn.

Defining the DoFn

As discussed previously, DoFn holds the processing logic that gets applied to every element in input PCollection. Therefore, inside the DoFn subclass, you need a process method to write the processing logic.

You don’t need to extract individual elements from PCollection manually. Beam SDK will handle it so the user only needs to add element as an argument in the function.

class WordLengthFn(beam.DoFn):
  def process(self, element):
    return [len(element)]

There are two critical points you need to consider while building the DoFn:

  • You should not in any way alter the element argument given to the process method.

  • Once you output a value using return, you should not change that value.

Lightweight DoFn

When the functionality is straightforward, You don’t need to create a new DoFn. Instead, you can use the lambda function.

word_lengths = words | beam.FlatMap(lambda word: [len(word)])