More Operations with DataFrames
Get hands-on practice exploring various operations that can be performed on DataFrames.
We'll cover the following
We can also rename, drop or change the data type of DataFrame columns. Let’s see examples of these.
Changing column names
Our data has a rather awkward name for the column that represents movie rating: hitFlop
. We can rename the column to the more appropriate name, “Rating,” using the withColumnRenamed
method.
scala> val moviesNewColDF = movies.withColumnRenamed("hitFlop","Rating")
moviesNewColDF: org.apache.spark.sql.DataFrame = [imdbId: string, title: string ... 8 more fields]
scala> moviesNewColDF.printSchema
root
|-- imdbId: string (nullable = true)
|-- title: string (nullable = true)
|-- releaseYear: string (nullable = true)
|-- releaseDate: string (nullable = true)
|-- genre: string (nullable = true)
|-- writers: string (nullable = true)
|-- actors: string (nullable = true)
|-- directors: string (nullable = true)
|-- sequel: integer (nullable = true)
|-- Rating: integer (nullable = true)
The original DataFrame movies
isn’t changed, rather we add a new DataFrame that’s created with the changed column name.
Changing column types
In our original movies
DataFrame, the column releaseDate
is inferred as string type instead of date type if we don’t use the samplingRatio
option. To fix this, we can create a new column from the releaseDate
column and interpret it as a date type using the withColumn
method.
scala> val newDF = movies.withColumn("launchDate", to_date($"releaseDate", "d MMM yyyy"))
.drop("releaseDate")
k: org.apache.spark.sql.DataFrame = [imdbId: string, title: string ... 8 more fields]
scala> newDF.printSchema
root
|-- imdbId: string (nullable = true)
|-- title: string (nullable = true)
|-- releaseYear: string (nullable = true)
|-- genre: string (nullable = true)
|-- writers: string (nullable = true)
|-- actors: string (nullable = true)
|-- directors: string (nullable = true)
|-- sequel: integer (nullable = true)
|-- hitFlop: integer (nullable = true)
|-- launchDate: date (nullable = true)
Spark offers to-and-from methods for date and timestamp types. In the above snippet, we also drop the column releaseDate
. The to_date
method takes in the column we want to read from and the format of the date to parse, which is d MMM yyyy
(for example, “20 Apr 2010”). You can see the date patterns for formatting and parsing listed here.
There may have been failures for some rows when converting from string to date. We can check for those failures as follows:
scala> newDF.select("releaseDate","launchDate").where($"launchDate".isNull).show(5,false)
+-----------+----------+
|releaseDate|launchDate|
+-----------+----------+
|N/A |null |
|N/A |null |
|N/A |null |
|28 Feb,2002|null |
|N/A |null |
+-----------+----------+
only showing top 5 rows
We can see the conversion failed for those rows which didn’t have a valid value for releaseDate
or were not in the format we passed-in. We can find the total number of failures as follows:
scala> newDF.select("releaseDate","launchDate")
.where($"launchDate".isNull)
.count()
res80: Long = 54
We can now use the year()
, month()
, and day()
methods on the launchDate
column. For instance, we can rewrite the query from the previous lesson to list all the distinct release years in our data but have it use the launchDate
column instead of the releaseYear
column. The query and its output is shown below:
scala> newDF.select(year($"launchDate"))
.distinct()
.orderBy(year($"launchDate"))
.show()
+----------------+
|year(launchDate)|
+----------------+
| null|
| 2001|
| 2002|
| 2003|
| 2004|
| 2005|
| 2006|
| 2007|
| 2008|
| 2009|
| 2010|
| 2011|
| 2012|
| 2013|
| 2014|
+----------------+
Note the result also includes null
as an entry in the output since the rows which didn’t have the correct date format or were missing data returned null for the to_date
method.
Aggregations
A lot of data analysis questions require aggregations to be performed on the data. For example, consider the query to calculate the number of movies released per year. We can do so using the groupBy
method to group the rows by releaseYear
and then ask for a count
of rows in each group. The query is shown below:
scala> movies.select("releaseYear")
.groupBy("releaseYear")
.count()
.orderBy("releaseYear")
.show
+-----------+-----+
|releaseYear|count|
+-----------+-----+
| 2001| 62|
| 2002| 79|
| 2003| 95|
| 2004| 88|
| 2005| 106|
| 2006| 60|
| 2007| 66|
| 2008| 98|
| 2009| 91|
| 2010| 116|
| 2011| 112|
| 2012| 99|
| 2013| 102|
| 2014| 110|
+-----------+-----+
We also orderBy
the results by releaseYear
so that the output is more readable. Spark also offers methods such as max()
, min()
, avg()
,and sum()
that can be used for mathematical operations. Some of the examples that use these methods are as follows:
Finding maximum value in the hitFlop
column:
scala> movies.select(max($"hitFlop"))
.show
+------------+
|max(hitFlop)|
+------------+
| 9|
+------------+
Finding minimum value in hitFlop
column.
scala> movies.select(min($"hitFlop"))
.show
+------------+
|min(hitFlop)|
+------------+
| 1|
+------------+
Finding the sum of all the values in the hitFlop
column.
scala> movies.select(sum($"hitFlop"))
.show
+------------+
|sum(hitFlop)|
+------------+
| 2753|
+------------+
Finding the average rating for each movie.
scala> movies.select(avg($"hitFlop"))
.show
+------------------+
| avg(hitFlop)|
+------------------+
|2.1440809968847354|
+------------------+
Other methods for advanced analysis also exist, such as stat()
, describe()
, correlation()
, covariance()
, sampleBy()
, approxQuantile()
, and frequentItems()
.
A more interesting query would be to find the average rating for the movies released in each year. We’ll need to group the rows by releaseYear
and then find the average rating for movies for each year.
scala> movies.select("releaseYear","hitFlop")
.groupBy("releaseYear")
.avg("hitFlop")
.orderBy("releaseYear")
.show
+-----------+------------------+
|releaseYear| avg(hitFlop)|
+-----------+------------------+
| 2001| 2.306451612903226|
| 2002|1.9620253164556962|
| 2003|2.0105263157894737|
| 2004|1.9545454545454546|
| 2005| 2.009433962264151|
| 2006|2.9833333333333334|
| 2007| 2.621212121212121|
| 2008| 2.13265306122449|
| 2009| 1.835164835164835|
| 2010|1.8620689655172413|
| 2011|2.0535714285714284|
| 2012| 2.393939393939394|
| 2013| 2.343137254901961|
| 2014| 2.081818181818182|
+-----------+------------------+
Take vs collect
We’ll end the discussion on DataFrame APIs with two methods: take()
and collect()
. When we invoke collect on a DataFrame, we are returned all the rows that make up the DataFrame. This can be a time consuming and memory intensive operation, potentially resulting in out of memory (OOM) errors if the DataFrame is large enough. In such situations, if the intent is to peek at a few records, it is better to use the take(n)
method that returns first n
row objects of the DataFrame.
All the queries and commands used in this lesson are reproduced in the widget below for easy copy and pasting into the terminal.
Get hands-on with 1400+ tech skills courses.