JUST DO IT!

Airflow DAG - TIL230607 ๋ณธ๋ฌธ

TIL

Airflow DAG - TIL230607

sunhokimDev 2023. 6. 7. 17:35

๐Ÿ“š KDT WEEK 10 DAY 3 TIL

  • Airflow ์˜ˆ์ œ ํ”„๋กœ๊ทธ๋žจ ํฌํŒ…
  • API DAG ์ž‘์„ฑํ•˜๊ธฐ

๐ŸŸฅ Hello World DAG ์˜ˆ์ œ ํ”„๋กœ๊ทธ๋žจ

 

DAG ๊ตฌ์กฐ ์•Œ์•„๋ณด๊ธฐ

 

dag = DAG(
    dag_id = 'HelloWorld', # DAG ID
    start_date = datetime(2022,5,5), # ์‹œ์ž‘์ผ
    catchup=False, # ์•„๋ž˜ ์„ค๋ช… ์ฐธ๊ณ 
    tags=['example'], # DAG TAG
    schedule = '0 2 * * *') # ๋ถ„, ์‹œ, ์ผ, ์›”, ์š”์ผ ์ˆœ์œผ๋กœ ์‹คํ–‰์ฃผ๊ธฐ ์„ค์ • / ๋งค์ผ 2์‹œ๋งˆ๋‹ค ์‹คํ–‰๋œ๋‹ค.

 

  • max_active_runs : ํ•œ๋ฒˆ์— ๋™์‹œ์— ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋Š” DAG ์ˆ˜ (Backfillํ• ๋•Œ ์ค‘์š”) (Worker์— ํ• ๋‹น๋œ CPU ์ดํ•ฉ์ด ์ตœ๋Œ€)
  • max_active_tasks : ์ด DAG์— ์†ํ•œ Task๊ฐ€ ํ•œ ๋ฒˆ์— ๋™์‹œ์— ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋Š” Task ์ˆ˜
  • catchup
    1. DAG๋ฅผ ์‹คํ–‰ ์ค‘์ด๋‹ค๊ฐ€ ๋‹ค์Œ DAG๋ฅผ ์‹คํ–‰ํ•  ์‹œ๊ฐ„์„ ๋„˜๊ฒผ์„ ๋•Œ๋„ ์ด์ „ DAG๋ฅผ ๋งˆ์น˜๊ณ  ๊ฐˆ ๊ฒƒ์ธ์ง€
    2. start_date๊ฐ€ ํ˜„์žฌ๋ณด๋‹ค ๊ณผ๊ฑฐ์ผ ๋•Œ ๊ณผ๊ฑฐ์— ์‹คํ–‰๋˜์ง€ ๋ชปํ•œ ์ผ๊นŒ์ง€ ๋ชจ๋‘ ์‹คํ–‰ํ•  ๊ฒƒ์ธ์ง€

 

 

 

PythonOperator๋ฅผ ์‚ฌ์šฉํ•ด๋ณด์ž.

 

from airflow.operators.python import PythonOperator

# DAG ์ •์˜
dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
    )

# PythonOperator ์ •์˜
load_nps = PythonOperator(
    dag=dag,
    task_id='task_id', # id๋กœ ์‚ฌ์šฉ๋  ์ด๋ฆ„์„ ์ง€์ •
    python_callable=python_func, # task ์‹คํ–‰ ์‹œ ๋ถˆ๋Ÿฌ์˜ฌ ํ•จ์ˆ˜ ์ง€์ •
    
    # ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋„˜๊ฒจ์ค„ ์ˆ˜๋„ ์žˆ๋‹ค.
    # dictionary ํ˜•ํƒœ๋กœ ๋„˜๊ธฐ๋ฉด ๋œ๋‹ค.
    params={
        'table': 'delighted_nps',
        'schema': 'raw_data'
    },
)

