Squashing git Commits In The Same Branch

I am pretty darn frustrated after having spent more than an hour trying to clean up my mess from a 5 second code change. At work we are now supposed to squash (rebase?) multiple commits in a single branch into a single commit. Before starting at this job, I had done exactly zero rebases in git. Ever. And much like any new action in git, it tends to make me overly paranoid that I might screw up the project.

An overwhelming amount of information I found online for using git rebase involved multiple branches, which is not what I wanted to accomplish. I would like to think that there is an easier way than what I’m about to describe here–and I may very well update/edit this article if that happens–but for now, this is what works for me:

  1. Ensure that you are on the right branch.
  2. Start an interactive rebase.
  3. Squash the commit(s).
  4. Update the commit message if you want to.
  5. Push the changes to remote.

Check out the branch with the commits that you want to rebase

We want to checkout the most recent commit in the branch containing the commits you want to squash.

git checkout your_branch

Start interactive rebase

You need to know how many commits that you want to squash together. That number of commits you want to squash will be referenced at the end of your git rebase command. So if we want to squash the last 2 commits together, we would use the following:

git rebase --interactive HEAD~2

This will open your text editor (most likely vim) with a list of commits & what ought to be some helpful information as to what you need to do.

Squash the commits

VERY, VERY IMPORTANT: git rebase works top down. So the top most commit is the one you want to pick (aka “keep”), and the commits below it will be the one(s) you want to squash into that top-most commit.

So you are presented with a list of (at least 2) commits. Each of those lines start with the word “pick” followed by a (partial) hash of the commit & the first line of the commit message for that hash. Any lines here that start with ‘#’ are comments & can be ignored.

  • DO NOT CHANGE OR REMOVE THE TOPMOST UNCOMMENTED LINE. Merely confirm that it is the (oldest) commit you wish to retain in this branch.
  • Change pick to squash for every commit below the first one that you want to squash into that first commit. (Leave the hash & commit message alone!)
  • Save your changes & exit.

Upon saving your changes it will (re)open your text editor to allow you how you want to preserve the commit message.

Update the commit message

Again, any lines that start with ‘#’ are comments that will not be retained in the resultant commit message. By default (just like in the previous editor session), it will display commit messages from oldest at the top to newest at the bottom. You can remove the commented lines if you want, but they should be discarded automatically anyway. Odds are you probably won’t want/need to change your commit messages, but you can do so now if you wish.

Save your changes & exit the editor when you are satisfied with how your commit message looks. This will exit git rebase.

If you happen to look at your project commit tree, it will probably look like you made a terrible mess. This is not what you were intending to do & it will probably look like you just wrecked your project, not just your branch! But I can assure you, we’re going to fix that now. And it’s (thankfully!) easy.

Push changes to remote

First, DON’T PANIC. git probably is showing that the rebased commits & remote commits are now on separate branches that also happen to have the same name. One more time: DON’T PANIC.

Just push your changes to remote with:

git push origin +HEAD

This will display some information about counting, compressing & writing objects then a series of lines starting with remote: referencing create a pull request for your branch. But if you look at the project tree again, you will see that order has been restored & that what was once multiple commits is now just a single commit.

Mission accomplished & now carry on with your day…

Update as of July 30, 2021

