์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Spark ์ค์ต
- SQL
- Salting
- backfill
- Spark Partitioning
- AQE
- KDT_TIL
- ๋น ๋ฐ์ดํฐ
- mysql
- Spark Caching
- Kafka
- Docker
- Kubernetes
- Airflow
- Spark SQL
- disk spill
- colab
- etl
- redshift
- DataFrame Hint
- Speculative Execution
- aws
- CI/CD
- Spark
- Dag
- k8s
- spark executor memory
- topic
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- off heap memory
- Today
- Total
JUST DO IT!
Airflow REST API ๊ฐ๋จํ ์ฌ์ฉํด๋ณด๊ธฐ(with Python) - TIL230620 ๋ณธ๋ฌธ
Airflow REST API ๊ฐ๋จํ ์ฌ์ฉํด๋ณด๊ธฐ(with Python) - TIL230620
sunhokimDev 2023. 6. 22. 01:42๐ KDT WEEK 12 DAY 2 TIL
- Airflow API ์ฌ์ฉํ๊ธฐ
์ฌ์ฉํ๊ธฐ ์ ์
๋จผ์ Airflow API๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด airflow.cfg ๋ณ๊ฒฝ์ด ์กฐ๊ธ ํ์ํ๋ค.
airflow.cfg์ api ์น์ ์์ auth_backend์ ๊ฐ์ ์๋์ ๊ฐ์ด ๋ณ๊ฒฝํด์ผํ๋ค.
[api]
auth_backend = airflow.api.auth.backend.basic_auth
basic_auth๋ airflow api๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ์ฌ์ฉ์์ ์์ด๋์ ๋น๋ฐ๋ฒํธ๋ก ์ธ์ฆํ๋ ๋ฐฉ์์ด๋ค.
์ด ์ค์ ์ด ๋์ด์๋์ง ํ์ธํ๊ณ ์ถ๋ค๋ฉด ํฐ๋ฏธ๋์ ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํด์ ํ์ธํ ์ ์๋ค. (docker)
docker exec -it airflow-scheduler airflow config get-value api auth_backend
airflow-scheduler์๋ docker ps ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์ ๋ ๋ํ๋ airflow scheduler ์ด๋ฏธ์ง ์ด๋ฆ์ ๋ฃ์ผ๋ฉด ๋๋ค.
Airflow REST API
๋ค์์ ๋งํฌ์์ ์์ธํ ์ ๋ณด๋ฅผ ์ป์ ์ ์๋ค.
Airflow API Documents
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Airflow API์์ ํต์ ํ ๋ ์ฌ์ฉํ๋ ๊ธฐ๋ณธ ํฌ๋งท์ json์ด๋ค.
ํ์ด์ฌ ์ฝ๋๋ฅผ ํตํด ๋ช ๊ฐ์ง ์์ฒญ์ ๋ ๋ ค๋ณด์.
1. ๋ชจ๋ dags ๋ฆฌ์คํธํ๊ธฐ
# List dags
import requests
from requests.auth import HTTPBasicAuth
import pprint
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.json())
dags ํด๋์์ dag๋ค์ด ๋ชจ๋ ๋ํ๋๊ฒ ๋๋ค.
is_active๋ dags ํด๋์ ์กด์ฌ ์ ๋ฌด์ ๋ฐ๋ผ True, False๋ก ๋ํ๋๊ณ ,
is_paused๋ dag ํ์ฑํ ์ฌ๋ถ์ ๋ฐ๋ผ True, False๋ก ๋ํ๋๋ค.
๋ง์ฝ is_paused = True์ธ dag๋ง ๋ํ๋ด๊ณ ์ถ๋ค๋ฉด, ๋ค์๊ณผ ๊ฐ์ ์กฐ๊ฑด์ ๋ฃ์ผ๋ฉด ๊ฐ๋ฅํ ๊ฒ์ด๋ค.
for d in dags.json()["dags"]:
if not d["is_paused"]:
print(d["dag_id"])
2. ๋ชจ๋ Variables ๋ฆฌ์คํธํ๊ธฐ
# List variables
url = "http://localhost:8080/api/v1/variables"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.json())
dags๋ฅผ ๋ฆฌ์คํธํ๋ ๊ณผ์ ๊ณผ ๋น์ทํ๋ค.
์ฐธ๊ณ ๋ก docker-compose์ ์์ฑ๋ ํ๊ฒฝ๋ณ์๋ ๋ฆฌํด๋์ง ์๋๋ค.
3. ๋ชจ๋ config ๋ฆฌ์คํธํ๊ธฐ
config์ ๊ฒฝ์ฐ airflow.cfg์์ ์ค์ ์ด ์กฐ๊ธ ํ์ํ๋ค.
airflow.cfg์ webserver > expose_config = True ๋ก ์ค์ ํด์ผํ๋ค.
docker-compose.yml์ environment์์ ์ค์ ํด์ค ์๋ ์๋ค.
environment:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG : 'true'
๋๋ ํ์๋ฅผ ์ ํํ๊ณ , docker compose down > docker compose up ๋ช ๋ น์ด๋ก ์ฌ์คํํ๋ค.
# List config
url = "http://localhost:8080/api/v1/config"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.text)
4. ํน์ DAG RUN ํ๊ธฐ
# Run HelloWorld Dag
import json
url = "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
auth = HTTPBasicAuth("airflow", "airflow")
headers = {"Content-Type": "application/json"}
data = {"execution_date": "2023-05-24T00:00:00Z"}
response = requests.post(url, auth=auth, headers=headers, data=json.dumps(data))
response.json()
run์ ๊ฒฝ์ฐ execution_date ์ ๋ฌ์ด ํ์ํ๋ฏ๋ก post๋ก ์์ฒญํด์ผ ํ๋ค.
execution_date์๋ ์์ ๊ฐ์ ๋ ์ง ํ์์ด์ด์ผํ๋ค.
5. Airflow ์ํ(Health) ํ์ธํ๊ธฐ
# Check Airlfow Health
url = "http://localhost:8080/api/v1/health"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.text)
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Airflow Trigger์ Sensor๋ก Dag Dependencies ์ค์ ํ๊ธฐ - TIL230621 (0) | 2023.06.22 |
---|---|
Airflow์์์ Jinja Template - TIL230621 (0) | 2023.06.22 |
Airflow ์๋ฌ ๋ฐ์ ์ Slack ์๋ฆผ๋ฐ์๋ณด๊ธฐ - TIL230619 (0) | 2023.06.21 |
Docker Container ๊ด๋ฆฌ์ ์ค์ํ K8s ์์๋ณด๊ธฐ - TIL230616 (0) | 2023.06.20 |
์ Docker-compose๋ฅผ ์ฌ์ฉํ ๊น? - TIL230615 (0) | 2023.06.15 |