JUST DO IT!

Yahoo finance API ์‚ฌ์šฉํ•ด์„œ Airflow DAG ๋งŒ๋“ค์–ด๋ณด๊ธฐ - TIL230607 ๋ณธ๋ฌธ

TIL

Yahoo finance API ์‚ฌ์šฉํ•ด์„œ Airflow DAG ๋งŒ๋“ค์–ด๋ณด๊ธฐ - TIL230607

sunhokimDev 2023. 6. 8. 16:58

๐Ÿ“š KDT WEEK 10 DAY 3 TIL

  • Yahoo finance API Airflow DAG ์ฝ”๋“œ ๊ตฌํ˜„
    • Full Refresh ๋ฐฉ์‹ ๊ตฌํ˜„
    • Incremental Update ๋ฐฉ์‹ ๊ตฌํ˜„

 


 

Yahoo finance API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Airflow ETL ๊ตฌํ˜„ํ•˜๊ธฐ

 

๋จผ์ € Yahoo finance API๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด yfinance ๋ชจ๋“ˆ์„ ๋‹ค์šด๋กœ๋“œ๋ฐ›์•„์•ผํ•œ๋‹ค.

 

๋‚˜๋Š” Airflow๋ฅผ ๋„์ปค ํ™˜๊ฒฝ์—์„œ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์œผ๋ฏ€๋กœ ๋„์ปค์— ์„ค์น˜๋ฅผ ํ•ด์ฃผ์—ˆ๋‹ค.

 

docker ps # Airflow scheduler์˜ Container ID๋ฅผ ํ™•์ธํ•˜์ž
docker exec -it SchedulerContainerID sh # ์ฐพ์€ scheduler์˜ Container ID๋ฅผ ์ž…๋ ฅํ•ด์„œ ์ ‘์†ํ•œ๋‹ค.

# airflow ์ ‘์† ํ›„
pip install yfinance

 

๐ŸŸฅ Full Refresh(๋งค๋ฒˆ ์ „์ฒด๋ฅผ ๋ณต์‚ฌํ•ด์—ฌ ์ ์žฌํ•˜๋Š” ๋ฐฉ์‹) ๊ตฌํ˜„

 

๋‹ค์Œ์€ Airflow DAG์— ๋„ฃ์„ Python ์ฝ”๋“œ์ด๋‹ค.

Extract์™€ Transform์„ ํ•ฉ์ณ์„œ get_historical_prices ํ•จ์ˆ˜๋กœ ๊ตฌํ˜„ํ•˜๊ณ , Load๋Š” load ํ•จ์ˆ˜๋กœ ๊ตฌํ˜„ํ•˜์˜€๋‹ค.

 

from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp

import yfinance as yf
import pandas as pd
import logging

# Airflow์— ๋“ฑ๋ก๋œ Connection ์ •๋ณด๋ฅผ ํ†ตํ•ด Redshift ์—ฐ๊ฒฐ
def get_Redshift_connection(autocommit=True):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()


# Extract + Transform
@task
def get_historical_prices(symbol):
    ticket = yf.Ticker(symbol) # ์‹ฌ๋ณผ(ํŠน์ • ์ฃผ์‹์˜ ID? ๊ฐ™์€๊ฒƒ)์„ ํ†ตํ•ด yahoo finance ์ฃผ์‹ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ
    data = ticket.history() # ์ง€๋‚œ 30์ผ๊ฐ„์˜ ํ•ด๋‹น ์ฃผ์‹ ์ •๋ณด๋ฅผ DataFrame ํ˜•ํƒœ๋กœ ๋ฐ˜ํ™˜
    records = []

    for index, row in data.iterrows():
        date = index.strftime('%Y-%m-%d %H:%M:%S')

		# ์ฐจ๋ก€๋กœ ๋‚ ์งœ, ์‹œ์ž‘์ฃผ๊ฐ€, ์ƒํ•œ๊ฐ€, ํ•˜ํ•œ๊ฐ€, ์ข…๋ฃŒ์ฃผ๊ฐ€, ๋งค๋„๋งค์ˆ˜๊ฑด
        records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])

    return records

