SlackAPIPostOperator 를 사용하여 slack을 연결하여 실패 메세지 보내기
많은 DAG에 적용하는 방법
-> 모듈 설치하기
pip install apache-airflow-providers-slack
pip install apache-airflow-providers-slack
-> 모듈 import 하기
from airflow.operators.slack_operator import SlackAPIPostOperator
1. slack에서 작업
slack 가입 후 Token 추가
(OAuth & Permissions 에 있는 토큰을 추가)
2. airflow에서 작업
sourcecode 안에서 바로, api 정보를 넣어줘도 되지만,
airflow Connections 을 작성하여 여러번, 다양한 코드에서 호출이 가능한 구조로 만듬.(재사용성을 높임)
3. dag script 작성
많은 dag에 적용하기 위해 file 분리.
-> alert.py파일을 생성하고 아래의 코드를 작성
- alert.py
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
class SlackAlert:
def __init__(self, channel):
self.slack_channel = channel
self.slack_token = BaseHook.get_connection('slack').password
def slack_fail_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=self.slack_channel,
token=self.slack_token,
text="""
:red_circle: Task Failed. :red_circle:
*⏰ {dag} ⏰*
*Task*: {task}
*Execution Time*: {exec_date}
*Error log: {task_instance}
""".format(
dag=context.get('task_instance').dag_id,
task=context.get('task_instance').task_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
)
return alert.execute(context=context)
** call back 함수의 context ->get_template_context 에서 확인 가능
main_slack.py
from airflow.contrib.hooks.ssh_hook import SSHHook
# ## ⏰ slack alarm ⏰
from alert import SlackAlert
## 로컬 타임존 생성
local_tz = pendulum.timezone("Asia/Seoul")
# ## ⏰ slack alarm ⏰
alert = SlackAlert('#notification')
default_args = {
'owner': 'airflow_slack',
'depends_on_past': False,
'start_date': datetime(2021, 12, 28 ,tzinfo=local_tz), #dag생성일자
'email_on_failure': False,
'email_on_retry': False,
'catchup': False,
'wait_for_downstream': False,
# ⏰ slack alarm ⏰
'on_failure_callback': alert.slack_fail_alert,
}
dag main_slack.py 안에서 에러가 난다면, slack_fail_alert이 작동.
반응형
'Infra > MLops' 카테고리의 다른 글
[MLOps][Infra]MLOps에 jenkins CI/CD 적용 가능한 구조 고르기 (0) | 2022.01.18 |
---|---|
[airflow][crontab] 시간 설정 (0) | 2022.01.18 |
[airflow] Dag간 연결 하기 (Connecting between Dags) ⏰ (0) | 2022.01.10 |
[airflow] mysql연결하기 (0) | 2022.01.10 |
[airflow]task Group in Airflow 2.0 ⏰ (0) | 2022.01.10 |