JUST DO IT!

Airflow ์—๋Ÿฌ ๋ฐœ์ƒ ์‹œ Slack ์•Œ๋ฆผ๋ฐ›์•„๋ณด๊ธฐ - TIL230619 ๋ณธ๋ฌธ

TIL

Airflow ์—๋Ÿฌ ๋ฐœ์ƒ ์‹œ Slack ์•Œ๋ฆผ๋ฐ›์•„๋ณด๊ธฐ - TIL230619

sunhokimDev 2023. 6. 21. 17:36

๐Ÿ“š KDT WEEK 12 DAY 1 TIL

  • Airflow์—์„œ Slack ์•Œ๋ฆผ ์ฒ˜๋ฆฌํ•˜๊ธฐ

๐ŸŸฅ Slack ์•Œ๋ฆผ ์ฒ˜๋ฆฌํ•˜๊ธฐ

 

1. Slack API ํŽ˜์ด์ง€์—์„œ ์•Œ๋ฆผ๋ฐ›์„ ์ฑ„๋„ ์„ค์ •ํ•˜๊ธฐ

 

๋จผ์ €, Slack API ํŽ˜์ด์ง€์— ์ ‘์†ํ•œ๋‹ค.

https://api.slack.com/messaging/webhooks

 

Sending messages using Incoming Webhooks

Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some options.

api.slack.com

 

์˜ค๋ฅธ์ชฝ ์ƒ๋‹จ์˜ Your Apps๋ฅผ ๋ˆ„๋ฅด๊ณ , ์ƒˆ๋กœ์šด ์•ฑ์„ ๋งŒ๋“ ๋‹ค.

 

1 > 2 ์ˆœ์„œ๋Œ€๋กœ ๋ˆ„๋ฅธ๋‹ค.

 

์ฒซ ๋ฒˆ์งธ ์˜ต์…˜(From scratch)์„ ์‚ฌ์šฉํ•ด์„œ ๋งŒ๋“ค์–ด๋ณธ๋‹ค.

 

From scratch๋ฅผ ๋ˆ„๋ฅธ๋‹ค.

 

์•Œ๋ฆผ ๋ฐ›๊ณ ์žํ•˜๋Š” ์›Œํฌ์ŠคํŽ˜์ด์Šค๋ฅผ ์„ค์ •ํ•œ๋‹ค.

App Name์€ ์›ํ•˜๋Š” ์ด๋ฆ„์„ ๋„ฃ์–ด์ฃผ๋ฉด ๋œ๋‹ค.

 

๋‚˜๋Š” ํ…Œ์ŠคํŠธ์šฉ ์›Œํฌ์ŠคํŽ˜์ด์Šค๋ฅผ ํ•˜๋‚˜ํŒŒ์„œ, ์„ค์ •ํ•ด์ฃผ์—ˆ๋‹ค.

 

๋นจ๊ฐ„ ์ƒ์ž์•ˆ์˜ Incoming Webhooks๋ฅผ ๋ˆ„๋ฅธ๋‹ค.

 

Slack์— ์•Œ๋ฆผ์„ ๋ณด๋‚ด๊ธฐ ์œ„ํ•œ ์˜ต์…˜์ด๋‹ค.

 

Activate Incoming Webhooks๋ฅผ On ํ•ด์ค˜์•ผ ๋ฐ‘์œผ๋กœ ์ฑ„๋„์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋Š” ์˜ต์…˜์ด ์ƒ๊ธด๋‹ค.

On์„ ๋ˆŒ๋ €๋‹ค๋ฉด, ๋Š˜์–ด๋‚œ ์ฐฝ ์•„๋ž˜๋กœ Add New Webhook to Workspace ๋ฅผ ๋ˆŒ๋Ÿฌ ์ฑ„๋„์„ ์ถ”๊ฐ€ํ•ด๋ณด์ž.

 

1 > 2 ์ˆœ์„œ๋Œ€๋กœ ๋ˆ„๋ฅธ๋‹ค.

 

์•„๊นŒ ์„ค์ •ํ–ˆ๋˜ ์›Œํฌ์ŠคํŽ˜์ด์Šค์•ˆ์˜ ์ฑ„๋„๋“ค ์ค‘ ํ•˜๋‚˜๋ฅผ ๊ณ ๋ฅผ ์ˆ˜ ์žˆ๋‹ค.

์•Œ๋ฆผ๋ฐ›์„ ์ฑ„๋„์„ ๊ณจ๋ผ ์„ค์ •ํ•œ๋’ค, ํ—ˆ์šฉ ๋ฒ„ํŠผ์„ ๋ˆŒ๋Ÿฌ ์ถ”๊ฐ€ํ•œ๋‹ค.

 

