Dan Vatterott

Data Scientist

Custom Email Alerts in Airflow

Apache Airflow is great for coordinating automated jobs, and it provides a simple interface for sending email alerts when these jobs fail. Typically, one can request these emails by setting email_on_failure to True in your operators.

These email alerts work great, but I wanted to include additional links in them (I wanted to include a link to my spark cluster which can be grabbed from the Databricks Operator). Here’s how I created a custom email alert on job failure.

First, I set email_on_failure to False and use the operators’s on_failure_callback. I give on_failure_callback the function described below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from airflow.utils.email import send_email

def notify_email(contextDict, **kwargs):
    """Send custom email alerts."""

    # email title.
    title = "Airflow alert: {task_name} Failed".format(**contextDict)

    # email contents
    body = """
    Hi Everyone, <br>
    <br>
    There's been an error in the {task_name} job.<br>
    <br>
    Forever yours,<br>
    Airflow bot <br>
    """.format(**contextDict)

    send_email('[email protected]', title, body)

send_email is a function imported from Airflow. contextDict is a dictionary given to the callback function on error. Importantly, contextDict contains lots of relevant information. This includes the Task Instance (key=’ti’) and Operator Instance (key=’task’) associated with your error. I was able to use the Operator Instance, to grab the relevant cluster’s address and I included this address in my email (this exact code is not present here).

To use the notify_email, I set on_failure_callback equal to notify_email.

I write out a short example airflow dag below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from airflow.models import DAG
from airflow.operators import PythonOperator
from airflow.utils.dates import days_ago

args = {
  'owner': 'me',
  'description': 'my_example',
  'start_date': days_ago(1)
}

# run every day at 12:05 UTC
dag = DAG(dag_id='example_dag', default_args=args, schedule_interval='0 5 * * *')

def print_hello():
  return 'hello!'

py_task = PythonOperator(task_id='example',
                         python_callable=print_hello,
                         on_failure_callback=notify_email,
                         dag=dag)

py_task

Note where set on_failure_callback equal to notify_email in the PythonOperator.

Hope you find this helpful! Don’t hesitate to reach out if you have a question.

Comments