Apache Airflow is great for coordinating automated jobs, and it provides a simple interface for sending email alerts when these jobs fail. Typically, one can request these emails by setting email_on_failure
to True
in your operators.
These email alerts work great, but I wanted to include additional links in them (I wanted to include a link to my spark cluster which can be grabbed from the Databricks Operator). Here's how I created a custom email alert on job failure.
First, I set email_on_failure
to False
and use the operators's on_failure_callback
. I give on_failure_callback
the function described below.
from airflow.utils.email import send_email
def notify_email(contextDict, **kwargs):
"""Send custom email alerts."""
# email title.
title = "Airflow alert: {task_name} Failed".format(**contextDict)
# email contents
body = """
Hi Everyone, <br>
<br>
There's been an error in the {task_name} job.<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(**contextDict)
send_email('[email protected]', title, body)
send_email
is a function imported from Airflow. contextDict
is a dictionary given to the callback function on error. Importantly, contextDict
contains lots of relevant information. This includes the Task Instance (key='ti') and Operator Instance (key='task') associated with your error. I was able to use the Operator Instance, to grab the relevant cluster's address and I included this address in my email (this exact code is not present here).
To use the notify_email
, I set on_failure_callback
equal to notify_email
.
I write out a short example airflow dag below.
from airflow.models import DAG
from airflow.operators import PythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'me',
'description': 'my_example',
'start_date': days_ago(1)
}
# run every day at 12:05 UTC
dag = DAG(dag_id='example_dag', default_args=args, schedule_interval='0 5 * * *')
def print_hello():
return 'hello!'
py_task = PythonOperator(task_id='example',
python_callable=print_hello,
on_failure_callback=notify_email,
dag=dag)
py_task
Note where set on_failure_callback
equal to notify_email
in the PythonOperator
.
Hope you find this helpful! Don't hesitate to reach out if you have a question.