์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- CI/CD
- KDT_TIL
- Spark
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- backfill
- ๋น ๋ฐ์ดํฐ
- Kubernetes
- Spark Partitioning
- aws
- Salting
- disk spill
- SQL
- Docker
- k8s
- Dag
- AQE
- redshift
- off heap memory
- Spark ์ค์ต
- Airflow
- colab
- spark executor memory
- Kafka
- Speculative Execution
- etl
- topic
- mysql
- DataFrame Hint
- Spark SQL
- Spark Caching
- Today
- Total
JUST DO IT!
DBT์ ์ถ๊ฐ ๊ธฐ๋ฅ ์์๋ณด๊ธฐ (seed, sources, snapshots, tests) - TIL230623 ๋ณธ๋ฌธ
DBT์ ์ถ๊ฐ ๊ธฐ๋ฅ ์์๋ณด๊ธฐ (seed, sources, snapshots, tests) - TIL230623
sunhokimDev 2023. 6. 23. 21:43๐ KDT WEEK 12 DAY 5 TIL
- Dbt Seed
- Dbt Sources
- Dbt Snapshots
- Dbt tests
๐ฅ DBT Seed
๋ง์ dimension ํ ์ด๋ธ๋ค์ ํฌ๊ธฐ๊ฐ ์๊ณ ๋ง์ด ๋ณํ์ง ์์ผ๋ฏ๋ก ์ด๋ฅผ ํ์ผํํ๋ก ๋ฐ์ดํฐ์จ์ดํ์ฐ์ค๋ก ๋ก๋ํ๋ ๋ฐฉ๋ฒ
Seeds๋ ์์ ํ์ผ ๋ฐ์ดํฐ๋ฅผ ์ง์นญํ๋ค. (๋ณดํต csv)
๋์ถฉ ๊ต์ฅํ ์์ csv ํ์ผ์ seeds/์ ํ๋ ๋ง๋ค์๋ค.
dbt seed ๋ช ๋ น์ด๋ฅผ ํตํด DB์ ์ ์ฌํ ์ ์๋ค.
colab์์ Redshift์ ์ฐ๊ฒฐํด์ ํ ์ด๋ธ์ ํ์ธํด๋ณด์๋ค.
๐ฆ DBT Sources
Staging ํ ์ด๋ธ๋ค ๋ง๋ค ๋ ์ ๋ ฅ ํ ์ด๋ธ๋ค์ด ์์ฃผ ๋ฐ๋๋ค๋ฉด? > ๊ฐ ํ ์ด๋ธ์ ๋ณ์นญ ์ ๊ณตํ๋ ๊ธฐ๋ฅ
์ฒ์ ์ ๋ ฅ์ด ๋๋ ETL ํ ์ด๋ธ๋ค ๋์์ผ๋ก ๋ณ์นญ์ ์ ๊ณตํ๊ณ ์ต์ ๋ ์ฝ๋ ์ฒดํฌ ๊ธฐ๋ฅ๋ ์ ๊ณตํ๋ค.
1. ๋ณ์นญ ์ ๊ณต
models/sources.yml ํ์ผ์ ๋ค์๊ณผ ๊ฐ์ด ์์
version: 2
sources:
- name: sunhokim
schema: raw_data
tables:
- name: metadata
identifier: user_metadata
- name: event
identifier: user_event
- name: variant
identifier: user_variant
์คํค๋ง์ ํ ์ด๋ธ์ ๋ณ์นญ์ ์ ๊ณตํด์ฃผ์ด, SQL๋ฌธ์ ์์ฑํ ๋ ๋ณ์นญ์ผ๋ก ๋ถ๋ฌ์ฌ ์ ์๊ฒ ๋๋ค.
๋ค์ฌ์ฐ๊ธฐ์ ์ฃผ์ํด์ ์์ฑํ์.
๊ทธ๋ฆฌ๊ณ ํ ์ด๋ธ์ ์ฌ์ฉํ SQL ์ฟผ๋ฆฌ์์ ํ ํ๋ฆฟํํ๋ก ๋ณ์นญ์ ๋ถ๋ฌ์ค๋ฉด ๋๋ค.
ex)
๋ณ๊ฒฝ ์
WITH src_user_event AS (
SELECT * FROM raw_data.user_event
)
๋ณ๊ฒฝ ํ
WITH src_user_metadata AS (
SELECT * FROM {{ source("sunhokim", "event") }}
)
์์ฒ๋ผ ์ฝ๋๋ฅผ ๋ณ๊ฒฝํด๋ ์ ๊ณผ ๊ฐ์ด ๋๊ฐ์ด ๋์ํ๋ค.
2. ์ต์ ์ฑ (Freshness) ์ฒดํฌ
models/sources.yml ํ์ผ์ ๋ค์๊ณผ ๊ฐ์ด ์์ ํ๋ค.
version: 2
sources:
- name: sunhokim
schema: raw_data
tables:
- name: metadata
identifier: user_metadata
- name: event
identifier: user_event
loaded_at_field: datestamp
freshness:
warn_after: { count: 1, period: hour }
error_after: { count: 24, period: hour }
- name: variant
identifier: user_variant
event ํ ์ด๋ธ์ datestamp ํ๋๋ฅผ ๋น๊ตํ์ฌ ๋ฐ์ดํฐ๊ฐ ์ต์ ํ๋ ๋ฐ์ดํฐ์ธ์ง ํ์ธํ ์ ์๊ฒ๋๋ค.
freshness: ์ warn_after์ error_after:์์ datestamp๊ฐ ์ผ๋ง๋ ํ์ฌ ์๊ฐ๊ณผ ๋ฉ์ด์ก์๋ ๊ฒฝ๊ณ ๋ ์ค๋ฅ๋ฅผ ๋ณด๋ผ์ง ๊ฒฐ์ ํ๋ค.
์์์ ์์ฑํ warn_after์ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ์ datestamp๊ฐ ํ์ฌ ์๊ฐ์์ 1์๊ฐ ~ 24์๊ฐ์ฐจ์ด ๋ ๋ Warn๋ฅผ ์ฃผ๊ณ ,
error_after๋ ๋ฐ์ดํฐ์ datastamp๊ฐ ํ์ฌ ์๊ฐ์์ 24์๊ฐ์ด์ ์ฐจ์ด๋ ๋ Error๋ฅผ ๋ณด๋ธ๋ค.
์ด๋ dbt source freshness ๋ช ๋ น์ด๋ก ์ฒดํฌํ ์ ์๋ค.
ํ์ฌ ๋ฐ์ดํฐ์๋ datestamp๊ฐ 2023-06-22 10:00:00์ธ ๋ฐ์ดํฐ๊ฐ ํ๋ ์ฝ์ ๋์๊ณ ,
dbt source freshness๋ฅผ ๋ช ๋ นํ ์๊ฐ์ 2023-06-23 21:00:00์ด์๋ค.
๋ฐ์ดํฐ์ datestamp๊ฐ ๋ช ๋ น์ ์คํํ ์๊ฐ๋ณด๋ค 24์๊ฐ์ด์ ๊ฒฝ๊ณผ๋์์ผ๋ Error๊ฐ ๋ํ๋๋ ๊ฒ์ด ๋ง๋ค.
๐ฉ Dbt Snapshots
ํ ์ด๋ธ์ ๋ณํ๋ฅผ ๊ณ์์ ์ผ๋ก ๊ธฐ๋กํจ์ผ๋ก์จ ๊ณผ๊ฑฐ ์์ ์ ํ ์ด๋ธ ๋ด์ฉ์ ํ์ธํ ์ ์์
์ค๋ ์ท์ ์ ์ฅํ๊ณ ์ถ์ ํ ์ด๋ธ์ snapshot/ ์ SQL๋ฌธ์ ์์ฑํด์ ์ ์ฅํ๋ฉด ๋๋ค.
snapshot/scd_user_metadata.sql
{% snapshot scd_user_metadata %}
{{
config(
target_schema='sunhokim_public',
unique_key='user_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
SELECT * FROM {{ source('sunhokim', 'metadata') }}
{% endsnapshot %}
- target_schema : ์ค๋ ์ท์ ์ ์ฅํ ์คํค๋ง
- unique_key : ์ค๋ ์ท ํ ์ด๋ธ์ PK ์ง์
- strategy : ํ์คํ ๋ฆฌ๋ฅผ ์ก๋ ๊ธฐ์ค
- updated_at : ์ด๋ ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ํ์คํ ๋ฆฌ๋ฅผ ์ก์์ง ์ค์
- invalidate_hard_deletes : True์ผ ๊ฒฝ์ฐ ์๋ณธ ๋ฐ์ดํฐ๊ฐ ์ญ์ ๋์ด๋ ์ค๋ ์ท์๋ ํด๋น ๋ฐ์ดํฐ๋ฅผ ์ ์งํ๊ฒ ๋จ
๊ทธ๋ฆฌ๊ณ dbt snapshot ๋ช ๋ น์ด๋ก ์ค๋ ์ท์ ์ง์ ํ ์คํค๋ง์ ํ ์ด๋ธ ์ด๋ฆ์ผ๋ก ์ ์ฅํ๋ค.
colab์์ scd_user_metadata ๋ด์ฉ์ ํ์ธํด๋ณด๋ฉด, ์ฌ๋ฌ ํ๋๊ฐ ๋ ์์ฑ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
dbt_vallid_from๊ณผ dbt_valid_to ํ๋๋ฅผ ํตํด ํด๋น ๋ ์ฝ๋๊ฐ ์ด๋ ๊ธฐ๊ฐ๋์ ์ ํจํ๋์ง ์ ์ ์๋๋ก ์ ์ฅ๋๋ค.
dbt_valid_from์ด ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋ ์์๋ ์ง์ด๊ณ , dbt_valid_to๊ฐ ์ ์ฅ๋ ๋ง์ง๋ง ๋ ์ง์ด๋ฏ๋ก,
๊ฐ์ฅ ์ต์ ์ ๋ฐ์ดํฐ์ผ ๊ฒฝ์ฐ dbt_valid_to ํ๋์ ๊ฐ์ NULL๋ก ์ ์ฅ๋๋ค.
์ด ์ํ์์ ์๊น ์ค๋ ์ท์ ์ ์ฅํ ํ ์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ๊ฟ์ฃผ์๋ค.
UPDATE raw_data.user_metadata
SET age = '20-29', updated_at = GETDATE()
WHERE user_id = 99;
๊ทธ๋ฆฌ๊ณ dbt snapshot ๋ช ๋ น์ด๋ฅผ ์คํํ๋ฉด, ์ ๋ฐ์ดํธ ๋ฐ์ดํฐ์ ๋ฐ๋ผ dbt_vaild_to๊ฐ ์ ๋ฐ์ดํธ๋ ๊ฒ์ด๋ค.
๐จ DBT TEST
๋ฐ์ดํฐ ํ์ง์ ํ ์คํธํ๋ ๋ฐฉ๋ฒ, Generic๊ณผ Singular ํ ์คํธ ๋ฐฉ๋ฒ์ผ๋ก ๋๋๋ค.
1. ๋ด์ฅ ์ผ๋ฐ ํ ์คํธ (Generic)
unique, not_null, accepted_values, relationships ๋ฑ์ ํ ์คํธ ์ง์
models/schema.yml ํ์ผ ์์ฑ์ ํตํด ๊ตฌํ
ex) dim_user_metadata ํ ์ด๋ธ์ user_id ์ปฌ๋ผ์ ์ค๋ณต์ฑ๊ณผ not null ์ฒดํฌํ๊ธฐ
version: 2
models:
- name: dim_user_metadata
columns:
- name: user_id
tests:
- unique
- not_null
dbt test ๋ช ๋ น์ด๋ฅผ ํตํด ๊ฐ๋จํ๊ฒ test๊ฐ ๊ฐ๋ฅํ๋ค.
2. ์ปค์คํ ํ ์คํธ (Singular)
SELECT์ผ๋ก ํ ์คํธ๋ฅผ ์ง์ ๊ฒฐ๊ณผ๊ฐ ๋ฆฌํด๋๋ฉด ์คํจ๋ก ๊ฐ์ฃผํ๋ ๋ฐฉ๋ฒ์ด๋ค.
WHERE์ ์ ํด๋นํ๋ ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด ๋ฐ์ดํฐ ํ์ง์ ๋ฌธ์ ๊ฐ ์๋ ๊ฒ์ผ๋ก ์๊ฐํ๋ค.
tests/ ํด๋์ sql ํ์ผ์ ์์ฑํจ์ผ๋ก์จ ๊ตฌํํ๋ค.
ex) tests/dim_user_metadata.sql
SELECT
*
FROM (
SELECT
user_id, COUNT(1) cnt
FROM
{{ ref("dim_user_metadata") }}
GROUP BY 1
ORDER BY 2 DESC
LIMIT 1
)
WHERE cnt > 1
์ด์ฒ๋ผ dim_user_metadata ํ ์ด๋ธ์ user_id ์ปฌ๋ผ์ด primary key uniqueness๋ฅผ ๋ณด์ฅํ๋๋ก ์ค๊ณํ ์ ์๋ค.
dbt test --select dim_user_metadata ํํ๋ก ํน์ ํ ์ด๋ธ๋ง test๋ฅผ ์คํํ ์๋ ์๋ค.
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
๋น ๋ฐ์ดํฐ์ Spark ์์๋ณด๊ธฐ - TIL230703 (0) | 2023.07.04 |
---|---|
๋ฐ์ดํฐ ์นดํ๋ก๊ทธ - TIL230623(2) (0) | 2023.06.23 |
๊ฐ๋จํ DBT ์ค์ตํด๋ณด๊ธฐ (with Redshift) - TIL230622(2) (0) | 2023.06.23 |
Airflow ์ด์์ ์ฃผ์์ฌํญ๊ณผ Airflow ๋์ ์๋น์ค ์์๋ณด๊ธฐ - TIL230622 (0) | 2023.06.22 |
Airflow Task Grouping๊ณผ Dynamic Dags ์์๋ณด๊ธฐ - TIL230621 (0) | 2023.06.22 |