Verifying Task Order Within The DAG

One of my favorite tests for a DAG is the one to confirm the connectivity between tasks. If there is a limitation to this particular implementation, it’s that it only works with rather linear DAGs. (So if you DAG contains a number of forks & merges, this probably isn’t going to work as shown.)

Linear DAG Example

The code below has a simple DAG containing 6 tasks that will be run in the following order:

  1. start
  2. idempotency_check
  3. extract_data
  4. transform_data
  5. load_data
  6. send_slack_message_success

Testing the task order

from pytest import mark, param

import my_airflow_dag

dagbag = DagBag()
dag_id = my_airflow_dag.dag.dag_id
dag = dagbag.get_dag(dag_id)
args = dag.default_args
all_tasks = [
    "EMPTY",
    "start",
    "idempotency_check",
    "extract_data",
    "transform_data",
    "load_data",
    "send_slack_message_success",
    "EMPTY",
]
tasks = all_tasks[1:-1]  # Only contains actual tasks
upstream_tasks = all_tasks[:-1]  # Starts @ EMPTY, ends on last task
downstream_tasks = all_tasks[2:]  # Starts on first task; ends @ EMPTY


@mark.parametrize(
    "task_id, upstream_task_id, downstream_task_id",
    [param(t, u, d, id=t) for t, u, d in zip(tasks, upstream_tasks, downstream_tasks)],
)
def test_task_dependencies(task_id, upstream_task_id, downstream_task_id):
    """Verify the upstream/downstream task_ids for tasks in DAG.

    If there is no upstream or downstream task_id, it will be "EMPTY".
    """
    dag_task_id = dag.get_task(task_id)
    upstream_task_ids = [task.task_id for task in dag_task_id.upstream_list] or "EMPTY"
    downstream_task_ids = [
        task.task_id for task in dag_task_id.downstream_list
    ] or "EMPTY"
    assert upstream_task_id in upstream_task_ids
    assert downstream_task_id in downstream_task_ids

The magic is in the list slicing & then zip()-ping up those slices so that it outputs the current task, the task that came before it (or “EMPTY”), and the task that comes after it (or “EMPTY”). Then if I ever add or remove a task from the DAG, I just need to remove the name of the task from the original list (all_tasks).

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.