์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Speculative Execution
- aws
- topic
- k8s
- backfill
- Spark SQL
- redshift
- CI/CD
- Spark Caching
- Salting
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- mysql
- Docker
- ๋น ๋ฐ์ดํฐ
- Spark
- Spark Partitioning
- Kafka
- off heap memory
- Dag
- SQL
- KDT_TIL
- AQE
- Kubernetes
- disk spill
- Spark ์ค์ต
- colab
- etl
- spark executor memory
- DataFrame Hint
- Airflow
- Today
- Total
JUST DO IT!
Airflow ์๋ฌ ๋ฐ์ ์ Slack ์๋ฆผ๋ฐ์๋ณด๊ธฐ - TIL230619 ๋ณธ๋ฌธ
๐ KDT WEEK 12 DAY 1 TIL
- Airflow์์ Slack ์๋ฆผ ์ฒ๋ฆฌํ๊ธฐ
๐ฅ Slack ์๋ฆผ ์ฒ๋ฆฌํ๊ธฐ
1. Slack API ํ์ด์ง์์ ์๋ฆผ๋ฐ์ ์ฑ๋ ์ค์ ํ๊ธฐ
๋จผ์ , Slack API ํ์ด์ง์ ์ ์ํ๋ค.
https://api.slack.com/messaging/webhooks
์ค๋ฅธ์ชฝ ์๋จ์ Your Apps๋ฅผ ๋๋ฅด๊ณ , ์๋ก์ด ์ฑ์ ๋ง๋ ๋ค.
์ฒซ ๋ฒ์งธ ์ต์ (From scratch)์ ์ฌ์ฉํด์ ๋ง๋ค์ด๋ณธ๋ค.
์๋ฆผ ๋ฐ๊ณ ์ํ๋ ์ํฌ์คํ์ด์ค๋ฅผ ์ค์ ํ๋ค.
App Name์ ์ํ๋ ์ด๋ฆ์ ๋ฃ์ด์ฃผ๋ฉด ๋๋ค.
๋นจ๊ฐ ์์์์ Incoming Webhooks๋ฅผ ๋๋ฅธ๋ค.
Activate Incoming Webhooks๋ฅผ On ํด์ค์ผ ๋ฐ์ผ๋ก ์ฑ๋์ ์ค์ ํ ์ ์๋ ์ต์ ์ด ์๊ธด๋ค.
On์ ๋๋ ๋ค๋ฉด, ๋์ด๋ ์ฐฝ ์๋๋ก Add New Webhook to Workspace ๋ฅผ ๋๋ฌ ์ฑ๋์ ์ถ๊ฐํด๋ณด์.
์๊น ์ค์ ํ๋ ์ํฌ์คํ์ด์ค์์ ์ฑ๋๋ค ์ค ํ๋๋ฅผ ๊ณ ๋ฅผ ์ ์๋ค.
์๋ฆผ๋ฐ์ ์ฑ๋์ ๊ณจ๋ผ ์ค์ ํ๋ค, ํ์ฉ ๋ฒํผ์ ๋๋ฌ ์ถ๊ฐํ๋ค.
์ฑ๋ ์ค์ ์ด ์๋ฃ๋๋ฉด, ๊ทธ ์ฑ๋์ Webhook URL์ด ์๊ธด๋ค.
์ด URL์ json ํํ์ POST๋ฅผ ๋ณด๋ด๋ฉด, ๊ทธ ์ฑ๋์ ์๋ฆผ์ ๋ณด๋ผ ์ ์๊ฒ๋๋ค.
Mac ์ ์ ๋ผ๋ฉด Sample curl request to post to a channel ๋ถ๋ถ์ ๋ณต์ฌํด์ ํฐ๋ฏธ๋์์ ์คํํด๋ณผ ์ ์๋ค.
๋๋ ์ด Webhook URL์ Airflow ์น UI์ ๋ด์๋์๋ค.
Admin > Variables ์์ slack_url ์ด๋ ์ด๋ฆ์ผ๋ก ๋ด์๋์๋ค.
Webhook URL์ ์ ์ฒด๋ฅผ ๋ด์ง ์๊ณ , ๊ทธ ์ฑ๋์ ์ ๋ํฌ ID๋ก ๋ณด์ด๋ ๋ถ๋ถ๋ง ์ ์ฅํ๋ค.
2. Slack ์๋ฆผ์ ๋ณด๋ผ ํ์ด์ฌ ์ฝ๋ ์์ฑํ๊ธฐ
์ด์ ๋ slack ์๋ฆผ์ ๋ณด๋ด๊ธฐ ์ํ ํ์ด์ฌ ์ฝ๋๋ฅผ ํ๋ ์์ฑํด๋ณด์.
๋๋ dags ํด๋์์ plugins ํด๋๋ฅผ ๋ง๋ค์ด์, slack.py ์ด๋ฆ์ผ๋ก ์์ฑํ๋ค.
from airflow.models import Variable
import logging
import requests
# dag ์คํจ์ ํธ์ถ
def on_failure_callback(context):
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
# slack์ ์๋ฆผ ๋ณด๋ด๊ธฐ
def send_message_to_a_slack_channel(message, emoji):
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
dag ์คํจ์ ์์์ ํธ์ถ๋ on_failure_callback ํจ์์, slack์ ์๋ฆผ์ ๋ณด๋ผ send_message_to_a_slack_channel ํจ์๋ฅผ ๋ง๋ค์๋ค.
on_failure_callbackํจ์๋ ํธ์ถ๋๋ฉด์ airflow๊ฐ context ํํ์ ๋ฐ์ดํฐ๋ฅผ ๋ด์์ค๋ค.
context์์๋ dag์ ์ฌ๋ฌ ์ ๋ณด๊ฐ ํฌํจ๋์ด์๋๋ฐ,
'task_instance' ๋ก dag์ ์ด๋ฆ์ ๊ฐ์ ธ์ค๊ณ , 'exception'์ ํตํด ์๋ฌ ๋ด์ฉ์ ๊ฐ์ ธ์จ๋ค.
url์ ์๊น ๋ถ์ฌ๋ฐ์ Webhook URL์ ๋ฃ์ด์ค๋ค.
ํค๋์ json์ด๋ผ๋ ์ ๋ณด๋ฅผ ๋ด๊ณ , request ๋ชจ๋์ ์ด์ฉํด post๋ก ๋ณด๋ด์ฃผ๋ฉด ๋์ด๋ค.
3. ์คํ์ํฌ dag์์ Slack ์๋ฆผ ์ฒ๋ฆฌํ๊ธฐ
๋จผ์ Slack ์๋ฆผ์ ๋์ํ์ธ์ ์ํด ์ผ๋ถ๋ฌ ์๋ฌ๋ฅผ ๋ง๋ ๋ค.
์ค์ํ ์ ์, raise๋ฅผ ํตํด ์๋ฌ๋ฅผ ๊ผญ ์ฌ๋ ค์ผ ํ๋ค๋ ์ ์ด๋ค.
๋๋ sql์ INSERT์ I๋ฅผ ํ๋ ๋๋ถ์ฌ ์ผ๋ถ๋ฌ ์๋ฌ๋ฅผ ๋๋ค.
๊ทธ๋ฆฌ๊ณ ๊ฐ์ dag ํ์ด์ฌ ์ฝ๋์ ๋ค์์ ์ถ๊ฐํ๋ค.
from plugins import slack
...
dag = DAG(
...
default_args = {
'on_failure_callback': slack.on_failure_callback, # ์ด ๋ถ๋ถ์ ์ถ๊ฐ
}
...
)
...
์๊น slack ์๋ฆผ์ ์ํด ์์ฑํ๋ slack.py๋ฅผ importํ๊ณ ,
์์ฑํ DAG ๋ถ๋ถ์ default_args์ on_failure_callback ์ ์ถ๊ฐํ๋ค.
slack ํ์ผ์์ ์คํจ์ ํธ์ถ๋ ํจ์์ธ on_failure_callback์ ์ง์ ํ๋ค.
๊ทธ๋ฆฌ๊ณ dag๋ฅผ ์คํํด๋ณด๋ฉด..
++ ์ค๋ฅ
์๋ ๋ด dag์ ํํ๋ ์๋์ ๊ฐ์๋ค.
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback,
}
๊ทผ๋ฐ default_args์ retries๊ฐ ์์ผ๋ฉด ์ฌ๋์ ์๋ฆผ์ด ๊ฐ์ง์์๋คใ
GPT์๊ฒ ๋ฌผ์ด๋ณด๋ ์ฌ์๋ ํ์๋ฅผ ๋ถ์ฌ์ฃผ๋ฉด on_failure_callback์ด ํธ์ถ๋์ง ์๋๋ค๊ณ ํ๋ค..
์ฑGPT๋ฅผ ๋ฐฑํ๋ก๋ฏฟ๋๊ฑด ์๋์ง๋ง.. ์ฌ์๋ ํ์๋ฅผ ํจ๊ป ๋ฃ์ ๋ฐฉ๋ฒ์ ์์์๊น? ๐ฅฒ
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Airflow์์์ Jinja Template - TIL230621 (0) | 2023.06.22 |
---|---|
Airflow REST API ๊ฐ๋จํ ์ฌ์ฉํด๋ณด๊ธฐ(with Python) - TIL230620 (0) | 2023.06.22 |
Docker Container ๊ด๋ฆฌ์ ์ค์ํ K8s ์์๋ณด๊ธฐ - TIL230616 (0) | 2023.06.20 |
์ Docker-compose๋ฅผ ์ฌ์ฉํ ๊น? - TIL230615 (0) | 2023.06.15 |
Docker Volume ์์๋ณด๊ณ ๊ฐ๋จํ ์ค์ตํด๋ณด๊ธฐ - TIL230614 (0) | 2023.06.15 |