Spark SQL Data Source

Learn about the various sources and formats of data that can be read and written using Spark SQL.

Reading data into DataFrames

Once data has been ingested, processed, and loaded into Spark SQL databases and tables, it can be read as DataFrames. An example is shown below:

scala> val movies = spark.read.format("csv")
                              .option("header", "true")
                              .option("samplingRatio", 0.001)
                              .option("inferSchema", "true")
                              .load("/data/BollywoodMovieDetail.csv")

scala> movies.write.saveAsTable("movieData")

scala> val movieTitles = spark.sql("SELECT title FROM movieData")

scala> movieTitles.show(3, false)
+---------------------------------+
|title                            |
+---------------------------------+
|Albela                           |
|Lagaan: Once Upon a Time in India|
|Meri Biwi Ka Jawab Nahin         |
+---------------------------------+
only showing top 3 rows

In the above example, we create the Spark SQL table movieData and then execute a Spark SQL query to return only the titles of the movies as a DataFrame.

DataFrameReader

We have touched upon DataFrameReader briefly in an earlier lesson. It is the core construct used for reading data from a source into a DataFrame. The pattern of stringing methods together is common in Spark and is also recommended when using DataFrameReader. The usage template occurs as follows:

DataFrameReader.format(args).option("key", "value").schema(args).load()

We can’t instantiate the DataFrameReader instance. Rather, it is available through a SparkSession instance e.g., SparkSession.read or SparkSession.readStream. The first API can be used to read static data sources, while the second one can be used to read a streaming source. In our previous examples, the spark variable represents the SparkSession. DataFrameReader can read various data formats such as CSV, JSON, and Parquet, and in the case of static Parquet data files, the schema option can be skipped since the file comes embedded within the schema.

DataFrameWriter

The counterpart to DataFrameReader is DataFrameWriter, which can be used to write the contents of a DataFrame to a built-in data source. The DataFrameWriter is accessible from an instance of DataFrame. The recommended usage pattern is:

DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table)

Or:

DataFrameWriter.format(args).option(args).bucketBy(args).partitionBy(args).save(path)

To get an instance, we can invoke DataFrame.write or DataFrame.writeStream for static and streaming outputs, respectively.

Formats

Data can be read into and written out using DataFrames in the following formats:

  1. Parquet
  2. JSON
  3. CSV
  4. Avro
  5. ORC
  6. Images (machine learning and deep learning frameworks)
  7. Binary Files

We have already seen the reading of data in CSV format in a DataFrame. Let’s see a few examples with the other formats.

Parquet

For simplicity we’ll assume that we already have a DataFrame with the data we want to write out as a Parquet file.

## We'll read our CSV file as a DataFrame
scala> val movies = spark.read.format("csv")
                              .option("header", "true")
                              .option("samplingRatio", 0.001)
                              .option("inferSchema", "true")
                              .load("/data/BollywoodMovieDetail.csv")

scala> movies.write.format("parquet")
                   .mode("overwrite")
                   .option("compression","snappy")
                   .save("/data/moviesParquet")

The output would be a directory moviesParquet which contains the following files:

DataJek > ls -ltr /data/moviesParquet/
total 124
-rw-r--r-- 1 root root 126793 May  8 23:58 part-00000-96a8bd99-31f1-4fe2-a4f0-315235b73dfc-c000.snappy.parquet
-rw-r--r-- 1 root root      0 May  8 23:58 _SUCCESS

We can read the Parquet file in a DataFrame as follows:

Get hands-on with 1400+ tech skills courses.