...

/

Solution: Data Input and Output

Solution: Data Input and Output

Let's see the solution of the data input/output Challenge.

We'll cover the following...

Task

Save the data set as a distributed data set with proper bucketing and sorting.

Solution

def create_spark_session():
    """Create a Spark Session"""
    _ = load_dotenv()
    spark = SparkSession.builder.appName("SparkApp").master("local[5]").getOrCreate()
    return spark


def read_sdf(spark,PATH_BIGDATA):
    """Read the dataset"""
    raw_sdf=spark.read.json(PATH_BIGDATA)
    return raw_sdf

def rename_columns(df, column_map):
    """Rename the columns"""
    for old, new in column_map.items():
        df = df.withColumnRenamed(old, new)

    return df

def select_subset(df,columns):
    """Select the required columns"""
    df=df.select(*columns)
    return df

def repartitioning_and_saving(df,PATH_SNAPSHOT):
    """Repartitioning and saving the snapshot"""
    df = df.repartition('reviewed_year', 'reviewed_month').sortWithinPartitions("asin")
    df.write.mode("overwrite").parquet(PATH_SNAPSHOT)
Solution of data input and output challenge

Explanation

  • Line 3: We read the key-value pair from .env ...