Ingestion Job: Part II
Continue inspecting the code of the Ingestion Batch Job example.
Processing the input
The project with the codebase for this lesson is the same as the previous lesson:
mvn install -DskipTests java -jar /usercode/target/batch-app-0.0.1-SNAPSHOT.jar jobName=ingesterJob clientId=client1 readFormat=json fileName=sales
Once Spark is instructed to read and point the input—JSON file contents—into our favorite logical abstraction, the DataFrame, this object is then “passed” to the job’s process method in the following manner:
@Overrideprotected Dataset<Row> process(Dataset<Row> preProcessOutput) {return (Dataset<Row>) ingesterProcessor.process(preProcessOutput);}
The ingesterProcessor
object receives it as the argument for its process(preProcessOutput)
method. What goes on inside it? Let’s inspect the following code snippet:
@Componentpublic class IngesterProcessor implements Processor<Dataset<Row>> {private static Logger LOGGER = LoggerFactory.getLogger(IngesterProcessor.class);@Overridepublic Dataset<Row> process(Dataset<Row> inputDf) {LOGGER.info("Flattening JSON records...");//Get the appropriate Spark based class doing a TransformationDataset<Row> parsedResults = inputDf.flatMap(new IngesterJsonFlatMapper(), RowEncoder.apply(SalesSchema.getSparkSchema()));return parsedResults;}}
Line 11 in the code packs a lot of interesting things:
- On the
inputDf
, aflatMap
transformation is applied to parse each of the JSON records into a single line represented by a Row object. This “translation” process is needed because when Spark reads the JSON file, the contents are internally structured in the following way:
root|-- Sales: array (nullable = true)| |-- element: struct (containsNull = true)| | |-- Date: string (nullable = true)| | |-- Items: array (nullable = true)| | | |-- element: struct (containsNull = true)| | | | |-- Product: string (nullable = true)| | | | |-- Quantity: long (nullable = true)|-- Seller_Id: string (nullable = true)
Interestingly enough, Spark internal representation of a JSON read record (the schema) is pretty similar to the JSON structure.
It contains an array of Sale
s and Seller_id
String fields as root properties. The sales array is a collection of elements, named element
, of type struct
.
Struct
refers here to an object that is composed of a ...