Using pytest.fixture to Test Airflow Context

I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True), such as BranchPythonOperator orPythonOperator. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the various date-related values instead of a statically defined date, DAG name, & task name.)

There are 2 key components to creating a working context kwargs dictionary for your tests:

  1. Create a context fixture that dynamically populates many key values in the context dictionary.
  2. Use @mark.parametrize with indirect=True to allow dynamically assigned dates, DAG name, & task name. (It could easily be enhanced to include the task operator, too.) Then call the context fixture inside the test function as **context.

Create context Fixture

Configuring the fixture to accept parameters is easy, but it requires using the very benign sounding request object. (So don’t go thinking that you can call the parameter whatever you’d like… And you can only pass one object, so if you want to have more than one–as you’ll see below–you’ll need to “wrap” multiple values inside of a tuple.)

from datetime import datetime, timedelta
from pytest import fixture, mark

@fixture
def context(request):
    """Given a string in YYYY-MM-DD format, DAG name, & task name, fabricate an airflow
    context.
    """
    test_date, dag_name, task_name = request.param  # <== Unpacking all parameters
    today = datetime.strptime(test_date, "%Y-%m-%d")
    yesterday = today - timedelta(1)
    tomorrow = today + timedelta(1)
    ds = (today).strftime("%Y-%m-%d")
    ds_nodash = (today).strftime("%Y%m%d")
    yesterday_ds = (yesterday).strftime("%Y-%m-%d")
    yesterday_ds_nodash = (yesterday).strftime("%Y%m%d")
    tomorrow_ds = (tomorrow).strftime("%Y-%m-%d")
    tomorrow_ds_nodash = (tomorrow).strftime("%Y%m%d")
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    now_t = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
    return {
        "dag": f"<DAG: {dag_name}>",
        "ds": ds,
        "ds_nodash": ds_nodash,
        "ts": f"{ds}T00:00:00",
        "ts_nodash": f"{ds_nodash}T000000",
        "yesterday_ds": yesterday_ds,
        "yesterday_ds_nodash": yesterday_ds_nodash,
        "tomorrow_ds": tomorrow_ds,
        "tomorrow_ds_nodash": tomorrow_ds_nodash,
        "END_DATE": ds,
        "end_date": ds,
        "dag_run": f"<DagRun {task_name} @ {now}: scheduled__{now_t}, externally triggered: False>",
        "run_id": f"scheduled__{ds}T00:00:00",
        "execution_date": today,
        "prev_execution_date": yesterday,
        "next_execution_date": tomorrow,
        "latest_date": ds,
        "macros": "<module 'airflow.macros' from '/anaconda3/lib/python3.6/site-packages/airflow/macros/__init__.py'>",
        "params": {},
        "tables": None,
        "task": f"<Task(BranchPythonOperator): {task_name}>",
        "task_instance": f"<TaskInstance: {dag_name}.verify_error_existence {now} [running]>",
        "ti": f"<TaskInstance: {dag_name}.verify_error_existence {now} [running]>",
        "task_instance_key_str": f"{dag_name}__{task_name}__{ds_nodash}",
        "conf": "<module 'airflow.configuration' from '/anaconda3/lib/python3.6/site-packages/airflow/configuration.py'>",
        "test_mode": False,
        "var": {"value": None, "json": None},
        "templates_dict": None,
    }

And I might as well show the actual function under test–and it is being called by a BranchPythonOperator, which means that needs to return (at least) 2 different strings (aka task_ids).

def verify_error_existence(**context):
    """Get log file names from database and collect error details. If no errors are found,
    send Slack message. Otherwise format errors to send both email & Slack messages
    containing error details.
    """
    task_date = context.get("ds")
    rows_found = get_log_file_names(task_date)
    error_details = collect_results(rows_found)
    if len(error_details) == 0:
        return "send_all_clear_slack_message"
    else:
        return "format_error_details"

Use indirect=True Within @mark.parametrize to Pass Values to context Fixture and Call

The magic bits of indirect=True are that the parameters being passed will NOT be used in the test function, but passed upstream to the fixture!

@mark.parametrize(
    "context",
    [("2019-07-01", "legacy_ETL_verify", "verify_error_existence")], # <== Tuple to pass to fixture
    ids=["date_without_any_errors"],
    indirect=True,  # <== Pass the tuple upstream
)
def test_verify_error_existence_all_clear(context):
    results = my_function(**context)  # <== call as kwargs
    assert results == "send_all_clear_slack_message"

It’s worth noting that if I just wanted to pass any non-specific date, I could just pass datetime.date.today().strftime("%Y-%m-%d") as the first value in the tuple.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.