- Natality Streaming

Creating pipeline with Dataflow using the Natality dataset.

PubSub can be used to provide data sources and data sinks within a Dataflow pipeline, where a consumer is a data source and a publisher is a data sink.

Example

We’ll reuse the Natality dataset to create a pipeline with Dataflow, but for the streaming version, we’ll use a PubSub consumer as the input data source rather than a BigQuery result set.

Defining functions

For the output, we’ll publish predictions to Datastore and reuse the published DoFn from the previous chapter.

Press + to interact
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import json
class ApplyDoFn(beam.DoFn):
def __init__(self):
self._model = None
from google.cloud import storage
import pandas as pd
import pickle as pkl
import json as js
self._storage = storage
self._pkl = pkl
self._pd = pd
self._json = js
def process(self, element):
if self._model is None:
bucket = self._storage.Client().get_bucket('dsp_model_store')
blob = bucket.get_blob('natality/sklearn-linear')
self._model =self._pkl.loads(blob.download_as_string())
element = self._json.loads(element.decode('utf-8'))
new_x = self._pd.DataFrame.from_dict(element,
orient = "index").transpose().fillna(0)
weight = self._model.predict(new_x.iloc[:,1:8])[0]
return [ { 'guid': element['guid'], 'weight': weight,
'time': str(element['time']) } ]

The code snippet above shows the function we’ll use to perform the model application in the streaming pipeline. This function is the same as the function we defined in Cloud Dataflow and Batch Modeling with one modification: the json.loads function is used to convert the passed in string into a dictionary object. ...