Accumulators and Broadcast Variables
We'll cover the following...
Sharing data in a cluster
Sharing data in a distributed environment, regardless of the use case, can be confusing.
Understanding the scope (where the variables “live”) and lifecycle (how the values change) of shared variables while executing code in a cluster presents itself as a challenging task.
Within the Spark ecosystem, variables can be passed down to objects that operate in a distributed fashion. Still, these are copies with a different state each while execution takes place.
Furthermore, this is a one-way type of communication, meaning that those variables are not sent back to the driver program with whatever updated values they might have.
To address these inconveniences, the Spark API provides both accumulators and broadcast variables.
Accumulators
Accumulators are variables that expose only an addition operation. This means we can only add values but not delete, or modify existing values. In other words, accumulators provide a simple way of aggregating values from the worker nodes back to the driver program.
Let’s start with a code example to illustrate. Let’s work on a brief project to demonstrate the usage of accumulators and broadcast variables.
mvn install exec:exec
The business domain of this application deals with network information, specifically network’s transit data concerning peer-to-peer communication from low-level devices, such as a series of transmitters in a hardware device.
The data is produced frequently as a CSV file and analyzed to isolate and potentially replace slow network paths caused by a faulty transmitter (by inspecting the “ping” of each communication). The information is loaded in a DataFrame with a schema as follows:
root
|-- source: string (nullable = true)
|-- destination: string (nullable = true)
|-- ping: double (nullable = true)
|-- errors: integer (nullable = true)
We can imagine a vast amount of rows received per file because communications within these devices run in the thousands per second. For our demonstration purposes, we can just work with a tiny dataset of only six rows.
So, we decide to implement the MapFunction<>
interface from Spark to work on each row and produce a new DataFrame containing transmitters’ IDs, which are the first two letters of a source or destination.
Then, we tag them according to a category based on slowness tolerance such as:
- Ping <= 50: ACPT (Acceptable)
- Ping 50 < ping <= 100: CGTD (Congested)
- Ping > 100: OVLD (Overloaded)
Fundamentally, we want to produce rows with the following schema:
+--------------+--------+
|Transmitter_id|Category|
+--------------+--------+
Our Mapper
class does the heavy lifting, and the ...