This was one of those “Of course multiple values can be passed/checked!”-type situations.
I’ve written a couple new DAGs which use SqlSensor
, and I want to use my existing test, test_database_operators_have_sql()
, to make sure that I am passing a SQL statement to the SqlSensor
task.
Here’s how the test originally looked testing for just the PostgresOperator
:
from airflow.operators.postgres_operator import PostgresOperator
@mark.parametrize(
"db_operator",
[x for x in dag.tasks if isinstance(x, PostgresOperator)],
ids=[x for x, y in zip(dag.task_ids, dag.tasks) if isinstance(y, PostgresOperator)],
)
def test_database_operators_have_sql(db_operator):
"""For all PostgresOperator task(s), verify that sql attribute returns
non-empty value.
"""
assert db_operator.sql.startswith("INSERT") or db_operator.sql.startswith(
"TRUNCATE"
)
You can check for more than one class with isinstance()
by passing a tuple of classes for the second argument. And not only do I want to check for SqlSensor
tasks, I need to make sure that the SQL query may start with SELECT
(as well as the other 2 strings), which means modifying the assert
statement. Some research turned up that you can also pass a tuple to str.startswith()
to check for multiple strings! Implementing those requirements now gives us this:
from airflow.operators.postgres_operator import PostgresOperator
from airflow.sensors.sql_sensor import SqlSensor
@mark.parametrize(
"db_operator",
[x for x in dag.tasks if isinstance(x, (PostgresOperator, SqlSensor))],
ids=[
x
for x, y in zip(dag.task_ids, dag.tasks)
if isinstance(y, (PostgresOperator, SqlSensor))
],
)
def test_database_operators_have_sql(db_operator):
"""For all tasks involving a SQL query, verify that sql attribute returns
non-empty value.
"""
assert db_operator.sql.startswith(("INSERT", "SELECT", "TRUNCATE"))
Ah, simplicity!
Bonus lesson learned:
The first time I ran the modified test, I had accidentally omitted SqlSensor
in the second argument (first list comprehension) of @mark.parametrize()
… When I ran the test (worth noting that the DAG does not have any PostgresOperator
tasks in it), it was being skipped! So if there are no tasks that meet the criteria, pytest automatically marks the test as skipped. Nice!