I configure all of my DAGs to slack me a message when they finish successfully, but what about when a task in a DAG fails? Sure, there’s the built in ability to send an email, but wouldn’t it be nice to slack a failed message instead?
Turns out, it’s not so hard to do… Simply create a function that creates a SlackAPIPostOperator
object & then add an on_failure_callback
reference in your DAG definition!
[All of the usual docstrings & imports for your DAG go here]
def send_slack_message_failed(context):
"""Send Slack message when a task fails.
"""
return SlackAPIPostOperator(
task_id="send_slack_message_task_failed",
token=Variable.get("slack_token"),
channel=Variable.get("slack_status_channel"),
username="BI-ETL-airflow",
text=f":red_circle: TASK {context.get('task')} IN {context.get('dag')} "
+ "FAILED! ATTENTION NEEDED!",
).execute(context=context)
default_args = { ... }
with DAG(
dag_id="my_awesome_dag_that_sometimes_fails",
default_args=default_args,
schedule_interval="@daily",
) as dag:
task1 = PostgresOperator(
task_id="truncate_mysterious_table",
sql="TRUNCATE TABLE mysterious;",
on_failure_callback=send_slack_message_failed,
)
I feel like I should also mention that I generally prefer in most DAGs for it to not contain any functions, as my preference is to separate the connection of the tasks from the work actually performed by the tasks. However, this is a worthy exception as potentially all tasks in the DAG could invoke send_slack_message_failed()
.
So when a task goes wrong, here’s a screenshot of the slack message:
I should also mention that because the message sent includes the name of the task, I found that the only way to get the correct value passed was to add on_failure_callback
statements to all the individual tasks! (If you wish to have a more generic message though, you could simply add on_failure_callback
in the DAG()
definition.)