One of my favorite tests for a DAG is the one to confirm the connectivity between tasks. If there is a limitation to this particular implementation, it’s that it only works with rather linear DAGs. (So if you DAG contains a number of forks & merges, this probably isn’t going to work as shown.)
Linear DAG Example
The code below has a simple DAG containing 6 tasks that will be run in the following order:
- start
- idempotency_check
- extract_data
- transform_data
- load_data
- send_slack_message_success
Testing the task order
from pytest import mark, param
import my_airflow_dag
dagbag = DagBag()
dag_id = my_airflow_dag.dag.dag_id
dag = dagbag.get_dag(dag_id)
args = dag.default_args
all_tasks = [
"EMPTY",
"start",
"idempotency_check",
"extract_data",
"transform_data",
"load_data",
"send_slack_message_success",
"EMPTY",
]
tasks = all_tasks[1:-1] # Only contains actual tasks
upstream_tasks = all_tasks[:-1] # Starts @ EMPTY, ends on last task
downstream_tasks = all_tasks[2:] # Starts on first task; ends @ EMPTY
@mark.parametrize(
"task_id, upstream_task_id, downstream_task_id",
[param(t, u, d, id=t) for t, u, d in zip(tasks, upstream_tasks, downstream_tasks)],
)
def test_task_dependencies(task_id, upstream_task_id, downstream_task_id):
"""Verify the upstream/downstream task_ids for tasks in DAG.
If there is no upstream or downstream task_id, it will be "EMPTY".
"""
dag_task_id = dag.get_task(task_id)
upstream_task_ids = [task.task_id for task in dag_task_id.upstream_list] or "EMPTY"
downstream_task_ids = [
task.task_id for task in dag_task_id.downstream_list
] or "EMPTY"
assert upstream_task_id in upstream_task_ids
assert downstream_task_id in downstream_task_ids
The magic is in the list slicing & then zip()-ping up those slices so that it outputs the current task, the task that came before it (or “EMPTY”), and the task that comes after it (or “EMPTY”). Then if I ever add or remove a task from the DAG, I just need to remove the name of the task from the original list (all_tasks).