While generally my python functions called by a PythonOperator end with return True, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging. The problem is “How do you test that […]
Category: Uncategorized
Using PostgresHook.copy_expert() method
Another day, another interesting data source to perform ETL on. This time, it’s a CSV file that’s downloaded from a remote server. Not going to focus on the part about using requests to download the file, but jumping right into using PostgresHook.copy_expert() to dump the CSV file into a staging table. Even the actual code_export […]
Getting Credit Card BIN/IIN Information with Python
There are free (& paid) services that allow you to submit API calls with partial credit card numbers to return information about the credit card, like type (Visa, MC, Amex), the issuing bank, the country & currency where the card was issued, etc. The problem is that the free services either limit what data is […]
A Tale of Printing Two Euros
There are 2 different Airflow DAGs that I maintain which generate reports that are delivered via email. And in both reports, the financial information needs to be reported in both USD & EUR currency. One of the reports involves running a SQL query, which loads the results into a pandas dataframe, then performs a few […]
Navigating Past Seemingly Unrecoverable Tables with pg_restore
We happen to run our ETL processes only once per day. So that allows us to simply use pg_dump for our database backups. But there’s the old adage “Your backups are only as good as your last restore.” So I do a restore of the production server backup on the development server at least every […]
Custom Airflow Operators for Loading Data Into PostgreSQL
While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote […]
Testing SqlSensor & dealing with str.startswith
This was one of those “Of course multiple values can be passed/checked!”-type situations. I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task. Here’s how the test originally looked testing for just the […]
Creating Dynamic Number of Tasks at DAG Runtime
There is one DAG that I wrote–and it’s one of the first DAGs that I ever put into production–that has tasks that are dynamically allocated. What I mean is that there are a number of very similar tasks that I need to perform as quickly as possible, thus they need to run in parallel. The […]
Using pytest.fixture to Test Airflow Context
I’ve been wanting to figure out a way that will allow me to test tasks that reference Airflow context (via provide_context=True), such as BranchPythonOperator orPythonOperator. What I’ve come up with is a fixture that returns a dictionary. (In my example below, I perhaps went a little overboard with configuring it to return all of the […]
The Mess That Is PostgreSQL Foreign Data Wrappers, tds_fdw, & diacritic characters
This post may seem off-topic, however, after spending considerable time over the past week grappling with getting data containing text with diacritic remarks–in my case, German names & email addresses that contain ß, ä, ö, and/or ü–capturing some notes about the issue would be worth it in case I ever need to deal with this […]