JUST DO IT!

Airflow REST API ๊ฐ„๋‹จํžˆ ์‚ฌ์šฉํ•ด๋ณด๊ธฐ(with Python) - TIL230620 ๋ณธ๋ฌธ

TIL

Airflow REST API ๊ฐ„๋‹จํžˆ ์‚ฌ์šฉํ•ด๋ณด๊ธฐ(with Python) - TIL230620

sunhokimDev 2023. 6. 22. 01:42

๐Ÿ“š KDT WEEK 12 DAY 2 TIL

  • Airflow API ์‚ฌ์šฉํ•˜๊ธฐ

 


์‚ฌ์šฉํ•˜๊ธฐ ์ „์—

 

๋จผ์ € Airflow API๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด airflow.cfg ๋ณ€๊ฒฝ์ด ์กฐ๊ธˆ ํ•„์š”ํ•˜๋‹ค.

 

airflow.cfg์˜ api ์„น์…˜์—์„œ auth_backend์˜ ๊ฐ’์„ ์•„๋ž˜์™€ ๊ฐ™์ด ๋ณ€๊ฒฝํ•ด์•ผํ•œ๋‹ค.

 

[api]
auth_backend = airflow.api.auth.backend.basic_auth

 

basic_auth๋Š” airflow api๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ์ž์˜ ์•„์ด๋””์™€ ๋น„๋ฐ€๋ฒˆํ˜ธ๋กœ ์ธ์ฆํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค.

 

์ด ์„ค์ •์ด ๋˜์–ด์žˆ๋Š”์ง€ ํ™•์ธํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ํ„ฐ๋ฏธ๋„์— ๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•ด์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. (docker)

 

docker exec -it airflow-scheduler airflow config get-value api auth_backend

airflow-scheduler์—๋Š” docker ps ๋ช…๋ น์–ด๋ฅผ ์‚ฌ์šฉํ–ˆ์„ ๋•Œ ๋‚˜ํƒ€๋‚œ airflow scheduler ์ด๋ฏธ์ง€ ์ด๋ฆ„์„ ๋„ฃ์œผ๋ฉด ๋œ๋‹ค.

 

 


 

Airflow REST API

 

๋‹ค์Œ์˜ ๋งํฌ์—์„œ ์ž์„ธํ•œ ์ •๋ณด๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค.

 

Airflow API Documents

 

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

 

Airflow REST API

 

airflow.apache.org

 

 

Airflow API์—์„œ ํ†ต์‹ ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๊ธฐ๋ณธ ํฌ๋งท์€ json์ด๋‹ค.

 

Airflow REST API์—์„œ ๋ช…์‹œํ•œ ๋ถ€๋ถ„

 

 

ํŒŒ์ด์ฌ ์ฝ”๋“œ๋ฅผ ํ†ตํ•ด ๋ช‡ ๊ฐ€์ง€ ์š”์ฒญ์„ ๋‚ ๋ ค๋ณด์ž.

 

1. ๋ชจ๋“  dags ๋ฆฌ์ŠคํŠธํ•˜๊ธฐ

 

# List dags

import requests
from requests.auth import HTTPBasicAuth
import pprint

url = "http://localhost:8080/api/v1/dags"

dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))

pprint.pprint(dags.json())

 

๋ชจ๋“  dags ๋ฆฌ์ŠคํŠธ ๊ฒฐ๊ณผ

 

dags ํด๋”์•ˆ์˜ dag๋“ค์ด ๋ชจ๋‘ ๋‚˜ํƒ€๋‚˜๊ฒŒ ๋œ๋‹ค.

is_active๋Š” dags ํด๋”์— ์กด์žฌ ์œ ๋ฌด์— ๋”ฐ๋ผ True, False๋กœ ๋‚˜ํƒ€๋‚˜๊ณ ,

is_paused๋Š” dag ํ™œ์„ฑํ™” ์—ฌ๋ถ€์— ๋”ฐ๋ผ True, False๋กœ ๋‚˜ํƒ€๋‚œ๋‹ค.

 