# PythonOperator์—์„œ ๋ถˆ๋Ÿฌ์˜ฌ ํ•จ์ˆ˜
def python_func(**cxt):

	# ๋„˜๊ฒจ์ค€ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๋ถˆ๋Ÿฌ์˜ฌ ๋•Œ๋Š” cxt["params"]["ํ‚ค"] ํ˜•ํƒœ๋กœ ๋„˜๊ธด๋‹ค.
    table = cxt["params"]["table"] 
    schema = cxt["params"]["schema"]
    ex_date = cxt["execution_date"]
    # do what you need to do

 

PythonOperator๋ฅผ ํ†ตํ•ด ์ •์˜๋œ dag์™€ ํ•จ์ˆ˜๋ฅผ ๋ฐ›์•„ ๋ถˆ๋Ÿฌ์˜ค๋Š” ๊ตฌ์กฐ์ด๋‹ค.

PythonOperator์—์„œ params๋ฅผ ์‚ฌ์ „ ํ˜•ํƒœ๋กœ ์ง€์ •ํ•˜๋ฉด, ํ•จ์ˆ˜์—์„œ ํ•ด๋‹น ๊ฐ’์„ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

ํ˜•ํƒœ๋Š” cxt["params"]["ํ‚ค"] ํ˜•ํƒœ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. (๋ฌผ๋ก  cxt๋Š” ํ•จ์ˆ˜์—์„œ ๋ฐ›์€ ์ธ์ž์˜ ์ด๋ฆ„์„ ์‚ฌ์šฉํ•œ๋‹ค)

 

์˜ˆ์ œ

๋”๋ณด๊ธฐ

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