์•Œ๋ฆผ๋ฐ›์„ ์ฑ„๋„์„ ์ƒˆ๋กœ ์ƒ์„ฑํ•ด์„œ ์„ค์ •ํ–ˆ๋‹ค.

 

์ฑ„๋„ ์„ค์ •์ด ์™„๋ฃŒ๋˜๋ฉด, ๊ทธ ์ฑ„๋„์˜ Webhook URL์ด ์ƒ๊ธด๋‹ค.

์ด URL์— json ํ˜•ํƒœ์˜ POST๋ฅผ ๋ณด๋‚ด๋ฉด, ๊ทธ ์ฑ„๋„์— ์•Œ๋ฆผ์„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๊ฒŒ๋œ๋‹ค.

Mac ์œ ์ €๋ผ๋ฉด Sample curl request to post to a channel ๋ถ€๋ถ„์„ ๋ณต์‚ฌํ•ด์„œ ํ„ฐ๋ฏธ๋„์—์„œ ์‹คํ—˜ํ•ด๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

์ด Webhook URL์„ ๋ณต์‚ฌํ•ด๋‘์ž

 

๋‚˜๋Š” ์ด Webhook URL์„ Airflow ์›น UI์— ๋‹ด์•„๋‘์—ˆ๋‹ค.

Admin > Variables ์—์„œ slack_url ์ด๋ž€ ์ด๋ฆ„์œผ๋กœ ๋‹ด์•„๋‘์—ˆ๋‹ค.

Webhook URL์˜ ์ „์ฒด๋ฅผ ๋‹ด์ง€ ์•Š๊ณ , ๊ทธ ์ฑ„๋„์˜ ์œ ๋‹ˆํฌ ID๋กœ ๋ณด์ด๋Š” ๋ถ€๋ถ„๋งŒ ์ €์žฅํ–ˆ๋‹ค.

 

 

2. Slack ์•Œ๋ฆผ์„ ๋ณด๋‚ผ ํŒŒ์ด์ฌ ์ฝ”๋“œ ์ž‘์„ฑํ•˜๊ธฐ

 

์ด์ œ๋Š” slack ์•Œ๋ฆผ์„ ๋ณด๋‚ด๊ธฐ ์œ„ํ•œ ํŒŒ์ด์ฌ ์ฝ”๋“œ๋ฅผ ํ•˜๋‚˜ ์ž‘์„ฑํ•ด๋ณด์ž.

๋‚˜๋Š” dags ํด๋”์•ˆ์— plugins ํด๋”๋ฅผ ๋งŒ๋“ค์–ด์„œ, slack.py ์ด๋ฆ„์œผ๋กœ ์ž‘์„ฑํ–ˆ๋‹ค.

 

from airflow.models import Variable
import logging
import requests

# dag ์‹คํŒจ์‹œ ํ˜ธ์ถœ
def on_failure_callback(context):

    text = str(context['task_instance'])
    text += "```" + str(context.get('exception')) +"```"
    send_message_to_a_slack_channel(text, ":scream:")

# slack์— ์•Œ๋ฆผ ๋ณด๋‚ด๊ธฐ
def send_message_to_a_slack_channel(message, emoji):
    url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
    headers = {
        'content-type': 'application/json',
    }
    data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
    r = requests.post(url, json=data, headers=headers)
    return r

 

dag ์‹คํŒจ์‹œ ์•ˆ์—์„œ ํ˜ธ์ถœ๋  on_failure_callback ํ•จ์ˆ˜์™€, slack์— ์•Œ๋ฆผ์„ ๋ณด๋‚ผ send_message_to_a_slack_channel ํ•จ์ˆ˜๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค.

 

on_failure_callbackํ•จ์ˆ˜๋Š” ํ˜ธ์ถœ๋˜๋ฉด์„œ airflow๊ฐ€ context ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ด์•„์ค€๋‹ค.

context์•ˆ์—๋Š” dag์˜ ์—ฌ๋Ÿฌ ์ •๋ณด๊ฐ€ ํฌํ•จ๋˜์–ด์žˆ๋Š”๋ฐ,

'task_instance' ๋กœ dag์˜ ์ด๋ฆ„์„ ๊ฐ€์ ธ์˜ค๊ณ , 'exception'์„ ํ†ตํ•ด ์—๋Ÿฌ ๋‚ด์šฉ์„ ๊ฐ€์ ธ์˜จ๋‹ค.

 

url์— ์•„๊นŒ ๋ถ€์—ฌ๋ฐ›์€ Webhook URL์„ ๋„ฃ์–ด์ค€๋‹ค.

