Solution: Data Input and Output
Let's see the solution of the data input/output Challenge.
We'll cover the following...
Task
Save the data set as a distributed data set with proper bucketing and sorting.
Solution
def create_spark_session(): """Create a Spark Session""" _ = load_dotenv() spark = SparkSession.builder.appName("SparkApp").master("local[5]").getOrCreate() return spark def read_sdf(spark,PATH_BIGDATA): """Read the dataset""" raw_sdf=spark.read.json(PATH_BIGDATA) return raw_sdf def rename_columns(df, column_map): """Rename the columns""" for old, new in column_map.items(): df = df.withColumnRenamed(old, new) return df def select_subset(df,columns): """Select the required columns""" df=df.select(*columns) return df def repartitioning_and_saving(df,PATH_SNAPSHOT): """Repartitioning and saving the snapshot""" df = df.repartition('reviewed_year', 'reviewed_month').sortWithinPartitions("asin") df.write.mode("overwrite").parquet(PATH_SNAPSHOT)
Solution of data input and output challenge
Explanation
-
Line 3: We read the key-value pair from
.env
...