Solution: Customer Churn Analysis Using PySpark
The solution to the customer churn analysis and predictions using PySpark.
We'll cover the following...
We'll cover the following...
Python 3.8
Files
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col, sum, count, corr, avgspark = SparkSession.builder.getOrCreate()# Task1: Loading Customer Data into a PySpark DataFrameprint("Reading 'churn.csv' into spark_df:")telco_df = spark.read.csv("./churn.csv", header=True, inferSchema=True)print("First 5 rows of the telco_df:")telco_df.show(2, truncate=False, vertical=True)print("Schema of the telco_df:")telco_df.printSchema()# Task 2: Preprocessing and Transformation of Datachurn_count = telco_df.filter(col("Churn Value") == 1).count()print("Counting the number of churned customers:",churn_count)print("Computing the average monthly charges by gender:")telco_df.groupBy("Gender").avg("Monthly Charges").show()print("Creating a new column 'Total Charges' by multiplying Monthly Charges and Tenure Months:")telco_df.withColumn("Total Charges", col("Monthly Charges") * col("Tenure Months")).show(2, vertical=True)print("Computing the correlation between Monthly Charges and Total Charges:")telco_df.select(corr(col("Monthly Charges"), col("Total Charges"))).show()# Task 3: EDAprint("Calculating the churn rates by contract type:")telco_df.groupBy("Contract").agg((sum("Churn Value") / telco_df.count()).alias("Aggregated Churn Value")).show()print("Calculating the average tenure by churn value:")telco_df.groupBy("Churn Value").agg(avg("Tenure Months").alias("Aggregated Churn Value")).show()print("Calculating the churn rates by payment method:")telco_df.groupBy("Payment Method").agg((sum("Churn Value") / telco_df.count()).alias("Aggregated Churn Value")).show()
Here’s a breakdown of what’s happening:
Task 1: Loading customer data into a PySpark DataFrame
- Lines 1–3: Import the necessary libraries and create a
SparkSession
using thebuilder
pattern. - Line 7: Read the CSV file “churn.csv” into a DataFrame named
telco_df
using theread.csv()
method of thespark
object. Theheader
parameter is set toTrue
to treat the