# Load
@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
        cur.execute(f"""
CREATE TABLE {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")

        for r in records:
            sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;")
        raise

    logging.info("load done")


with DAG(
    dag_id = 'UpdateSymbol',
    start_date = datetime(2023,5,30),
    catchup=False,
    tags=['API'],
    schedule = '0 10 * * *' # ๋งค์ผ 10์‹œ ์ •๊ฐ๋งˆ๋‹ค
) as dag:

    results = get_historical_prices("AAPL") # ์• ํ”Œ ์ฃผ์‹์˜ ์‹ฌ๋ณผ AAPL
    load("sunhokim_public", "stock_info", results) # ์ˆœ์„œ๋Œ€๋กœ ์Šคํ‚ค๋งˆ์ด๋ฆ„, ํ…Œ์ด๋ธ”์ด๋ฆ„, transform ํ•จ์ˆ˜์˜ ๊ฒฐ๊ณผ

 

Airflow์˜ Connection ์ •๋ณด๋ฅผ ์—ฐ๊ฒฐํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ๋ฐ”๋กœ ์ด์ „ ๊ธ€์„ ์ฐธ๊ณ ํ•˜๋ฉด ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

Airflow Decorator์ธ @task๋ฅผ ์‚ฌ์šฉํ–ˆ๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ„๋‹จํ•˜๊ฒŒ ๊ตฌํ˜„์ด ๋œ ๋ชจ์Šต์ด๋‹ค.

Decorator์„ ์‚ฌ์šฉํ•˜๋ฉด ํ•จ์ˆ˜์— ์ธ์ž๋„ ์›ํ•˜๋Š” ์ด๋ฆ„์œผ๋กœ ์ง€์–ด ๋„ฃ์„ ์ˆ˜ ์žˆ๊ณ , ๊ฐ€๋…์„ฑ๋„ ๋†’์•„์ง„๋‹ค๋Š” ์ ์ด ํฐ ์žฅ์ ์ธ ๊ฒƒ ๊ฐ™๋‹ค.

 

์ด ์ฝ”๋“œ๋ฅผ airflowํด๋” > dags ํด๋”์— ๋„ฃ์œผ๋ฉด Airflow ์›น UI์— ๋‚˜ํƒ€๋‚  ๊ฒƒ์ด๋‹ค. (๋ฐ˜์˜์— 5๋ถ„์ •๋„ ๊ฑธ๋ฆผ)

 

์ด์ œ ์ •์ƒ์ ์œผ๋กœ ๊ตฌ๋™์ด ๋˜๋Š”์ง€ DAG๋ฅผ ์‹คํ–‰์‹œ์ผœ ๋ณด์ž.

์›น UI(๋กœ์ปฌํฌํŠธ)๋กœ๋„ ๊ฐ€๋Šฅํ•˜์ง€๋งŒ ๋‚˜๋Š” CLI๋กœ ์‹คํ–‰์‹œ์ผœ๋ณด์•˜๋‹ค.

 

์ด์ œ ์•„๊นŒ yfinance๋ฅผ ์„ค์น˜ํ•˜๊ธฐ ์œ„ํ•ด ์ ‘์†ํ–ˆ๋˜ airflow ํ™˜๊ฒฝ์— ๋‹ค์‹œ ๋“ค์–ด๊ฐ€์„œ,

๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•œ๋‹ค.

cd dags # dags ํด๋”๋กœ ์ด๋™
ls # ์•ˆ์— ์žˆ๋Š” ํŒŒ์ผ ํ™•์ธ
airflow dags test UpdateSymbol 2023-05-30 # UpdateSymbol์ด ์•„๊นŒ ๋งŒ๋“ค์—ˆ๋˜ ์ฝ”๋“œํŒŒ์ผ์˜ ์ด๋ฆ„์ด๋‹ค.

 

์ •์ƒ์ ์œผ๋กœ ๊ตฌ๋™๋˜์—ˆ๋‹ค๋ฉด, ๋‹ค์Œ๊ณผ ๊ฐ™์€ ํ™”๋ฉด์œผ๋กœ ๋งˆ๋ฌด๋ฆฌ๋  ๊ฒƒ์ด๋‹ค!

INSERT๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ๋™์ž‘ํ•˜์˜€๊ณ , ๋งˆ๋ฌด๋ฆฌ๋˜์—ˆ๋‹ค.

 


 

๐ŸŸฆ Incremental Update(์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋งŒ ๊ณ„์† ์ถ”๊ฐ€) ๊ตฌํ˜„

 

Full Refresh ๋ฐฉ์‹์˜ ๊ฒฝ์šฐ ํ•ญ์ƒ ํ•œ ๋‹ฌ์น˜์˜ ๋ฐ์ดํ„ฐ๋งŒ ํ…Œ์ด๋ธ”์— ์ €์žฅ๋˜์–ด ์žˆ์„ ๊ฒƒ์ด๋‹ค. (์†Œ์Šค ๋ฐ์ดํ„ฐ๊ฐ€ ๊ทธ๋ ‡๋‹ค) 

์ด๋ฒˆ์—๋Š” ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ์ƒ๊ธฐ๋ฉด ๋ณธ๋ž˜ ํ…Œ์ด๋ธ”์— ์ถ”๊ฐ€ํ•ด์ฃผ๋Š” ๋ฐฉ์‹์„ ๊ตฌํ˜„ํ•ด๋ณด์ž.

 

์•„๊นŒ Full Refresh ์ฝ”๋“œ์—์„œ _create_table ํ•จ์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  load ํ•จ์ˆ˜๋ฅผ ์ˆ˜์ •ํ•˜์˜€๋‹ค.

# drop_first๋Š” ํ…Œ์ด๋ธ”์ด ์ด๋ฏธ ์กด์žฌํ•  ๊ฒฝ์šฐ True๋ฅผ ์ธ์ž๋กœ ๋„ฃ๋Š”๋‹ค
def _create_table(cur, schema, table, drop_first):
    if drop_first:
        cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
    cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
    date date,
    "open" float,
    high float,
    low float,
    close float,
    volume bigint
);""")


