Ingestion Job: Part II

Continue inspecting the code of the Ingestion Batch Job example.

Processing the input

The project with the codebase for this lesson is the same as the previous lesson:

mvn install -DskipTests
java -jar /usercode/target/batch-app-0.0.1-SNAPSHOT.jar jobName=ingesterJob clientId=client1 readFormat=json fileName=sales
Project with the IngesterJob code implementation

Once Spark is instructed to read and point the input—JSON file contents—into our favorite logical abstraction, the DataFrame, this object is then “passed” to the job’s process method in the following manner:

Press + to interact
@Override
protected Dataset<Row> process(Dataset<Row> preProcessOutput) {
return (Dataset<Row>) ingesterProcessor.process(preProcessOutput);
}

The ingesterProcessor object receives it as the argument for its process(preProcessOutput) method. What goes on inside it? Let’s inspect the following code snippet:

Press + to interact
@Component
public class IngesterProcessor implements Processor<Dataset<Row>> {
private static Logger LOGGER = LoggerFactory
.getLogger(IngesterProcessor.class);
@Override
public Dataset<Row> process(Dataset<Row> inputDf) {
LOGGER.info("Flattening JSON records...");
//Get the appropriate Spark based class doing a Transformation
Dataset<Row> parsedResults = inputDf.flatMap(new IngesterJsonFlatMapper(), RowEncoder.apply(SalesSchema.getSparkSchema()));
return parsedResults;
}
}

Line 11 in the code packs a lot of interesting things:

  • On the inputDf, a flatMap transformation is applied to parse each of the JSON records into a single line represented by a Row object. This “translation” process is needed because when Spark reads the JSON file, the contents are internally structured in the following way:
Press + to interact
root
|-- Sales: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Date: string (nullable = true)
| | |-- Items: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Product: string (nullable = true)
| | | | |-- Quantity: long (nullable = true)
|-- Seller_Id: string (nullable = true)

Interestingly enough, Spark internal representation of a JSON read record (the schema) is pretty similar to the JSON structure.

It contains an array of Sales and Seller_id String fields as root properties. The sales array is a collection of elements, named element, of type struct.

Struct refers here to an object that is composed of a ...