- BigQuery Publish

Read data from BigQuery, apply a model, and write the results to BigQuery.

Example

We’ll start by building a Beam pipeline that reads in data from BigQuery, applies a model, and then writes the results to BigQuery. In the next lesson, we’ll add Cloud Datastore as an additional data sink for the pipeline.

This pipeline will be a bit more complex than the previous example because we need to use multiple Python modules in the process function, which requires a bit more setup. This time, we’ll walk through different parts of the pipeline to provide additional details about each step.

Importing libraries

The first task is to define the libraries needed to build and execute the pipeline. We are also importing the JSON module because we need it to create the schema object that specifies the structure of the output BigQuery table. Like in the past lesson, we are still sampling the dataset to make sure our pipeline works before ramping up to the complete dataset. Once we’re confident in our pipeline, we can remove the limit command and autoscale a cluster to complete the workload.

Press + to interact
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import json
query = """
SELECT year, plurality, apgar_5min,
mother_age, father_age,
gestation_weeks, ever_born
,case when mother_married = true
then 1 else 0 end as mother_married
,weight_pounds as weight
,current_timestamp as time
,GENERATE_UUID() as guid
FROM `bigquery-public-data.samples.natality`
rand()
limit 100
"""

Creating model object

Next, we’ll define a DoFn class that implements the process function and applies the sklearn model to the individual records in the Natality dataset. One of the changes from before is that we now have an init function, which we use to instantiate a set of fields. In order to have references to the modules that we need to use in the process function, we need to assign these ...