์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- AQE
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- off heap memory
- Spark
- DataFrame Hint
- KDT_TIL
- ๋น ๋ฐ์ดํฐ
- Salting
- Spark Caching
- Kubernetes
- SQL
- Dag
- Speculative Execution
- colab
- Spark ์ค์ต
- spark executor memory
- topic
- backfill
- Kafka
- mysql
- Docker
- aws
- disk spill
- redshift
- Spark SQL
- Spark Partitioning
- Airflow
- etl
- CI/CD
- k8s
- Today
- Total
JUST DO IT!
Airflow ์๋ฌ ๋ฐ์ ์ Slack ์๋ฆผ๋ฐ์๋ณด๊ธฐ - TIL230619 ๋ณธ๋ฌธ
๐ KDT WEEK 12 DAY 1 TIL
- Airflow์์ Slack ์๋ฆผ ์ฒ๋ฆฌํ๊ธฐ
๐ฅ Slack ์๋ฆผ ์ฒ๋ฆฌํ๊ธฐ
1. Slack API ํ์ด์ง์์ ์๋ฆผ๋ฐ์ ์ฑ๋ ์ค์ ํ๊ธฐ
๋จผ์ , Slack API ํ์ด์ง์ ์ ์ํ๋ค.
https://api.slack.com/messaging/webhooks
Sending messages using Incoming Webhooks
Creating an Incoming Webhook gives you a unique URL to which you send a JSON payload with the message text and some options.
api.slack.com
์ค๋ฅธ์ชฝ ์๋จ์ Your Apps๋ฅผ ๋๋ฅด๊ณ , ์๋ก์ด ์ฑ์ ๋ง๋ ๋ค.
์ฒซ ๋ฒ์งธ ์ต์ (From scratch)์ ์ฌ์ฉํด์ ๋ง๋ค์ด๋ณธ๋ค.
์๋ฆผ ๋ฐ๊ณ ์ํ๋ ์ํฌ์คํ์ด์ค๋ฅผ ์ค์ ํ๋ค.
App Name์ ์ํ๋ ์ด๋ฆ์ ๋ฃ์ด์ฃผ๋ฉด ๋๋ค.
๋นจ๊ฐ ์์์์ Incoming Webhooks๋ฅผ ๋๋ฅธ๋ค.
Activate Incoming Webhooks๋ฅผ On ํด์ค์ผ ๋ฐ์ผ๋ก ์ฑ๋์ ์ค์ ํ ์ ์๋ ์ต์ ์ด ์๊ธด๋ค.
On์ ๋๋ ๋ค๋ฉด, ๋์ด๋ ์ฐฝ ์๋๋ก Add New Webhook to Workspace ๋ฅผ ๋๋ฌ ์ฑ๋์ ์ถ๊ฐํด๋ณด์.
์๊น ์ค์ ํ๋ ์ํฌ์คํ์ด์ค์์ ์ฑ๋๋ค ์ค ํ๋๋ฅผ ๊ณ ๋ฅผ ์ ์๋ค.
์๋ฆผ๋ฐ์ ์ฑ๋์ ๊ณจ๋ผ ์ค์ ํ๋ค, ํ์ฉ ๋ฒํผ์ ๋๋ฌ ์ถ๊ฐํ๋ค.
์ฑ๋ ์ค์ ์ด ์๋ฃ๋๋ฉด, ๊ทธ ์ฑ๋์ Webhook URL์ด ์๊ธด๋ค.
์ด URL์ json ํํ์ POST๋ฅผ ๋ณด๋ด๋ฉด, ๊ทธ ์ฑ๋์ ์๋ฆผ์ ๋ณด๋ผ ์ ์๊ฒ๋๋ค.
Mac ์ ์ ๋ผ๋ฉด Sample curl request to post to a channel ๋ถ๋ถ์ ๋ณต์ฌํด์ ํฐ๋ฏธ๋์์ ์คํํด๋ณผ ์ ์๋ค.
๋๋ ์ด Webhook URL์ Airflow ์น UI์ ๋ด์๋์๋ค.
Admin > Variables ์์ slack_url ์ด๋ ์ด๋ฆ์ผ๋ก ๋ด์๋์๋ค.
Webhook URL์ ์ ์ฒด๋ฅผ ๋ด์ง ์๊ณ , ๊ทธ ์ฑ๋์ ์ ๋ํฌ ID๋ก ๋ณด์ด๋ ๋ถ๋ถ๋ง ์ ์ฅํ๋ค.
2. Slack ์๋ฆผ์ ๋ณด๋ผ ํ์ด์ฌ ์ฝ๋ ์์ฑํ๊ธฐ
์ด์ ๋ slack ์๋ฆผ์ ๋ณด๋ด๊ธฐ ์ํ ํ์ด์ฌ ์ฝ๋๋ฅผ ํ๋ ์์ฑํด๋ณด์.
๋๋ dags ํด๋์์ plugins ํด๋๋ฅผ ๋ง๋ค์ด์, slack.py ์ด๋ฆ์ผ๋ก ์์ฑํ๋ค.
from airflow.models import Variable
import logging
import requests
# dag ์คํจ์ ํธ์ถ
def on_failure_callback(context):
text = str(context['task_instance'])
text += "```" + str(context.get('exception')) +"```"
send_message_to_a_slack_channel(text, ":scream:")
# slack์ ์๋ฆผ ๋ณด๋ด๊ธฐ
def send_message_to_a_slack_channel(message, emoji):
url = "https://hooks.slack.com/services/"+Variable.get("slack_url")
headers = {
'content-type': 'application/json',
}
data = { "username": "Data GOD", "text": message, "icon_emoji": emoji }
r = requests.post(url, json=data, headers=headers)
return r
dag ์คํจ์ ์์์ ํธ์ถ๋ on_failure_callback ํจ์์, slack์ ์๋ฆผ์ ๋ณด๋ผ send_message_to_a_slack_channel ํจ์๋ฅผ ๋ง๋ค์๋ค.
on_failure_callbackํจ์๋ ํธ์ถ๋๋ฉด์ airflow๊ฐ context ํํ์ ๋ฐ์ดํฐ๋ฅผ ๋ด์์ค๋ค.
context์์๋ dag์ ์ฌ๋ฌ ์ ๋ณด๊ฐ ํฌํจ๋์ด์๋๋ฐ,
'task_instance' ๋ก dag์ ์ด๋ฆ์ ๊ฐ์ ธ์ค๊ณ , 'exception'์ ํตํด ์๋ฌ ๋ด์ฉ์ ๊ฐ์ ธ์จ๋ค.
url์ ์๊น ๋ถ์ฌ๋ฐ์ Webhook URL์ ๋ฃ์ด์ค๋ค.
ํค๋์ json์ด๋ผ๋ ์ ๋ณด๋ฅผ ๋ด๊ณ , request ๋ชจ๋์ ์ด์ฉํด post๋ก ๋ณด๋ด์ฃผ๋ฉด ๋์ด๋ค.
3. ์คํ์ํฌ dag์์ Slack ์๋ฆผ ์ฒ๋ฆฌํ๊ธฐ
๋จผ์ Slack ์๋ฆผ์ ๋์ํ์ธ์ ์ํด ์ผ๋ถ๋ฌ ์๋ฌ๋ฅผ ๋ง๋ ๋ค.
์ค์ํ ์ ์, raise๋ฅผ ํตํด ์๋ฌ๋ฅผ ๊ผญ ์ฌ๋ ค์ผ ํ๋ค๋ ์ ์ด๋ค.
๋๋ sql์ INSERT์ I๋ฅผ ํ๋ ๋๋ถ์ฌ ์ผ๋ถ๋ฌ ์๋ฌ๋ฅผ ๋๋ค.
๊ทธ๋ฆฌ๊ณ ๊ฐ์ dag ํ์ด์ฌ ์ฝ๋์ ๋ค์์ ์ถ๊ฐํ๋ค.
from plugins import slack
...
dag = DAG(
...
default_args = {
'on_failure_callback': slack.on_failure_callback, # ์ด ๋ถ๋ถ์ ์ถ๊ฐ
}
...
)
...
์๊น slack ์๋ฆผ์ ์ํด ์์ฑํ๋ slack.py๋ฅผ importํ๊ณ ,
์์ฑํ DAG ๋ถ๋ถ์ default_args์ on_failure_callback ์ ์ถ๊ฐํ๋ค.
slack ํ์ผ์์ ์คํจ์ ํธ์ถ๋ ํจ์์ธ on_failure_callback์ ์ง์ ํ๋ค.
๊ทธ๋ฆฌ๊ณ dag๋ฅผ ์คํํด๋ณด๋ฉด..
++ ์ค๋ฅ
์๋ ๋ด dag์ ํํ๋ ์๋์ ๊ฐ์๋ค.
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
'on_failure_callback': slack.on_failure_callback,
}
๊ทผ๋ฐ default_args์ retries๊ฐ ์์ผ๋ฉด ์ฌ๋์ ์๋ฆผ์ด ๊ฐ์ง์์๋คใ
GPT์๊ฒ ๋ฌผ์ด๋ณด๋ ์ฌ์๋ ํ์๋ฅผ ๋ถ์ฌ์ฃผ๋ฉด on_failure_callback์ด ํธ์ถ๋์ง ์๋๋ค๊ณ ํ๋ค..
์ฑGPT๋ฅผ ๋ฐฑํ๋ก๋ฏฟ๋๊ฑด ์๋์ง๋ง.. ์ฌ์๋ ํ์๋ฅผ ํจ๊ป ๋ฃ์ ๋ฐฉ๋ฒ์ ์์์๊น? ๐ฅฒ
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Airflow์์์ Jinja Template - TIL230621 (0) | 2023.06.22 |
---|---|
Airflow REST API ๊ฐ๋จํ ์ฌ์ฉํด๋ณด๊ธฐ(with Python) - TIL230620 (0) | 2023.06.22 |
Docker Container ๊ด๋ฆฌ์ ์ค์ํ K8s ์์๋ณด๊ธฐ - TIL230616 (0) | 2023.06.20 |
์ Docker-compose๋ฅผ ์ฌ์ฉํ ๊น? - TIL230615 (0) | 2023.06.15 |
Docker Volume ์์๋ณด๊ณ ๊ฐ๋จํ ์ค์ตํด๋ณด๊ธฐ - TIL230614 (0) | 2023.06.15 |