I haven’t exactly found a wealth of information on how to test with Airflow. Here are 3 tests though that I’ve used with every single DAG that I’ve written.
The first one does nothing more than verify that there are no syntax errors within the DAG file, the second one confirms the existence of and access to any Airflow variables in the DAG, and making sure that any database Operators (e.g., PostgresOperator) has a SQL statement/file assigned to it in the last test.
And in case it isn’t obvious, all testing is done with pytest.
No syntax errors test
This one is the ultimate in low hanging fruit–a baseline in just making sure that your DAG has valid syntax.
from airflow.models import DagBag
import my_airflow_dag
dagbag = DagBag()
dag_id = my_airflow_dag.dag.dag_id
dag = dagbag.get_dag(dag_id)
args = dag.default_args
def test_dag_logic_and_syntax():
"""Verify that there are no logical or syntactical errors in DAG.
Ideal response should return 0 errors (status == False).
"""
assert not len(dagbag.import_errors)
Airflow Variables test
I only have a few Airflow Variables configured, but I use them a lot. The most common usage is all of my DAGs end with sending a message via Slack. So instead of managing the token & channel within every DAG, I can simply call those Variables from the Airflow database when needed.
from airflow.models import Variable
from pytest import mark
variables = ["slack_status_channel", "slack_token"]
@mark.parametrize("variable", variables, id=variables)
def test_variables(variable):
"""Verify existence of any Variables used in DAG.
Note: Variable is imported in code block above.
"""
assert Variable.get(variable)
SQL for database operators test
This one is deceptively powerful. After running into trying to run a DAG & not having a task either contain a SQL statement or point to a (correct) SQL file, I wrote this simple check to make sure that each database operator task will at least be able to do something.
While most of my database interaction is done with only PostgreSQL, you can use a tuple of database operators with isinstance() if you need to interact with more than one in a DAG.
from airflow.operators.postgres_operator import PostgresOperator
@mark.parametrize(
"db_operator",
[param(x, id=x.task_id) for x in dag.tasks if isinstance(x, PostgresOperator)],
)
def test_database_operators_have_sql(db_operator):
"""For all PostgresOperator task(s), verify that sql attribute returns
non-empty value.
"""
assert db_operator.sql.startswith("INSERT") or db_operator.sql.startswith(
"TRUNCATE"
)
Hey,
Thanks for the post. It’s really helpful! We are using the Variables test but not the database operators test.
Google is suggesting that your site is not safe.
There will be more attention if you can change it to https.
Best,
Thank you for the feedback & I’m glad that you found some of the tests to be helpful.
(I also got around to finally setting up SSL so you shouldn’t see any more warnings.)