# DAG
dag = DAG(
    dag_id = 'HelloWorld',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *')

# hello์™€ goodbye ๋‘ ๊ฐœ์˜ ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•œ๋‹ค.
def print_hello():
    print("hello!")
    return "hello!"

def print_goodbye():
    print("goodbye!")
    return "goodbye!"

# ๋‘ ๊ฐœ์˜ ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•  PythonOperator๋ฅผ ์ •์˜ํ•œ๋‹ค.
print_hello = PythonOperator(
    task_id = 'print_hello',
    #python_callable param points to the function you want to run 
    python_callable = print_hello,
    #dag param points to the DAG that this task is a part of
    dag = dag)

print_goodbye = PythonOperator(
    task_id = 'print_goodbye',
    python_callable = print_goodbye,
    dag = dag)

# DAG์˜ ์‹คํ–‰ ์ˆœ์„œ๋ฅผ ๋ช…์‹œํ•œ๋‹ค. 
# hello DAG ์ž‘์—…์ด ๋๋‚˜์•ผ goodbye DAG ์ž‘์—…์ด ์‹คํ–‰๋  ๊ฒƒ์ด๋‹ค.
print_hello >> print_goodbye

 

 


 

Task Decorator

 

์ด๋ฒˆ์—๋Š” Airflow์—์„œ ์ œ๊ณตํ•˜๋Š” Task Decorator๋ฅผ ์‚ฌ์šฉํ•ด๋ณด์ž.

์•„๊นŒ๋ณด๋‹ค ๊ตฌ์กฐ๊ฐ€ ๋” ๊ฐ„๋‹จํ•ด์งˆ ์ˆ˜ ์žˆ๋‹ค.

 

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def print_hello():
    print("hello!")
    return "hello!"

@task
def print_goodbye():
    print("goodbye!")
    return "goodbye!"

with DAG(
    dag_id = 'HelloWorld_v2',
    start_date = datetime(2022,5,5),
    catchup=False,
    tags=['example'],
    schedule = '0 2 * * *'
) as dag:

    # Assign the tasks to the DAG in order
    print_hello() >> print_goodbye()

 

@task Decorator๋ฅผ ์‚ฌ์šฉํ•  ํ•จ์ˆ˜ ์œ„์— ์ •์˜ํ•ด์ฃผ๋ฉด, ํ•จ์ˆ˜ ์ž์ฒด๋ฅผ task๋กœ ์ •์˜ํ•ด์ฃผ์–ด Operator ์ •์˜๊ฐ€ ํ•„์š”์—†์–ด์ง„๋‹ค.

์ด๋•Œ ๋ฐ‘์— DAG์˜ ์ˆœ์„œ๋ฅผ ์ •์˜ํ•  ๋•Œ๋Š” ํ•จ์ˆ˜์˜ ์ด๋ฆ„์„ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 


 

๐ŸŸฆ S3์˜ csvํŒŒ์ผ ETL ์˜ˆ์ œ ํ”„๋กœ๊ทธ๋žจ Airflow ํฌํŒ…

 

์ €๋ฒˆ ๊ธ€์—์„œ Colab์œผ๋กœ ETL ์‹ค์Šตํ–ˆ๋˜ ๋‚ด์šฉ์„ Airflow๋กœ ๋™์ž‘ํ•˜๋„๋ก ํฌํŒ…ํ•ด๋ณด์ž.

(๋งํฌ)

 

์ €๋ฒˆ ์ฝ”๋“œ๋ฅผ ์งง๊ฒŒ ์š”์•ฝํ•˜์ž๋ฉด,

S3์— ์ €์žฅ๋œ name_gender.csv (์ด๋ฆ„, ์„ฑ๋ณ„๋กœ ๊ตฌ์„ฑ)์„ ๋‚ด Redshift ํ…Œ์ด๋ธ”๋กœ INSERTํ•˜๋Š” ์ฝ”๋“œ์˜€๋‹ค.

์ด ETL์„ ํ•˜๋‚˜์˜ Task์™€ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Task๋กœ ๊ตฌ์„ฑํ•ด์„œ ํฌํŒ…ํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๊ฐ๊ฐ ์•Œ์•„๋ณธ๋‹ค.

 

๐Ÿ”ง ํ•˜๋‚˜์˜ Task๋กœ ๋งŒ๋“ค์–ด๋ณด๊ธฐ

๋จผ์ € extract, transform, load๋ฅผ ํ•˜๋‚˜์˜ Task๋กœ ๊ตฌ์„ฑํ•ด์„œ ํฌํŒ…ํ•ด๋ณด์ž.

ํ•˜๋‚˜์˜ Task๋กœ ๊ตฌ์„ฑํ•˜๊ธฐ ๋•Œ๋ฌธ์— ํ•˜๋‚˜์˜ PythonOperator๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค.

 

๋‹ค์Œ์˜ ์ฝ”๋“œ๋ฅผ ์ถ”๊ฐ€ํ•˜๋ฉด ๋œ๋‹ค.

 

def etl(**context):
    link = context["params"]["url"] # PythonOperator์˜ params๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค!
    task_instance = context['task_instance'] # Task์˜ ์ •๋ณด๊ฐ€ ๋„˜์–ด๊ฐ„๋‹ค.
    execution_date = context['execution_date'] # ์Šค์ผ€์ฅด๋ง์— ๊ด€๋ จ๋œ ๋ฐ์ดํ„ฐ๋กœ, ๋‹ค์Œ์— ๋ฐฐ์šด๋‹ค.

    logging.info(execution_date)

	# ์—ฌ๊ธฐ์„œ extract, transform, load๊ฐ€ ๋ฌถ์—ฌ ํ•˜๋‚˜์˜ Task์ฒ˜๋Ÿผ ๋™์ž‘ํ•˜๊ฒŒ ๋œ๋‹ค.
    data = extract(link)
    lines = transform(data)
    load(lines)


dag = DAG(
    dag_id = 'name_gender_v2',
    start_date = datetime(2023,4,6), # ๋‚ ์งœ๊ฐ€ ๋ฏธ๋ž˜์ธ ๊ฒฝ์šฐ ์‹คํ–‰์ด ์•ˆ๋จ
    schedule = '0 2 * * *',  # ๋งค์ผ 2์‹œ๋งˆ๋‹ค ์‹คํ–‰
    catchup = False,
    max_active_runs = 1, # ํ•œ ๋ฒˆ์— ์—ฌ๋Ÿฌ ๊ฐœ๊ฐ€ ๋™์ž‘ํ•˜์ง€ ์•Š๋„๋ก
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)


task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl, # ํ•˜๋‚˜์˜ Task๋กœ ๋ฌถ์€ ํ•จ์ˆ˜๋ฅผ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.
    params = {
        'url': "S3 ์ฃผ์†Œ๋ฅผ ์ž…๋ ฅ"
    },
    dag = dag)

