While generally my python functions called by a PythonOperator end with return True, sometimes I would like for them to emit some useful information (like a count of how many objects it finished working on, rows loaded into a table, etc.) to the airflow logs via logging. The problem is “How do you test that […]
Tag: Python
Using PostgresHook.copy_expert() method
Another day, another interesting data source to perform ETL on. This time, it’s a CSV file that’s downloaded from a remote server. Not going to focus on the part about using requests to download the file, but jumping right into using PostgresHook.copy_expert() to dump the CSV file into a staging table. Even the actual code_export […]
Getting Credit Card BIN/IIN Information with Python
There are free (& paid) services that allow you to submit API calls with partial credit card numbers to return information about the credit card, like type (Visa, MC, Amex), the issuing bank, the country & currency where the card was issued, etc. The problem is that the free services either limit what data is […]
Custom Airflow Operators for Loading Data Into PostgreSQL
While the ETL I am responsible for takes advantage of PostgreSQL’s foreign data wrappers to simplify (avoid?) extracting from one database into another, I was recently tasked with an interesting project to track (changes in) the schemas of the remote databases proving the source data. This meant reaching out to the information_schema of those remote […]
Testing SqlSensor & dealing with str.startswith
This was one of those “Of course multiple values can be passed/checked!”-type situations. I’ve written a couple new DAGs which use SqlSensor, and I want to use my existing test, test_database_operators_have_sql(), to make sure that I am passing a SQL statement to the SqlSensor task. Here’s how the test originally looked testing for just the […]
Revisiting “Database Operators Have SQL” test
Back in my first post, https://learningtotest.com/2019/06/13/3-easy-airflow-tests/, the third test made sure that there was valid SQL attached to the task. I’ve come to realize that a downside to that particular snippet is that the output doesn’t tell you the name of the task that failed, but returns output like this: That’s not very helpful! We […]
Send Slack Message When A Task Fails
I configure all of my DAGs to slack me a message when they finish successfully, but what about when a task in a DAG fails? Sure, there’s the built in ability to send an email, but wouldn’t it be nice to slack a failed message instead? Turns out, it’s not so hard to do… Simply […]
Verifying Task Order Within The DAG
One of my favorite tests for a DAG is the one to confirm the connectivity between tasks. If there is a limitation to this particular implementation, it’s that it only works with rather linear DAGs. (So if you DAG contains a number of forks & merges, this probably isn’t going to work as shown.) Linear […]