Spark SQL Data Source
Learn about the various sources and formats of data that can be read and written using Spark SQL.
We'll cover the following
Reading data into DataFrames
Once data has been ingested, processed, and loaded into Spark SQL databases and tables, it can be read as DataFrames. An example is shown below:
scala> val movies = spark.read.format("csv")
.option("header", "true")
.option("samplingRatio", 0.001)
.option("inferSchema", "true")
.load("/data/BollywoodMovieDetail.csv")
scala> movies.write.saveAsTable("movieData")
scala> val movieTitles = spark.sql("SELECT title FROM movieData")
scala> movieTitles.show(3, false)
+---------------------------------+
|title |
+---------------------------------+
|Albela |
|Lagaan: Once Upon a Time in India|
|Meri Biwi Ka Jawab Nahin |
+---------------------------------+
only showing top 3 rows
In the above example, we create the Spark SQL table movieData
and then execute a Spark SQL query to return only the titles of the movies as a DataFrame.
DataFrameReader
We have touched upon DataFrameReader briefly in an earlier lesson. It is the core construct used for reading data from a source into a DataFrame. The pattern of stringing methods together is common in Spark and is also recommended when using DataFrameReader. The usage template occurs as follows:
DataFrameReader.format(args).option("key", "value").schema(args).load()
We can’t instantiate the DataFrameReader instance. Rather, it is available through a SparkSession
instance e.g., SparkSession.read
or SparkSession.readStream
. The first API can be used to read static data sources, while the second one can be used to read a streaming source. In our previous examples, the spark
variable represents the SparkSession. DataFrameReader can read various data formats such as CSV, JSON, and Parquet, and in the case of static Parquet data files, the schema option can be skipped since the file comes embedded within the schema.
DataFrameWriter
The counterpart to DataFrameReader is DataFrameWriter, which can be used to write the contents of a DataFrame to a built-in data source. The DataFrameWriter is accessible from an instance of DataFrame. The recommended usage pattern is:
DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)
Or:
DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save(path)
To get an instance, we can invoke DataFrame.write
or DataFrame.writeStream
for static and streaming outputs, respectively.
Formats
Data can be read into and written out using DataFrames in the following formats:
- Parquet
- JSON
- CSV
- Avro
- ORC
- Images (machine learning and deep learning frameworks)
- Binary Files
We have already seen the reading of data in CSV format in a DataFrame. Let’s see a few examples with the other formats.
Parquet
For simplicity we’ll assume that we already have a DataFrame with the data we want to write out as a Parquet file.
## We'll read our CSV file as a DataFrame
scala> val movies = spark.read.format("csv")
.option("header", "true")
.option("samplingRatio", 0.001)
.option("inferSchema", "true")
.load("/data/BollywoodMovieDetail.csv")
scala> movies.write.format("parquet")
.mode("overwrite")
.option("compression","snappy")
.save("/data/moviesParquet")
The output would be a directory moviesParquet
which contains the following files:
DataJek > ls -ltr /data/moviesParquet/
total 124
-rw-r--r-- 1 root root 126793 May 8 23:58 part-00000-96a8bd99-31f1-4fe2-a4f0-315235b73dfc-c000.snappy.parquet
-rw-r--r-- 1 root root 0 May 8 23:58 _SUCCESS
We can read the Parquet file in a DataFrame as follows:
Get hands-on with 1400+ tech skills courses.