์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Spark
- Spark SQL
- k8s
- DataFrame Hint
- Salting
- KDT_TIL
- Airflow
- Kafka
- SQL
- colab
- etl
- Dag
- aws
- AQE
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Spark Partitioning
- disk spill
- backfill
- CI/CD
- ๋น ๋ฐ์ดํฐ
- redshift
- Spark ์ค์ต
- mysql
- Spark Caching
- off heap memory
- Speculative Execution
- spark executor memory
- Kubernetes
- Docker
- topic
- Today
- Total
JUST DO IT!
Airflow Task Grouping๊ณผ Dynamic Dags ์์๋ณด๊ธฐ - TIL230621 ๋ณธ๋ฌธ
๐ KDT WEEK 12 DAY 3 TIL
- Task Grouping
- Dynamic Dags
๐ฆ Task Grouping
Task ์๊ฐ ๋ง์ DAG๋ผ๋ฉด Task๋ค์ ์ฑ๊ฒฉ์ ๋ฐ๋ผ ๊ด๋ฆฌํ ์ ์์
Airflow 2.0์์๋ถํฐ๋ SubDAG ๋์ Task Grouping ์ฌ์ฉ
ex) start > ํ์ผ 1, 2, 3 ๋ค์ด๋ก๋ Task > ํ์ผ 1, 2, 3 Process Task > end ๊ตฌ์กฐ๋ก DAG๋ฅผ ์์ฑํ๋ค.
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
import pendulum
with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
start = EmptyOperator(task_id="start")
# Task Group #1
with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_1 >> [task_2, task_3]
# Task Group #2
# Task Group ์์ Task Group ๋ฃ๊ธฐ
with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
task_1 = EmptyOperator(task_id="task_1")
with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")
[task_2, task_3] >> task_4
end = EmptyOperator(task_id='end')
# Task Group์ผ๋ก ์์ ์์ฑ ๊ฐ๋ฅ
start >> section_1 >> section_2 >> end
์ฝ๋๋ฅผ ๋ณด๋ฉด, Task Group์์์ Task๋ค์ ์์๋ฅผ ์์ฑํ ์๋ ์๋ค.
์ด DAG์ ๊ตฌ์กฐ๋ฅผ Airflow ์น UI์์ ํ์ธํด๋ณด์.
ํ๋ ๋ฐ์ค๋ฅผ ํด๋ฆญํด๋ณด๋ฉด ์์ ๋ค์ด์๋ Task๋ค์ ํ์ธํ ์ ์๋ค.
โธ๏ธ Dynamic Dags
Template ํํ(๋น์ทํ ํํ)๋ก Dag๋ฅผ ๋ง์ด ๋ง๋ค์ด๋ด๋ ๊ฒฝ์ฐ, ์ฝ๋ ์ฌ์ฌ์ฉ์ ์ต๋ํํ ์ ์๋ ๋ฐฉ๋ฒ
Jinja ํ ํ๋ฆฟ๊ณผ YAML ๊ธฐ๋ฐ์ผ๋ก DAG๋ฅผ ๋์ ์ผ๋ก ๋ง๋ค๊ธฐ
Jinja๊ฐ DAG ์์ฒด ํ ํ๋ฆฟ์ ๋์์ธํ๊ณ , YAML์ ํตํด ํ ํ๋ฆฟ์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ ๊ณตํ๋ค.
ex) ์ฃผ์ ์ฌ๋ณผ์ ๋ฐ๋ผ ๊ฐ๋จํ๊ฒ DAG ๋ง๋ค๊ธฐ
1. templated_dag.jinja2
jinja2 ํฌ๋งท์ผ๋ก DAG ํํ์ ํ ํ๋ฆฟ์ ๋ง๋ค์๋ค.
{{ }}์ ํ์์ผ๋ก YAML ํ์ผ์์ ๊ฐ์ ๋ถ๋ฌ์ค๊ฒ ๋๋ค.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(dag_id="get_price_{{ dag_id }}",
start_date=datetime(2023, 6, 15),
schedule='{{ schedule }}',
catchup={{ catchup or True }}) as dag: # yml์ catchup ๊ฐ์ ๊ฐ์ ธ์ค๋ ์์ผ๋ฉด True๋ก ๋ฃ๊ธฐ
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process(extract("{{ symbol }}")))
2. config_appl.yml
jinja2 ํ ํ๋ฆฟ์์ ๋ค์ด๊ฐ ๊ฐ์ yml ํ์ผ์ ๊ฐ๊ฐ ์ ์ํ๋ค.
์ฌ๊ธฐ์ ์ ์๋ key : value ํํ๋ฅผ ํตํด key์ ๊ฐ์ jinja2 ํ ํ๋ฆฟ์์ ๋ถ๋ฌ์ค๊ฒ ๋๋ค.
๊ฑฐ๊ธฐ์ ํด๋นํ๋ ๊ฐ์ด ์ฑ์์ง๋ ํ์์ผ๋ก, ์๋์ generator.py๋ฅผ ํตํด DAG ํ์ด์ฌ ํ์ผ๋ก ๋ง๋ค์ด์ง๋ค.
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
3. generator.py
jinja2 ํ ํ๋ฆฟ์ yml ํ์ผ์ ๊ฐ์ ์ฑ์๋ฃ๊ณ , ํ์ด์ฌ ํ์ผ(get_price_APPL.py)๋ก ๋ง๋ค์ด์ง๋ ํ์ด์ฌ ํ์ผ์ด๋ค.
from jinja2 import Environment, FileSystemLoader
import yaml
import os
file_dir = os.path.dirname(os.path.abspath(__file__)) # ํ์ฌ ํ์ผ์ ์ ๋๊ฒฝ๋ก
# ์์ฑํ๋ jinja2 ํ
ํ๋ฆฟ ๊ฐ์ ธ์ค๊ธฐ
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')
# ํ์ฌ ํ์ผ๊ณผ ๋์ผํ ์์น์์ yml ํ์ผ์ ์ฐพ์ ํน์ ์์น์ DAG ํ์ด์ฌ ํ์ผ ์์ฑ
for f in os.listdir(file_dir):
if f.endswith(".yml"):
with open(f"{file_dir}/{f}", "r") as cf:
config = yaml.safe_load(cf)
with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))
generator.py๋ airflow ํด๋์ ๋ฃจํธ ์์น์์ ์คํ๋๋ ๊ฒ์ ๊ธฐ์ค์ผ๋ก ์์ฑ๋์๋ค.
๊ทธ๋ฆฌ๊ณ generator.py์ ymlํ์ผ, jinja2 ํ ํ๋ฆฟ์ ๊ฐ์ ํด๋์ ์์ด์ผ ํ๋ค.
generator.py๋ฅผ ๋ฃจํธ์์น์์ ์คํ์ํค๋ฉด, get_price_APPL.py ํ์ผ์ด ์ ์์ ์ผ๋ก ์์ฑ๋๋ค.
๊ฒฐ๊ณผ ) get_price_APPL.py
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(dag_id="get_price_APPL",
start_date=datetime(2023, 6, 15),
schedule='@daily',
catchup=True) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process(extract("APPL")))