- Sklearn Streaming 2

Build a streaming pipeline that applies a sklearn model in the second part of streaming in sklearn.

To implement the streaming model pipeline, we’ll use PySpark with a Python UDF to apply model predictions as new elements arrive.

Example

A Python UDF operates on a single row, while a Pandas UDF operates on a partition of rows. The code for this pipeline is shown in the PySpark snippet below, which first trains a model on the driver node, sets up a data sink for a Kafka stream, defines a UDF for applying an ML model, and then publishes the scores to a new topic as a pipeline output.

Replace {external IP} with the public IP of your machine or EC2 instance.

Press + to interact
from pyspark.sql.types import StringType
import json
import pandas as pd
from sklearn.linear_model import LogisticRegression
# build a logistic regression model
gamesDF = pd.read_csv("https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")
model = LogisticRegression()
model.fit(gamesDF.iloc[:,0:10], gamesDF['label'])
# define the UDF for scoring users
def score(row):
d = json.loads(row)
p = pd.DataFrame.from_dict(d, orient = "index").transpose()
pred = model.predict_proba(p.iloc[:,0:10])[0][0]
result = {'User_ID': d['User_ID'], 'pred': pred }
return str(json.dumps(result))
# read from Kafka
df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "{external_ip}:9092")
.option("subscribe", "dsp").load()
# select the value field and apply the UDF
df = df.selectExpr("CAST(value AS STRING)")
score_udf = udf(score, StringType())
df = df.select( score_udf("value").alias("value"))
# Write results to Kafka
query = df.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "{external_ip}:9092")
.option("topic", "preds")
.option("checkpointLocation", "/temp").start()

The script first trains a logistic regression model using data fetched from GitHub. The model object is created on the driver node, but is copied to the worker nodes when used by the UDF.

Defining UDF

The next step is to ...