- Natality Streaming
Creating pipeline with Dataflow using the Natality dataset.
We'll cover the following...
PubSub can be used to provide data sources and data sinks within a Dataflow pipeline, where a consumer is a data source and a publisher is a data sink.
Example
We’ll reuse the Natality dataset to create a pipeline with Dataflow, but for the streaming version, we’ll use a PubSub consumer as the input data source rather than a BigQuery result set.
Defining functions
For the output, we’ll publish predictions to Datastore and reuse the published DoFn
from the previous chapter.
import apache_beam as beamimport argparsefrom apache_beam.options.pipeline_options import PipelineOptionsfrom apache_beam.io.gcp.bigquery import parse_table_schema_from_jsonimport jsonclass ApplyDoFn(beam.DoFn):def __init__(self):self._model = Nonefrom google.cloud import storageimport pandas as pdimport pickle as pklimport json as jsself._storage = storageself._pkl = pklself._pd = pdself._json = jsdef process(self, element):if self._model is None:bucket = self._storage.Client().get_bucket('dsp_model_store')blob = bucket.get_blob('natality/sklearn-linear')self._model =self._pkl.loads(blob.download_as_string())element = self._json.loads(element.decode('utf-8'))new_x = self._pd.DataFrame.from_dict(element,orient = "index").transpose().fillna(0)weight = self._model.predict(new_x.iloc[:,1:8])[0]return [ { 'guid': element['guid'], 'weight': weight,'time': str(element['time']) } ]
The code snippet above shows the function we’ll use to perform the model application in the streaming pipeline. This function is the same as the function we defined in Cloud Dataflow and Batch Modeling with one modification: the json.loads
function is used to convert the passed in string into a dictionary object. ...