Pyspark
is a confluence for Apache Spark
in Python. It permits to write down the Spark applications by using Python
It allows the Pyspark hull
for interactively examining your information in a distributed domain.
This Spark module is for structured processing to supply a programming abstraction called DataFrame and works as a distributed query engine.
Running on top of Spark, the streaming feature in Apache Spark provides significant interactive and analytical applications over both streaming and historical data while inheriting Spark’s simple use and fault tolerance characteristics.
Built ahead of Spark, MLlib may be an expendable machine learning library that gives a consistent set of high-level APIs that help users to create and tune operational machine learning pipelines.
Spark Core is the general execution engine in which each functionality is made. It gives a Resilient Distributed Dataset (RDD) and in-memory computing abilities.
DataBricks is a corporation and large processing platform by the inventors of Apache Spark.
It was created for data scientists, engineers, and analysts to help users to integrate the fields of knowledge, science, engineering, and the business behind them across the machine learning lifecycle.
In the first stage, we load some distributed data and read that data as a
Files can be listed on a distributed file system (DBFS, S3, or HDFS) using %fs
commands.
We are using data files stored in dbfs:/databricks-datasets/songs_pk/data_001
for this example.
DBFS is the system that strengthens AWS S3 and the SSD drives attached to Spark clusters hosted in AWS.
When approaching a file, it first checks if the file is cached in the %fs ls /databricks-datasets/songs_pk/data_001/
.
We have data files and one header file in the list. Let’s use the Pyspark textFile
command to read the data of the header then use collect to display the contents.
After executing this, the header consists of a name and a type, separated by a colon:
sc.textFile("databricks-datasets/songs_pk/data_001/header.txt").collect()
The textFile
command is used to load the data files and the Pyspark
command is used to visualize the first three lines of the data.
After running this, you will see that each line consists of several fields separated by a _\t_
.
dataRDD = sc.textFile("/databricks-datasets/songs_pk/data_001/part-000*") dataRDD.take(3)
We set out to parse what we know about the data. To do this, we develop a function that takes a line of text and returns an array of parsed fields.
If the header shows the type is int, we cast the token to an integer.
If the header shows the type is double, we cast the token to float.
Otherwise, we return the string.
# Divide the header by its separatorheader = sc.textFile("/databricks-datasets/songs_pk/data_001/header.txt").map(lambda line: line.split(":")).collect()# Create the Python functiondef parse_Line(line):tokens = zip(line.split("\t"), header)parsed_tokens = []for token in tokens:token_type = token[1][1]if token_type == & # 39; double& # 39;:parsed_tokens.append(float(token[0]))elif token_type == & # 39; int& # 39;:parsed_tokens.append(-1 if &# 39; -& # 39; in token[0] else int(token[0]))else:parsed_tokens.append(token[0])return parsed_tokens
Before using parsed header, we have to convert it to the type that SparkSQL
expects. That presupposes using SQL
types:
IntegerType
DoubleType
StringType
StructType
(instead of a normal Python list)from pyspark.sql.types import *def strinToType(str):if str == 'int':return IntegerType()elif str == 'double':return DoubleType()else:return StringType()ssschema = StructType([StructField(t[0], strinToType(t[1]), True) for t in header])
Spark’s createDataFrame()
method is applied to combine the schema data and the parsed information to construct a DataFrame.
DataFrames are preferred because of easier manipulation and because Spark knows about the types of data and can do a better job processing them.
df = sqlContext.createDataFrame(dataRDD.map(parseLine), schema)
.
We have a DataFrame now, and we can register it as a short-term table. It will allow us to use its name in SQL
queries.
df.registerTempTable("songs_pkTable")
As we are approaching this data multiple times, let’s cache it in memory for faster subsequent access.
%sql cache table songs_pkTable
We can simply query our data by using the temporary table that we create and cached in memory. As it is registered as a table, we can utilize SQL
as well as Spark API to approach it.
%sql select * from songs_pkTable limit 15
.
table("songs_pkTable").printSchema()
%sql select count(*) from songs_pkTable
A question is: how do various parameters of songs change with time? For instance, how did average song durations change over time?
We begin by importing in ggplot
, which makes plotting data very easy in Python. Next, we put together the SQL
query which will pull the required data from the table.
We create a Pandas dataframe object toPandas()
, then use the display method to render the graph.
from ggplot import *baseQuery = sqlContext.sql("select avg(duration) as duration, year from songs_pkTable group by year")df_filtered = baseQuery.filter(baseQuery.year > 0).filter(baseQuery.year < 2010).toPandas()plot = ggplot(df_filtered, aes('year', 'duration')) + geom_point() + geom_line(color='blue')display(plot)