JUST DO IT!

Airflow ETL์—์„œ Primary Key Uniqueness SQL๋กœ ๋ณด์žฅํ•˜๊ธฐ - TIL230608 ๋ณธ๋ฌธ

TIL

Airflow ETL์—์„œ Primary Key Uniqueness SQL๋กœ ๋ณด์žฅํ•˜๊ธฐ - TIL230608

sunhokimDev 2023. 6. 8. 22:19

๐Ÿ“š KDT WEEK 10 DAY 4 TIL

  • Primary Key Uniqueness ๋ณด์žฅํ•˜๊ธฐ

 


 

๐ŸŸฅ Primary Key Uniqueness ๋ณด์žฅํ•˜๊ธฐ

  • PK : ํ…Œ์ด๋ธ”์—์„œ ํ•˜๋‚˜์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ์œ ์ผํ•˜๊ฒŒ ์ง€์นญํ•  ์ˆ˜ ์žˆ๋Š” ํ•„๋“œ(๋“ค)
  • ๊ด€๊ณ„ํ˜• ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์‹œ์Šคํ…œ์ด Primary key์˜ ๊ฐ’์ด ์ค‘๋ณต ์กด์žฌํ•˜๋Š” ๊ฒƒ์„ ๋ง‰์•„์ค€๋‹ค.
  • ๋น…๋ฐ์ดํ„ฐ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค๋“ค์€ Primary key Uniqueness๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ ์—”์ง€๋‹ˆ์–ด๊ฐ€ ๋ณด์žฅํ•ด์•ผํ•จ

