I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True
), such as BranchPythonOperator
orPythonOperator
. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the various date-related values instead of a statically defined date, DAG name, & task name.)
There are 2 key components to creating a working context kwargs dictionary for your tests:
- Create a context fixture that dynamically populates many key values in the context dictionary.
- Use
@mark.parametrize
withindirect=True
to allow dynamically assigned dates, DAG name, & task name. (It could easily be enhanced to include the task operator, too.) Then call thecontext
fixture inside the test function as**context
.
Create context Fixture
Configuring the fixture to accept parameters is easy, but it requires using the very benign sounding request
object. (So don’t go thinking that you can call the parameter whatever you’d like… And you can only pass one object, so if you want to have more than one–as you’ll see below–you’ll need to “wrap” multiple values inside of a tuple.)
from datetime import datetime, timedelta
from pytest import fixture, mark
@fixture
def context(request):
"""Given a string in YYYY-MM-DD format, DAG name, & task name, fabricate an airflow
context.
"""
test_date, dag_name, task_name = request.param # <== Unpacking all parameters
today = datetime.strptime(test_date, "%Y-%m-%d")
yesterday = today - timedelta(1)
tomorrow = today + timedelta(1)
ds = (today).strftime("%Y-%m-%d")
ds_nodash = (today).strftime("%Y%m%d")
yesterday_ds = (yesterday).strftime("%Y-%m-%d")
yesterday_ds_nodash = (yesterday).strftime("%Y%m%d")
tomorrow_ds = (tomorrow).strftime("%Y-%m-%d")
tomorrow_ds_nodash = (tomorrow).strftime("%Y%m%d")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
now_t = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
return {
"dag": f"<DAG: {dag_name}>",
"ds": ds,
"ds_nodash": ds_nodash,
"ts": f"{ds}T00:00:00",
"ts_nodash": f"{ds_nodash}T000000",
"yesterday_ds": yesterday_ds,
"yesterday_ds_nodash": yesterday_ds_nodash,
"tomorrow_ds": tomorrow_ds,
"tomorrow_ds_nodash": tomorrow_ds_nodash,
"END_DATE": ds,
"end_date": ds,
"dag_run": f"<DagRun {task_name} @ {now}: scheduled__{now_t}, externally triggered: False>",
"run_id": f"scheduled__{ds}T00:00:00",
"execution_date": today,
"prev_execution_date": yesterday,
"next_execution_date": tomorrow,
"latest_date": ds,
"macros": "<module 'airflow.macros' from '/anaconda3/lib/python3.6/site-packages/airflow/macros/__init__.py'>",
"params": {},
"tables": None,
"task": f"<Task(BranchPythonOperator): {task_name}>",
"task_instance": f"<TaskInstance: {dag_name}.verify_error_existence {now} [running]>",
"ti": f"<TaskInstance: {dag_name}.verify_error_existence {now} [running]>",
"task_instance_key_str": f"{dag_name}__{task_name}__{ds_nodash}",
"conf": "<module 'airflow.configuration' from '/anaconda3/lib/python3.6/site-packages/airflow/configuration.py'>",
"test_mode": False,
"var": {"value": None, "json": None},
"templates_dict": None,
}
And I might as well show the actual function under test–and it is being called by a BranchPythonOperator
, which means that needs to return (at least) 2 different strings (aka task_ids).
def verify_error_existence(**context):
"""Get log file names from database and collect error details. If no errors are found,
send Slack message. Otherwise format errors to send both email & Slack messages
containing error details.
"""
task_date = context.get("ds")
rows_found = get_log_file_names(task_date)
error_details = collect_results(rows_found)
if len(error_details) == 0:
return "send_all_clear_slack_message"
else:
return "format_error_details"
Use indirect=True Within @mark.parametrize to Pass Values to context Fixture and Call
The magic bits of indirect=True
are that the parameters being passed will NOT be used in the test function, but passed upstream to the fixture!
@mark.parametrize(
"context",
[("2019-07-01", "legacy_ETL_verify", "verify_error_existence")], # <== Tuple to pass to fixture
ids=["date_without_any_errors"],
indirect=True, # <== Pass the tuple upstream
)
def test_verify_error_existence_all_clear(context):
results = my_function(**context) # <== call as kwargs
assert results == "send_all_clear_slack_message"
It’s worth noting that if I just wanted to pass any non-specific date, I could just pass datetime.date.today().strftime("%Y-%m-%d")
as the first value in the tuple.