Tag: Airflow

Another Way to Dynamically Create New Tasks: Factory Methods

New project, new challenge. This time I wanted to create very similar tasks (in this case to drop indexes), however, I didn’t want to be locked in with doing those similar tasks at the same time (read: in parallel with help from an iterable). So while I will not even pretend to be the originator […]

Why Setting an SLA for Your DAGs Might Be a Very Good Idea

While I feel like I’d already taken plenty of steps to be notified if there’s a problem when a DAG fails, I recently discovered that I hadn’t done anything to catch those scenarios where a DAG takes longer–or in my case, a lot longer–to complete (or fail) than normal. Thus, the need to learn how […]

Testing for logging Output from PythonOperator Functions

While generally my python functions called by a PythonOperator end with return True, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging. The problem is “How do you test that […]

Getting Credit Card BIN/IIN Information with Python

There are free (& paid) services that allow you to submit API calls with partial credit card numbers to return information about the credit card, like type (Visa, MC, Amex), the issuing bank, the country & currency where the card was issued, etc. The problem is that the free services either limit what data is […]

Custom Airflow Operators for Loading Data Into PostgreSQL

While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote […]

Testing SqlSensor & dealing with str.startswith

This was one of those “Of course multiple values can be passed/checked!”-type situations. I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task. Here’s how the test originally looked testing for just the […]

Using pytest.fixture to Test Airflow Context

I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True), such as BranchPythonOperator orPythonOperator. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the […]