Solution: Data Input and Output
Let's see the solution of the data input/output Challenge.
We'll cover the following...
We'll cover the following...
Task
Save the data set as a distributed data set with proper bucketing and sorting.
Solution
def create_spark_session():
"""Create a Spark Session"""
_ = load_dotenv()
spark = SparkSession.builder.appName("SparkApp").master("local[5]").getOrCreate()
return spark
def read_sdf(spark,PATH_BIGDATA):
"""Read the dataset"""
raw_sdf=spark.read.json(PATH_BIGDATA)
return raw_sdf
def rename_columns(df, column_map):
"""Rename the columns"""
for old, new in column_map.items():
df = df.withColumnRenamed(old, new)
return df
def select_subset(df,columns):
"""Select the required columns"""
df=df.select(*columns)
return df
def repartitioning_and_saving(df,PATH_SNAPSHOT):
"""Repartitioning and saving the snapshot"""
df = df.repartition('reviewed_year', 'reviewed_month').sortWithinPartitions("asin")
df.write.mode("overwrite").parquet(PATH_SNAPSHOT)
Solution of data input and output challenge
Explanation
-
Line 3: We read the key-value pair from
.env...