Category: Uncategorized

Testing for logging Output from PythonOperator Functions

While generally my python functions called by a PythonOperator end with return True, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging. The problem is “How do you test that […]

Using PostgresHook.copy_expert() method

Another day, another interesting data source to perform ETL on. This time, it’s a CSV file that’s downloaded from a remote server. Not going to focus on the part about using requests to download the file, but jumping right into using PostgresHook.copy_expert() to dump the CSV file into a staging table. Even the actual code_export […]

Getting Credit Card BIN/IIN Information with Python

There are free (& paid) services that allow you to submit API calls with partial credit card numbers to return information about the credit card, like type (Visa, MC, Amex), the issuing bank, the country & currency where the card was issued, etc. The problem is that the free services either limit what data is […]

Navigating Past Seemingly Unrecoverable Tables with pg_restore

We happen to run our ETL processes only once per day. So that allows us to simply use pg_dump for our database backups. But there’s the old adage “Your backups are only as good as your last restore.” So I do a restore of the production server backup on the development server at least every […]

Custom Airflow Operators for Loading Data Into PostgreSQL

While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote […]

Testing SqlSensor & dealing with str.startswith

This was one of those “Of course multiple values can be passed/checked!”-type situations. I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task. Here’s how the test originally looked testing for just the […]

Using pytest.fixture to Test Airflow Context

I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True), such as BranchPythonOperator orPythonOperator. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the […]

The Mess That Is PostgreSQL Foreign Data Wrappers, tds_fdw, & diacritic characters

This post may seem off-topic, however, after spending considerable time over the past week grappling with getting data containing text with diacritic remarks–in my case, German names & email addresses that contain ß, ä, ö, and/or ü–capturing some notes about the issue would be worth it in case I ever need to deal with this […]