New project, new challenge. This time I wanted to create very similar tasks (in this case to drop indexes), however, I didn’t want to be locked in with doing those similar tasks at the same time (read: in parallel with help from an iterable). So while I will not even pretend to be the originator […]
Tag: Airflow
Testing Task Dependencies In Your DAG
While I have been using a simpler version in the past, it is only recently that I wrapped my head around how to simplify the creation–and really the maintenance–of upstream & downstream task dependencies in a DAG. The way I had been doing it involved writing these massive tuples with the task names over & […]
Why Setting an SLA for Your DAGs Might Be a Very Good Idea
While I feel like I’d already taken plenty of steps to be notified if there’s a problem when a DAG fails, I recently discovered that I hadn’t done anything to catch those scenarios where a DAG takes longer–or in my case, a lot longer–to complete (or fail) than normal. Thus, the need to learn how […]
Testing for logging Output from PythonOperator Functions
While generally my python functions called by a PythonOperator end with return True, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging. The problem is “How do you test that […]
Getting Credit Card BIN/IIN Information with Python
There are free (& paid) services that allow you to submit API calls with partial credit card numbers to return information about the credit card, like type (Visa, MC, Amex), the issuing bank, the country & currency where the card was issued, etc. The problem is that the free services either limit what data is […]
A Tale of Printing Two Euros
There are 2 different Airflow DAGs that I maintain which generate reports that are delivered via email. And in both reports, the financial information needs to be reported in both USD & EUR currency. One of the reports involves running a SQL query, which loads the results into a pandas dataframe, then performs a few […]
Custom Airflow Operators for Loading Data Into PostgreSQL
While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote […]
Testing SqlSensor & dealing with str.startswith
This was one of those “Of course multiple values can be passed/checked!”-type situations. I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task. Here’s how the test originally looked testing for just the […]
Creating Dynamic Number of Tasks at DAG Runtime
There is one DAG that I wrote–and it’s one of the first DAGs that I ever put into production–that has tasks that are dynamically allocated. What I mean is that there are a number of very similar tasks that I need to perform as quickly as possible, thus they need to run in parallel. The […]
Using pytest.fixture to Test Airflow Context
I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True), such as BranchPythonOperator orPythonOperator. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the […]