๋‚ ์”จ ๋ฐ์ดํ„ฐ API(https://openweathermap.org/api)๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ๊ณผ์ •์—์„œ PK Uniquness๋ฅผ ๋ณด์žฅํ•ด๋ณด๋ ค๊ณ  ํ•œ๋‹ค.

SQL ๋ฌธ๋ฒ•์„ ์‚ฌ์šฉํ•ด์„œ Upsert(๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ ์ƒˆ๋กœ ์—…๋ฐ์ดํŠธ)๋ฐฉ์‹์„ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋‹ค.

์ฐธ๊ณ ๋กœ ์ ์žฌ ๋Œ€์ƒ DB์ธ Redshift์—์„œ๋Š” Upsert ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค๊ณ  ํ•œ๋‹ค. ์ด๊ฑด ๋‹จ์ˆœ ํ•™์Šต์šฉ์œผ๋กœ ๊ธฐ๋ก๋˜๋Š” ๊ธ€์ด๋‹ค!

 

 

๋‹ค์Œ์˜ ๋‚ ์”จ ์ •๋ณด ํ…Œ์ด๋ธ”์„ ์‚ฌ์šฉํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•ด๋ณด์ž.

 

CREATE TABLE sunhokim_public.weather_forecast(
    date date primary key,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);

 

  • date : ํ•ด๋‹น ๋‚ ์งœ(์˜ ๋‚ ์”จ ์ •๋ณด)
  • temp : ๊ธฐ์˜จ
  • min_temp : ์ตœ์ € ๊ธฐ์˜จ
  • max_temp : ์ตœ๋Œ€ ๊ธฐ์˜จ
  • created_date : ๋‚ ์”จ ์ •๋ณด ์ƒ์„ฑ ๋‚ ์งœ > ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ€๋ฉด์„œ ํ˜„์žฌ ์‹œ๊ฐ„ ์ž๋™ ์ž…๋ ฅ

๋‚ ์”จ ์ •๋ณด์˜ ๊ฒฝ์šฐ ํ•ด๋‹น ๋‚ ์งœ์— ๊ฐ€๊นŒ์›Œ์งˆ์ˆ˜๋ก ๋” ์ •ํ™•ํ•œ ์ •๋ณด๊ฐ€ ๋˜๋ฏ€๋กœ created_date๊ฐ€ ์ตœ์‹ ์ผ์ˆ˜๋ก ๊ฐ€์น˜์žˆ๋Š” ๋ฐ์ดํ„ฐ๋กœ ํŒ๋‹จํ•œ๋‹ค.

 

๋‚ ์”จ ์ •๋ณด๋ฅผ ์–ด๋–ค API๋ฅผ ํ†ตํ•ด ๊ฐ€์ ธ์™€์„œ Airflow ETL์„ Incremental Update(์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ ์‚ฝ์ž…) ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด,

2์ผ์ฐจ 3์ผ์ฐจ ๋ฐ์ดํ„ฐ์—๋Š” ๊ฐ™์€ ๋‚ ์งœ์˜ ์ค‘๋ณต ๋‚ ์”จ ๋ฐ์ดํ„ฐ๊ฐ€ ์ƒ๊ธธ ๊ฒƒ์ด๋‹ค.

์ด๋•Œ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•˜๋Š” ๊ณผ์ •์—์„œ ์ค‘๋ณต ๋ฐ์ดํ„ฐ ์ค‘ ๋” ๊ฐ€์น˜์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์šฐ์„ ํ•˜์—ฌ ์ ์žฌํ•ด๋ณด์ž.

 

Airflow์—์„œ Redshift๋ฅผ ์—ฐ๊ฒฐํ•˜๊ฑฐ๋‚˜, ETL์„ ๋งŒ๋“œ๋Š” ๊ณผ์ •์€ ์ด์ „ ๊ธ€์— ์ ์—ˆ์œผ๋‹ˆ ๋ชจ๋‘ ์ƒ๋žตํ•˜์˜€๋‹ค.

 

๋ฐ์ดํ„ฐ ์ค‘ ๊ฐ™์€ date ํ•„๋“œ๋ฅผ ๋ฌถ์–ด created_date๋ฅผ ์ •๋ ฌํ•˜๋ ค๋ฉด, ROW_NUMBER() ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

PARTITION BY์— date ํ•„๋“œ๋ฅผ ๋„ฃ์–ด ๋ฌถ๊ณ , ORDER BY์— created_date๋ฅผ ๋„ฃ๊ณ  ๋‚ด๋ฆผ์ฐจ์ˆœ(DESC)๋ฅผ ์ ์šฉํ•œ๋‹ค.

ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq

 

ํ•ด๋‹น ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•ด์„œ SELECT ๋ฌธ์œผ๋กœ ์ถœ๋ ฅํ•ด๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์€ ํ˜•ํƒœ๋กœ ๋‚˜์˜ฌ ๊ฒƒ์ด๋‹ค!

 

date created_date temp seq
2021-01-01 2021-01-03 6 1
2021-01-01 2021-01-02 5 2
2021-01-01 2021-01-01 2 3
2021-01-02 2021-01-03 3 1
2021-01-02 2021-01-01 4 2

 

์ด ๋ถ€๋ถ„์—์„œ WHERE์„ ํ†ตํ•ด seq = 1์ธ ๋ถ€๋ถ„๋งŒ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•˜๋ฉด ๋  ๊ฒƒ์ด๋‹ค.

๋”ฐ๋ผ์„œ ์ ์žฌ์‹œ์— ์‚ฌ์šฉํ•  INSERT ๋ฌธ์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์‚ฌ์šฉํ•œ๋‹ค.

 

INSERT INTO {schema}.{table}
SELECT date, temp, min_temp, max_temp FROM (
	SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
	FROM t
)
WHERE seq = 1;

 

๋ฐ์ดํ„ฐ ์ ์žฌ์‹œ INSERT๋ฌธ์„ ์œ„์ฒ˜๋Ÿผ ์‚ฌ์šฉํ•˜๋ฉด ์ค‘๋ณต๋ฐ์ดํ„ฐ๋Š” ์ƒˆ๋กœ ์—…๋ฐ์ดํŠธํ•˜๊ณ , ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋Š” ์ƒˆ๋กœ ์ ์žฌํ•˜๊ฒŒ ๋œ๋‹ค.

 

์ „์ฒด ์ฝ”๋“œ

๋”๋ณด๊ธฐ

 

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime
from datetime import timedelta

import requests
import logging
import json

# Airflow์— ์ €์žฅ๋œ ์ •๋ณด๋ฅผ ํ†ตํ•ด Redshift ์—ฐ๊ฒฐ
def get_Redshift_connection():
    # autocommit is False by default
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    return hook.get_conn().cursor()

# ETL (use Airflow Task Decorator)
@task
def etl(schema, table, lat, lon, api_key):

    # ๋‚ ์”จ API ๋ฐ์ดํ„ฐ ์š”์ฒญ
    url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts"
    response = requests.get(url)
    data = json.loads(response.text)

    """
    {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3}
    """
    
    # ๋ฐ›์€ ์š”์ฒญ์˜ ๋ฐ์ดํ„ฐ์— ๋”ฐ๋ผ ํ•„์š”ํ•œ ํ•„๋“œ๋งŒ ์ €์žฅ
    ret = []
    for d in data["daily"]:
        day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
        ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"]))

    cur = get_Redshift_connection()
    
    # ์›๋ณธ ํ…Œ์ด๋ธ”์ด ์—†๋‹ค๋ฉด ์ƒ์„ฑ
    create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    temp float,
    min_temp float,
    max_temp float,
    created_date timestamp default GETDATE()
);"""
    logging.info(create_table_sql)

    # ์ž„์‹œ ํ…Œ์ด๋ธ” ์ƒ์„ฑ (CTAS ์›๋ณธํ…Œ์ด๋ธ”)
    create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};"""
    logging.info(create_t_sql)
    try:
        cur.execute(create_table_sql)
        cur.execute(create_t_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # ์ž„์‹œ ํ…Œ์ด๋ธ” ๋ฐ์ดํ„ฐ ์ž…๋ ฅ
    insert_sql = f"INSERT INTO t VALUES " + ",".join(ret)
    logging.info(insert_sql)
    try:
        cur.execute(insert_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

    # ๊ธฐ์กด ํ…Œ์ด๋ธ” ๋Œ€์ฒด
    alter_sql = f"""DELETE FROM {schema}.{table};
      INSERT INTO {schema}.{table}
      SELECT date, temp, min_temp, max_temp FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
        FROM t
      )
      WHERE seq = 1;"""
    logging.info(alter_sql)
    try:
        cur.execute(alter_sql)
        cur.execute("COMMIT;")
    except Exception as e:
        cur.execute("ROLLBACK;")
        raise

# DAG ์ •์˜
with DAG(
    dag_id = 'Weather_to_Redshift_v2',
    start_date = datetime(2022,8,24), # ๋‚ ์งœ๊ฐ€ ๋ฏธ๋ž˜์ธ ๊ฒฝ์šฐ ์‹คํ–‰์ด ์•ˆ๋จ
    schedule = '0 4 * * *',  # ๋งค์ผ 4์‹œ๋งˆ๋‹ค
    max_active_runs = 1,
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
) as dag:

	# ์ˆœ์„œ๋Œ€๋กœ ์Šคํ‚ค๋งˆ์ด๋ฆ„, ํ…Œ์ด๋ธ”์ด๋ฆ„, ์„œ์šธ์ขŒํ‘œ1, ์„œ์šธ์ขŒํ‘œ2, ๋‚ ์”จAPI KEY
    etl("sunhokim_public", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))