Spark offers more involved and complex functions that support aggregation, collection, datetime, math, string, sorting, windowing, etc., functionality. We’ll see examples of joins, unions and windowing in this lesson.
Join
Joins can be performed between DataFrames or tables in Spark. By default, Spark executes an inner join between tables but has support for cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti joins. Let’s work an example of executing an inner join on our Bollywood movies data set. We’ll read the following two files and then join their respective DataFrames.
- BollywoodMovieDetail.csv (the file we have already been reading in the previous lessons). The column names appear below:
imdbId | title | releaseYear | releaseDate | genre | writers | actors | directors | sequel | hitFlop |
---|---|---|---|---|---|---|---|---|---|
X |
- BollywoodActorRanking.csv. The column names appear below:
actorId | actorName | movieCount | ratingSum | normalizedMovieRank | googleHits | normalizedGoogleRank | normalizedRating |
---|---|---|---|---|---|---|---|
X |
We’ll want to join the two tables data on the actor names’ column. However, recall that the column actors
in BollywoodMovieDetail.csv
appears as pipe delimited string of names, so we’ll need to massage the data in this column first in a way to make it amenable for a join. We can then attempt a join with the actorName
column of the BollywoodActorRanking.csv
file.
Additionally, we’ll switch between the DataFrame API and Spark SQL to demonstrate the flexibility of Spark. Read the listing below to see how the join is performed.
## Read the first file as a DataFrame
scala> val movies = spark.read.format("csv")
.option("header", "true")
.option("samplingRatio", 0.001)
.option("inferSchema", "true")
.load("/data/BollywoodMovieDetail.csv")
## Read the second file as a DataFrame
scala> val actors = spark.read.format("csv")
.option("header", "true")
.option("samplingRatio", 0.001)
.option("inferSchema", "true")
.load("/data/BollywoodActorRanking.csv")
## We create a temporary table tempTbl1 that converts the pipe-delimited string of actor names into an array of string tokens
scala> spark.sql("""CREATE TABLE tempTbl1 AS SELECT title, releaseYear, hitFlop, split(actors,"[|]") AS actors FROM movies""")
## Next we create another temporary table tempTbl2 that explodes the actors' array in tempTbl2 so that we get a column with a single actor name. We'll later join this column with the actorName column in the other DataFrame. Also, notice that we trim the actor name for any preceding or trailing spaces and convert the name to upper case.
scala> spark.sql("""CREATE TABLE tempTbl2 AS SELECT title, releaseYear, hitFlop, upper(trim(actor)) AS actor FROM (SELECT title, releaseYear, hitFlop, explode(actors) AS actor FROM tempTbl1)""")
## Now we create a new DataFrame by running trim and upper functions on the column actorName for the data read in for the second data file.
scala> val actorsDF = actors.withColumn("actorName",trim(upper($"actorName")))
## Create a DataFrame from tempTbl2
scala> val moviesDF = spark.sql("""SELECT * FROM tempTbl2""")
## Finally, we join on the columns containing the actor names and display 5 rows.
scala> moviesDF.join(actorsDF, $"actor" === $"actorName").show(5,false)
The result of executing the above queries is captured in the widget below:
Create a free account to view this lesson.
By signing up, you agree to Educative's Terms of Service and Privacy Policy