์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- DataFrame Hint
- aws
- mysql
- etl
- Spark
- Spark Partitioning
- k8s
- Speculative Execution
- Salting
- SQL
- colab
- spark executor memory
- Spark Caching
- backfill
- KDT_TIL
- redshift
- Kubernetes
- disk spill
- Airflow
- Kafka
- CI/CD
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- ๋น ๋ฐ์ดํฐ
- off heap memory
- Spark SQL
- Dag
- topic
- Spark ์ค์ต
- Docker
- AQE
- Today
- Total
JUST DO IT!
Airflow DAG - TIL230607 ๋ณธ๋ฌธ
๐ KDT WEEK 10 DAY 3 TIL
- Airflow ์์ ํ๋ก๊ทธ๋จ ํฌํ
- API DAG ์์ฑํ๊ธฐ
๐ฅ Hello World DAG ์์ ํ๋ก๊ทธ๋จ
DAG ๊ตฌ์กฐ ์์๋ณด๊ธฐ
dag = DAG(
dag_id = 'HelloWorld', # DAG ID
start_date = datetime(2022,5,5), # ์์์ผ
catchup=False, # ์๋ ์ค๋ช
์ฐธ๊ณ
tags=['example'], # DAG TAG
schedule = '0 2 * * *') # ๋ถ, ์, ์ผ, ์, ์์ผ ์์ผ๋ก ์คํ์ฃผ๊ธฐ ์ค์ / ๋งค์ผ 2์๋ง๋ค ์คํ๋๋ค.
- max_active_runs : ํ๋ฒ์ ๋์์ ์คํ๋ ์ ์๋ DAG ์ (Backfillํ ๋ ์ค์) (Worker์ ํ ๋น๋ CPU ์ดํฉ์ด ์ต๋)
- max_active_tasks : ์ด DAG์ ์ํ Task๊ฐ ํ ๋ฒ์ ๋์์ ์คํ๋ ์ ์๋ Task ์
- catchup
- DAG๋ฅผ ์คํ ์ค์ด๋ค๊ฐ ๋ค์ DAG๋ฅผ ์คํํ ์๊ฐ์ ๋๊ฒผ์ ๋๋ ์ด์ DAG๋ฅผ ๋ง์น๊ณ ๊ฐ ๊ฒ์ธ์ง
- start_date๊ฐ ํ์ฌ๋ณด๋ค ๊ณผ๊ฑฐ์ผ ๋ ๊ณผ๊ฑฐ์ ์คํ๋์ง ๋ชปํ ์ผ๊น์ง ๋ชจ๋ ์คํํ ๊ฒ์ธ์ง
PythonOperator๋ฅผ ์ฌ์ฉํด๋ณด์.
from airflow.operators.python import PythonOperator
# DAG ์ ์
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
)
# PythonOperator ์ ์
load_nps = PythonOperator(
dag=dag,
task_id='task_id', # id๋ก ์ฌ์ฉ๋ ์ด๋ฆ์ ์ง์
python_callable=python_func, # task ์คํ ์ ๋ถ๋ฌ์ฌ ํจ์ ์ง์
# ํ๋ผ๋ฏธํฐ๋ฅผ ๋๊ฒจ์ค ์๋ ์๋ค.
# dictionary ํํ๋ก ๋๊ธฐ๋ฉด ๋๋ค.
params={
'table': 'delighted_nps',
'schema': 'raw_data'
},
)
# PythonOperator์์ ๋ถ๋ฌ์ฌ ํจ์
def python_func(**cxt):
# ๋๊ฒจ์ค ํ๋ผ๋ฏธํฐ๋ฅผ ๋ถ๋ฌ์ฌ ๋๋ cxt["params"]["ํค"] ํํ๋ก ๋๊ธด๋ค.
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
PythonOperator๋ฅผ ํตํด ์ ์๋ dag์ ํจ์๋ฅผ ๋ฐ์ ๋ถ๋ฌ์ค๋ ๊ตฌ์กฐ์ด๋ค.
PythonOperator์์ params๋ฅผ ์ฌ์ ํํ๋ก ์ง์ ํ๋ฉด, ํจ์์์ ํด๋น ๊ฐ์ ๋ถ๋ฌ์ฌ ์ ์๋ค.
ํํ๋ cxt["params"]["ํค"] ํํ๋ฅผ ์ฌ์ฉํ๋ค. (๋ฌผ๋ก cxt๋ ํจ์์์ ๋ฐ์ ์ธ์์ ์ด๋ฆ์ ์ฌ์ฉํ๋ค)
์์
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# DAG
dag = DAG(
dag_id = 'HelloWorld',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *')
# hello์ goodbye ๋ ๊ฐ์ ํจ์๋ฅผ ์ ์ํ๋ค.
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
# ๋ ๊ฐ์ ํจ์๋ฅผ ์ฌ์ฉํ PythonOperator๋ฅผ ์ ์ํ๋ค.
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
# DAG์ ์คํ ์์๋ฅผ ๋ช
์ํ๋ค.
# hello DAG ์์
์ด ๋๋์ผ goodbye DAG ์์
์ด ์คํ๋ ๊ฒ์ด๋ค.
print_hello >> print_goodbye
Task Decorator
์ด๋ฒ์๋ Airflow์์ ์ ๊ณตํ๋ Task Decorator๋ฅผ ์ฌ์ฉํด๋ณด์.
์๊น๋ณด๋ค ๊ตฌ์กฐ๊ฐ ๋ ๊ฐ๋จํด์ง ์ ์๋ค.
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
@task Decorator๋ฅผ ์ฌ์ฉํ ํจ์ ์์ ์ ์ํด์ฃผ๋ฉด, ํจ์ ์์ฒด๋ฅผ task๋ก ์ ์ํด์ฃผ์ด Operator ์ ์๊ฐ ํ์์์ด์ง๋ค.
์ด๋ ๋ฐ์ DAG์ ์์๋ฅผ ์ ์ํ ๋๋ ํจ์์ ์ด๋ฆ์ ์ฌ์ฉํ๋ฉด ๋๋ค.
๐ฆ S3์ csvํ์ผ ETL ์์ ํ๋ก๊ทธ๋จ Airflow ํฌํ
์ ๋ฒ ๊ธ์์ Colab์ผ๋ก ETL ์ค์ตํ๋ ๋ด์ฉ์ Airflow๋ก ๋์ํ๋๋ก ํฌํ ํด๋ณด์.
(๋งํฌ)
์ ๋ฒ ์ฝ๋๋ฅผ ์งง๊ฒ ์์ฝํ์๋ฉด,
S3์ ์ ์ฅ๋ name_gender.csv (์ด๋ฆ, ์ฑ๋ณ๋ก ๊ตฌ์ฑ)์ ๋ด Redshift ํ ์ด๋ธ๋ก INSERTํ๋ ์ฝ๋์๋ค.
์ด ETL์ ํ๋์ Task์ ์ฌ๋ฌ ๊ฐ์ Task๋ก ๊ตฌ์ฑํด์ ํฌํ ํ๋ ๋ฐฉ๋ฒ์ ๊ฐ๊ฐ ์์๋ณธ๋ค.
๐ง ํ๋์ Task๋ก ๋ง๋ค์ด๋ณด๊ธฐ
๋จผ์ extract, transform, load๋ฅผ ํ๋์ Task๋ก ๊ตฌ์ฑํด์ ํฌํ ํด๋ณด์.
ํ๋์ Task๋ก ๊ตฌ์ฑํ๊ธฐ ๋๋ฌธ์ ํ๋์ PythonOperator๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ค.
๋ค์์ ์ฝ๋๋ฅผ ์ถ๊ฐํ๋ฉด ๋๋ค.
def etl(**context):
link = context["params"]["url"] # PythonOperator์ params๋ฅผ ๊ฐ์ ธ์จ๋ค!
task_instance = context['task_instance'] # Task์ ์ ๋ณด๊ฐ ๋์ด๊ฐ๋ค.
execution_date = context['execution_date'] # ์ค์ผ์ฅด๋ง์ ๊ด๋ จ๋ ๋ฐ์ดํฐ๋ก, ๋ค์์ ๋ฐฐ์ด๋ค.
logging.info(execution_date)
# ์ฌ๊ธฐ์ extract, transform, load๊ฐ ๋ฌถ์ฌ ํ๋์ Task์ฒ๋ผ ๋์ํ๊ฒ ๋๋ค.
data = extract(link)
lines = transform(data)
load(lines)
dag = DAG(
dag_id = 'name_gender_v2',
start_date = datetime(2023,4,6), # ๋ ์ง๊ฐ ๋ฏธ๋์ธ ๊ฒฝ์ฐ ์คํ์ด ์๋จ
schedule = '0 2 * * *', # ๋งค์ผ 2์๋ง๋ค ์คํ
catchup = False,
max_active_runs = 1, # ํ ๋ฒ์ ์ฌ๋ฌ ๊ฐ๊ฐ ๋์ํ์ง ์๋๋ก
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl, # ํ๋์ Task๋ก ๋ฌถ์ ํจ์๋ฅผ ๋ถ๋ฌ์จ๋ค.
params = {
'url': "S3 ์ฃผ์๋ฅผ ์
๋ ฅ"
},
dag = dag)
extract์์ ์ฌ์ฉํ context['params']['url']์ ๊ฒฝ์ฐ, PythonOperator์์ ์ ์๋ ํ๋ผ๋ฏธํฐ๋ฅผ ๊ฐ์ ธ์ค๊ฒ ๋๋ค.
์ด ๋ถ๋ถ์ ์ ๊ธ์์ ์ค๋ช ํ์๋ค.
Extract, Transform, Load๊ฐ ๋์ํ ํ๋์ ํจ์ etl์ ๋ง๋ค์ด PythonOperator ํจ์์ ๋ฃ๋๋ค.
dag๋ ์ ๊ธ์ ์ค๋ช ํ๋ ๊ฒ์ฒ๋ผ ์คํ ์ฃผ๊ธฐ ๋ฑ๋ฑ์ ์ ํด์ ๋ฃ์ด์ฃผ์.
์ ์ฒด ์ฝ๋
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "redshift ์๋ํฌ์ธํธ ์ฃผ์"
redshift_user = "ID"
redshift_pass = "PASSWORD"
port = 5439 # Redshift Port
dbname = "DB์ด๋ฆ"
conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("Transform started")
lines = text.strip().split("\n")[1:] # ์ฒซ ๋ฒ์งธ ๋ผ์ธ์ ์ ์ธํ๊ณ ์ฒ๋ฆฌ
records = []
for l in lines:
(name, gender) = l.split(",") # CSV ํ์ผ
records.append([name, gender])
logging.info("Transform ended")
return records
def load(records):
logging.info("load started")
schema = "Redshift ์คํค๋ง์ด๋ฆ"
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;") # ํด๋น ์คํค๋ง์ name_gender ํ
์ด๋ธ ๋ด์ฉ ์ ๊ฑฐ
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" # ํด๋น ์คํค๋ง์ name_gender ํ
์ด๋ธ์ ์ฝ์
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
def etl(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
data = extract(link)
lines = transform(data)
load(lines)
dag = DAG(
dag_id = 'name_gender_v2',
start_date = datetime(2023,4,6),
schedule = '0 2 * * *',
catchup = False,
max_active_runs = 1,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
params = {
'url': "S3 CSVํ์ผ ์ฃผ์"
},
dag = dag)
๐ ๏ธ ์ฌ๋ฌ ๊ฐ์ Task๋ก ๊ตฌ์ฑํด๋ณด๊ธฐ
์ด๋ฒ์๋ extract, transform, load๋ฅผ ๊ฐ๊ฐ์ Task๋ก ๊ตฌ์ฑํ์ฌ ๋์ํ๋๋ก ๊ตฌํํด๋ณด์.
์ฌ๊ธฐ์๋ ๊ฐ Task๊ฐ ๊ฐ์ ์ด๋ป๊ฒ ์ฃผ๊ณ ๋ฐ์์ง๊ฐ ์ค์ํ๋ค.
Airflow์์๋ ๊ทธ ๊ฐ์ ์ฃผ๊ณ ๋ฐ๋ ๋ฐฉ์์ผ๋ก Xcom์ ์ฌ์ฉํ๋ค.
Xcom
- Task(Operator)๋ค๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๊ธฐ ์ํ ๋ฐฉ์
- ๋ณดํต ํ Operator์ ๋ฆฌํด๊ฐ์ ๋ค๋ฅธ Operator์์ ์ฝ๋ ํํ
- ์ด ๋ฐ์ดํฐ๋ค์ Airflow ๋ฉํ ๋ฐ์ดํฐ DB์ ์ ์ฅ๋๋ฏ๋ก ํฐ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๊ธฐ์๋ ์ด๋ ค์
- ํฐ ๋ฐ์ดํฐ์ ๊ฒฝ์ฐ S3 ๋ฑ์ ๋ก๋ํด์ ๊ทธ ์์น๋ฅผ ๋ฐ๋๊ฒ ์ผ๋ฐ์ ์ด๋ค.
task ์ ๋ณด๋ก๋ถํฐ xcom_pull์ ๋ถ๋ฌ์ค๋ ๋ฐฉ์์ ์ฌ์ฉํ๋ค.
context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
์ด ์ฝ๋๋ฅผ ์ฌ์ฉํ๋ฉด, task id๊ฐ transform์ธ ํจ์์ return ๊ฐ์ ๊ฐ์ ธ์ฌ ์ ์๋ค.
์ฌ๋ฌ ๊ฐ์ Task๋ก ๊ตฌ์ฑํ ๊ฒฝ์ฐ ์์ ์ฝ๋์์ ๋ณ๊ฒฝ๋ ๊ฐ๊ฐ์ ํจ์์ PythonOperator๋ฅผ ํ์ธํด๋ณด์.
# ๋ชจ๋ ์ธ์๋ก context๋ฅผ ๋ฐ๊ฒ ๋์๋ค.
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
logging.info("Transform started")
# extract์ ๋ฆฌํด ๊ฐ์ ๋ฐ๊ธฐ ์ํด Xcom์ ์ฌ์ฉํ๋ค.
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:]
records = []
for l in lines:
(name, gender) = l.split(",") # CSV ํ์ผ
records.append([name, gender])
logging.info("Transform ended")
return records
def load(**context):
logging.info("load started")
#PythonOperator์์ ์ ์๋ shema์ table ๊ฐ์ ๊ฐ์ ธ์จ๋ค.
schema = context["params"]["schema"]
table = context["params"]["table"]
# transform ํจ์์ ๋ฆฌํด ๊ฐ์ Xcom์ผ๋ก ๊ฐ์ ธ์จ๋ค.
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url") # ์ด ๋ถ๋ถ์ ์๋์์ ๋ฐ๋ก ์ค๋ช
ํ๋ค!
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'Redshift ์คํค๋ง์ด๋ฆ',
'table': 'name_gender' # name_gender๋ผ๋ ์ด๋ฆ์ ํ
์ด๋ธ์ ์ ์ฅํ๋ค.
},
dag = dag)
extract >> transform >> load # DAG์ ์์๋ฅผ ๋ช
์ํ๋ค.
PythonOperator์์ ์ฌ์ฉํ dag๋ ์๊น ์ ๊ณผ ๊ฐ์์ ์๋ตํ๋ค.
ํ์ํ ๊ฐ์ ๊ฐ์์ PythonOperator์ params๋ฅผ ํตํด ์ ๋ฌ๋ฐ๊ณ , Xcom์ ์ฌ์ฉํด์ ์ DAG์ ๋ฆฌํด ๊ฐ์ ์ ๋ฌ๋ฐ๋๋ค.
Variable.get("csv_url")์ ๊ฒฝ์ฐ, ์ธ๋ถ์์ Airflow์ ์ ์ฅ๋ ์ ๋ณด๋ฅผ ๋ถ๋ฌ์จ ๊ฒ์ผ๋ก, ๋ฐ๋ก ๋ฐ์์ ์ค๋ช ํ๊ฒ ๋ค.
๐ฉ Airflow์์ ๋ฏผ๊ฐํ ์ ๋ณด ๊ด๋ฆฌํ๊ธฐ
extract์ PythonOperator์์ ์ฌ์ฉํ Variable.get("csv_url")์ Airflow์ Variable์ ์ ์๋ url์ ๊ฐ์ ธ์จ ๊ฒ์ด๋ค.
์๋์ Airflow ์น UI์์ Admin > Variables > + ๋ฒํผ์ ํตํด Variables๋ฅผ ์ถ๊ฐํ๋ฉด ๋๋ค.
์ด์ฒ๋ผ Airflow์์๋ Variable์ ๋ฐ๋ก ๊ตฌ์ฑํ๊ณ ์ ์ฅํ๋ฉด, ๋ถ๋ฌ์์ ์ฌ์ฉํ ์ ์๋ค.
์ด๋ฒ์๋ Redshift Connection ์ ๋ณด๋ฅผ Airflow์ ๋ฃ์ด๋ณด์.
์ฌ์ฉํ ๋๋ Connection id๋ก ์ ๋ ฅํ๋ ์ ๋ณด๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฐ๋ก ์ฌ์ฉํ ์ ์๋ค.
+
Airflow ์ง์ง ์คํ
airflow-setup/dags ํด๋์ ํด๋น py ํ์ผ ๋ฃ๊ธฐ
์นUI์์ ํ์ธ (๋์ปค๋ก ์ฌ์คํํ์)
Variable ๊ฐ์ด ์ ์ ์ฅ๋์ด์์ด์ผ DAG Import ์ค๋ฅ๊ฐ ์์
๊ธฐํ QnA
์ฌ๋ฌ ๊ฐ์ Task์ ํ๋์ Task ์ฅ๋จ์ ?
ํ๋์ Task --> ์ค๋ ๊ฑธ๋ฆฌ๋ Task์ ๊ฒฝ์ฐ ์ฒ์๋ถํฐ ๋ค์ ์คํํด์ผ ํ์ง๋ง, ์ฌ๋ฌ ๊ฐ๋ก ๋๋๋ฉด ์คํจํ ๋ถ๋ถ๋ถํฐ ์คํํ๋ฉด๋จ.
ํ์ง๋ง Task๋ฅผ ๋๋ฌด ๋ง์ด ๋ง๋ค๋ฉด DAG๊ฐ ์คํ๋๋๋ฐ ์ค๋ ๊ฑธ๋ฆฌ๊ณ ์ค์ผ์ค๋ฌ์ ๋ถํ๊ฐ ๊ฐ
์ ๋นํ ๋๋๋๊ฒ ํ์.
PostgresHook์ autocommit Default๋ False๋ก, ์ด๋๋ BEGIN ์๋ฏธx
Airflow์ variable ๊ด๋ฆฌ
airflow.cfg (airflow ํ๊ฒฝ์ค์ )
DAGs ํด๋๋ ์ด๋์ ์ง์ ๋๋๊ฐ? ์ง์ ๋๋ ํค๊ฐ ๋ฌด์์ธ๊ฐ.
DAGs ํด๋์ ์๋ก์ด Dag๋ฅผ ๋ง๋ค๋ฉด ์ธ์ ์ค์ ๋ก Airflow ์์คํ ์์ ์๊ฒ ๋๋? > 5๋ถ
์ด ์ค์บ ์ฃผ๊ธฐ๋ฅผ ๊ฒฐ์ ํด์ฃผ๋ ํค์ ์ด๋ฆ์?
Airflow๋ฅผ APIํํ๋ก ์ธ๋ถ์์ ์กฐ์ํ๋ ค๋ฉด ์ด๋ ์น์ ์ ๋ณ๊ฒฝํด์ผํ๋๊ฐ
Variable์ ๋ณ์์ ๊ฐ์ด ์ํธํ๋๋ ค๋ฉด ๋ณ์์ ์ด๋ฆ์ ์ด๋ค ๋จ์ด๊ฐ ๋ค์ด๊ฐ์ผ ํ ๊น?
์ด ํ๊ฒฝ์ค์ ํ์ผ์ ์ค์ ๋ก ๋ฐ์ํ๋ ค๋ฉด ํ์ํ ์ผ์ด ์์. ๋ฌด์์ธ๊ฐ?
Metadata DB ๋ด์ฉ์ ์ํธํํ๋๋ฐ ์ฌ์ฉ๋๋ ํค๋? (MySQL, PostgreSQL๋ฅผ ์ํธํํ๋ ํค?)
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Airflow ETL์์ Primary Key Uniqueness SQL๋ก ๋ณด์ฅํ๊ธฐ - TIL230608 (0) | 2023.06.08 |
---|---|
Yahoo finance API ์ฌ์ฉํด์ Airflow DAG ๋ง๋ค์ด๋ณด๊ธฐ - TIL230607 (0) | 2023.06.08 |
Redshift๋ฅผ Superset์ ์ฐ๋ํ๊ณ ๋์๋ณด๋ ๋ง๋ค๊ธฐ - TIL230526 (0) | 2023.05.26 |
Snowflake ์์๋ณด๊ณ , S3์์ COPYํด๋ณด๊ธฐ - TIL230525 (0) | 2023.05.25 |
Redshift ๊ณ ๊ธ ๊ธฐ๋ฅ -TIL230524 (1) | 2023.05.24 |