extract์—์„œ ์‚ฌ์šฉํ•œ context['params']['url']์˜ ๊ฒฝ์šฐ, PythonOperator์—์„œ ์ •์˜๋œ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ฒŒ ๋œ๋‹ค.

์ด ๋ถ€๋ถ„์€ ์œ— ๊ธ€์—์„œ ์„ค๋ช…ํ–ˆ์—ˆ๋‹ค.

 

Extract, Transform, Load๊ฐ€ ๋™์ž‘ํ•  ํ•˜๋‚˜์˜ ํ•จ์ˆ˜ etl์„ ๋งŒ๋“ค์–ด PythonOperator ํ•จ์ˆ˜์— ๋„ฃ๋Š”๋‹ค.

dag๋„ ์œ— ๊ธ€์— ์„ค๋ช…ํ–ˆ๋˜ ๊ฒƒ์ฒ˜๋Ÿผ ์‹คํ–‰ ์ฃผ๊ธฐ ๋“ฑ๋“ฑ์„ ์ •ํ•ด์„œ ๋„ฃ์–ด์ฃผ์ž.

 

์ „์ฒด ์ฝ”๋“œ

๋”๋ณด๊ธฐ
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2


def get_Redshift_connection():
    host = "redshift ์—”๋“œํฌ์ธํŠธ ์ฃผ์†Œ"
    redshift_user = "ID"
    redshift_pass = "PASSWORD"
    port = 5439 # Redshift Port
    dbname = "DB์ด๋ฆ„"
    conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
    conn.set_session(autocommit=True)
    return conn.cursor()


def extract(url):
    logging.info("Extract started")
    f = requests.get(url)
    logging.info("Extract done")
    return (f.text)


def transform(text):
    logging.info("Transform started")	
    lines = text.strip().split("\n")[1:] # ์ฒซ ๋ฒˆ์งธ ๋ผ์ธ์„ ์ œ์™ธํ•˜๊ณ  ์ฒ˜๋ฆฌ
    records = []
    for l in lines:
      (name, gender) = l.split(",") # CSV ํŒŒ์ผ
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(records):
    logging.info("load started")

    schema = "Redshift ์Šคํ‚ค๋งˆ์ด๋ฆ„"
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") # ํ•ด๋‹น ์Šคํ‚ค๋งˆ์˜ name_gender ํ…Œ์ด๋ธ” ๋‚ด์šฉ ์ œ๊ฑฐ
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" # ํ•ด๋‹น ์Šคํ‚ค๋งˆ์˜ name_gender ํ…Œ์ด๋ธ”์— ์‚ฝ์ž…
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")

def etl(**context):
    link = context["params"]["url"]
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)

    data = extract(link)
    lines = transform(data)
    load(lines)


dag = DAG(
    dag_id = 'name_gender_v2',
    start_date = datetime(2023,4,6),
    schedule = '0 2 * * *',
    catchup = False,
    max_active_runs = 1,	
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)


task = PythonOperator(
    task_id = 'perform_etl',
    python_callable = etl,
    params = {
        'url': "S3 CSVํŒŒ์ผ ์ฃผ์†Œ"
    },
    dag = dag)

 


 

๐Ÿ› ๏ธ ์—ฌ๋Ÿฌ ๊ฐœ์˜ Task๋กœ ๊ตฌ์„ฑํ•ด๋ณด๊ธฐ

 

์ด๋ฒˆ์—๋Š” extract, transform, load๋ฅผ ๊ฐ๊ฐ์˜ Task๋กœ ๊ตฌ์„ฑํ•˜์—ฌ ๋™์ž‘ํ•˜๋„๋ก ๊ตฌํ˜„ํ•ด๋ณด์ž.

์—ฌ๊ธฐ์„œ๋Š” ๊ฐ Task๊ฐ€ ๊ฐ’์„ ์–ด๋–ป๊ฒŒ ์ฃผ๊ณ  ๋ฐ›์„์ง€๊ฐ€ ์ค‘์š”ํ•˜๋‹ค.