๋งŒ์•ฝ is_paused = True์ธ dag๋งŒ ๋‚˜ํƒ€๋‚ด๊ณ  ์‹ถ๋‹ค๋ฉด, ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์กฐ๊ฑด์„ ๋„ฃ์œผ๋ฉด ๊ฐ€๋Šฅํ•  ๊ฒƒ์ด๋‹ค.

 

for d in dags.json()["dags"]:
	if not d["is_paused"]:
    		print(d["dag_id"])

 

2. ๋ชจ๋“  Variables ๋ฆฌ์ŠคํŠธํ•˜๊ธฐ

 

# List variables

url = "http://localhost:8080/api/v1/variables"

dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))

pprint.pprint(dags.json())

 

๋ชจ๋“  variables ๋ฆฌ์ŠคํŠธ ๊ฒฐ๊ณผ

 

dags๋ฅผ ๋ฆฌ์ŠคํŠธํ•˜๋Š” ๊ณผ์ •๊ณผ ๋น„์Šทํ•˜๋‹ค.

์ฐธ๊ณ ๋กœ docker-compose์— ์ž‘์„ฑ๋œ ํ™˜๊ฒฝ๋ณ€์ˆ˜๋Š” ๋ฆฌํ„ด๋˜์ง€ ์•Š๋Š”๋‹ค.

 

3. ๋ชจ๋“  config ๋ฆฌ์ŠคํŠธํ•˜๊ธฐ

config์˜ ๊ฒฝ์šฐ airflow.cfg์—์„œ ์„ค์ •์ด ์กฐ๊ธˆ ํ•„์š”ํ•˜๋‹ค.

 

airflow.cfg์˜ webserver > expose_config = True ๋กœ ์„ค์ •ํ•ด์•ผํ•œ๋‹ค.

 

docker-compose.yml์˜ environment์—์„œ ์„ค์ •ํ•ด์ค„ ์ˆ˜๋„ ์žˆ๋‹ค.

 

environment:
    AIRFLOW__WEBSERVER__EXPOSE_CONFIG : 'true'

 

๋‚˜๋Š” ํ›„์ž๋ฅผ ์„ ํƒํ•˜๊ณ , docker compose down > docker compose up ๋ช…๋ น์–ด๋กœ ์žฌ์‹คํ–‰ํ–ˆ๋‹ค.

 

# List config

url = "http://localhost:8080/api/v1/config"

dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))

pprint.pprint(dags.text)

 

config๊ฐ€ ์ž˜ ๋ฆฌ์ŠคํŠธ๋œ๋‹ค.

 

4. ํŠน์ • DAG RUN ํ•˜๊ธฐ

 

# Run HelloWorld Dag

import json 

url = "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
auth = HTTPBasicAuth("airflow", "airflow")
headers = {"Content-Type": "application/json"}
data = {"execution_date": "2023-05-24T00:00:00Z"}

response = requests.post(url, auth=auth, headers=headers, data=json.dumps(data))

response.json()

 

run์˜ ๊ฒฝ์šฐ execution_date ์ „๋‹ฌ์ด ํ•„์š”ํ•˜๋ฏ€๋กœ post๋กœ ์š”์ฒญํ•ด์•ผ ํ•œ๋‹ค.

execution_date์—๋Š” ์œ„์™€ ๊ฐ™์€ ๋‚ ์งœ ํ˜•์‹์ด์–ด์•ผํ•œ๋‹ค.

 

์ •์ƒ์ ์œผ๋กœ ๋™์ž‘ํ–ˆ๋‹ค.

 

5. Airflow ์ƒํƒœ(Health) ํ™•์ธํ•˜๊ธฐ

 

# Check Airlfow Health

url = "http://localhost:8080/api/v1/health"

dags = requests.get(url, auth=HTTPBasicAuth("airflow","airflow"))

pprint.pprint(dags.text)

 

์ •์ƒ์ ์ธ ๊ฒฝ์šฐ, healthy๋กœ ๋‚˜ํƒ€๋‚œ๋‹ค.