- Sklearn Streaming 2
Build a streaming pipeline that applies a sklearn model in the second part of streaming in sklearn.
To implement the streaming model pipeline, we’ll use PySpark with a Python UDF to apply model predictions as new elements arrive.
Example
A Python UDF operates on a single row, while a Pandas UDF operates on a partition of rows. The code for this pipeline is shown in the PySpark snippet below, which first trains a model on the driver node, sets up a data sink for a Kafka stream, defines a UDF for applying an ML model, and then publishes the scores to a new topic as a pipeline output.
Replace
{external IP}
with the public IP of your machine or EC2 instance.
from pyspark.sql.types import StringTypeimport jsonimport pandas as pdfrom sklearn.linear_model import LogisticRegression# build a logistic regression modelgamesDF = pd.read_csv("https://github.com/bgweber/Twitch/raw/master/Recommendations/games-expand.csv")model = LogisticRegression()model.fit(gamesDF.iloc[:,0:10], gamesDF['label'])# define the UDF for scoring usersdef score(row):d = json.loads(row)p = pd.DataFrame.from_dict(d, orient = "index").transpose()pred = model.predict_proba(p.iloc[:,0:10])[0][0]result = {'User_ID': d['User_ID'], 'pred': pred }return str(json.dumps(result))# read from Kafkadf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "{external_ip}:9092").option("subscribe", "dsp").load()# select the value field and apply the UDFdf = df.selectExpr("CAST(value AS STRING)")score_udf = udf(score, StringType())df = df.select( score_udf("value").alias("value"))# Write results to Kafkaquery = df.writeStream.format("kafka").option("kafka.bootstrap.servers", "{external_ip}:9092").option("topic", "preds").option("checkpointLocation", "/temp").start()
The script first trains a logistic regression model using data fetched from GitHub. The model object is created on the driver node, but is copied to the worker nodes when used by the UDF.
Defining UDF
The next step is to ...