Testing SqlSensor & dealing with str.startswith

This was one of those “Of course multiple values can be passed/checked!”-type situations.

I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task.

Here’s how the test originally looked testing for just the PostgresOperator:

from airflow.operators.postgres_operator import PostgresOperator

@mark.parametrize(
    "db_operator",
    [x for x in dag.tasks if isinstance(x, PostgresOperator)],
    ids=[x for x, y in zip(dag.task_ids, dag.tasks) if isinstance(y, PostgresOperator)],
)
def test_database_operators_have_sql(db_operator):
    """For all PostgresOperator task(s), verify that sql attribute returns
    non-empty value.
    """
    assert db_operator.sql.startswith("INSERT") or db_operator.sql.startswith(
        "TRUNCATE"
    )

You can check for more than one class with isinstance() by passing a tuple of classes for the second argument. And not only do I want to check for SqlSensor tasks, I need to make sure that the SQL query may start with SELECT (as well as the other 2 strings), which means modifying the assert statement. Some research turned up that you can also pass a tuple to str.startswith() to check for multiple strings! Implementing those requirements now gives us this:

from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.sql_sensor import SqlSensor

@mark.parametrize(
    "db_operator",
    [x for x in dag.tasks if isinstance(x, (PostgresOperator, SqlSensor))],
    ids=[
        x
        for x, y in zip(dag.task_ids, dag.tasks)
        if isinstance(y, (PostgresOperator, SqlSensor))
    ],
)
def test_database_operators_have_sql(db_operator):
    """For all tasks involving a SQL query, verify that sql attribute returns
    non-empty value.
    """
    assert db_operator.sql.startswith(("INSERT", "SELECT", "TRUNCATE"))

Ah, simplicity!

Bonus lesson learned:

The first time I ran the modified test, I had accidentally omitted SqlSensor in the second argument (first list comprehension) of @mark.parametrize()… When I ran the test (worth noting that the DAG does not have any PostgresOperator tasks in it), it was being skipped! So if there are no tasks that meet the criteria, pytest automatically marks the test as skipped. Nice!

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.