JUST DO IT!

MySQL โžก๏ธ Redshift์˜ Airflow ETL ๊ตฌํ˜„ํ•ด๋ณด๊ธฐ - TIL230609 ๋ณธ๋ฌธ

TIL

MySQL โžก๏ธ Redshift์˜ Airflow ETL ๊ตฌํ˜„ํ•ด๋ณด๊ธฐ - TIL230609

sunhokimDev 2023. 6. 9. 22:03

๐Ÿ“š KDT WEEK 10 DAY 4 TIL

  • MySQL โžก๏ธ Redshift Airflow ETL ๊ตฌํ˜„
    • ์‚ฌ์ „์ž‘์—…
    • ETL ์ฝ”๋“œ
    • Backfill ๊ตฌ๋™

 


๐ŸŸฅ MySQL(OLTP)์—์„œ Redshift(OLAP)๋กœ Airflow ETL ๊ตฌํ˜„ํ•ด๋ณด๊ธฐ

ํ”„๋กœ๋•์…˜ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค(MySQL)์—์„œ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค(Redshift)๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๋Š” ETL์„ Airflow๋กœ ๊ตฌํ˜„ํ•ด๋ณด์ž.

 

๐Ÿ› ๏ธ ์‚ฌ์ „ ์ž‘์—…

1. ๊ถŒํ•œ ์„ค์ •

 

๋จผ์ €, ์„œ๋กœ๊ฐ„์˜ ๊ถŒํ•œ ์„ค์ •์ด ์‚ฌ์ „์— ํ•„์š”ํ•˜๋‹ค.

  • Airflow DAG์—์„œ S3 ์ ‘๊ทผ ๊ถŒํ•œ : AWS IAM User(S3 ๋ฒ„ํ‚ท ์ฝ๊ธฐ, ์“ฐ๊ธฐ ๊ถŒํ•œ) ์ƒ์„ฑํ•ด์„œ access key, secret key ๋ฐ›๊ธฐ
  • Redshift๊ฐ€ S3 ์ ‘๊ทผ > Redshift์— S3๋ฅผ ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ๋Š” ์—ญํ• (Role)์„ ๋งŒ๋“ค๊ณ  ์ด๋ฅผ Redshift์— ์ง€์ •

๊ถŒํ•œ์„ ์„ค์ •ํ•  ๋•Œ๋Š” S3FullAccess ๊ถŒํ•œ์ด ํ•„์š”ํ•˜๋‹ค.

S3FullAccess ๊ถŒํ•œ์ด ์œ„ํ—˜ํ•˜๋‹ค ์‹ถ์œผ๋ฉด, ๊ถŒํ•œ ์ƒ์„ฑ ์ค‘ Create policy > JSON > "Resource" ์— ์‚ฌ์šฉํ•  S3 ๋ฒ„ํ‚ท๋งŒ ํ• ๋‹นํ•˜์ž.

 

2. Docker์— MySQL ์„ธํŒ…

Airflow Scheduler Docker Container์— root ์œ ์ €๋กœ ๋กœ๊ทธ์ธํ•ด์„œ ์‹คํ–‰ํ•œ๋‹ค.

 

docker ps # ํ•ด๋‹น ๋ช…๋ น์–ด๋กœ airflow-scheduler์˜ CONTAINER ID๋ฅผ ์ฐพ๋Š”๋‹ค.
docker exec --user root -it CONTAINERID sh # ์ฐพ์€ CONTAINER ID๋ฅผ ๋„ฃ์–ด ๋ฃจํŠธ ๊ณ„์ •์œผ๋กœ Airflow ์Šค์ผ€์ค„๋Ÿฌ ์ง„์ž…
sudo apt-get update #apt-get ์—…๋ฐ์ดํŠธ
sudo apt-get install -y default-libmysqlclient-dev # MySQL ๋‹ค์šด๋กœ๋“œ
sudo apt-get install -y gcc # MySQL์— ํ•„์š”ํ•œ C++ ์ปดํŒŒ์ผ๋Ÿฌ ๋‹ค์šด๋กœ๋“œ
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql" # airflow mysql ๋ชจ๋“ˆ (์žฌ)์„ค์น˜

 

 

3. Airflow ์„ธํŒ…

 

๋‹ค์Œ์œผ๋กœ, Airflow Connection ์„ธํŒ…์ด ํ•„์š”ํ•˜๋‹ค.

 

1)  MySQL

Airflow ์›น UI > Admin > Connections์—์„œ MySQL์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

 

2)  AWS S3

Airflow ์›น UI > Admin > Connections์—์„œ Amazon Web Services์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

IAM ์œ ์ € ์ƒ์„ฑํ•  ๋•Œ ๋ฐ›์€ Access Key์™€ Secret Acceess Key๋ฅผ ์ž…๋ ฅํ•˜๋ฉด ๋œ๋‹ค.

 

