Airflow REST API ๊ฐ๋จํ ์ฌ์ฉํด๋ณด๊ธฐ(with Python) - TIL230620
๐ KDT WEEK 12 DAY 2 TIL
- Airflow API ์ฌ์ฉํ๊ธฐ
์ฌ์ฉํ๊ธฐ ์ ์
๋จผ์ Airflow API๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด airflow.cfg ๋ณ๊ฒฝ์ด ์กฐ๊ธ ํ์ํ๋ค.
airflow.cfg์ api ์น์ ์์ auth_backend์ ๊ฐ์ ์๋์ ๊ฐ์ด ๋ณ๊ฒฝํด์ผํ๋ค.
[api]
auth_backend = airflow.api.auth.backend.basic_auth
basic_auth๋ airflow api๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ์ฌ์ฉ์์ ์์ด๋์ ๋น๋ฐ๋ฒํธ๋ก ์ธ์ฆํ๋ ๋ฐฉ์์ด๋ค.
์ด ์ค์ ์ด ๋์ด์๋์ง ํ์ธํ๊ณ ์ถ๋ค๋ฉด ํฐ๋ฏธ๋์ ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํด์ ํ์ธํ ์ ์๋ค. (docker)
docker exec -it airflow-scheduler airflow config get-value api auth_backend
airflow-scheduler์๋ docker ps ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ์ ๋ ๋ํ๋ airflow scheduler ์ด๋ฏธ์ง ์ด๋ฆ์ ๋ฃ์ผ๋ฉด ๋๋ค.
Airflow REST API
๋ค์์ ๋งํฌ์์ ์์ธํ ์ ๋ณด๋ฅผ ์ป์ ์ ์๋ค.
Airflow API Documents
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
Airflow REST API
airflow.apache.org
Airflow API์์ ํต์ ํ ๋ ์ฌ์ฉํ๋ ๊ธฐ๋ณธ ํฌ๋งท์ json์ด๋ค.
ํ์ด์ฌ ์ฝ๋๋ฅผ ํตํด ๋ช ๊ฐ์ง ์์ฒญ์ ๋ ๋ ค๋ณด์.
1. ๋ชจ๋ dags ๋ฆฌ์คํธํ๊ธฐ
# List dags
import requests
from requests.auth import HTTPBasicAuth
import pprint
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.json())
dags ํด๋์์ dag๋ค์ด ๋ชจ๋ ๋ํ๋๊ฒ ๋๋ค.
is_active๋ dags ํด๋์ ์กด์ฌ ์ ๋ฌด์ ๋ฐ๋ผ True, False๋ก ๋ํ๋๊ณ ,
is_paused๋ dag ํ์ฑํ ์ฌ๋ถ์ ๋ฐ๋ผ True, False๋ก ๋ํ๋๋ค.
๋ง์ฝ is_paused = True์ธ dag๋ง ๋ํ๋ด๊ณ ์ถ๋ค๋ฉด, ๋ค์๊ณผ ๊ฐ์ ์กฐ๊ฑด์ ๋ฃ์ผ๋ฉด ๊ฐ๋ฅํ ๊ฒ์ด๋ค.
for d in dags.json()["dags"]:
if not d["is_paused"]:
print(d["dag_id"])
2. ๋ชจ๋ Variables ๋ฆฌ์คํธํ๊ธฐ
# List variables
url = "http://localhost:8080/api/v1/variables"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.json())
dags๋ฅผ ๋ฆฌ์คํธํ๋ ๊ณผ์ ๊ณผ ๋น์ทํ๋ค.
์ฐธ๊ณ ๋ก docker-compose์ ์์ฑ๋ ํ๊ฒฝ๋ณ์๋ ๋ฆฌํด๋์ง ์๋๋ค.
3. ๋ชจ๋ config ๋ฆฌ์คํธํ๊ธฐ
config์ ๊ฒฝ์ฐ airflow.cfg์์ ์ค์ ์ด ์กฐ๊ธ ํ์ํ๋ค.
airflow.cfg์ webserver > expose_config = True ๋ก ์ค์ ํด์ผํ๋ค.
docker-compose.yml์ environment์์ ์ค์ ํด์ค ์๋ ์๋ค.
environment:
AIRFLOW__WEBSERVER__EXPOSE_CONFIG : 'true'
๋๋ ํ์๋ฅผ ์ ํํ๊ณ , docker compose down > docker compose up ๋ช ๋ น์ด๋ก ์ฌ์คํํ๋ค.
# List config
url = "http://localhost:8080/api/v1/config"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.text)
4. ํน์ DAG RUN ํ๊ธฐ
# Run HelloWorld Dag
import json
url = "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
auth = HTTPBasicAuth("airflow", "airflow")
headers = {"Content-Type": "application/json"}
data = {"execution_date": "2023-05-24T00:00:00Z"}
response = requests.post(url, auth=auth, headers=headers, data=json.dumps(data))
response.json()
run์ ๊ฒฝ์ฐ execution_date ์ ๋ฌ์ด ํ์ํ๋ฏ๋ก post๋ก ์์ฒญํด์ผ ํ๋ค.
execution_date์๋ ์์ ๊ฐ์ ๋ ์ง ํ์์ด์ด์ผํ๋ค.
5. Airflow ์ํ(Health) ํ์ธํ๊ธฐ
# Check Airlfow Health
url = "http://localhost:8080/api/v1/health"
dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))
pprint.pprint(dags.text)