Revisiting “Database Operators Have SQL” test

Back in my first post, https://learningtotest.com/2019/06/13/3-easy-airflow-tests/, the third test made sure that there was valid SQL attached to the task.

I’ve come to realize that a downside to that particular snippet is that the output doesn’t tell you the name of the task that failed, but returns output like this:

tests/test_my_dag.py::test_database_operators_have_sql[db_operator0] FAILED [ 18%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator1] FAILED [ 27%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator2] FAILED [ 36%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator3] FAILED [ 45%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator4] FAILED [ 54%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator5] FAILED [ 63%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator6] FAILED [ 72%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator7] FAILED [ 81%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator8] FAILED [ 90%]
tests/test_my_dag.py::test_database_operators_have_sql[db_operator9] FAILED [100%]

That’s not very helpful! We can do better… Let’s generate a list of ids. It requires zipping up the two lists generated from dag.tasks & dag.task_ids though.

@mark.parametrize(
    "db_operator",
    [x for x in dag.tasks if isinstance(x, PostgresOperator)],
    ids=[x for x, y in zip(dag.task_ids, dag.tasks) if isinstance(y, PostgresOperator)],
)
def test_database_operators_have_sql(db_operator):
    """For all PostgresOperator task(s), verify that sql attribute returns
    non-empty value.
    """
    assert db_operator.sql.startswith("INSERT") or db_operator.sql.startswith(
        "TRUNCATE"
    )

Which now returns much more helpful output, telling us exactly which task failed uder test:

tests/test_my_dag.py::test_database_operators_have_sql[extract_table01] FAILED [ 64%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table02] FAILED [ 68%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table03] FAILED [ 72%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table04] FAILED [ 76%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table05] FAILED [ 80%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table06] FAILED [ 84%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table07] FAILED [ 88%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table08] FAILED [ 92%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table09] FAILED [ 96%]
tests/test_my_dag.py::test_database_operators_have_sql[extract_table10] FAILED [100%]

Ah, that’s much better… (Well, now I clearly need to go write some SQL!)

Send Slack Message When A Task Fails

I configure all of my DAGs to slack me a message when they finish successfully, but what about when a task in a DAG fails? Sure, there’s the built in ability to send an email, but wouldn’t it be nice to slack a failed message instead?

Turns out, it’s not so hard to do… Simply create a function that creates a SlackAPIPostOperator object & then add an on_failure_callback reference in your DAG definition!

[All of the usual docstrings & imports for your DAG go here]


def send_slack_message_failed(context):
    """Send Slack message when a task fails.
    """
    return SlackAPIPostOperator(
        task_id="send_slack_message_task_failed",
        token=Variable.get("slack_token"),
        channel=Variable.get("slack_status_channel"),
        username="BI-ETL-airflow",
        text=f":red_circle: TASK {context.get('task')} IN {context.get('dag')} "
        + "FAILED! ATTENTION NEEDED!",
    ).execute(context=context)


default_args = { ... }

with DAG(
    dag_id="my_awesome_dag_that_sometimes_fails",
    default_args=default_args,
    schedule_interval="@daily",
) as dag:
    task1 = PostgresOperator(
        task_id="truncate_mysterious_table",
        sql="TRUNCATE TABLE mysterious;",
        on_failure_callback=send_slack_message_failed,
    )
        

I feel like I should also mention that I generally prefer in most DAGs for it to not contain any functions, as my preference is to separate the connection of the tasks from the work actually performed by the tasks. However, this is a worthy exception as potentially all tasks in the DAG could invoke send_slack_message_failed().

So when a task goes wrong, here’s a screenshot of the slack message:

I should also mention that because the message sent includes the name of the task, I found that the only way to get the correct value passed was to add on_failure_callback statements to all the individual tasks! (If you wish to have a more generic message though, you could simply add on_failure_callback in the DAG() definition.)

Verifying Task Order Within The DAG

One of my favorite tests for a DAG is the one to confirm the connectivity between tasks. If there is a limitation to this particular implementation, it’s that it only works with rather linear DAGs. (So if you DAG contains a number of forks & merges, this probably isn’t going to work as shown.)

Linear DAG Example