@task
def load(schema, table, records):
    logging.info("load started")
    cur = get_Redshift_connection()
    try:
        cur.execute("BEGIN;")
        # ์›๋ณธ ํ…Œ์ด๋ธ”์ด ์—†์œผ๋ฉด ์ƒ์„ฑ - ํ…Œ์ด๋ธ”์ด ์ฒ˜์Œ ํ•œ๋ฒˆ ๋งŒ๋“ค์–ด์งˆ ๋•Œ ํ•„์š”ํ•œ ์ฝ”๋“œ
        _create_table(cur, schema, table, False)
        # ์ž„์‹œ ํ…Œ์ด๋ธ”๋กœ ์›๋ณธ ํ…Œ์ด๋ธ”์„ ๋ณต์‚ฌ
        cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
        for r in records:
            sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
            print(sql)
            cur.execute(sql)

        # ์›๋ณธ ํ…Œ์ด๋ธ” ์ƒ์„ฑ
        _create_table(cur, schema, table, True)
        # ์ž„์‹œ ํ…Œ์ด๋ธ” ๋‚ด์šฉ์„ ์›๋ณธ ํ…Œ์ด๋ธ”๋กœ ๋ณต์‚ฌ
        cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
        cur.execute("COMMIT;")   # cur.execute("END;")
    except Exception as error:
        print(error)
        cur.execute("ROLLBACK;") 
        raise
    logging.info("load done")

load ํ•จ์ˆ˜์˜ ์ž‘๋™ ๋ฐฉ์‹์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ด๋ฃจ์–ด์ง„๋‹ค.

 

์›๋ณธ ํ…Œ์ด๋ธ” ์—†์œผ๋ฉด ์ƒ์„ฑ > ์ž„์‹œ ํ…Œ์ด๋ธ”๋กœ ๋‚ด์šฉ ๋ณต์‚ฌ > ์ž„์‹œ ํ…Œ์ด๋ธ”์— ์†Œ์Šค ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€

> ์›๋ณธ ํ…Œ์ด๋ธ” ์‚ญ์ œ ํ›„ ์žฌ์ƒ์„ฑ > ์›๋ณธ ํ…Œ์ด๋ธ”์— ์ค‘๋ณต์—†์ด ๋ฐ์ดํ„ฐ ์ถ”๊ฐ€

 

์ž„์‹œ ํ…Œ์ด๋ธ”์— ์›๋ณธ ํ…Œ์ด๋ธ”์˜ ๋‚ด์šฉ์ด ๋“ค์–ด๊ฐ„ ํ›„์— ์†Œ์Šค ๋ฐ์ดํ„ฐ๋„ ์ถ”๊ฐ€๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์— ์ค‘๋ณต ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒƒ์ด๋‹ค.(์ฒซ ์‹œ๋„๊ฐ€ ์•„๋‹ˆ๋ผ๋ฉด)

์ด๋•Œ ์›๋ณธ ํ…Œ์ด๋ธ”์— ์ž„์‹œ ํ…Œ์ด๋ธ”์˜ ๋‚ด์šฉ์„ DISTINCT๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ค‘๋ณต์—†์ด ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฝ์ž…ํ•˜๋ฉด ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๊ฐ€ ์ถ”๊ฐ€๋œ ํ˜•ํƒœ๋กœ ๋‚จ๋Š”๋‹ค.