What is Pyspark and how to use it?

What is Pyspark?

Pyspark is a confluence for Apache Spark in Python. It permits to write down the Spark applications by using Python APIsApplication Programing Interfaces.

It allows the Pyspark hull for interactively examining your information in a distributed domain.

Spark’s Features

Understanding Spark’s features

1. Spark SQL & DataFrame

This Spark module is for structured processing to supply a programming abstraction called DataFrame and works as a distributed query engine.

2. Streaming

Running on top of Spark, the streaming feature in Apache Spark provides significant interactive and analytical applications over both streaming and historical data while inheriting Spark’s simple use and fault tolerance characteristics.

3. MLlib

Built ahead of Spark, MLlib may be an expendable machine learning library that gives a consistent set of high-level APIs that help users to create and tune operational machine learning pipelines.

4. Spark Core

Spark Core is the general execution engine in which each functionality is made. It gives a Resilient Distributed Dataset (RDD) and in-memory computing abilities.

5. DataBricks

DataBricks is a corporation and large processing platform by the inventors of Apache Spark.

It was created for data scientists, engineers, and analysts to help users to integrate the fields of knowledge, science, engineering, and the business behind them across the machine learning lifecycle.

Pyspark example on databricks

Part 1: Load & Transform Data

In the first stage, we load some distributed data and read that data as a RDDResilient Distributed Dataset, do some transformations on that RDD, and construct a Spark DataFrame from that RDD and register it as a table.

1.1. List files

Files can be listed on a distributed file system (DBFS, S3, or HDFS) using %fs commands.

We are using data files stored in DBFSDatabricks File System at dbfs:/databricks-datasets/songs_pk/data_001 for this example.

DBFS is the system that strengthens AWS S3 and the SSD drives attached to Spark clusters hosted in AWS.

When approaching a file, it first checks if the file is cached in the SSDSolid State Drive. If it is not available, then it goes out to the specific S3 bucket to get the file(s) %fs ls /databricks-datasets/songs_pk/data_001/.

1.2. Display contents of the header

We have data files and one header file in the list. Let’s use the Pyspark textFile command to read the data of the header then use collect to display the contents.

After executing this, the header consists of a name and a type, separated by a colon:

sc.textFile("databricks-datasets/songs_pk/data_001/header.txt").collect()

1.3. Examine a data file

The textFile command is used to load the data files and the Pyspark command is used to visualize the first three lines of the data.

After running this, you will see that each line consists of several fields separated by a _\t_.


dataRDD = sc.textFile("/databricks-datasets/songs_pk/data_001/part-000*") dataRDD.take(3)

1.4. Create python function to parse fields

We set out to parse what we know about the data. To do this, we develop a function that takes a line of text and returns an array of parsed fields.

  • If the header shows the type is int, we cast the token to an integer.

  • If the header shows the type is double, we cast the token to float.

  • Otherwise, we return the string.

# Divide the header by its separator
header = sc.textFile("/databricks-datasets/songs_pk/data_001/header.txt")
.map(lambda line: line.split(":")).collect()
# Create the Python function
def parse_Line(line):
tokens = zip(line.split("\t"), header)
parsed_tokens = []
for token in tokens:
token_type = token[1][1]
if token_type == & # 39; double& # 39;:
parsed_tokens.append(float(token[0]))
elif token_type == & # 39; int& # 39;:
parsed_tokens.append(-1 if &# 39; -& # 39; in token[0] else int(token[0]))
else:
parsed_tokens.append(token[0])
return parsed_tokens

1.5. Convert header structure

Before using parsed header, we have to convert it to the type that SparkSQL expects. That presupposes using SQL types:

  • IntegerType
  • DoubleType
  • StringType
  • StructType (instead of a normal Python list)
from pyspark.sql.types import *
def strinToType(str):
if str == 'int':
return IntegerType()
elif str == 'double':
return DoubleType()
else:
return StringType()ss
schema = StructType([StructField(t[0], strinToType(t[1]), True) for t in header])

1.6. Create a DataFrame

Spark’s createDataFrame() method is applied to combine the schema data and the parsed information to construct a DataFrame.

DataFrames are preferred because of easier manipulation and because Spark knows about the types of data and can do a better job processing them.

df = sqlContext.createDataFrame(dataRDD.map(parseLine), schema).

1.7. Create a temp table

We have a DataFrame now, and we can register it as a short-term table. It will allow us to use its name in SQL queries. df.registerTempTable("songs_pkTable")

1.8. Cache the table

As we are approaching this data multiple times, let’s cache it in memory for faster subsequent access.

%sql cache table songs_pkTable

1.9. Query the data

We can simply query our data by using the temporary table that we create and cached in memory. As it is registered as a table, we can utilize SQL as well as Spark API to approach it.

%sql select * from songs_pkTable limit 15.

Part 2: Explore & Visualize the Data

2.1. Display table schema

table("songs_pkTable").printSchema()

2.2. Get table rows count

%sql select count(*) from songs_pkTable

2.3. Visualize a data point: Song duration changes with time

A question is: how do various parameters of songs change with time? For instance, how did average song durations change over time?

We begin by importing in ggplot, which makes plotting data very easy in Python. Next, we put together the SQL query which will pull the required data from the table.

We create a Pandas dataframe object toPandas(), then use the display method to render the graph.

from ggplot import *
baseQuery = sqlContext.sql("select avg(duration) as duration, year from songs_pkTable group by year")
df_filtered = baseQuery.filter(baseQuery.year > 0).filter(baseQuery.year < 2010).toPandas()
plot = ggplot(df_filtered, aes('year', 'duration')) + geom_point() + geom_line(color='blue')
display(plot)

Free Resources