The code below has a simple DAG containing 6 tasks that will be run in the following order:

  1. start
  2. idempotency_check
  3. extract_data
  4. transform_data
  5. load_data
  6. send_slack_message_success

Testing the task order

from pytest import mark, param

import my_airflow_dag

dagbag = DagBag()
dag_id = my_airflow_dag.dag.dag_id
dag = dagbag.get_dag(dag_id)
args = dag.default_args
all_tasks = [
    "EMPTY",
    "start",
    "idempotency_check",
    "extract_data",
    "transform_data",
    "load_data",
    "send_slack_message_success",
    "EMPTY",
]
tasks = all_tasks[1:-1]  # Only contains actual tasks
upstream_tasks = all_tasks[:-1]  # Starts @ EMPTY, ends on last task
downstream_tasks = all_tasks[2:]  # Starts on first task; ends @ EMPTY


@mark.parametrize(
    "task_id, upstream_task_id, downstream_task_id",
    [param(t, u, d, id=t) for t, u, d in zip(tasks, upstream_tasks, downstream_tasks)],
)
def test_task_dependencies(task_id, upstream_task_id, downstream_task_id):
    """Verify the upstream/downstream task_ids for tasks in DAG.

    If there is no upstream or downstream task_id, it will be "EMPTY".
    """
    dag_task_id = dag.get_task(task_id)
    upstream_task_ids = [task.task_id for task in dag_task_id.upstream_list] or "EMPTY"
    downstream_task_ids = [
        task.task_id for task in dag_task_id.downstream_list
    ] or "EMPTY"
    assert upstream_task_id in upstream_task_ids
    assert downstream_task_id in downstream_task_ids

The magic is in the list slicing & then zip()-ping up those slices so that it outputs the current task, the task that came before it (or “EMPTY”), and the task that comes after it (or “EMPTY”). Then if I ever add or remove a task from the DAG, I just need to remove the name of the task from the original list (all_tasks).

3 Easy Airflow Tests

I haven’t exactly found a wealth of information on how to test with Airflow. Here are 3 tests though that I’ve used with every single DAG that I’ve written.

The first one does nothing more than verify that there are no syntax errors within the DAG file, the second one confirms the existence of and access to any Airflow variables in the DAG, and making sure that any database Operators (e.g., PostgresOperator) has a SQL statement/file assigned to it in the last test.

And in case it isn’t obvious, all testing is done with pytest.

No syntax errors test

This one is the ultimate in low hanging fruit–a baseline in just making sure that your DAG has valid syntax.

from airflow.models import DagBag

import my_airflow_dag

dagbag = DagBag()
dag_id = my_airflow_dag.dag.dag_id
dag = dagbag.get_dag(dag_id)
args = dag.default_args


def test_dag_logic_and_syntax():
    """Verify that there are no logical or syntactical errors in DAG.

    Ideal response should return 0 errors (status == False).
    """
    assert not len(dagbag.import_errors)

Airflow Variables test

I only have a few Airflow Variables configured, but I use them a lot. The most common usage is all of my DAGs end with sending a message via Slack. So instead of managing the token & channel within every DAG, I can simply call those Variables from the Airflow database when needed.

from airflow.models import Variable
from pytest import mark

variables = ["slack_status_channel", "slack_token"]


@mark.parametrize("variable", variables, id=variables)
def test_variables(variable):
    """Verify existence of any Variables used in DAG.

    Note: Variable is imported in code block above.
    """
    assert Variable.get(variable)

SQL for database operators test

This one is deceptively powerful. After running into trying to run a DAG & not having a task either contain a SQL statement or point to a (correct) SQL file, I wrote this simple check to make sure that each database operator task will at least be able to do something.

While most of my database interaction is done with only PostgreSQL, you can use a tuple of database operators with isinstance() if you need to interact with more than one in a DAG.

from airflow.operators.postgres_operator import PostgresOperator


@mark.parametrize(
    "db_operator",
    [param(x, id=x.task_id) for x in dag.tasks if isinstance(x, PostgresOperator)],
)
def test_database_operators_have_sql(db_operator):
    """For all PostgresOperator task(s), verify that sql attribute returns
    non-empty value.
    """
    assert db_operator.sql.startswith("INSERT") or db_operator.sql.startswith(
        "TRUNCATE"
    )