일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Kafka
- Spark SQL
- DataFrame Hint
- Spark
- Docker
- 데이터 파이프라인
- colab
- redshift
- Airflow
- KDT_TIL
- disk spill
- Speculative Execution
- aws
- backfill
- 빅데이터
- k8s
- etl
- mysql
- Salting
- Dag
- SQL
- CI/CD
- Spark 실습
- topic
- Spark Partitioning
- off heap memory
- Kubernetes
- Spark Caching
- AQE
- spark executor memory
- Today
- Total
JUST DO IT!
Airflow Trigger와 Sensor로 Dag Dependencies 설정하기 - TIL230621 본문
📚 KDT WEEK 12 DAY 3 TIL
- Dag Dependencies
- Explicit Trigger
- Reactive Trigger(Sensor)
- 다른 Operator
- Trigger rule
Dag Dependencies는 DAG끼리 의존성을 가져 task 실행 순서 등을 정의하는 것이다.
Airflow에서는 Dag Dependencies를 설정하는 방법에 크게 두 가지로 나뉜다.
예를 들어 DAG A > DAG B의 순서를 가질 경우,
1. Explicit trigger
TriggerDagOperator를 DAG A에 두어, DAG A가 명시적으로 DAG B를 트리거
2. Reactive trigger
ExternalTaskSensor를 DAG B를 두어, DAG B가 DAG A의 테스크가 끝나기를 대기(DAG A는 이 사실을 모름)
🟥 Explicit Trigger
TriggerDagRunOperator 에서는 trigger_dag_id, conf, execution_date에서 템플릿을 사용할 수 있다.
DAG A에서 DAG B로 정보를 넘기고 싶을 때, conf에 사전 형태로 데이터를 넘길 수 있다.
Jinja Template에 대한 내용은 링크 참고 : https://sunhokimdev.tistory.com/52
ex)
TriggerDag.py (DAG A)
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
dag = DAG(
dag_id='SourceDag',
start_date=datetime(2023, 6, 19),
schedule='@daily' # 매일 실행
)
trigger_task = TriggerDagRunOperator(
task_id="trigger_task",
trigger_dag_id="TargetDag", # Taget DAG 명시
conf={ 'path': 'value1' }, # DAG B에 넘기고 싶은 정보
execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스
reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라도 다시 재실행
wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
dag = dag
)
TargetDag.py (DAG B)
dag = DAG(
'TargetDag',
schedule='@once', # 트리거 Dag에 의해 매일 실행됨
start_date=datetime(2023, 6, 1),
)
task1 = BashOperator(
task_id='task1',
# Python에서 conf값을 가져올 때는 dag_run.conf.get 의 형태를 활용한다.
# conf의 path 값이 있으면 리턴, 없으면 none 리턴
bash_command="""echo '{{ ds }}, {{ dag_run.conf.get("path", "none") }}' """,
dag=dag
)
TriggerDag.py (DAG A)를 실행시키면, 이에 따라 순서대로 TagetDag.py (DAG B)가 실행된다.
🟦 Reactive Trigger (Sensor)
특정 조건이 충족될 때까지 대기하는 Operator
Airflow에서는 몇 가지 내장 Sensor를 제공한다.
- FileSensor: 지정한 위치에 파일이 생길 때까지 대기
- HttpSensor : HTTP 요청을 수행하고 지정된 응답이 올 때까지 대기
- SqlSensor : SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- ExternalTaskSensor : 다른 Airflow DAG의 특정 작업 완료를 대기
worker 하나를 가지고 지속적으로 체크하는 poke mode(default)와,
worker를 가지고 있다가 체크 후 릴리스해주는 reschedule mode 가 있다.
✴️ ExternalTaskSensor
DAG B의 ExternalTaskSensor 테스크가 DAG A의 특정 테스크가 끝났는지 체크하는 Sensor
사용하려면 DAG A와 DAG B 간에 조건이 필요하다.
- 동일한 schedule_interval
- 동일한 Execution Date
ex)
from airflow.sensors.external_task import ExternalTaskSensor
waiting_for_end_of_dag_a = ExternalTaskSensor(
task_id='waiting_for_end_of_dag_a',
external_dag_id='DAG이름',
external_task_id='end',
timeout=5*60,
mode='reschedule' # 5분마다 체크하고 DAG A가 안끝났으면 릴리스하고 5분 뒤 다시 Worker 할당
)
🟩 다른 Operator
🌲 BranchPythonOperator
상황에 따라 실행되어야할 Task를 동적으로 결정해주는 Operator
Variable의 mode 값으로 현재 개발상태 값을 저장해두어 trigger_b가 실행되기를 바라는지 작성하는 코드
ex)
from airflow.operators.python import BranchPythonOperator
# mode 값이 dev (개발모드) 일때는 아무것도 하지않고 끝내기(trigger_b 스킵)
# 개발모드가 아닐 때는 trigger_b Task 실행
def skip_or_cont_trigger():
if Variable.get("mode", "dev") == "dev":
return []
else:
return ["trigger_b"]
# BranchPythonOperator
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_cont_trigger,
)
현재 시간에 따라 task를 선택하여 실행하는 코드
( decide_branch는 BranchPythonOperator의 python_callable 함수, moring_task와 afternoon_task는 생략 )
ex)
def decide_branch(**context):
current_hour = datetime.now().hour
print(f"current_hour: {current_hour}")
if current_hour < 12:
return 'morning_task'
else:
return 'afternoon_task'
# 이렇게 순서를 미리 정의해두면, BranchPythonOperator가 선택함
branching_operator >> morning_task
branching_operator >> afternoon_task
⌛ LatestOnlyOperator
Time-sensitive한 Task들이 과거 데이터의 backfill시 실행되는 것을 막기 위한 Operator
다시말해, backfill시 모든 task가 실행되지 않게 하도록 제어하는 Operator이다.
ex)
with DAG(
dag_id='Learn_LatestOnlyOperator',
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')
t1 >> t2 >> [t3, t4]
6월 18일에 execution_date가 6월 14일인 데이터를 가져올 때 t1, t2만 실행될 것이다.
catchup = True여도 상관없이 동작한다.
🔖 Trigger Rules
앞단의 Task의 성공 여부에 따라 뒷단의 Task 실행 여부를 결정할 수 있다.
Operater에 trigger_rule이란 파라미터로 결정 가능하다.
- all_success : 모든 Task가 성공으로 넘어왔을 때 (default)
- all_failed : 모든 Task가 실패했을 때
- none_failed : 모든 Task가 실패가 아닌 상태로 넘어왔을 때 ( success, skipped )
- one_success : 하나라도 Task가 성공하는 순간 실행
- none_failed_min_one_succcess : 실패하지 않은 상황에서 하나라도 성공하면 실행
이외에도 좀 더 있다..
ex)
with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1") # 항상 실패하는 task
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
t4가 ALL_DONE trigger_rule을 가졌으므로 t1, t2, t3(실패)가 모두 실행완료되면 t4가 동작한다.
'TIL' 카테고리의 다른 글
Airflow 운영에 주의사항과 Airflow 대안 서비스 알아보기 - TIL230622 (0) | 2023.06.22 |
---|---|
Airflow Task Grouping과 Dynamic Dags 알아보기 - TIL230621 (0) | 2023.06.22 |
Airflow에서의 Jinja Template - TIL230621 (0) | 2023.06.22 |
Airflow REST API 간단히 사용해보기(with Python) - TIL230620 (0) | 2023.06.22 |
Airflow 에러 발생 시 Slack 알림받아보기 - TIL230619 (0) | 2023.06.21 |