Infra/MLops

[airflow] slack 연결하기-많은 dag에 적용

뚜둔뚜둔 2022. 1. 10. 11:31

SlackAPIPostOperator 를 사용하여 slack을 연결하여 실패 메세지 보내기

많은 DAG에 적용하는 방법

 

-> 모듈 설치하기

pip install apache-airflow-providers-slack

pip install apache-airflow-providers-slack

 

-> 모듈 import 하기

from airflow.operators.slack_operator import SlackAPIPostOperator

1. slack에서 작업

slack 가입 후 Token 추가

(OAuth & Permissions 에 있는 토큰을 추가) 

-> https://jisun-rea.tistory.com/entry/Slack-API-Slack-Bot-%EB%A7%8C%EB%93%A4%EA%B3%A0-Slack-%EB%A9%94%EC%84%B8%EC%A7%80-%EB%B3%B4%EB%82%B4%EA%B8%B0-Scopes-slacker

 

[Slack API] Slack Bot 만들고 Slack 메세지 보내기 / Scopes / slacker

기존에 Bot user을 추가하는 방식은 deprecated되었고, token scope을 지정하는 방식으로 바뀌었다. 그런데 공식 api tutorial도 업데이트가 잘 안되있어서 직접 정리해보았다. Slack 회원가입 후 새로운 worksp

jisun-rea.tistory.com

 

2. airflow에서 작업

sourcecode 안에서 바로, api 정보를 넣어줘도 되지만,

airflow  Connections 을 작성하여 여러번, 다양한 코드에서 호출이 가능한 구조로 만듬.(재사용성을 높임)

 

 

admin > connections > +(add) click

 

작성 후 save click

 

3. dag script 작성

많은 dag에 적용하기 위해 file 분리.

-> alert.py파일을 생성하고 아래의 코드를 작성

  • alert.py
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator

class SlackAlert:
    def __init__(self, channel):
        self.slack_channel = channel
        self.slack_token = BaseHook.get_connection('slack').password

    def slack_fail_alert(self, context):
        alert = SlackAPIPostOperator(
            task_id='slack_failed',
            channel=self.slack_channel,
            token=self.slack_token,
            
            text="""
            :red_circle: Task Failed. :red_circle:
            *⏰ {dag} ⏰*
            *Task*: {task}
            *Execution Time*: {exec_date}  
            *Error log: {task_instance}
              
                """.format(
                    dag=context.get('task_instance').dag_id,
                    task=context.get('task_instance').task_id,
                    exec_date=context.get('execution_date'),
                    log_url=context.get('task_instance').log_url,
                    )
                  )
        return alert.execute(context=context)

 ** call back 함수의 context ->get_template_context 에서 확인 가능

main_slack.py

from airflow.contrib.hooks.ssh_hook import SSHHook

# ## ⏰ slack alarm ⏰

from alert import SlackAlert

## 로컬 타임존 생성
local_tz = pendulum.timezone("Asia/Seoul")
# ## ⏰ slack alarm ⏰
alert = SlackAlert('#notification') 

default_args = {
    'owner': 'airflow_slack',
    'depends_on_past': False,
    'start_date': datetime(2021, 12, 28 ,tzinfo=local_tz), #dag생성일자
    'email_on_failure': False,
    'email_on_retry': False,
    'catchup': False,
    'wait_for_downstream': False,
    # ⏰ slack alarm ⏰
    'on_failure_callback': alert.slack_fail_alert,
}

dag main_slack.py 안에서 에러가 난다면, slack_fail_alert이 작동.

 

slack 알람

 

 

 

반응형