4. ๋ฐ์ดํ„ฐ ๋ฐ ํ…Œ์ด๋ธ” ์„ธํŒ…

 

MySQL ์„ธํŒ… ์ •๋ณด(๋ฐ์ดํ„ฐ ์ ์žฌ์™„๋ฃŒ)

CREATE TABLE prod.nps(
    id INT NOT NULL AUTO_INCREMENT primary key,
    created_at timestamp,
    score smallint
);

 

Redshift๋„ ๊ฐ™์€ ํ•„๋“œ์™€ ๋ฐ์ดํ„ฐ ํƒ€์ž…์œผ๋กœ ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ด๋‘”๋‹ค.

์ด ํ…Œ์ด๋ธ”์ด ETL์„ ํ†ตํ•ด ์ „์†ก๋ฐ›์€ ๋ฐ์ดํ„ฐ๊ฐ€ ์ ์žฌ๋  ๊ณต๊ฐ„์ด ๋  ๊ฒƒ์ด๋‹ค.

 

 

โš™๏ธ ์‚ฌ์šฉํ•  ๊ธฐ๋Šฅ

 

Airflow Operator

 

  • SqlToS3Operator : MySQL์˜ SQL ๊ฒฐ๊ณผ๋ฅผ S3์— ์ ์žฌํ•˜๋Š” Operator
  • S3ToRedshiftOperator : S3์˜ ๋ฐ์ดํ„ฐ๋ฅผ Redshift ํ…Œ์ด๋ธ”์— ์ ์žฌํ•˜๋Š” Operator (COPY ์ปค๋งจ๋“œ ์‚ฌ์šฉ)

 

๐Ÿงฉ ์ฝ”๋“œ

 

ETL ์ฝ”๋“œ(Full Refresh)

 

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable

from datetime import datetime
from datetime import timedelta

import requests
import logging
import psycopg2
import json


