Performance Fundamentals and Recipes
Learn the fundamentals of performance for a Spark application.
Many factors and constraints affect an application’s execution performance, such as its architecture, resources available, and non-functional requirements like data encryption. No one magic recipe can account for the myriad of applications and their nature when it comes to performance considerations.
Ultimately, applying a systemic approach, one of testing, gathering metrics and results, doing changes, testing again, and repeating the process might shed some light on the bottlenecks, overheads, or just poor design of an application. On the other hand, specific third-party libraries and frameworks, like Spark, are designed in a manner that imposes constraints on the application that uses its APIs. We have seen an example of this in the case of immutability, specifically when we talked about the impossibility of changing a DataFrame when a transformation is applied to it. Instead, a new DataFrame is always returned with changes reflected.
Constraints like this don’t have to be a foe, but rather a friend, when it comes to utilizing Spark in a performant way. With this in mind, this lesson tries to provide general guidelines and explain the fundamentals for setting the foundation of a robust Spark application, as well as describing some recipes commonly used in Spark developments.
Note: Application performance optimization tends to be a very complex topic that unfortunately cannot be explained in one or even several lessons, so this lesson might pack a lot of concepts. The best recipe might simply be, at the end of the day, getting our hands dirty with the development and testing of a Spark application dealing with huge volumes of information; this goes beyond the scope of this course.
Let’s start with the basics.
Resource utilization
A common situation is to observe that a Spark application might use intensively most, if not all, the resources of the cluster where it runs.
This can be a good thing, after all, we don’t want resources we might be paying by the hour, the minute, or the second (especially in cloud environments) to go idle, thus wasting precious processing time. On the other hand, the processing of if the few volumes of the application in question are processing are not considerable this can point to incorrect resource utilization. This also informs us of a concerning processing throughput, that is, for a specified amount of time the amount of records effectively processed is low. Throughput means, in classical terms, a rate of items processed in a defined unit of time.
So let’s see some recipes or recommendations that lead to increased throughput and performance optimization.
Avoid shuffles when possible
Generally in Spark, but quite common in distributed applications, shuffles tend to eat up a lot of resources. This occurs because when data is sent across the network and between the cluster’s nodes, operations involving network I/O, data serialization, and even disk I/O are all likely to take place.
Some Spark transformations, like join(...)
, cause shuffles. So, when possible, the following guidelines serve as more performant alternatives:
-
If one of the DataFrames on a join operation is relatively small, then broadcast the smaller DataFrame as an object (a list, for example). This way, the DataFrame can be received as a copy in whatever node Spark processes the transformation. Also, try using Java lighter objects while broadcasting; for example, it is be overkill to send a list of objects with many properties when a list of primitives serve the same purpose.
-
Use narrow transformations instead of wide ones, if possible. Narrow transformations such as
map
orfilter
are considerably faster because the data being processed by these operations resides in the same partition, whereas in wide transformations, such asgroupByKey
orjoins
, the data needs to be sent around (shuffled.) It also makes sense to filter and transform the information first (and maybe even discard records in the process), which reduces the amount of data before a wide transformation (such as joining) is needed. -
Favor the
reducebyKey
one on reductive operations. We’ve touched base on this briefly before in a previous lesson. If we need to group records by key, to later perform some calculations on them, each partition in which the data is distributed holds records with the same key. So, we can apply pre-calculations by reducing the data on the same partition withreduceByKey
, and later join or group a reduced (smaller) dataset. -
If we are in need of sorting, it’s always more performant to work first on the input or current DataFrame and at a later stage sort it by calling the
sort
operation. Also, it is not recommended to call this operation too much because it tends to shuffle rows around.
Partitioning and degree of parallelism
Achieving balance in the degree of parallelism of an application can almost become a Zen master. On the one hand, having a high degree of parallelism while processing takes place can undoubtedly speed up computing times. On the other hand, if this degree cannot be accommodated with the resources available (that is, we are asking for too much processing in parallel with too few resources at hand), then processing overhead can slow down overall computation times, as all running processes have to be accommodated a fair share of time for resources’ usage.
When it comes to computing in Spark, partitions are the bulk of the data the executors work on when they are fulfilling tasks. Tasks are also concurrent units of processing. If said data is split into a small number of partitions, possibly with a lot of rows each, then only a few executors can work on them, which decreases the degree of processing parallelism (and resembles more sequential processing). However, if there are too many partitions, then this can cause too much overhead and some partitions—depending on the data distribution between them—might even end up with too few or no records at all, which, in turn, could lead to many executors competing for resources at once, obtaining little processing time each. This can be compared to excessive context-switching between threads.
So what is a good number of partitions to have? Definitely not too few and not too many, but as a general approach to this predicament, the following recommendations help:
-
First, Spark can be left alone in deciding the number of partitions. It tends to be quite decent at estimating it, and we can later compare execution times against our runtime goals or SLAs (service level agreements). If needed, the number of partitions can be shown through the code by calling the following method on a DataFrame:
df.rdd.getNumPartitions()
. -
If the above option is not ideal in terms of performance goals, then a baseline for the number of initial partitions can be calculated as follows: number of cores of a cluster * 2 (or 3.) So if all the nodes’ processing cores amount to 50, then 100 or 200 can be good starting points for the number of partitions.
-
If we want to change the number of partitions of a DataFrame and either increase or reduce their number, this can be changed with the following methods:
Get hands-on with 1300+ tech skills courses.