JUST DO IT!

Airflow Task Grouping๊ณผ Dynamic Dags ์•Œ์•„๋ณด๊ธฐ - TIL230621 ๋ณธ๋ฌธ

TIL

Airflow Task Grouping๊ณผ Dynamic Dags ์•Œ์•„๋ณด๊ธฐ - TIL230621

sunhokimDev 2023. 6. 22. 19:31

๐Ÿ“š KDT WEEK 12 DAY 3 TIL

  • Task Grouping
  • Dynamic Dags

 


 

๐Ÿ“ฆ Task Grouping

 

Task ์ˆ˜๊ฐ€ ๋งŽ์€ DAG๋ผ๋ฉด Task๋“ค์„ ์„ฑ๊ฒฉ์— ๋”ฐ๋ผ ๊ด€๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ

Airflow 2.0์—์„œ๋ถ€ํ„ฐ๋Š” SubDAG ๋Œ€์‹  Task Grouping ์‚ฌ์šฉ

 

ex) start > ํŒŒ์ผ 1, 2, 3 ๋‹ค์šด๋กœ๋“œ Task > ํŒŒ์ผ 1, 2, 3 Process Task > end ๊ตฌ์กฐ๋กœ DAG๋ฅผ ์ž‘์„ฑํ–ˆ๋‹ค.

from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
import pendulum

with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
    start = EmptyOperator(task_id="start")

    # Task Group #1
    with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
        task_1 = EmptyOperator(task_id="task_1")
        task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
        task_3 = EmptyOperator(task_id="task_3")

        task_1 >> [task_2, task_3]

    # Task Group #2
    # Task Group ์•ˆ์— Task Group ๋„ฃ๊ธฐ
    with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
        task_1 = EmptyOperator(task_id="task_1")

        with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
            task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
            task_3 = EmptyOperator(task_id="task_3")
            task_4 = EmptyOperator(task_id="task_4")

            [task_2, task_3] >> task_4

    end = EmptyOperator(task_id='end')

	# Task Group์œผ๋กœ ์ˆœ์„œ ์ž‘์„ฑ ๊ฐ€๋Šฅ
    start >> section_1 >> section_2 >> end

 

์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด, Task Group์•ˆ์—์„œ Task๋“ค์˜ ์ˆœ์„œ๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

 

์ด DAG์˜ ๊ตฌ์กฐ๋ฅผ Airflow ์›น UI์—์„œ ํ™•์ธํ•ด๋ณด์ž.

 

 

Task Grouping 1

 

ํŒŒ๋ž€ ๋ฐ•์Šค๋ฅผ ํด๋ฆญํ•ด๋ณด๋ฉด ์•ˆ์— ๋“ค์–ด์žˆ๋Š” Task๋“ค์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Task Grouping 2

 


 

โ˜ธ๏ธ Dynamic Dags

Template ํ˜•ํƒœ(๋น„์Šทํ•œ ํ˜•ํƒœ)๋กœ Dag๋ฅผ ๋งŽ์ด ๋งŒ๋“ค์–ด๋‚ด๋Š” ๊ฒฝ์šฐ, ์ฝ”๋“œ ์žฌ์‚ฌ์šฉ์„ ์ตœ๋Œ€ํ™”ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•

 

Jinja ํ…œํ”Œ๋ฆฟ๊ณผ YAML ๊ธฐ๋ฐ˜์œผ๋กœ DAG๋ฅผ ๋™์ ์œผ๋กœ ๋งŒ๋“ค๊ธฐ

Jinja๊ฐ€ DAG ์ž์ฒด ํ…œํ”Œ๋ฆฟ์„ ๋””์ž์ธํ•˜๊ณ , YAML์„ ํ†ตํ•ด ํ…œํ”Œ๋ฆฟ์— ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

 


 

ex) ์ฃผ์‹ ์‹ฌ๋ณผ์— ๋”ฐ๋ผ ๊ฐ„๋‹จํ•˜๊ฒŒ DAG ๋งŒ๋“ค๊ธฐ

 

1. templated_dag.jinja2

 

jinja2 ํฌ๋งท์œผ๋กœ DAG ํ˜•ํƒœ์˜ ํ…œํ”Œ๋ฆฟ์„ ๋งŒ๋“ค์—ˆ๋‹ค.