Airflow์—์„œ๋Š” ๊ทธ ๊ฐ’์„ ์ฃผ๊ณ  ๋ฐ›๋Š” ๋ฐฉ์‹์œผ๋กœ Xcom์„ ์‚ฌ์šฉํ•œ๋‹ค.

 

Xcom

  • Task(Operator)๋“ค๊ฐ„์— ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๊ธฐ ์œ„ํ•œ ๋ฐฉ์‹
  • ๋ณดํ†ต ํ•œ Operator์˜ ๋ฆฌํ„ด๊ฐ’์„ ๋‹ค๋ฅธ Operator์—์„œ ์ฝ๋Š” ํ˜•ํƒœ
  • ์ด ๋ฐ์ดํ„ฐ๋“ค์€ Airflow ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ DB์— ์ €์žฅ๋˜๋ฏ€๋กœ ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ  ๋ฐ›๊ธฐ์—๋Š” ์–ด๋ ค์›€
    • ํฐ ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ S3 ๋“ฑ์— ๋กœ๋“œํ•ด์„œ ๊ทธ ์œ„์น˜๋ฅผ ๋ฐ›๋Š”๊ฒŒ ์ผ๋ฐ˜์ ์ด๋‹ค.

task ์ •๋ณด๋กœ๋ถ€ํ„ฐ xcom_pull์„ ๋ถˆ๋Ÿฌ์˜ค๋Š” ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

context["task_instance"].xcom_pull(key="return_value", task_ids="transform")

 

์ด ์ฝ”๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด, task id๊ฐ€ transform์ธ ํ•จ์ˆ˜์˜ return ๊ฐ’์„ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

 

์—ฌ๋Ÿฌ ๊ฐœ์˜ Task๋กœ ๊ตฌ์„ฑํ•œ ๊ฒฝ์šฐ ์•ž์˜ ์ฝ”๋“œ์—์„œ ๋ณ€๊ฒฝ๋œ ๊ฐ๊ฐ์˜ ํ•จ์ˆ˜์™€ PythonOperator๋ฅผ ํ™•์ธํ•ด๋ณด์ž.

 

# ๋ชจ๋‘ ์ธ์ž๋กœ context๋ฅผ ๋ฐ›๊ฒŒ ๋˜์—ˆ๋‹ค.

def extract(**context):
    link = context["params"]["url"] 
    task_instance = context['task_instance']
    execution_date = context['execution_date']

    logging.info(execution_date)
    f = requests.get(link)
    return (f.text)


def transform(**context):
    logging.info("Transform started")
    
    # extract์˜ ๋ฆฌํ„ด ๊ฐ’์„ ๋ฐ›๊ธฐ ์œ„ํ•ด Xcom์„ ์‚ฌ์šฉํ•œ๋‹ค.
    text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    
    lines = text.strip().split("\n")[1:]
    records = []
    for l in lines:
      (name, gender) = l.split(",") # CSV ํŒŒ์ผ
      records.append([name, gender])
    logging.info("Transform ended")
    return records


def load(**context):
    logging.info("load started")
    
    #PythonOperator์—์„œ ์ •์˜๋œ shema์™€ table ๊ฐ’์„ ๊ฐ€์ ธ์˜จ๋‹ค.
    schema = context["params"]["schema"]
    table = context["params"]["table"]

	# transform ํ•จ์ˆ˜์˜ ๋ฆฌํ„ด ๊ฐ’์„ Xcom์œผ๋กœ ๊ฐ€์ ธ์˜จ๋‹ค.
    lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")

    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DELETE FROM {schema}.name_gender;") 
        for r in records:
            name = r[0]
            gender = r[1]
            print(name, "-", gender)
            sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;") 
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")   
    logging.info("load done")
    
    
extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'url':  Variable.get("csv_url") # ์ด ๋ถ€๋ถ„์€ ์•„๋ž˜์—์„œ ๋ฐ”๋กœ ์„ค๋ช…ํ•œ๋‹ค!
    },
    dag = dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    params = { 
    },  
    dag = dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = load,
    params = {
        'schema': 'Redshift ์Šคํ‚ค๋งˆ์ด๋ฆ„',
        'table': 'name_gender' # name_gender๋ผ๋Š” ์ด๋ฆ„์˜ ํ…Œ์ด๋ธ”์— ์ €์žฅํ•œ๋‹ค.
    },
    dag = dag)
    
extract >> transform >> load # DAG์˜ ์ˆœ์„œ๋ฅผ ๋ช…์‹œํ•œ๋‹ค.

PythonOperator์—์„œ ์‚ฌ์šฉํ•œ dag๋Š” ์•„๊นŒ ์ „๊ณผ ๊ฐ™์•„์„œ ์ƒ๋žตํ–ˆ๋‹ค.

ํ•„์š”ํ•œ ๊ฐ’์„ ๊ฐ์ž์˜ PythonOperator์˜ params๋ฅผ ํ†ตํ•ด ์ „๋‹ฌ๋ฐ›๊ณ , Xcom์„ ์‚ฌ์šฉํ•ด์„œ ์ „ DAG์˜ ๋ฆฌํ„ด ๊ฐ’์„ ์ „๋‹ฌ๋ฐ›๋Š”๋‹ค.

Variable.get("csv_url")์˜ ๊ฒฝ์šฐ, ์™ธ๋ถ€์—์„œ Airflow์— ์ €์žฅ๋œ ์ •๋ณด๋ฅผ ๋ถˆ๋Ÿฌ์˜จ ๊ฒƒ์œผ๋กœ, ๋ฐ”๋กœ ๋ฐ‘์—์„œ ์„ค๋ช…ํ•˜๊ฒ ๋‹ค.

 


 

๐ŸŸฉ Airflow์—์„œ ๋ฏผ๊ฐํ•œ ์ •๋ณด ๊ด€๋ฆฌํ•˜๊ธฐ

 

extract์˜ PythonOperator์—์„œ ์‚ฌ์šฉํ•œ Variable.get("csv_url")์€ Airflow์˜ Variable์— ์ •์˜๋œ url์„ ๊ฐ€์ ธ์˜จ ๊ฒƒ์ด๋‹ค.

 

์•„๋ž˜์˜ Airflow ์›น UI์—์„œ Admin > Variables > + ๋ฒ„ํŠผ์„ ํ†ตํ•ด Variables๋ฅผ ์ถ”๊ฐ€ํ•˜๋ฉด ๋œ๋‹ค.

Airflow์˜ ์›น UI ํ™”๋ฉด

 

csv_url ์ด๋ฆ„์œผ๋กœ csvํŒŒ์ผ์˜ ์ฃผ์†Œ๋ฅผ ๋„ฃ์–ด์ฃผ์ž

 

์ด์ฒ˜๋Ÿผ Airflow์—์„œ๋Š” Variable์„ ๋”ฐ๋กœ ๊ตฌ์„ฑํ•˜๊ณ  ์ €์žฅํ•˜๋ฉด, ๋ถˆ๋Ÿฌ์™€์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

์ด๋ฒˆ์—๋Š” Redshift Connection ์ •๋ณด๋ฅผ Airflow์— ๋„ฃ์–ด๋ณด์ž.

 

Admin > Connections

 

Redshift DB์˜ ์ •๋ณด๋ฅผ ์ž…๋ ฅํ–ˆ๋‹ค.

 

์‚ฌ์šฉํ•  ๋•Œ๋Š” Connection id๋กœ ์ž…๋ ฅํ–ˆ๋˜ ์ •๋ณด๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ”๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

+

Airflow ์ง„์งœ ์‹คํ–‰

airflow-setup/dags ํด๋”์— ํ•ด๋‹น py ํŒŒ์ผ ๋„ฃ๊ธฐ

์›นUI์—์„œ ํ™•์ธ (๋„์ปค๋กœ ์žฌ์‹คํ–‰ํ•„์š”)

