While generally my python functions called by a PythonOperator
end with return True
, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging
.
The problem is “How do you test that a message was emitted to the airflow log?” Well, wonder no more!
pytest Built-In Fixture, caplog
(I’m going to save another pytest built-in fixture, tmp_path
, for a future post…and context is my own fixture discussed in a previous post for simulating airflow’s context.)
import requests
def download_csv_file(target_dir: Path, **context) -> str:
"""Download CSV file from remote server.
"""
task_date = datetime.strptime(context.get("ds"), "%Y-%m-%d")
file_name = csv_file_name(task_date)
local_file = Path(target_dir) / file_name
url = f"https://website.com/extracts/{file_name}"
csv_file = requests.get(url, stream=True)
csv_file.raise_for_status()
with open(local_file, "wb") as lf:
lf.write(csv_file.content)
logging.info(f"{local_file} downloaded successfully.") # LOOK FOR THIS LINE!
return True
def test_download_csv_file(caplog, tmp_path, context):
"""Make sure that a valid CSV file is downloaded from remote server.
"""
test_date = date.today() - timedelta(1)
file_date = test_date.strftime("%d-%m-%Y")
file_name = f"remote_source_{file_date}.csv"
test_file = tmp_path / file_name
results = download_csv_file(tmp_path, **context)
assert results
assert f"{test_file}" in caplog.text # Make sure logging message is emitted
assert test_file.is_file()
assert os.path.getsize(test_file) > 7_000_000 # File should be 7+MB
Definitely look over the documentation on the pytest website because there are all sorts of things you can check with caplog
.
But here I simply look for a string in caplog.text
, which in this case just happens to be a single object. If you want to capture multiple logged events, then you may have to iterate through a list to get to the message you want.