Gửi Slack Alerts trên Airflow
MarkdownView HTML version
Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.
# 1. Slack Incoming Webhooks và Airflow Connection
Truy cập Slack App Directory tìm Incoming Webhooks: `https://<workspace>.slack.com/apps/A0F7XDUAZ-incoming-webhooks`

Ở mục **Post to Channel** chọn Channel, sau đó bấm **Add Incoming Webhooks integration**

Sau đó bạn sẽ nhận được 1 URL có dạng:
https://hooks.slack.com/services/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2

Vào **Airflow** > **Admin** > **Connections** để thêm một connection mới
- Conn Id: `Slack`
- Conn Type: `HTTP`
- Host: `https://hooks.slack.com/services`
- Password: `/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2`

# 2. Slack alert Utils
Tạo file utils chứa function alert, ví dụ: `/dags/utils/slack_alert.py`
```python
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
Task Failed.
*DAG*: {dag_id}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
dag_id=context.get('dag').dag_id,
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
```
# 3. Config Slack alert cho từng DAG
Với mỗi DAG muốn alert, ta thêm thuộc tính `on_failure_callback` cho mỗi DAG. Ví dụ như dưới dây:
`example_dag.py`
```py
from airflow import DAG
...
from utils.slack_alert import task_fail_slack_alert
default_args = {
**params['default_args'],
'owner': DAG_OWNER,
'on_failure_callback': task_fail_slack_alert,
...
}
dag = DAG('dag_id', default_args=default_args)
...
```
Kết quả:

# Tham khảo
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- airflow.operators.slack_operator: [https://airflow.apache.org/\_modules/airflow/operators/slack_operator.html](https://airflow.apache.org/_modules/airflow/operators/slack_operator.html)
Chúc các bạn thành công.