Variable ๊ฐ’์ด ์ž˜ ์ €์žฅ๋˜์–ด์žˆ์–ด์•ผ DAG Import ์˜ค๋ฅ˜๊ฐ€ ์—†์Œ

 

๊ธฐํƒ€ QnA

 

์—ฌ๋Ÿฌ ๊ฐœ์˜ Task์™€ ํ•˜๋‚˜์˜ Task ์žฅ๋‹จ์ ?

ํ•˜๋‚˜์˜ Task --> ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๋Š” Task์˜ ๊ฒฝ์šฐ ์ฒ˜์Œ๋ถ€ํ„ฐ ๋‹ค์‹œ ์‹คํ–‰ํ•ด์•ผ ํ•˜์ง€๋งŒ, ์—ฌ๋Ÿฌ ๊ฐœ๋กœ ๋‚˜๋ˆ„๋ฉด ์‹คํŒจํ•œ ๋ถ€๋ถ„๋ถ€ํ„ฐ ์‹คํ–‰ํ•˜๋ฉด๋จ.

ํ•˜์ง€๋งŒ Task๋ฅผ ๋„ˆ๋ฌด ๋งŽ์ด ๋งŒ๋“ค๋ฉด DAG๊ฐ€ ์‹คํ–‰๋˜๋Š”๋ฐ ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๊ณ  ์Šค์ผ€์ค„๋Ÿฌ์— ๋ถ€ํ•˜๊ฐ€ ๊ฐ

์ ๋‹นํžˆ ๋‚˜๋ˆ„๋Š”๊ฒŒ ํ•„์š”.

 

PostgresHook์˜ autocommit Default๋Š” False๋กœ, ์ด๋•Œ๋Š” BEGIN ์˜๋ฏธx

 

Airflow์˜ variable ๊ด€๋ฆฌ

 

airflow.cfg (airflow ํ™˜๊ฒฝ์„ค์ •)

DAGs ํด๋”๋Š” ์–ด๋””์— ์ง€์ •๋˜๋Š”๊ฐ€? ์ง€์ •๋˜๋Š” ํ‚ค๊ฐ€ ๋ฌด์—‡์ธ๊ฐ€.

DAGs ํด๋”์— ์ƒˆ๋กœ์šด Dag๋ฅผ ๋งŒ๋“ค๋ฉด ์–ธ์ œ ์‹ค์ œ๋กœ Airflow ์‹œ์Šคํ…œ์—์„œ ์•Œ๊ฒŒ ๋˜๋‚˜? > 5๋ถ„

์ด ์Šค์บ” ์ฃผ๊ธฐ๋ฅผ ๊ฒฐ์ •ํ•ด์ฃผ๋Š” ํ‚ค์˜ ์ด๋ฆ„์€?

Airflow๋ฅผ APIํ˜•ํƒœ๋กœ ์™ธ๋ถ€์—์„œ ์กฐ์ž‘ํ•˜๋ ค๋ฉด ์–ด๋Š ์„น์…˜์„ ๋ณ€๊ฒฝํ•ด์•ผํ•˜๋Š”๊ฐ€

Variable์˜ ๋ณ€์ˆ˜์˜ ๊ฐ’์ด ์•”ํ˜ธํ™”๋˜๋ ค๋ฉด ๋ณ€์ˆ˜์˜ ์ด๋ฆ„์— ์–ด๋–ค ๋‹จ์–ด๊ฐ€ ๋“ค์–ด๊ฐ€์•ผ ํ• ๊นŒ?

์ด ํ™˜๊ฒฝ์„ค์ • ํŒŒ์ผ์„ ์‹ค์ œ๋กœ ๋ฐ˜์˜ํ•˜๋ ค๋ฉด ํ•„์š”ํ•œ ์ผ์ด ์žˆ์Œ. ๋ฌด์—‡์ธ๊ฐ€?

Metadata DB ๋‚ด์šฉ์„ ์•”ํ˜ธํ™”ํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋˜๋Š” ํ‚ค๋Š”? (MySQL, PostgreSQL๋ฅผ ์•”ํ˜ธํ™”ํ•˜๋Š” ํ‚ค?)