Solution: Build an FXRate Reporting Pipeline with Airflow
Discuss the solution to the Airflow challenge.
Solution
Let's check out the suggested solution to this challenge. Here are the tasks we need to perform:
Create a task to fetch the exchange rate from the endpoint every minute from Monday to Friday. Make sure that rerunning the DAG on a different date makes the same API request.
Create a task to insert data into the BigQuery
exchange_rate
table. We must ensure idempotency, meaning that running the same Dag Run multiple times will not result in duplicated records. Using theMERGE
statement in BigQuery is a good option.Create a task to retrieve data points of the day from the
exchange_rate
table, calculate the daily average rate so far, and update theexchange_rate_report
table. Similarly, this step must be idempotent as well, so the same Dag Run always returns data points of the same day.Build a dependency between the ingestion task and analysis. The latter will run only if the previous ones are successful.
Let's solve them one by one.
Create an ingestion DAG
We start with creating an ingestion DAG object. The DAG runs every minute to fetch the exchange rate from the endpoint and ingest it into the exchange_rate
table. template_searchpath
points to the SQL folder that will be used by BigQuery operators. user_defined_marcos
defines macros that can be shared among tasks.
Get hands-on with 1400+ tech skills courses.