dag = DAG(
    dag_id = 'MySQL_to_Redshift',
    start_date = datetime(2022,8,24), # ๋‚ ์งœ๊ฐ€ ๋ฏธ๋ž˜์ธ ๊ฒฝ์šฐ ์‹คํ–‰์ด ์•ˆ๋จ
    schedule = '0 9 * * *', # ๋งค์ผ 9์‹œ๋งˆ๋‹ค
    max_active_runs = 1, # ๋™์‹œ์— ์—ฌ๋Ÿฌ๊ฐœ์˜ DAG๊ฐ€ ๋™์ž‘ํ•˜์ง€ ์•Š๋„๋ก
    catchup = False,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

schema = "sunhokim_public" # ์ ์žฌ๋  Redshift ์Šคํ‚ค๋งˆ์ด๋ฆ„
table = "nps" # ์ ์žฌ๋  Redshift ํ…Œ์ด๋ธ”์ด๋ฆ„
s3_bucket = "grepp-data-engineering" # S3์—์„œ ๊ฐ€์ ธ์˜ฌ ๋ฐ์ดํ„ฐ์˜ ๋ฒ„ํ‚ท ์ด๋ฆ„
s3_key = schema + "-" + table # ์ ์žฌ๋  S3 PATH ์ •๋ณด

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps', # task_id ์ง€์ •
    query = "SELECT * FROM prod.nps", # MySQL์˜ prod.nps ํ…Œ์ด๋ธ”์˜ ๋ชจ๋“  ์ •๋ณด
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id", # Airflow ์›น UI์—์„œ ์„ค์ •ํ•œ MySQL Connection ID
    aws_conn_id = "aws_conn_id", # Airflow ์›น UI์—์„œ ์„ค์ •ํ•œ S3 Connection ID
    verify = False,
    replace = True, # S3 PATH์— ๋™์ผํ•œ ํŒŒ์ผ์ด ์กด์žฌํ•  ์‹œ ๋Œ€์ฒดํ•  ๊ฒƒ์ธ์ง€ ์œ ๋ฌด
    pd_kwargs={"index": False, "header": False}, # index์™€ header ์ •๋ณด๋Š” ๊ฐ€์ ธ์˜ค์ง€ ์•Š๋Š”๋‹ค (pandas ์˜ต์…˜)   
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps', # task_id ์ง€์ •
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'], # S3์˜ ์†Œ์Šค๋ฐ์ดํ„ฐ๋Š” csv ํŒŒ์ผ
    method = 'REPLACE', # Full Refresh > Redshift์— ๋ณธ๋ž˜ ์กด์žฌํ•˜๋˜ ๋ฐ์ดํ„ฐ๋Š” ์™„์ „ํžˆ ๋Œ€์ฒด
    redshift_conn_id = "redshift_dev_db", # Airflow ์›น UI์—์„œ ์„ค์ •ํ•œ Redshift Connection ID
    aws_conn_id = "aws_conn_id",
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

S3ToRedshiftOperator์—์„œ ์‚ฌ์šฉํ•œ method์˜ ๋ฐฉ์‹์— ๋”ฐ๋ผ ๋ฐ์ดํ„ฐ ์ ์žฌ๋ฐฉ์‹์ด ๋ฐ”๋€๋‹ค.

method = 'REPLACE' ์„ ์‚ฌ์šฉํ•˜๋ฉด ํ…Œ์ด๋ธ”์˜ ๋‚ด์šฉ์ด ์™„์ „ํžˆ ์ƒˆ๋กœ ๊ต์ฒด๋œ๋‹ค.

REPLACE์ด์™ธ์—๋„ UPSERT, APPEND๊ฐ€ ์กด์žฌํ•˜๋ฉฐ, UPSERT์˜ ๊ฒฝ์šฐ ์•„๋ž˜์—์„œ ๋‹ค๋ฃจ์—ˆ๋‹ค.

 

ETL์ฝ”๋“œ(Incremental Update)

 

์œ„ Full Refresh ์ฝ”๋“œ์—์„œ ๋ณ€๊ฒฝ๋œ ํ•จ์ˆ˜๋งŒ ๊ฐ€์ ธ์™”๋‹ค.

์—ฌ๊ธฐ์„œ method = 'UPSERT' ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.

 

mysql_to_s3_nps = SqlToS3Operator(
    task_id = 'mysql_to_s3_nps',
    
    # UPSERT ์‚ฌ์šฉ์‹œ ๋‚ ์งœ์— ํ•„ํ„ฐ๋˜๋Š” SQL๋ฌธ์ด ํ•„์š”ํ•˜๋‹ค. Airflow์˜ execution_date๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
    # {{ execution_date }}๋Š” Airflow๊ฐ€ execution_date์— ํ•ด๋‹นํ•˜๋Š” ๊ฐ’์œผ๋กœ ๋ฐ”๊พธ์–ด ๋„ฃ์–ด์ค€๋‹ค.
    query = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')",
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    sql_conn_id = "mysql_conn_id",
    aws_conn_id = "aws_conn_id",
    verify = False,
    replace = True,
    pd_kwargs={"index": False, "header": False},    
    dag = dag
)

s3_to_redshift_nps = S3ToRedshiftOperator(
    task_id = 's3_to_redshift_nps',
    s3_bucket = s3_bucket,
    s3_key = s3_key,
    schema = schema,
    table = table,
    copy_options=['csv'],
    redshift_conn_id = "redshift_dev_db",
    aws_conn_id = "aws_conn_id",    

    method = "UPSERT", # UPSERT ๋ฐฉ์‹ ์‚ฌ์šฉ
    upsert_keys = ["id"], # PK์— ํ•ด๋‹นํ•˜๋Š” ํ•„๋“œ๋ฅผ ๋„ฃ์–ด์ฃผ๋ฉด ๋œ๋‹ค. UPSERT์‹œ ์ค‘๋ณต ์ฒดํฌ์— ์‚ฌ์šฉ๋˜๋Š” ํ•„๋“œ์ด๋‹ค.
    dag = dag
)

mysql_to_s3_nps >> s3_to_redshift_nps

UPSERT ๋ฐฉ์‹์—๋Š” upsert_keys ๊ฐ’์ด ํ•„์š”ํ•˜๋‹ค.

UPSERT๋Š” ์ค‘๋ณต๋œ ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด์„œ๋Š” Update, ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ์— ๋Œ€ํ•ด์„œ๋Š” Insert ๋™์ž‘์ฒ˜๋Ÿผ ๋ฐ์ดํ„ฐ๋ฅผ ์ ์žฌํ•œ๋‹ค.

upsert_keys ๊ฐ’์€ ์ค‘๋ณต๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๋ ค๋‚ผ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” Primary Key ํ•„๋“œ๋ฅผ ๋ฐ›๊ธฐ ์œ„ํ•œ ํ•„์ˆ˜ ๋ณ€์ˆ˜์ด๋‹ค.

 

๋˜ํ•œ Full Refresh ๋ฐฉ์‹์—๋Š” Backfill์ด ํ•„์š” ์—†์–ด์„œ ๊ตฌํ˜„ํ•˜์ง€ ์•Š์•˜์ง€๋งŒ,(๊ทธ๋ƒฅ ๋‹ค์‹œ ํ•œ๋ฒˆ ์‹คํ–‰ํ•˜๋ฉด ๋˜๋‹ˆ๊นŒ)

Incremental Update์—์„œ๋Š” Backfill์„ ์œ„ํ•œ SQL ๊ตฌ๋ฌธ ๋ณ€๊ฒฝ์ด ํ•„์š”ํ•˜๋‹ค. ์—ฌ๊ธฐ์„œ execution_date๋ฅผ ํ™œ์šฉํ•œ๋‹ค.

 

SqlToS3Operator์˜ ์ฟผ๋ฆฌ๋ฌธ์„ ํ™•์ธํ•ด๋ณด๋ฉด, ๋ฐ์ดํ„ฐ ์ƒ์„ฑ๋‚ ์งœ๊ฐ€ execution_date์— ํ•ด๋‹นํ•˜๋Š” ๋ฐ์ดํ„ฐ๋งŒ ๊ฐ€์ ธ์˜ค๊ณ  ์žˆ๋‹ค.

์ฐธ๊ณ ๋กœ execution_date๋Š” ๊ฐ€์ ธ์˜ค๊ณ  ์‹ถ์€ ๋ฐ์ดํ„ฐ์˜ ๋‚ ์งœ(๋ฐ์ดํ„ฐ๊ฐ€ ์ƒ์„ฑ๋œ ๋‚ ์งœ)๋ผ๊ณ  ์ƒ๊ฐํ•˜๋ฉด ์‰ฝ๋‹ค.

 

Airflow๋Š” execution_date๋ฅผ ์•Œ์•„์„œ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์—,

๋งŒ์•ฝ์— Backfill์„ ์‹คํ–‰ํ–ˆ์„ ๋•Œ ํŒŒ์ดํ”„๋ผ์ธ ๊ตฌ๋™์ด ์‹คํŒจํ•œ ๋‚ ์งœ๋ฅผ ์ž๋™์œผ๋กœ execution_date๋กœ ์ฑ„์›Œ์ฃผ๊ฑฐ๋‚˜, ์‚ฌ์šฉ์ž๊ฐ€ ์š”์ฒญํ•œ ๋‚ ์งœ๋กœ ์ฑ„์›Œ์ฃผ๊ฒŒ๋œ๋‹ค.

 

์ด ํŒŒ์ผ๋“ค์„ Airflow ํด๋” > dags์— ๋„ฃ๊ณ  ๊ตฌ๋™ํ•ด๋ณด์ž.

 

๐Ÿ”™ Backfill ๊ตฌ๋™

 

์•„๊นŒ Incremental Update๋ฅผ ๊ตฌํ˜„ํ•œ ETL ์ฝ”๋“œ๋ฅผ ์‚ฌ์šฉํ•ด์„œ Backfill์ด ์ •์ƒ์ ์œผ๋กœ ๋™์ž‘ํ•˜๋Š”์ง€ ํ™•์ธํ•ด๋ณด์ž.

 

๊ณผ๊ฑฐ 2018๋…„ 7์›” ํ•œ๋‹ฌ๊ฐ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ๋ถˆ๋Ÿฌ์˜ค๊ณ  ์‹ถ์œผ๋ฉด ๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01

 

์—ฌ๊ธฐ์„œ ์กฐ๊ธˆ ์ฃผ์˜ํ•ด์•ผํ•  ์ ์ด ์žˆ๋‹ค.

 

  • dag ์ฝ”๋“œ์— catchup = True ์„ค์ •์ด ๋˜์–ด์žˆ์–ด์•ผ ํ•œ๋‹ค.
  • execution_date๋ฅผ ์‚ฌ์šฉํ•ด์„œ Incremental Update ์ฝ”๋“œ๋กœ ๊ตฌํ˜„๋˜์–ด ์žˆ์–ด์•ผํ•œ๋‹ค.
  • start_date(2018-07-01)๋Š” ํฌํ•จํ•˜์ง€๋งŒ, end_date(2018-08-01)๋Š” ํฌํ•จํ•˜์ง€ ์•Š๋Š”๋‹ค.
  • 7์›” 1์ผ, 7์›” 2์ผ, 7์›” 3์ผ์ฒ˜๋Ÿผ ์ˆœ์„œ๋Œ€๋กœ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๊ณ  ๋žœ๋ค์œผ๋กœ ์ˆ˜ํ–‰ํ•œ๋‹ค.
    • ๋‚ ์งœ์ˆœ์œผ๋กœ ํ•˜๊ณ ์‹ถ๋‹ค๋ฉด default_args = {... 'depends_on_past' : True ... } ๊ฐ€ ํ•„์š”ํ•˜๋‹ค.
  • Backfill์€ max_active_runs = 1 ์ธ ์ƒํƒœ๋กœ ํ•˜๋‚˜์”ฉ ๋Œ๋ฆฌ๋Š”๊ฒŒ ๋น„๊ต์  ์•ˆ์ „ํ•˜๋‹ค.