์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 | 29 |
30 | 31 |
- KDT_TIL
- SQL
- backfill
- Dag
- Salting
- aws
- Spark ์ค์ต
- DataFrame Hint
- Docker
- Speculative Execution
- CI/CD
- Spark Caching
- Airflow
- mysql
- topic
- colab
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Kafka
- etl
- AQE
- ๋น ๋ฐ์ดํฐ
- spark executor memory
- Spark
- redshift
- off heap memory
- Spark Partitioning
- k8s
- disk spill
- Kubernetes
- Spark SQL
- Today
- Total
JUST DO IT!
Airflow ETL์์ Primary Key Uniqueness SQL๋ก ๋ณด์ฅํ๊ธฐ - TIL230608 ๋ณธ๋ฌธ
Airflow ETL์์ Primary Key Uniqueness SQL๋ก ๋ณด์ฅํ๊ธฐ - TIL230608
sunhokimDev 2023. 6. 8. 22:19๐ KDT WEEK 10 DAY 4 TIL
- Primary Key Uniqueness ๋ณด์ฅํ๊ธฐ
๐ฅ Primary Key Uniqueness ๋ณด์ฅํ๊ธฐ
- PK : ํ ์ด๋ธ์์ ํ๋์ ๋ ์ฝ๋๋ฅผ ์ ์ผํ๊ฒ ์ง์นญํ ์ ์๋ ํ๋(๋ค)
- ๊ด๊ณํ ๋ฐ์ดํฐ๋ฒ ์ด์ค ์์คํ ์ด Primary key์ ๊ฐ์ด ์ค๋ณต ์กด์ฌํ๋ ๊ฒ์ ๋ง์์ค๋ค.
- ๋น ๋ฐ์ดํฐ ๊ธฐ๋ฐ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค๋ค์ Primary key Uniqueness๋ฅผ ๋ณด์ฅํ์ง ์์ผ๋ฏ๋ก ์์ง๋์ด๊ฐ ๋ณด์ฅํด์ผํจ
๋ ์จ ๋ฐ์ดํฐ API(https://openweathermap.org/api)๋ฅผ ๊ฐ์ ธ์ค๋ ๊ณผ์ ์์ PK Uniquness๋ฅผ ๋ณด์ฅํด๋ณด๋ ค๊ณ ํ๋ค.
SQL ๋ฌธ๋ฒ์ ์ฌ์ฉํด์ Upsert(๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ์๋ก ์ ๋ฐ์ดํธ)๋ฐฉ์์ ์ฌ์ฉํ ๊ฒ์ด๋ค.
์ฐธ๊ณ ๋ก ์ ์ฌ ๋์ DB์ธ Redshift์์๋ Upsert ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค๊ณ ํ๋ค. ์ด๊ฑด ๋จ์ ํ์ต์ฉ์ผ๋ก ๊ธฐ๋ก๋๋ ๊ธ์ด๋ค!
๋ค์์ ๋ ์จ ์ ๋ณด ํ ์ด๋ธ์ ์ฌ์ฉํ๋ค๊ณ ๊ฐ์ ํด๋ณด์.
CREATE TABLE sunhokim_public.weather_forecast(
date date primary key,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);
- date : ํด๋น ๋ ์ง(์ ๋ ์จ ์ ๋ณด)
- temp : ๊ธฐ์จ
- min_temp : ์ต์ ๊ธฐ์จ
- max_temp : ์ต๋ ๊ธฐ์จ
- created_date : ๋ ์จ ์ ๋ณด ์์ฑ ๋ ์ง > ๋ฐ์ดํฐ๊ฐ ๋ค์ด๊ฐ๋ฉด์ ํ์ฌ ์๊ฐ ์๋ ์ ๋ ฅ
๋ ์จ ์ ๋ณด์ ๊ฒฝ์ฐ ํด๋น ๋ ์ง์ ๊ฐ๊น์์ง์๋ก ๋ ์ ํํ ์ ๋ณด๊ฐ ๋๋ฏ๋ก created_date๊ฐ ์ต์ ์ผ์๋ก ๊ฐ์น์๋ ๋ฐ์ดํฐ๋ก ํ๋จํ๋ค.
๋ ์จ ์ ๋ณด๋ฅผ ์ด๋ค API๋ฅผ ํตํด ๊ฐ์ ธ์์ Airflow ETL์ Incremental Update(์๋ก์ด ๋ฐ์ดํฐ ์ฝ์ ) ๋ฐฉ์์ ์ฌ์ฉํ๋ค๋ฉด,
2์ผ์ฐจ 3์ผ์ฐจ ๋ฐ์ดํฐ์๋ ๊ฐ์ ๋ ์ง์ ์ค๋ณต ๋ ์จ ๋ฐ์ดํฐ๊ฐ ์๊ธธ ๊ฒ์ด๋ค.
์ด๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ ๊ณผ์ ์์ ์ค๋ณต ๋ฐ์ดํฐ ์ค ๋ ๊ฐ์น์๋ ๋ฐ์ดํฐ๋ฅผ ์ฐ์ ํ์ฌ ์ ์ฌํด๋ณด์.
Airflow์์ Redshift๋ฅผ ์ฐ๊ฒฐํ๊ฑฐ๋, ETL์ ๋ง๋๋ ๊ณผ์ ์ ์ด์ ๊ธ์ ์ ์์ผ๋ ๋ชจ๋ ์๋ตํ์๋ค.
๋ฐ์ดํฐ ์ค ๊ฐ์ date ํ๋๋ฅผ ๋ฌถ์ด created_date๋ฅผ ์ ๋ ฌํ๋ ค๋ฉด, ROW_NUMBER() ํจ์๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค.
PARTITION BY์ date ํ๋๋ฅผ ๋ฃ์ด ๋ฌถ๊ณ , ORDER BY์ created_date๋ฅผ ๋ฃ๊ณ ๋ด๋ฆผ์ฐจ์(DESC)๋ฅผ ์ ์ฉํ๋ค.
ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
ํด๋น ํจ์๋ฅผ ์ฌ์ฉํด์ SELECT ๋ฌธ์ผ๋ก ์ถ๋ ฅํด๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ์ ํํ๋ก ๋์ฌ ๊ฒ์ด๋ค!
date | created_date | temp | seq |
2021-01-01 | 2021-01-03 | 6 | 1 |
2021-01-01 | 2021-01-02 | 5 | 2 |
2021-01-01 | 2021-01-01 | 2 | 3 |
2021-01-02 | 2021-01-03 | 3 | 1 |
2021-01-02 | 2021-01-01 | 4 | 2 |
์ด ๋ถ๋ถ์์ WHERE์ ํตํด seq = 1์ธ ๋ถ๋ถ๋ง ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ฉด ๋ ๊ฒ์ด๋ค.
๋ฐ๋ผ์ ์ ์ฌ์์ ์ฌ์ฉํ INSERT ๋ฌธ์ ๋ค์๊ณผ ๊ฐ์ด ์ฌ์ฉํ๋ค.
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;
๋ฐ์ดํฐ ์ ์ฌ์ INSERT๋ฌธ์ ์์ฒ๋ผ ์ฌ์ฉํ๋ฉด ์ค๋ณต๋ฐ์ดํฐ๋ ์๋ก ์ ๋ฐ์ดํธํ๊ณ , ์๋ก์ด ๋ฐ์ดํฐ๋ ์๋ก ์ ์ฌํ๊ฒ ๋๋ค.
์ ์ฒด ์ฝ๋
from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
import requests
import logging
import json
# Airflow์ ์ ์ฅ๋ ์ ๋ณด๋ฅผ ํตํด Redshift ์ฐ๊ฒฐ
def get_Redshift_connection():
# autocommit is False by default
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
return hook.get_conn().cursor()
# ETL (use Airflow Task Decorator)
@task
def etl(schema, table, lat, lon, api_key):
# ๋ ์จ API ๋ฐ์ดํฐ ์์ฒญ
url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
response = requests.get(url)
data = json.loads(response.text)
"""
{'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
"""
# ๋ฐ์ ์์ฒญ์ ๋ฐ์ดํฐ์ ๋ฐ๋ผ ํ์ํ ํ๋๋ง ์ ์ฅ
ret = []
for d in data["daily"]:
day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))
cur = get_Redshift_connection()
# ์๋ณธ ํ
์ด๋ธ์ด ์๋ค๋ฉด ์์ฑ
create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
temp float,
min_temp float,
max_temp float,
created_date timestamp default GETDATE()
);"""
logging.info(create_table_sql)
# ์์ ํ
์ด๋ธ ์์ฑ (CTAS ์๋ณธํ
์ด๋ธ)
create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
logging.info(create_t_sql)
try:
cur.execute(create_table_sql)
cur.execute(create_t_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# ์์ ํ
์ด๋ธ ๋ฐ์ดํฐ ์
๋ ฅ
insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
logging.info(insert_sql)
try:
cur.execute(insert_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# ๊ธฐ์กด ํ
์ด๋ธ ๋์ฒด
alter_sql = f"""DELETE FROM {schema}.{table};
INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;"""
logging.info(alter_sql)
try:
cur.execute(alter_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise
# DAG ์ ์
with DAG(
dag_id = 'Weather_to_Redshift_v2',
start_date = datetime(2022,8,24), # ๋ ์ง๊ฐ ๋ฏธ๋์ธ ๊ฒฝ์ฐ ์คํ์ด ์๋จ
schedule = '0 4 * * *', # ๋งค์ผ 4์๋ง๋ค
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
) as dag:
# ์์๋๋ก ์คํค๋ง์ด๋ฆ, ํ
์ด๋ธ์ด๋ฆ, ์์ธ์ขํ1, ์์ธ์ขํ2, ๋ ์จAPI KEY
etl("sunhokim_public", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
MySQL โก๏ธ Redshift์ Airflow ETL ๊ตฌํํด๋ณด๊ธฐ - TIL230609 (0) | 2023.06.09 |
---|---|
Airflow Backfill - TIL230608 (0) | 2023.06.08 |
Yahoo finance API ์ฌ์ฉํด์ Airflow DAG ๋ง๋ค์ด๋ณด๊ธฐ - TIL230607 (0) | 2023.06.08 |
Airflow DAG - TIL230607 (2) | 2023.06.07 |
Redshift๋ฅผ Superset์ ์ฐ๋ํ๊ณ ๋์๋ณด๋ ๋ง๋ค๊ธฐ - TIL230526 (0) | 2023.05.26 |