Solution: Build an FXRate Reporting Pipeline with Airflow

Solution

Let's check out the suggested solution to this challenge. Here are the tasks we need to perform:

  • Create a task to fetch the exchange rate from the endpoint every minute from Monday to Friday. Make sure that rerunning the DAG on a different date makes the same API request.

  • Create a task to insert data into the BigQuery exchange_rate table. We must ensure idempotency, meaning that running the same Dag Run multiple times will not result in duplicated records. Using the MERGE statement in BigQuery is a good option.

  • Create a task to retrieve data points of the day from the exchange_rate table, calculate the daily average rate so far, and update the exchange_rate_report table. Similarly, this step must be idempotent as well, so the same Dag Run always returns data points of the same day.

  • Build a dependency between the ingestion task and analysis. The latter will run only if the previous ones are successful.

Let's solve them one by one.

Create an ingestion DAG

We start with creating an ingestion DAG object. The DAG runs every minute to fetch the exchange rate from the endpoint and ingest it into the exchange_rate table. template_searchpath points to the SQL folder that will be used by BigQuery operators. user_defined_marcos defines macros that can be shared among tasks.

Get hands-on with 1400+ tech skills courses.