Terriermon - Digimon

23. dag 작업 결과 slack 알림

2024. 12. 5. 10:41클라우드/GCP

gcp에서 dag 작업 결과를 slack으로 받고 싶다면?

 

필요한 dag 내용은 다음과 같다.

from airflow import DAG
import requests  # Slack 알림에 사용

# Slack 알림 함수
def send_slack_alert(context, status):
    webhook_url = "https://hooks.slack.com/xxxx
    task_instance = context.get('task_instance')
    dag_id = context.get('dag').dag_id
    task_id = context.get('task').task_id
    execution_date = context.get('execution_date')
    log_url = context.get('task_instance').log_url

    if status == "success":
        message = (
            f":white_check_mark: *Task Succeeded in Airflow!*\n"
            f"*DAG*: `{dag_id}`\n"
            f"*Task*: `{task_id}`\n"
            f"*Execution Time*: `{execution_date}`"
        )
    elif status == "failure":
        message = (
            f":red_circle: *Task Failed in Airflow!*\n"
            f"*DAG*: `{dag_id}`\n"
            f"*Task*: `{task_id}`\n"
            f"*Execution Time*: `{execution_date}`\n"
            f"*Log URL*: <{log_url}|Click here for details>"
        )
    
    response = requests.post(
        webhook_url,
        json={"text": message},
        headers={"Content-Type": "application/json"}
    )
    if response.status_code != 200:
        raise ValueError(
            f"Request to Slack returned an error {response.status_code}, response: {response.text}"
        )

# 성공 및 실패 시 Slack 알림을 보낼 콜백 함수 정의
def on_failure_callback(context):
    send_slack_alert(context, status="failure")

def on_success_callback(context):
    send_slack_alert(context, status="success")

# DAG 기본 설정
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "on_failure_callback": on_failure_callback,  # 실패 시 알림
    "on_success_callback": on_success_callback,  # 성공 시 알림
}

 

 

 Slack 알림 함수 중에
    webhook_url = "https://hooks.slack.com/xxxx"  항목이 있는데,

webhook 을 생성해야 한다.

 

https://api.slack.com/ 로 접속

 

로그인 후 your apps 클릭

app name 설정 및 workspace 선택

 

webhooks 사용 on 으로 변경

 

add new webhook to worksapce 클리

 

url이 생성된다. copy

 

slack 에 이렇게 알림이 온다.

클릭하면 이렇게 나옴

앱으로 이동하기 클릭

 

 

작업 성공 시 알림

 

실패시 알림

반응형