{{ }}์˜ ํ˜•์‹์œผ๋กœ YAML ํŒŒ์ผ์•ˆ์˜ ๊ฐ’์„ ๋ถˆ๋Ÿฌ์˜ค๊ฒŒ ๋œ๋‹ค.

 

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_{{ dag_id }}",
    start_date=datetime(2023, 6, 15),
    schedule='{{ schedule }}',
    catchup={{ catchup or True }}) as dag: # yml์˜ catchup ๊ฐ’์„ ๊ฐ€์ ธ์˜ค๋˜ ์—†์œผ๋ฉด True๋กœ ๋„ฃ๊ธฐ

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("{{ symbol }}")))

 

2. config_appl.yml

 

jinja2 ํ…œํ”Œ๋ฆฟ์•ˆ์— ๋“ค์–ด๊ฐˆ ๊ฐ’์„ yml ํŒŒ์ผ์— ๊ฐ๊ฐ ์ •์˜ํ•œ๋‹ค.

์—ฌ๊ธฐ์— ์ •์˜๋œ key : value ํ˜•ํƒœ๋ฅผ ํ†ตํ•ด key์˜ ๊ฐ’์„ jinja2 ํ…œํ”Œ๋ฆฟ์—์„œ ๋ถˆ๋Ÿฌ์˜ค๊ฒŒ ๋œ๋‹ค.

๊ฑฐ๊ธฐ์— ํ•ด๋‹นํ•˜๋Š” ๊ฐ’์ด ์ฑ„์›Œ์ง€๋Š” ํ˜•์‹์œผ๋กœ, ์•„๋ž˜์˜ generator.py๋ฅผ ํ†ตํ•ด DAG ํŒŒ์ด์ฌ ํŒŒ์ผ๋กœ ๋งŒ๋“ค์–ด์ง„๋‹ค.

 

dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'

 

3. generator.py

 

jinja2 ํ…œํ”Œ๋ฆฟ์— yml ํŒŒ์ผ์˜ ๊ฐ’์„ ์ฑ„์›Œ๋„ฃ๊ณ , ํŒŒ์ด์ฌ ํŒŒ์ผ(get_price_APPL.py)๋กœ ๋งŒ๋“ค์–ด์ง€๋Š” ํŒŒ์ด์ฌ ํŒŒ์ผ์ด๋‹ค.

 

from jinja2 import Environment, FileSystemLoader
import yaml
import os

file_dir = os.path.dirname(os.path.abspath(__file__)) # ํ˜„์žฌ ํŒŒ์ผ์˜ ์ ˆ๋Œ€๊ฒฝ๋กœ

# ์ƒ์„ฑํ–ˆ๋˜ jinja2 ํ…œํ”Œ๋ฆฟ ๊ฐ€์ ธ์˜ค๊ธฐ
env = Environment(loader=FileSystemLoader(file_dir)) 
template = env.get_template('templated_dag.jinja2')

# ํ˜„์žฌ ํŒŒ์ผ๊ณผ ๋™์ผํ•œ ์œ„์น˜์—์„œ yml ํŒŒ์ผ์„ ์ฐพ์•„ ํŠน์ • ์œ„์น˜์— DAG ํŒŒ์ด์ฌ ํŒŒ์ผ ์ƒ์„ฑ
for f in os.listdir(file_dir):
    if f.endswith(".yml"):
        with open(f"{file_dir}/{f}", "r") as cf:
            config = yaml.safe_load(cf)
            with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
                f.write(template.render(config))

 

generator.py๋Š” airflow ํด๋”์˜ ๋ฃจํŠธ ์œ„์น˜์—์„œ ์‹คํ–‰๋˜๋Š” ๊ฒƒ์„ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑ๋˜์—ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  generator.py์™€ ymlํŒŒ์ผ, jinja2 ํ…œํ”Œ๋ฆฟ์€ ๊ฐ™์€ ํด๋”์— ์žˆ์–ด์•ผ ํ•œ๋‹ค.

 

generator.py๋ฅผ ๋ฃจํŠธ์œ„์น˜์—์„œ ์‹คํ–‰์‹œํ‚ค๋ฉด, get_price_APPL.py ํŒŒ์ผ์ด ์ •์ƒ์ ์œผ๋กœ ์ƒ์„ฑ๋œ๋‹ค.

 

๊ฒฐ๊ณผ ) get_price_APPL.py

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG(dag_id="get_price_APPL",
    start_date=datetime(2023, 6, 15),
    schedule='@daily',
    catchup=True) as dag:

    @task
    def extract(symbol):
        return symbol

    @task
    def process(symbol):
        return symbol

    @task
    def store(symbol):
        return symbol

    store(process(extract("APPL")))