์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Speculative Execution
- topic
- spark executor memory
- mysql
- off heap memory
- AQE
- disk spill
- Airflow
- Salting
- Dag
- DataFrame Hint
- Spark Caching
- colab
- Docker
- CI/CD
- Spark ์ค์ต
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- ๋น ๋ฐ์ดํฐ
- aws
- KDT_TIL
- Kafka
- backfill
- Spark
- SQL
- etl
- Spark SQL
- redshift
- k8s
- Kubernetes
- Spark Partitioning
- Today
- Total
JUST DO IT!
Yahoo finance API ์ฌ์ฉํด์ Airflow DAG ๋ง๋ค์ด๋ณด๊ธฐ - TIL230607 ๋ณธ๋ฌธ
Yahoo finance API ์ฌ์ฉํด์ Airflow DAG ๋ง๋ค์ด๋ณด๊ธฐ - TIL230607
sunhokimDev 2023. 6. 8. 16:58๐ KDT WEEK 10 DAY 3 TIL
- Yahoo finance API Airflow DAG ์ฝ๋ ๊ตฌํ
- Full Refresh ๋ฐฉ์ ๊ตฌํ
- Incremental Update ๋ฐฉ์ ๊ตฌํ
Yahoo finance API๋ฅผ ์ฌ์ฉํ์ฌ Airflow ETL ๊ตฌํํ๊ธฐ
๋จผ์ Yahoo finance API๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด yfinance ๋ชจ๋์ ๋ค์ด๋ก๋๋ฐ์์ผํ๋ค.
๋๋ Airflow๋ฅผ ๋์ปค ํ๊ฒฝ์์ ์ฌ์ฉํ๊ณ ์์ผ๋ฏ๋ก ๋์ปค์ ์ค์น๋ฅผ ํด์ฃผ์๋ค.
docker ps # Airflow scheduler์ Container ID๋ฅผ ํ์ธํ์
docker exec -it SchedulerContainerID sh # ์ฐพ์ scheduler์ Container ID๋ฅผ ์
๋ ฅํด์ ์ ์ํ๋ค.
# airflow ์ ์ ํ
pip install yfinance
๐ฅ Full Refresh(๋งค๋ฒ ์ ์ฒด๋ฅผ ๋ณต์ฌํด์ฌ ์ ์ฌํ๋ ๋ฐฉ์) ๊ตฌํ
๋ค์์ Airflow DAG์ ๋ฃ์ Python ์ฝ๋์ด๋ค.
Extract์ Transform์ ํฉ์ณ์ get_historical_prices ํจ์๋ก ๊ตฌํํ๊ณ , Load๋ load ํจ์๋ก ๊ตฌํํ์๋ค.
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
# Airflow์ ๋ฑ๋ก๋ Connection ์ ๋ณด๋ฅผ ํตํด Redshift ์ฐ๊ฒฐ
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
# Extract + Transform
@task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol) # ์ฌ๋ณผ(ํน์ ์ฃผ์์ ID? ๊ฐ์๊ฒ)์ ํตํด yahoo finance ์ฃผ์ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ
data = ticket.history() # ์ง๋ 30์ผ๊ฐ์ ํด๋น ์ฃผ์ ์ ๋ณด๋ฅผ DataFrame ํํ๋ก ๋ฐํ
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
# ์ฐจ๋ก๋ก ๋ ์ง, ์์์ฃผ๊ฐ, ์ํ๊ฐ, ํํ๊ฐ, ์ข
๋ฃ์ฃผ๊ฐ, ๋งค๋๋งค์๊ฑด
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
# Load
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
for r in records:
sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'UpdateSymbol',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '0 10 * * *' # ๋งค์ผ 10์ ์ ๊ฐ๋ง๋ค
) as dag:
results = get_historical_prices("AAPL") # ์ ํ ์ฃผ์์ ์ฌ๋ณผ AAPL
load("sunhokim_public", "stock_info", results) # ์์๋๋ก ์คํค๋ง์ด๋ฆ, ํ
์ด๋ธ์ด๋ฆ, transform ํจ์์ ๊ฒฐ๊ณผ
Airflow์ Connection ์ ๋ณด๋ฅผ ์ฐ๊ฒฐํ๋ ๋ฐฉ๋ฒ์ ๋ฐ๋ก ์ด์ ๊ธ์ ์ฐธ๊ณ ํ๋ฉด ์ ์ ์๋ค.
Airflow Decorator์ธ @task๋ฅผ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ๊ฐ๋จํ๊ฒ ๊ตฌํ์ด ๋ ๋ชจ์ต์ด๋ค.
Decorator์ ์ฌ์ฉํ๋ฉด ํจ์์ ์ธ์๋ ์ํ๋ ์ด๋ฆ์ผ๋ก ์ง์ด ๋ฃ์ ์ ์๊ณ , ๊ฐ๋ ์ฑ๋ ๋์์ง๋ค๋ ์ ์ด ํฐ ์ฅ์ ์ธ ๊ฒ ๊ฐ๋ค.
์ด ์ฝ๋๋ฅผ airflowํด๋ > dags ํด๋์ ๋ฃ์ผ๋ฉด Airflow ์น UI์ ๋ํ๋ ๊ฒ์ด๋ค. (๋ฐ์์ 5๋ถ์ ๋ ๊ฑธ๋ฆผ)
์ด์ ์ ์์ ์ผ๋ก ๊ตฌ๋์ด ๋๋์ง DAG๋ฅผ ์คํ์์ผ ๋ณด์.
์น UI(๋ก์ปฌํฌํธ)๋ก๋ ๊ฐ๋ฅํ์ง๋ง ๋๋ CLI๋ก ์คํ์์ผ๋ณด์๋ค.
์ด์ ์๊น yfinance๋ฅผ ์ค์นํ๊ธฐ ์ํด ์ ์ํ๋ airflow ํ๊ฒฝ์ ๋ค์ ๋ค์ด๊ฐ์,
๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ๋ค.
cd dags # dags ํด๋๋ก ์ด๋
ls # ์์ ์๋ ํ์ผ ํ์ธ
airflow dags test UpdateSymbol 2023-05-30 # UpdateSymbol์ด ์๊น ๋ง๋ค์๋ ์ฝ๋ํ์ผ์ ์ด๋ฆ์ด๋ค.
์ ์์ ์ผ๋ก ๊ตฌ๋๋์๋ค๋ฉด, ๋ค์๊ณผ ๊ฐ์ ํ๋ฉด์ผ๋ก ๋ง๋ฌด๋ฆฌ๋ ๊ฒ์ด๋ค!
๐ฆ Incremental Update(์๋ก์ด ๋ฐ์ดํฐ๋ง ๊ณ์ ์ถ๊ฐ) ๊ตฌํ
Full Refresh ๋ฐฉ์์ ๊ฒฝ์ฐ ํญ์ ํ ๋ฌ์น์ ๋ฐ์ดํฐ๋ง ํ ์ด๋ธ์ ์ ์ฅ๋์ด ์์ ๊ฒ์ด๋ค. (์์ค ๋ฐ์ดํฐ๊ฐ ๊ทธ๋ ๋ค)
์ด๋ฒ์๋ ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์๊ธฐ๋ฉด ๋ณธ๋ ํ ์ด๋ธ์ ์ถ๊ฐํด์ฃผ๋ ๋ฐฉ์์ ๊ตฌํํด๋ณด์.
์๊น Full Refresh ์ฝ๋์์ _create_table ํจ์๋ฅผ ์ถ๊ฐํ๊ณ load ํจ์๋ฅผ ์์ ํ์๋ค.
# drop_first๋ ํ
์ด๋ธ์ด ์ด๋ฏธ ์กด์ฌํ ๊ฒฝ์ฐ True๋ฅผ ์ธ์๋ก ๋ฃ๋๋ค
def _create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# ์๋ณธ ํ
์ด๋ธ์ด ์์ผ๋ฉด ์์ฑ - ํ
์ด๋ธ์ด ์ฒ์ ํ๋ฒ ๋ง๋ค์ด์ง ๋ ํ์ํ ์ฝ๋
_create_table(cur, schema, table, False)
# ์์ ํ
์ด๋ธ๋ก ์๋ณธ ํ
์ด๋ธ์ ๋ณต์ฌ
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# ์๋ณธ ํ
์ด๋ธ ์์ฑ
_create_table(cur, schema, table, True)
# ์์ ํ
์ด๋ธ ๋ด์ฉ์ ์๋ณธ ํ
์ด๋ธ๋ก ๋ณต์ฌ
cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
load ํจ์์ ์๋ ๋ฐฉ์์ ๋ค์๊ณผ ๊ฐ์ด ์ด๋ฃจ์ด์ง๋ค.
์๋ณธ ํ ์ด๋ธ ์์ผ๋ฉด ์์ฑ > ์์ ํ ์ด๋ธ๋ก ๋ด์ฉ ๋ณต์ฌ > ์์ ํ ์ด๋ธ์ ์์ค ๋ฐ์ดํฐ ์ถ๊ฐ
> ์๋ณธ ํ ์ด๋ธ ์ญ์ ํ ์ฌ์์ฑ > ์๋ณธ ํ ์ด๋ธ์ ์ค๋ณต์์ด ๋ฐ์ดํฐ ์ถ๊ฐ
์์ ํ ์ด๋ธ์ ์๋ณธ ํ ์ด๋ธ์ ๋ด์ฉ์ด ๋ค์ด๊ฐ ํ์ ์์ค ๋ฐ์ดํฐ๋ ์ถ๊ฐ๋์๊ธฐ ๋๋ฌธ์ ์ค๋ณต ๋ฐ์ดํฐ๊ฐ ๋ฐ์ํ ๊ฒ์ด๋ค.(์ฒซ ์๋๊ฐ ์๋๋ผ๋ฉด)
์ด๋ ์๋ณธ ํ ์ด๋ธ์ ์์ ํ ์ด๋ธ์ ๋ด์ฉ์ DISTINCT๋ฅผ ์ฌ์ฉํ์ฌ ์ค๋ณต์์ด ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ํ๋ฉด ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์ถ๊ฐ๋ ํํ๋ก ๋จ๋๋ค.
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Airflow Backfill - TIL230608 (0) | 2023.06.08 |
---|---|
Airflow ETL์์ Primary Key Uniqueness SQL๋ก ๋ณด์ฅํ๊ธฐ - TIL230608 (0) | 2023.06.08 |
Airflow DAG - TIL230607 (2) | 2023.06.07 |
Redshift๋ฅผ Superset์ ์ฐ๋ํ๊ณ ๋์๋ณด๋ ๋ง๋ค๊ธฐ - TIL230526 (0) | 2023.05.26 |
Snowflake ์์๋ณด๊ณ , S3์์ COPYํด๋ณด๊ธฐ - TIL230525 (0) | 2023.05.25 |