ํ—ค๋”์— json์ด๋ผ๋Š” ์ •๋ณด๋ฅผ ๋‹ด๊ณ , request ๋ชจ๋“ˆ์„ ์ด์šฉํ•ด post๋กœ ๋ณด๋‚ด์ฃผ๋ฉด ๋์ด๋‹ค.

 

 

3. ์‹คํ–‰์‹œํ‚ฌ dag์—์„œ Slack ์•Œ๋ฆผ ์ฒ˜๋ฆฌํ•˜๊ธฐ

 

๋จผ์ € Slack ์•Œ๋ฆผ์˜ ๋™์ž‘ํ™•์ธ์„ ์œ„ํ•ด ์ผ๋ถ€๋Ÿฌ ์—๋Ÿฌ๋ฅผ ๋งŒ๋“ ๋‹ค.

์ค‘์š”ํ•œ ์ ์€, raise๋ฅผ ํ†ตํ•ด ์—๋Ÿฌ๋ฅผ ๊ผญ ์˜ฌ๋ ค์•ผ ํ•œ๋‹ค๋Š” ์ ์ด๋‹ค.

๋‚˜๋Š” sql์˜ INSERT์— I๋ฅผ ํ•˜๋‚˜ ๋”๋ถ™์—ฌ ์ผ๋ถ€๋Ÿฌ ์—๋Ÿฌ๋ฅผ ๋ƒˆ๋‹ค.

์ผ๋ถ€๋Ÿฌ INSERT ์•ž์— I๋ฅผ ํ•˜๋‚˜ ๋” ๋ถ™์—ฌ์„œ ์—๋Ÿฌ๋ฅผ ๋ฐœ์ƒ์‹œ์ผฐ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  ๊ฐ™์€ dag ํŒŒ์ด์ฌ ์ฝ”๋“œ์— ๋‹ค์Œ์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

 

from plugins import slack

...

dag = DAG(
	...
    
    default_args = {
        'on_failure_callback': slack.on_failure_callback, # ์ด ๋ถ€๋ถ„์„ ์ถ”๊ฐ€
    }
    
    ...
)

...

 

์•„๊นŒ slack ์•Œ๋ฆผ์„ ์œ„ํ•ด ์ž‘์„ฑํ–ˆ๋˜ slack.py๋ฅผ importํ•˜๊ณ ,

์ž‘์„ฑํ•œ DAG ๋ถ€๋ถ„์˜ default_args์— on_failure_callback ์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

slack ํŒŒ์ผ์•ˆ์— ์‹คํŒจ์‹œ ํ˜ธ์ถœ๋  ํ•จ์ˆ˜์ธ on_failure_callback์„ ์ง€์ •ํ–ˆ๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  dag๋ฅผ ์‹คํ–‰ํ•ด๋ณด๋ฉด..

 

CLI์—์„œ ์‹คํ–‰ ํ›„ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ๊ณ ,

 

์ •์ƒ์ ์œผ๋กœ Slack์— ์•Œ๋ฆผ์ด ๋‚ ์•„์™”๋‹ค!

 

 

++ ์˜ค๋ฅ˜

 

์›๋ž˜ ๋‚ด dag์˜ ํ˜•ํƒœ๋Š” ์•„๋ž˜์™€ ๊ฐ™์•˜๋‹ค.

 

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=3),
    'on_failure_callback': slack.on_failure_callback,
}

 

๊ทผ๋ฐ default_args์— retries๊ฐ€ ์žˆ์œผ๋ฉด ์Šฌ๋ž™์— ์•Œ๋ฆผ์ด ๊ฐ€์ง€์•Š์•˜๋‹คใ… 

GPT์—๊ฒŒ ๋ฌผ์–ด๋ณด๋‹ˆ ์žฌ์‹œ๋„ ํšŸ์ˆ˜๋ฅผ ๋ถ™์—ฌ์ฃผ๋ฉด on_failure_callback์ด ํ˜ธ์ถœ๋˜์ง€ ์•Š๋Š”๋‹ค๊ณ  ํ•œ๋‹ค..

 

 

์ฑ—GPT๋ฅผ ๋ฐฑํ”„๋กœ๋ฏฟ๋Š”๊ฑด ์•„๋‹ˆ์ง€๋งŒ.. ์žฌ์‹œ๋„ ํšŸ์ˆ˜๋ฅผ ํ•จ๊ป˜ ๋„ฃ์„ ๋ฐฉ๋ฒ•์€ ์—†์—ˆ์„๊นŒ? ๐Ÿฅฒ