Dan Vatterott

Data Scientist

Custom Email Alerts in Airflow

Apache Airflow is great for coordinating automated jobs, and it provides a simple interface for sending email alerts when these jobs fail. Typically, one can request these emails by setting email_on_failure to True in your operators.

These email alerts work great, but I wanted to include additional links in them (I wanted to include a link to my spark cluster which can be grabbed from the Databricks Operator). Here's how I created a custom email alert on job failure.

First, I set email_on_failure to False and use the operators's on_failure_callback. I give on_failure_callback the function described below.

 from airflow.utils.email import send_email

 def notify_email(contextDict, **kwargs):
     """Send custom email alerts."""

     # email title.
     title = "Airflow alert: {task_name} Failed".format(**contextDict)

     # email contents
     body = """
     Hi Everyone, <br>
     <br>
     There's been an error in the {task_name} job.<br>
     <br>
     Forever yours,<br>
     Airflow bot <br>
     """.format(**contextDict)

     send_email('[email protected]', title, body)

send_email is a function imported from Airflow. contextDict is a dictionary given to the callback function on error. Importantly, contextDict contains lots of relevant information. This includes the Task Instance (key='ti') and Operator Instance (key='task') associated with your error. I was able to use the Operator Instance, to grab the relevant cluster's address and I included this address in my email (this exact code is not present here).

To use the notify_email, I set on_failure_callback equal to notify_email.

I write out a short example airflow dag below.

 from airflow.models import DAG
 from airflow.operators import PythonOperator
 from airflow.utils.dates import days_ago

 args = {
   'owner': 'me',
   'description': 'my_example',
   'start_date': days_ago(1)
 }

 # run every day at 12:05 UTC
 dag = DAG(dag_id='example_dag', default_args=args, schedule_interval='0 5 * * *')

 def print_hello():
   return 'hello!'

 py_task = PythonOperator(task_id='example',
                          python_callable=print_hello,
                          on_failure_callback=notify_email,
                          dag=dag)

 py_task

Note where set on_failure_callback equal to notify_email in the PythonOperator.

Hope you find this helpful! Don't hesitate to reach out if you have a question.

Comments