If there’s one thing that I love about PostgreSQL, it’s foreign data wrappers (FDW). Being able to create a connection to another source–usually another database–can make the seemingly impossible possible. However, it’s essential to understand what they do–and may not/don’t do–before you go creating them on your production database.
Before I get any deeper into FDWs, I need to take a moment to potentially take a swipe at the COPY
command in PostgreSQL. It really simplifies the act of pulling data in from a, usually CSV, file into a table (& the occasional export to a CSV file too). But anyone who’s done it a few times knows that there are some limitations to using COPY
(one of the most prominent is that the command requires the database role using it to be a SUPERUSER
). That’s not always possible or convenient, but it’s often overlooked because once you get that one file loaded into the database, you dismiss the headache & move on to the next task…
But what if you need to load CSV files on a regular basis? Like for some daily ETL. How would you make that happen? I recently played around with a couple methods in Airflow’s PostgresOperator()
, copy_expert
& bulk_load
. It took a lot more wrangling to get it working the way I wanted with copy_expert
than I’d originally anticipated. Perhaps there would be an easier way…?
At some point in my research, I happened upon a link to a (unable to find again) Stack Overflow answer that involved using file_fdw
. I knew of its existence but never really thought about using it…mostly because it has this (supposed) limitation of only looking in the specified directory for a specific filename. Then it dawned on me: I found my use case!
This particular DAG involves downloading a log file from a remote server, using a BashOperator()
to strip out just the data I want (sidebar: because the log file has a varying number of fields per line, using COPY
to load the entire file wouldn’t work). But with this stripped down file, I’d already gone with the decision to give it the same file name with every DAG run so that it simply overwrites the contents.
With file_fdw
(definitely the easiest FDW I’ve ever configured, not that really any of them are that hard anymore…), it just took this to get it working:
CREATE EXTENSION IF NOT EXISTS file_fdw;
CREATE SERVER hash_bytes FOREIGN DATA WRAPPER file_fdw;
CREATE FOREIGN TABLE IF NOT EXISTS wrk_hash_bytes (
body_hash text
, body_bytes int
) SERVER hash_bytes
OPTIONS (
FILENAME '/mainstorage/etl/data/disk_usage/hash_bytes.csv'
, FORMAT 'csv'
);
The one thing that I didn’t do (that is a normal practice for me) was to create a schema to create the foreign table(s) in. But because I was only creating the one table, I went ahead & created it in PUBLIC
schema.
Finally, with the file in place, it was trivial to simply query the foreign table however I need to–namely read it into an indexed staging table! And that freed me up to use a more conventional PostgresOperator()
task to do the actual work.
Update November 22, 2019
While I do stand by everything in this post, I do want to amend it with a rather important warning. I actually went back to using the copy_expert
method of PostgresHook
for this DAG because attempting to ingest a 100M+ line CSV file was not working well for me at all. If I tested with 1M rows, it would come back in ~12 seconds. So extrapolating out that it should take ~1200 seconds–20 minutes–to ingest that much data would be valid math but incorrect real world experience. It wound up taking over 4 hours(!) to finish. I wound up using split
CLI command as the final step in my BashOperator()
task/script to divide the data up into CSV files with up to 10M rows each & then using a for
loop to create multiple PythonOperator()
tasks to perform multiple smaller COPY
-like actions. (That’s why we test!)