์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- SQL
- Spark SQL
- k8s
- KDT_TIL
- ๋น ๋ฐ์ดํฐ
- Airflow
- aws
- topic
- Spark Partitioning
- Spark
- AQE
- redshift
- etl
- spark executor memory
- Kubernetes
- Docker
- Speculative Execution
- backfill
- mysql
- DataFrame Hint
- Spark ์ค์ต
- Dag
- CI/CD
- off heap memory
- Salting
- Kafka
- colab
- Spark Caching
- disk spill
- Today
- Total
JUST DO IT!
MySQL โก๏ธ Redshift์ Airflow ETL ๊ตฌํํด๋ณด๊ธฐ - TIL230609 ๋ณธ๋ฌธ
๐ KDT WEEK 10 DAY 4 TIL
- MySQL โก๏ธ Redshift Airflow ETL ๊ตฌํ
- ์ฌ์ ์์
- ETL ์ฝ๋
- Backfill ๊ตฌ๋
๐ฅ MySQL(OLTP)์์ Redshift(OLAP)๋ก Airflow ETL ๊ตฌํํด๋ณด๊ธฐ
ํ๋ก๋์ ๋ฐ์ดํฐ๋ฒ ์ด์ค(MySQL)์์ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค(Redshift)๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋ ETL์ Airflow๋ก ๊ตฌํํด๋ณด์.
๐ ๏ธ ์ฌ์ ์์
1. ๊ถํ ์ค์
๋จผ์ , ์๋ก๊ฐ์ ๊ถํ ์ค์ ์ด ์ฌ์ ์ ํ์ํ๋ค.
- Airflow DAG์์ S3 ์ ๊ทผ ๊ถํ : AWS IAM User(S3 ๋ฒํท ์ฝ๊ธฐ, ์ฐ๊ธฐ ๊ถํ) ์์ฑํด์ access key, secret key ๋ฐ๊ธฐ
- https://sunhokimdev.tistory.com/34 > Snowflake ์ค์ต > Snowflake์์ AWS ์ ๊ทผํ๊ธฐ ์ฐธ๊ณ
- Redshift๊ฐ S3 ์ ๊ทผ > Redshift์ S3๋ฅผ ์ ๊ทผํ ์ ์๋ ์ญํ (Role)์ ๋ง๋ค๊ณ ์ด๋ฅผ Redshift์ ์ง์
- https://sunhokimdev.tistory.com/32 > Redshift ๋ฒํฌ์ ๋ฐ์ดํธ + ์ค์ต > IAM์ ํตํด~ ์ฐธ๊ณ
๊ถํ์ ์ค์ ํ ๋๋ S3FullAccess ๊ถํ์ด ํ์ํ๋ค.
S3FullAccess ๊ถํ์ด ์ํํ๋ค ์ถ์ผ๋ฉด, ๊ถํ ์์ฑ ์ค Create policy > JSON > "Resource" ์ ์ฌ์ฉํ S3 ๋ฒํท๋ง ํ ๋นํ์.
2. Docker์ MySQL ์ธํ
Airflow Scheduler Docker Container์ root ์ ์ ๋ก ๋ก๊ทธ์ธํด์ ์คํํ๋ค.
docker ps # ํด๋น ๋ช
๋ น์ด๋ก airflow-scheduler์ CONTAINER ID๋ฅผ ์ฐพ๋๋ค.
docker exec --user root -it CONTAINERID sh # ์ฐพ์ CONTAINER ID๋ฅผ ๋ฃ์ด ๋ฃจํธ ๊ณ์ ์ผ๋ก Airflow ์ค์ผ์ค๋ฌ ์ง์
sudo apt-get update #apt-get ์
๋ฐ์ดํธ
sudo apt-get install -y default-libmysqlclient-dev # MySQL ๋ค์ด๋ก๋
sudo apt-get install -y gcc # MySQL์ ํ์ํ C++ ์ปดํ์ผ๋ฌ ๋ค์ด๋ก๋
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql" # airflow mysql ๋ชจ๋ (์ฌ)์ค์น
3. Airflow ์ธํ
๋ค์์ผ๋ก, Airflow Connection ์ธํ ์ด ํ์ํ๋ค.
1) MySQL
Airflow ์น UI > Admin > Connections์์ MySQL์ ์ถ๊ฐํ๋ค.
2) AWS S3
Airflow ์น UI > Admin > Connections์์ Amazon Web Services์ ์ถ๊ฐํ๋ค.
IAM ์ ์ ์์ฑํ ๋ ๋ฐ์ Access Key์ Secret Acceess Key๋ฅผ ์ ๋ ฅํ๋ฉด ๋๋ค.
4. ๋ฐ์ดํฐ ๋ฐ ํ ์ด๋ธ ์ธํ
MySQL ์ธํ ์ ๋ณด(๋ฐ์ดํฐ ์ ์ฌ์๋ฃ)
CREATE TABLE prod.nps(
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
Redshift๋ ๊ฐ์ ํ๋์ ๋ฐ์ดํฐ ํ์ ์ผ๋ก ํ ์ด๋ธ์ ์์ฑํด๋๋ค.
์ด ํ ์ด๋ธ์ด ETL์ ํตํด ์ ์ก๋ฐ์ ๋ฐ์ดํฐ๊ฐ ์ ์ฌ๋ ๊ณต๊ฐ์ด ๋ ๊ฒ์ด๋ค.
โ๏ธ ์ฌ์ฉํ ๊ธฐ๋ฅ
Airflow Operator
- SqlToS3Operator : MySQL์ SQL ๊ฒฐ๊ณผ๋ฅผ S3์ ์ ์ฌํ๋ Operator
- S3ToRedshiftOperator : S3์ ๋ฐ์ดํฐ๋ฅผ Redshift ํ ์ด๋ธ์ ์ ์ฌํ๋ Operator (COPY ์ปค๋งจ๋ ์ฌ์ฉ)
๐งฉ ์ฝ๋
ETL ์ฝ๋(Full Refresh)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # ๋ ์ง๊ฐ ๋ฏธ๋์ธ ๊ฒฝ์ฐ ์คํ์ด ์๋จ
schedule = '0 9 * * *', # ๋งค์ผ 9์๋ง๋ค
max_active_runs = 1, # ๋์์ ์ฌ๋ฌ๊ฐ์ DAG๊ฐ ๋์ํ์ง ์๋๋ก
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "sunhokim_public" # ์ ์ฌ๋ Redshift ์คํค๋ง์ด๋ฆ
table = "nps" # ์ ์ฌ๋ Redshift ํ
์ด๋ธ์ด๋ฆ
s3_bucket = "grepp-data-engineering" # S3์์ ๊ฐ์ ธ์ฌ ๋ฐ์ดํฐ์ ๋ฒํท ์ด๋ฆ
s3_key = schema + "-" + table # ์ ์ฌ๋ S3 PATH ์ ๋ณด
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps', # task_id ์ง์
query = "SELECT * FROM prod.nps", # MySQL์ prod.nps ํ
์ด๋ธ์ ๋ชจ๋ ์ ๋ณด
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id", # Airflow ์น UI์์ ์ค์ ํ MySQL Connection ID
aws_conn_id = "aws_conn_id", # Airflow ์น UI์์ ์ค์ ํ S3 Connection ID
verify = False,
replace = True, # S3 PATH์ ๋์ผํ ํ์ผ์ด ์กด์ฌํ ์ ๋์ฒดํ ๊ฒ์ธ์ง ์ ๋ฌด
pd_kwargs={"index": False, "header": False}, # index์ header ์ ๋ณด๋ ๊ฐ์ ธ์ค์ง ์๋๋ค (pandas ์ต์
)
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps', # task_id ์ง์
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'], # S3์ ์์ค๋ฐ์ดํฐ๋ csv ํ์ผ
method = 'REPLACE', # Full Refresh > Redshift์ ๋ณธ๋ ์กด์ฌํ๋ ๋ฐ์ดํฐ๋ ์์ ํ ๋์ฒด
redshift_conn_id = "redshift_dev_db", # Airflow ์น UI์์ ์ค์ ํ Redshift Connection ID
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
S3ToRedshiftOperator์์ ์ฌ์ฉํ method์ ๋ฐฉ์์ ๋ฐ๋ผ ๋ฐ์ดํฐ ์ ์ฌ๋ฐฉ์์ด ๋ฐ๋๋ค.
method = 'REPLACE' ์ ์ฌ์ฉํ๋ฉด ํ ์ด๋ธ์ ๋ด์ฉ์ด ์์ ํ ์๋ก ๊ต์ฒด๋๋ค.
REPLACE์ด์ธ์๋ UPSERT, APPEND๊ฐ ์กด์ฌํ๋ฉฐ, UPSERT์ ๊ฒฝ์ฐ ์๋์์ ๋ค๋ฃจ์๋ค.
ETL์ฝ๋(Incremental Update)
์ Full Refresh ์ฝ๋์์ ๋ณ๊ฒฝ๋ ํจ์๋ง ๊ฐ์ ธ์๋ค.
์ฌ๊ธฐ์ method = 'UPSERT' ๋ฐฉ์์ ์ฌ์ฉํ๋ค.
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
# UPSERT ์ฌ์ฉ์ ๋ ์ง์ ํํฐ๋๋ SQL๋ฌธ์ด ํ์ํ๋ค. Airflow์ execution_date๋ฅผ ์ฌ์ฉํ๋ค.
# {{ execution_date }}๋ Airflow๊ฐ execution_date์ ํด๋นํ๋ ๊ฐ์ผ๋ก ๋ฐ๊พธ์ด ๋ฃ์ด์ค๋ค.
query = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT", # UPSERT ๋ฐฉ์ ์ฌ์ฉ
upsert_keys = ["id"], # PK์ ํด๋นํ๋ ํ๋๋ฅผ ๋ฃ์ด์ฃผ๋ฉด ๋๋ค. UPSERT์ ์ค๋ณต ์ฒดํฌ์ ์ฌ์ฉ๋๋ ํ๋์ด๋ค.
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
UPSERT ๋ฐฉ์์๋ upsert_keys ๊ฐ์ด ํ์ํ๋ค.
UPSERT๋ ์ค๋ณต๋ ๋ฐ์ดํฐ์ ๋ํด์๋ Update, ์๋ก์ด ๋ฐ์ดํฐ์ ๋ํด์๋ Insert ๋์์ฒ๋ผ ๋ฐ์ดํฐ๋ฅผ ์ ์ฌํ๋ค.
upsert_keys ๊ฐ์ ์ค๋ณต๋ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๋ ค๋ผ ๋ ์ฌ์ฉํ๋ Primary Key ํ๋๋ฅผ ๋ฐ๊ธฐ ์ํ ํ์ ๋ณ์์ด๋ค.
๋ํ Full Refresh ๋ฐฉ์์๋ Backfill์ด ํ์ ์์ด์ ๊ตฌํํ์ง ์์์ง๋ง,(๊ทธ๋ฅ ๋ค์ ํ๋ฒ ์คํํ๋ฉด ๋๋๊น)
Incremental Update์์๋ Backfill์ ์ํ SQL ๊ตฌ๋ฌธ ๋ณ๊ฒฝ์ด ํ์ํ๋ค. ์ฌ๊ธฐ์ execution_date๋ฅผ ํ์ฉํ๋ค.
SqlToS3Operator์ ์ฟผ๋ฆฌ๋ฌธ์ ํ์ธํด๋ณด๋ฉด, ๋ฐ์ดํฐ ์์ฑ๋ ์ง๊ฐ execution_date์ ํด๋นํ๋ ๋ฐ์ดํฐ๋ง ๊ฐ์ ธ์ค๊ณ ์๋ค.
์ฐธ๊ณ ๋ก execution_date๋ ๊ฐ์ ธ์ค๊ณ ์ถ์ ๋ฐ์ดํฐ์ ๋ ์ง(๋ฐ์ดํฐ๊ฐ ์์ฑ๋ ๋ ์ง)๋ผ๊ณ ์๊ฐํ๋ฉด ์ฝ๋ค.
Airflow๋ execution_date๋ฅผ ์์์ ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์,
๋ง์ฝ์ Backfill์ ์คํํ์ ๋ ํ์ดํ๋ผ์ธ ๊ตฌ๋์ด ์คํจํ ๋ ์ง๋ฅผ ์๋์ผ๋ก execution_date๋ก ์ฑ์์ฃผ๊ฑฐ๋, ์ฌ์ฉ์๊ฐ ์์ฒญํ ๋ ์ง๋ก ์ฑ์์ฃผ๊ฒ๋๋ค.
์ด ํ์ผ๋ค์ Airflow ํด๋ > dags์ ๋ฃ๊ณ ๊ตฌ๋ํด๋ณด์.
๐ Backfill ๊ตฌ๋
์๊น Incremental Update๋ฅผ ๊ตฌํํ ETL ์ฝ๋๋ฅผ ์ฌ์ฉํด์ Backfill์ด ์ ์์ ์ผ๋ก ๋์ํ๋์ง ํ์ธํด๋ณด์.
๊ณผ๊ฑฐ 2018๋ 7์ ํ๋ฌ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ค์ ๋ถ๋ฌ์ค๊ณ ์ถ์ผ๋ฉด ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ฌ์ฉํ๋ค.
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
์ฌ๊ธฐ์ ์กฐ๊ธ ์ฃผ์ํด์ผํ ์ ์ด ์๋ค.
- dag ์ฝ๋์ catchup = True ์ค์ ์ด ๋์ด์์ด์ผ ํ๋ค.
- execution_date๋ฅผ ์ฌ์ฉํด์ Incremental Update ์ฝ๋๋ก ๊ตฌํ๋์ด ์์ด์ผํ๋ค.
- start_date(2018-07-01)๋ ํฌํจํ์ง๋ง, end_date(2018-08-01)๋ ํฌํจํ์ง ์๋๋ค.
- 7์ 1์ผ, 7์ 2์ผ, 7์ 3์ผ์ฒ๋ผ ์์๋๋ก ์ํํ์ง ์๊ณ ๋๋ค์ผ๋ก ์ํํ๋ค.
- ๋ ์ง์์ผ๋ก ํ๊ณ ์ถ๋ค๋ฉด default_args = {... 'depends_on_past' : True ... } ๊ฐ ํ์ํ๋ค.
- Backfill์ max_active_runs = 1 ์ธ ์ํ๋ก ํ๋์ฉ ๋๋ฆฌ๋๊ฒ ๋น๊ต์ ์์ ํ๋ค.