I have since learned that–at least on GitLab–that much of the hassle here can largely be avoided by using Merge Requests–identical in all but name to GitHub’s Pull Requests. At the very bottom of the Merge Requests page, there are 2 very handy options that I would encourage checking before submitting:

  1. Squash all commits.
  2. Delete branch. (Doesn’t delete your work, just deletes the branch name. And while this sounds like a potentially Bad ThingTM because it makes it harder (but not impossible) to checkout that branch again, keep in mind that it doesn’t take effect on the remote until the Merge Request is approved…which implies the code has passed any tests & been QA’ed successfully.

Interviewing Data Engineer Candidates

At my current employer, as a part of the interviewing process we have a technical challenge & team interview (after an initial phone screen interview). One thing that I really appreciate about where I work now is that when it comes to considering data engineer candidates, we absolutely will bring in people that lack any experience with the databases and/or ETL tool that we use (Pentaho) as long as the candidate has quality previous data engineer experience.

Meanwhile I see–and have almost never been contacted for a phone screen myself–countless DE job postings that really are only interested in candidates that have experience with the database(s)/tool(s) that they are currently using. The biggest problem is that it’s very difficult if not impossible for an applicant to know at the time they submit their resume is whether or not the prospective employer is willing to accept candidates with specific experiences/skills or not. (Like I put in my last post, there is no penalty in applying for a DE job!)

That aside, there needs to be a way for a potential employer to determine whether a candidate has the skills to match all of those acronyms & claims on their resume, thus having a technical challenge.

Data Engineering Technical Challenge

Now if I (gave up a piece of my soul &) worked for one of those companies that was only looking for DE candidates with specific experiences & skills, I’m sure we’d have a technical challenge where we’d ask a candidate to perform a task in that tool. Thankfully–oh so thankfully!–that’s not how we roll.

The purpose of a technical challenge is that we really want to see how a candidate thinks. We felt that the best way to have a candidate show their DE skills is to have a challenge that is tool independent but speaking the universal language of SQL. So while little actual production work would ever be done purely in SQL, having an advanced grasp of SQL to carry out ETL actions should expose candidates who really understand what’s going on underneath any tool vs those who maybe rely too much/take for granted what ETL tools might actually be doing.

The premise of our technical challenge is pretty simple: we provide 2 tables. One table contains ~100 rows of “raw” source data; the other table contains a few rows of “production” data. The candidate is given a block of time to write a SQL query to transform & migrate as much of the source data into the production table as they can.

We do allow the candidate to ask questions to gain clarification, meanwhile they are free to google any technical/functional/syntax issues on their own…just like they would do in a real world situation.

The source table contains column names that are different from the target but are obviously named. There are a number of pitfalls & conversions in the source data requiring techniques such as:

  • Gracefully handling NULLs & duplicate values
  • Concatenating & sanitizing first & last names to a single column (FIRST_NAME | LAST_NAME -> “Last_name, First_name”)
  • Converting discrete integer values to text labels (1 & 2 -> “Male” & “Female”)
  • Splitting US ZIP codes into ZIP code & Plus_4 columns–and this one is tricky because not all ZIP codes given have the +4 and/or may not contain the leading zeroes
  • Convert 8-digit integers to date
  • Use those dates from the previous item to calculate the age in years
  • Perform an aggregate calculation

It has been pretty surprising–not to mention extremely insightful–to see where & why candidates get tripped up. And how do we do that? Well–especially in this time of the COVID-19 pandemic–we do the technical challenges over Zoom. That allows us to watch the candidate work–and for us to comment amongst ourselves about the decisions they make–in real time. And while we give the candidates 45 minutes for the challenge, it often takes maybe half that long to tell whether a candidate has the SQL chops or not. (And to date, only one candidate has completed the entire challenge in the allotted time. We know we’re asking a lot from a candidate so finishing isn’t expected or necessary if they demonstrate sound SQL skills with a well-thought out approach.)

Technical Challenge Code Smells

So while we feel that this technical challenge is DE-centric, I have found that there are plenty of times where employers will ask a candidate to perform some not-so-DE-specific tasks. Broadly, I’d suggest that you maybe not consider this employer because if they can’t even be bothered to ask you to perform a directly related technical challenge, what do you think it’s going to be like (if you were to accept the job) when it comes to the actual work?

Examples of technical challenge code smells (for DE):

  • Being asked to perform a technical challenge with no advanced warning (read: it should be made clear during a previous conversation/interview that there will be a technical challenge if nothing but allowing the candidate to mentally prepare for it).
  • (Re)implement any sorting algorithm or algorithm that already exists in a language’s standard library
  • Perform any type of task purely from memory (read: without the being able to search for help)
  • Perform any obscure or non-job related task
    • Once upon a time I was asked to perform a bunch of DDL queries (CREATE TABLE, ALTER INDEX, etc.) when applying for a data analyst position.
  • Perform a task using a technology that you in a previous conversation with the prospective employer expressly made known that you have little to no previous experience with

So if it isn’t obvious, these are all red flags that the prospective employer probably doesn’t understand or worse, doesn’t care what a data engineer actually is supposed to do.

Expectations Beyond The Technical Challenge

So with the technical challenge over, that’s no time for a candidate to relax as they are introduced to the team that they would be working with. While technical questions are still quite likely to be asked, it’s also when potentially suspect items on the candidate resume have a light shone on them for clarity (What did you do for those 6 months between your last 2 jobs? Tell us more about your experience using [fill in the blank of ETL tool or database]?) And this is a time for both sides to figure out if the candidate is going to be a good match, both technically & inter-personally, in the team. Does the candidate possess a skill set and/or business knowledge that may not already exist in the team? Could you see working with this person/these persons all day, five days a week? What is your approach to learning a new tool or technology?

And it’s respectful to manage a candidate’s expectations up front (remember: no surprise technical challenges!) & stick to a schedule–start on time & don’t run over schedule, at least not without asking if that is OK with the candidate. There’s nothing wrong with scheduling a second team interview for another day if you don’t feel comfortable with the candidate based on your last interview/conversation!

Getting Consensus

While the candidate has been vetted initially by HR and/or management, the team should have a considerable voice in deciding whether or not a candidate is offered because they will be the ones working with the candidate and also understand the requirements of the job better than anyone else in the organization. Getting the team together–and we do it immediately once the candidate has disconnected from the Zoom meeting, which has the benefit of having our thoughts be extremely fresh not to mention everyone on the team is already together! Usually these post-interview discussions take 15-20 minutes at most. Of course doing this generally keeps the speed of the hiring process high–whether extending an offer or declining–to move along to whatever the next step is & inform the candidate.

9 Rules To Interviewing For Data Engineer Jobs

This post is the first of two covering interviewing from 2 different perspectives: as the interviewee & the interviewer. It seems like there’s probably (at least!) one post a week on reddit’s /r/dataengineering asking for advice about getting a DE job, and many of the posts seem to hover around dealing with/getting past impostor syndrome. So I figure I’d share my experiences on both sides of the interview table. These are some simple rules that should help you find yourself in the candidate “hot seat” or even considering if not accepting a new position. There’s quite a bit that I don’t mention below that are just considered “normal” or “typical” (such as showing up for interviews in a suit) that still very much apply.

Rule #1: Always keep your resume up-to-date

Whatever you need to do up front to lessen the amount of effort to submit your resume, directly translates into you applying to more jobs. This also means keeping your profiles up to date on the job sites. Maintain copies of your resume in both Word & PDF formats, too. [Hint: the best time to update your resume is when you start a new job.]

Rule #2a: Always look for new opportunities

You never know when your next opportunity is going to fall into your lap. And it’s always easier (read: less pressure, less stressful) to find a job when you already have one than when you’re out of work. Apply to anything & everything that you might be interested in as there are no penalties for applying to jobs you think you may not be qualified for. (Read that last sentence a few more times.)

Rule #2b: Look for jobs that use technologies and/or industries that you already have experience with

All but one professional technical job I’ve ever had was due to someone at the company who hired me being familiar with somewhere that I worked before and/or the technology used at a previous job. My most current job, where I (asked for &) took a significant pay increase, was because I had both highly relevant pharmacy/healthcare experience and they use an ETL tool that I had significant experience using at a previous job.

Rule #3: Learn how to get past the “robot overlords” & get noticed by humans

This one is without a doubt the hardest part for most people. Be honest. Don’t put anything on your resume that you can’t comfortably discuss (more on that below). However, a human will never ever see your resume if it doesn’t contain some sizable fraction of the keywords they are looking for. What are they looking for, you ask? The answer is shockingly obvious: the exact same keywords they have in their job description! While tailoring your resume to every job submission is a definite pain, finding ways to easily but sensibly inject more applicable keywords for a potential future employer, the more likely you’ll get past the “robot overlords” (aka algorithms that pre-filter candidate submissions). Also, recall Rule #2a “there are no penalties for applying”, so guess what? There are also no penalties for submitting different versions of you resume. Your goal here is to get your resume in front of a human.

Rule #4: Interviewing is a skill that takes practice

Assuming that you are already working, the easiest way to practice is to interview candidates yourself. (Interviewing is also a skill worth developing, mind you.) Put time in asking the kinds of questions that will help you determine whether someone is going to be capable of doing that job. And then spend the same–if not more–time actually answering your own questions. Find meetups of people working in that field & pick their brain to learn what kinds of questions you might be asked.

Doesn’t matter if it’s a phone screen or in-person interview, give yourself plenty of time before & after an upcoming resume so that for that block of time you have only one thing on your mind: the interview. It can be going over some notes, or going over the company’s website, or just closing your eyes & meditating. Forget about the outside world. Preparation is the best way to handle a situation that you feel unsure of (more on that in rules #5a & #5b below). Be calm. Be focused.

And be prepared (post-phone screen) for some kind of technical practical. It may be on a whiteboard in person (terrible but many companies still do this) or an online based quiz or it may be a “thought experiment” where you walk someone through how you would approach a particular scenario. Most if not all are often designed to be difficult to finish in the allotted time. Therefore it’s based more on how far you get than completing the assignment/questions/scenario. If they let you use other tools to search for help and/or ask questions for clarification, use them when you get stuck! They want to see your thought process & troubleshooting abilities. Also, many of these practicals do not need to be completed in any order–so if you get stuck in one place, set it aside & work on another item or element, and come back to the hard part(s) after you’ve regained your confidence. Better yet, seek out the easiest parts first & get them out of the way regardless of order…after all, that’s probably how you’d do it “in the real world” right?! Lastly, SQL is the universal language of DE. Don’t get hung up on not knowing how to use some ETL tool. Think how you would solve the problem with SQL, and that should guide you towards a potential solution.

Rule #5a: Be confident in what you know; be confident in what you don’t know

Again, be honest. Give good, thoughtful answers to all questions. If you know a concept or technology then don’t be afraid to say so. However, if you do not know something, there are 3 magic words that you should not be afraid to say: “I don’t know.” If you just can’t bring yourself to admit that to stranger(s), then find some phrasing that you are comfortable with. For instance, there are things that I have done on my own time that I’d never done professionally, like using Docker. Therefore, I admitted in an interview “I have not used Docker professionally but I understand it conceptually & have tinkered with it for personal projects.”

Rule #5b: Have your answers ready for the difficult questions–and the good questions too!

We all have (perceived) “skeletons in our closet”, whether it’s having a job you were fired from, a sizable gap of time missing on your resume, etc. Items like that will almost certainly come up in an interview. Don’t try to hide from those incidents–it won’t work–but you can have answers prepared for those moments. Don’t take the question personally, instead give them the answer that you’ve prepared for. Once you have solid answers to those awkward questions, leveraging the same technique for “good” questions can provide the same benefits in an interview!

Rule #7: Know what you are worth & be ready to negotiate

Negotiating is hard, intimidating, awkward, & makes most people very uncomfortable. However, no one wants to get taken advantage of. Like anything else that’s hard & intimidating, this can be improved with some help & practice. A couple books about negotiating that I recommend are Chris Voss’s Never Split The Difference: Negotiating As If Your Life Depended On It and Start With No: The Negotiating Tools That The Pros Don’t Want You To Know by Jim Camp. And be ready for this situation at all times. I found myself negotiating my salary requirements at the tail end of the phone screen with my current employer! (That’s highly unusual as it’s more likely to happen near the end of a first or second in-person interview.)

But if I had to boil down negotiating to 4 simple tips, they would be:

  1. Figure out what the job you want is worth.
  2. Remain calm & don’t take anything personally.
  3. Use silence to your advantage. They might be more experienced, but it’s human nature for someone to always want to speak. Keep your mouth shut (& your facial expressions in check) for as long as you can.
  4. Do what you can to make the other side reveal their offer first. When pressed to (finally) share what you want, give them a range. And not just any range, but a range with odd numbers. (Voss’s book explains why giving a range with odd numbers works…and I can assure you, that it indeed works!)

Example of an odd salary range

Let’s say you want an annual salary of $80,000. You don’t just say “I want to get paid 80 thousand!” because it comes off as demanding. Instead offer them this: “I am looking to make from $78,397 to $82,649.” Giving them a range presents your offer as a choice of some amount over a ~$5,000 range. Could they be cheap & give you the low end of your range? Sure. But most employers like to start a new employee at nice round numbers…and the rounder, the better. So what’s the “roundest” number in that range? $80,000. You got what you wanted! [Bonus: prepare/memorize that range ahead of time–like rule #5b–so that you can express it calmly & confidently.]

Rule #8: Show up prepared for interviews–and interview them too!

Do as many things as possible to reduce your anxiety (that’s legal). Give yourself plenty of time to get to the interview on time, silence your damn phone(!), and bring a notepad, pen, & list of questions with you.

Especially the questions. You want to offload the burden of remembering a bunch of questions in your head. And while a fair number of your questions might come up from them asking you, others might also be answered in casual conversation. However, having a list of questions of everything you want to know about the job should be checked off the list so that you understand the expectations & environment that come with the job.

And there is one question that you need to ask & it must be the absolutely final thing you ask is this:

“Based on my resume & meeting with me today, what concerns do you have that may be holding you back from offering me this position?”

This is a magic question that invites the interviewer to speak their honest opinions of you. Then you have 2 things to do:

  1. Shut up. Let them speak. And absolutely, positively resist every fiber in your body to defend what they might say.
  2. Write down their comments right there in the moment. You will probably forget if you try to do this even a few minutes later. Plus, it makes you look serious & organized. Then when you get home, iterate over their feedback so you know what to improve or not do in your next interviews.

Rule #9: Resumes & job interviews are like dating…iterate, iterate, iterate and practice, practice, practice!

Most everyone has taken a job where they got paid less than what they were worth. Most everyone has had terrible previous work experiences, whether a co-worker, a manager, or the entire company culture. Fewer people have had a job that they hop out of bed every morning for. Well, just like dating to find that complimentary significant other, job hunting requires noticing any red flags as early as possible, figuring out what your wants & needs are, and continually improving yourself to handle larger opportunities. It may take multiple tries or there might be times that you feel like it’s never going to happen, but you cannot let that stop you. Figuring out what you need to improve upon, whether personal or technical, and then actually doing it better is key. That’s why it’s so important to ask that interview question in rule #8! Because once you leave the interview, you can forget about getting any quality feedback from the prospective employer.

Conclusion

So if it isn’t clear by now, let me say it one more time: Preparation, education, and iteration. Prepare for interviews by having answers to as many questions about your career as possible so that you avoid becoming flustered or lose your confidence. Educate yourself on what you need to know for the job & be a lifelong student–especially in any IT-related field where things are continually changing. And iterate often by tweaking your resume, getting better at negotiating, and answering a possible employer’s questions.

What have I overlooked? What gets you rattled when it comes to applying for a job or interviewing?

Explaining My Recent Silence…

The reason for the lack of Airflow and/or PostgreSQL posts is simple: I took a new job. I left my old data engineering (DE) job a few days before Christmas & started a new job at the beginning of January. While I greatly enjoyed many aspects of the old job, company, & people that I worked with, this new opportunity was simply entirely too good an offer to pass up & I left in a way that would perhaps allow me to return one day in the future if things didn’t work out for whatever reason. I’m still doing DE, but–at least for now–there’s no python, much less Airflow, which does bum me out a little. The new job runs all of their ETL on Pentaho Data Integration (aka PDI or kettle).

I’ve used Pentaho in the past at a previous job, and I’ll get into the things that I do & do not like about Pentaho’s ETL tool. But for now, I wanted to explain myself & let you know that the general details of what I might write about here will be on slightly different but still hopefully relevant DE topics.

One of those topics–and it was one of the major reasons for me accepting this new job opportunity–was to be part of an actual data team that works in an Agile software development environment. I’ve always previously been a data engineering “team of one”, so getting to split up the workload & in a way that should be leaner & more productive had great appeal to me.

Also, there’s a modest chance that I might just write a post or three about smaller (non-DE) python projects I’ve been working on at home in my personal time. The purpose of this blog has been & most likely always will be about me writing about interesting situations that I find & solve, so why limit myself to just professional problems & solutions?! And let’s face it, everyone has a lot more personal time due to COVID-19, social distancing, stay at home orders, & working from home (for those that are working from home).

Be safe.

Another Way to Dynamically Create New Tasks: Factory Methods

New project, new challenge. This time I wanted to create very similar tasks (in this case to drop indexes), however, I didn’t want to be locked in with doing those similar tasks at the same time (read: in parallel with help from an iterable).

So while I will not even pretend to be the originator of this idea (I know I’ve seen this style of creating tasks in a couple Medium articles that I cannot seem to find links for now), I finally had a need where it made sense to give it a try…

Really there is very little to do. The one caveat is that the factory function must be inside the DAG definition–it throws up a “I was expecting an Operator class but got a function instead”-type error…(presumably because it would have been missing all of the hidden kwargs that are assigned to a task Operator.)

In this particular case of needing to drop some indexes, I simply pass the name of the indexed column when I want to instantiate a PostgresOperator task & assign it a unique variable name for each instance.

with DAG(
    dag_id="my_new_dag",
    default_args=default_args,
    schedule_interval="@daily",
) as dag:

    def drop_index_factory(column: str) -> PostgresOperator:
        """Instantiate drop index task(s) when needed.
        """
        return PostgresOperator(
            task_id=f"drop_{column}_index",
            postgres_conn_id=staging_connection,
            sql=f"DROP INDEX IF EXISTS staging_table__{column};",
            on_failure_callback=send_slack_message_failed,
        )

    drop_fizz_index = drop_index_factory("fizz")

    [more tasks...]

    drop_buzz_index = drop_index_factory("buzz")
    drop_fizz_index >> OTHER_TASKS >> drop_buzz_index

So while this, in the right use cases is extremely effective, it does make for a “muddy” DAG definition–particularly if you wanted to do this for multiple actions, like adding another factory method to create an index! The big gain here is it makes your DAG code DRY (Don’t Repeat Yourself).

PostgreSQL Foreign Data Wrappers: file_fdw

If there’s one thing that I love about PostgreSQL, it’s foreign data wrappers (FDW). Being able to create a connection to another source–usually another database–can make the seemingly impossible possible. However, it’s essential to understand what they do–and may not/don’t do–before you go creating them on your production database.

Before I get any deeper into FDWs, I need to take a moment to potentially take a swipe at the COPY command in PostgreSQL. It really simplifies the act of pulling data in from a, usually CSV, file into a table (& the occasional export to a CSV file too). But anyone who’s done it a few times knows that there are some limitations to using COPY (one of the most prominent is that the command requires the database role using it to be a SUPERUSER). That’s not always possible or convenient, but it’s often overlooked because once you get that one file loaded into the database, you dismiss the headache & move on to the next task…

But what if you need to load CSV files on a regular basis? Like for some daily ETL. How would you make that happen? I recently played around with a couple methods in Airflow’s PostgresOperator(), copy_expert & bulk_load. It took a lot more wrangling to get it working the way I wanted with copy_expert than I’d originally anticipated. Perhaps there would be an easier way…?

At some point in my research, I happened upon a link to a (unable to find again) Stack Overflow answer that involved using file_fdw. I knew of its existence but never really thought about using it…mostly because it has this (supposed) limitation of only looking in the specified directory for a specific filename. Then it dawned on me: I found my use case!

This particular DAG involves downloading a log file from a remote server, using a BashOperator() to strip out just the data I want (sidebar: because the log file has a varying number of fields per line, using COPY to load the entire file wouldn’t work). But with this stripped down file, I’d already gone with the decision to give it the same file name with every DAG run so that it simply overwrites the contents.

With file_fdw (definitely the easiest FDW I’ve ever configured, not that really any of them are that hard anymore…), it just took this to get it working:

CREATE EXTENSION IF NOT EXISTS file_fdw;
CREATE SERVER hash_bytes FOREIGN DATA WRAPPER file_fdw;

CREATE FOREIGN TABLE IF NOT EXISTS wrk_hash_bytes (
    body_hash text
    , body_bytes int
) SERVER hash_bytes
OPTIONS (
    FILENAME '/mainstorage/etl/data/disk_usage/hash_bytes.csv'
    , FORMAT 'csv'
);

The one thing that I didn’t do (that is a normal practice for me) was to create a schema to create the foreign table(s) in. But because I was only creating the one table, I went ahead & created it in PUBLIC schema.

Finally, with the file in place, it was trivial to simply query the foreign table however I need to–namely read it into an indexed staging table! And that freed me up to use a more conventional PostgresOperator() task to do the actual work.

Update November 22, 2019

While I do stand by everything in this post, I do want to amend it with a rather important warning. I actually went back to using the copy_expert method of PostgresHook for this DAG because attempting to ingest a 100M+ line CSV file was not working well for me at all. If I tested with 1M rows, it would come back in ~12 seconds. So extrapolating out that it should take ~1200 seconds–20 minutes–to ingest that much data would be valid math but incorrect real world experience. It wound up taking over 4 hours(!) to finish. I wound up using split CLI command as the final step in my BashOperator() task/script to divide the data up into CSV files with up to 10M rows each & then using a for loop to create multiple PythonOperator() tasks to perform multiple smaller COPY-like actions. (That’s why we test!)

Testing Task Dependencies In Your DAG

While I have been using a simpler version in the past, it is only recently that I wrapped my head around how to simplify the creation–and really the maintenance–of upstream & downstream task dependencies in a DAG. The way I had been doing it involved writing these massive tuples with the task names over & over…

And it was even worse for DAGs that weren’t linear–in fact, from frustration I stopped maintaining that test at one point! Ultimately, it involved figuring out a way to build a list for each path through the DAG & then slice the list to create the upstream tasks & downstream tasks. And to aid in simplicity & future maintenance of the test, I used namedtuple from collections library to allow me to give the fields obvious names.

from collections import namedtuple
from pytest import mark, param


Task = namedtuple("Task", ["task", "upstream_task", "downstream_task", "name"])

task_paths = {  # Key = path name, Value = list of tasks in that path
    "first": [
        "EMPTY",  # This must be here for upstream task of "start" task!
        "start",
        "idempotency_check",
        "extract_first",
        "transform_first",
        "load_first",
        "send_slack_message_success",
        "EMPTY",  # This must be here for downstream_task of "send_slack_message_success" task!
    ],
    "second": [
        "EMPTY",
        "start",
        "extract_second_only",
        "send_slack_message_success",
        "EMPTY",
    ],  # Add any additional paths below here if necessary
}



def tasks_to_test(table_type):
    """This function creates a list of tuples with the task, its upstream task,
    downstream task, and name (for a pretty task identifier).
    """
    task_list = task_paths.get(table_type)
    tasks = task_list[1:-1]  # Creates a list that doesn't start/end with "EMPTY"
    upstream_tasks = task_list[:-1]  # Starts with "EMPTY" but doesn't end with it
    downstream_tasks = task_list[2:]  # End with "EMPTY" but doesn't start with it
    return [
        Task(t, u, d, f"{table_type}_{t}")
        for t, u, d in zip(tasks, upstream_tasks, downstream_tasks)
        if t != "EMPTY"
    ]


@mark.parametrize(
    "task_id, upstream_task_id, downstream_task_id",
    [
        param(task.task, task.upstream_task, task.downstream_task, id=task.name)
        for task_path in task_paths
        for task in tasks_to_test(task_path)
    ],
)
def test_task_dependencies(task_id, upstream_task_id, downstream_task_id):
    """Verify the upstream/downstream task_ids for tasks in DAG.

    If there is no upstream or downstream task_id, it will be "EMPTY".

    Where it is possible there may be multiple tasks that could run
    simultaneously, a task_id candidate is selected randomly.
    """
    dag_task_id = dag.get_task(task_id)
    upstream_task_ids = [task.task_id for task in dag_task_id.upstream_list] or "EMPTY"
    downstream_task_ids = [
        task.task_id for task in dag_task_id.downstream_list
    ] or "EMPTY"
    assert upstream_task_id in upstream_task_ids
    assert downstream_task_id in downstream_task_ids

This will based on the the non-“EMPTY” values in both lists, generate a total of 9 tests. A test will fail if any upstream or downstream tasks are not properly connected to each task.

The real magic starts here with tasks_to_test slicing up each list into a list of tasks, a list of upstream tasks, a list of downstream tasks, and then zipping them altogether into Task named tuples. Then the list comprehension in @mark.parametrize() decorator of test_task_dependencies finishes putting all the task data into the list of tuples needed for each test.

Marking Tests as “Happy” or “Sad” with pytest

I was watching the video of a talk on Advanced pytest from EuroPython 2019 where the presenter (also a pytest core developer) used some interesting decorators in his slides (actually his blog). Being rather familiar with @pytest.mark.parametrize(), it wasn’t entirely obvious at first what @pytest.mark.wip, @pytest.mark.happy, & @pytest.mark.sad were doing or functionality they provided. However, a small amount of patience–& checking out the pytest documentation–revealed that they were custom marks.

Before I get into the technical details, it’s worth mentioning that I’d recently been adding “Happy path” or “Sad path” in the docstrings of my tests. (“Happy path” implies that the object under test should work/pass; “sad path” for objects that should fail.) These marks move this need out of the docstring & place them at the top of the test. But what really sold me on these custom marks–and oh, by the way, he used @pytest.mark.wip is for any work-in-progress test(s)–is that you can run all tests with a particular custom mark like this:

# To run all "happy path" tests--regardless of what file(s), module(s),
# and/or classes they may be found in!

pytest -m happy

# To run all "sad path" tests

pytest -m sad

How awesome is that?!

(Also, the video later reveals that the @pytest.mark.wip was used with a pytest plugin that in his case, which would run that particular test 10X for additional consistency checks. Cool use!)

Why Setting an SLA for Your DAGs Might Be a Very Good Idea

While I feel like I’d already taken plenty of steps to be notified if there’s a problem when a DAG fails, I recently discovered that I hadn’t done anything to catch those scenarios where a DAG takes longer–or in my case, a lot longer–to complete (or fail) than normal. Thus, the need to learn how to set an SLA (service level agreement in support parlance) to account for these (rare) occurrences…

Configuring the SLA

First, things first, there is NOT a global SLA (within the default_args dictionary or DAG definition). The SLA must be set at the task level, like this:

...[imports, default_args dict, etc.]...

with DAG(
    dag_id="daily_email_report",
    catchup=False,
    default_args=default_args,
    schedule_interval="@daily",
) as dag:

    ...[other tasks]...

    send_email_report = PythonOperator(
        task_id="send_email_report",
        python_callable=send_email_report,
        sla=timedelta(hours=1),  # <=== SLA definition
        provide_context=True,
        on_failure_callback=send_slack_message_failed,
    )
  • Make sure that datetime.timedelta has been imported at the top of your DAG!

Yes, it really is that easy to define the SLA! You just have to assign the amount of time for the task to complete from the start of the DAG run (so in the case above, it will start at midnight–therefore it has until 1:00am to complete.

However, only assigning an SLA will generate an email being sent to whoever is listed in the default_args/DAG definition. And the body of the emails look like this:

Here's a list of tasks that missed their SLAs:
send_email_report on 2019-09-09T00:00:00+00:00
Blocking tasks:
wait_for_legacy_etl_complete on 2019-09-09T00:00:00+00:00

      =,             .=
     =.|    ,---.    |.=
     =.| "-(:::::)-" |.=
      \\__/`-.|.-'\__//
       `-| .::| .::|-'      Pillendreher
        _|`-._|_.-'|_       (Scarabaeus sacer)
      /.-|    | .::|-.\
     // ,| .::|::::|. \\
    || //\::::|::' /\\ ||
    /'\|| `.__|__.' ||/'\
   ^    \\         //    ^
        /'\       /'\
       ^             ^

And that email (including the ASCII bug) tells you what task didn’t complete on time & the name of the task that was active at the time of the SLA miss. That’s nice to know, but wouldn’t it be better if you could get a little more information and/or be notified another way?

Configuring sla_miss_callback

Just like on_retry_callback, on_failure_callback, etc., you can configure either a global or per-task sla_miss_callback. (And an obvious idea could be to send a Slack message much like I showed in this post.)

Warning About Configuring an SLA and/or sla_miss_callback

When setting an SLA, it does NOT cause the DAG (or task) to fail. It’s just there to bring attention to the fact that a DAG that is taking longer than intended or expected to complete.

Complex Formatting of Results in List or Dictionary Comprehensions (or Generators)

I love the power that comes with list (or dictionary) comprehensions or generators in Python.

I also despise the code pattern of iterating over object and appending the results to another list.

Let’s keep an example simple with a list comprehension performing a single calculation on each value in the original list:

# Given a list like this:
my_list = [1, 6, 23, 67, 7]

# Good:
results = [x + 1 for x in my_list]

# Bad:
results = []
for x in my_list:
    results.append(x + 1)

In either bits above, results = [2, 7, 24, 68, 8] but you have to admit, that the list comprehension is shorter, easier to write, & faster to understand what’s going on.

Filtering a list comprehension for a single result

If you want to filter out elements in your comprehension, and there’s only one condition you need to apply, you can do something like this:

my_list = [3, 6, 10, 15, 22, 24]

results = [
    x ** 2  # apply this calculation to the value
    for x in my_list
    if x % 3 == 0  # but only when x divides evenly by 3
]

Which returns results = [9, 36, 225, 576] (& tosses 10 & 22 from the results). That can certainly be useful, but what if you want to perform results that have more complex–and multiple–conditions and don’t remove any elements from the list?!

A solution like the one above didn’t cut if for a recent project I was working on. I needed to format the results of a list comprehension based on a set of multiple conditions. (While one approach I started down involved creating multiple list comprehensions–one for each set of output that I needed–and then merging the resulting lists together, the results at best were un-pythonic, & at worst, not only violating the DRY principle but also being terrible to maintain.)

I solved the issue by using a solution I’d done before but never made the connection in my head: Create a function outside the list comprehension & have it perform the necessary conditional formatting to return the results to the list comprehension!

Using functions to handle the complex formatting

def to_my_format(my_number: int) -> int:
    """Perform one of several particular calculations based on the input value given.
    """
    if my_number % 5 == 0:
        return my_number ** 3
    if my_number % 7 == 0:
        return my_number // 2
    return my_number - 1  # If my_number doesn't meet any of the criteria above, do this

my_list = [3, 6, 10, 15, 21, 24]

results = [to_my_format(x) for x in my_list]

Which in case you’re curious, gives us results = [2, 5, 1000, 3375, 3, 23]

So I end up with a list that contains as many elements as the original list (my_list), but one of 3 different calculations are performed on each element depending on the element itself.