JUST DO IT!

DBT์˜ ์ถ”๊ฐ€ ๊ธฐ๋Šฅ ์•Œ์•„๋ณด๊ธฐ (seed, sources, snapshots, tests) - TIL230623 ๋ณธ๋ฌธ

TIL

DBT์˜ ์ถ”๊ฐ€ ๊ธฐ๋Šฅ ์•Œ์•„๋ณด๊ธฐ (seed, sources, snapshots, tests) - TIL230623

sunhokimDev 2023. 6. 23. 21:43

๐Ÿ“š KDT WEEK 12 DAY 5 TIL

  • Dbt Seed
  • Dbt Sources
  • Dbt Snapshots
  • Dbt tests

 


 

๐ŸŸฅ DBT Seed

 

๋งŽ์€ dimension ํ…Œ์ด๋ธ”๋“ค์€ ํฌ๊ธฐ๊ฐ€ ์ž‘๊ณ  ๋งŽ์ด ๋ณ€ํ•˜์ง€ ์•Š์œผ๋ฏ€๋กœ ์ด๋ฅผ ํŒŒ์ผํ˜•ํƒœ๋กœ ๋ฐ์ดํ„ฐ์›จ์–ดํ•˜์šฐ์Šค๋กœ ๋กœ๋“œํ•˜๋Š” ๋ฐฉ๋ฒ•

Seeds๋Š” ์ž‘์€ ํŒŒ์ผ ๋ฐ์ดํ„ฐ๋ฅผ ์ง€์นญํ•œ๋‹ค. (๋ณดํ†ต csv)

 

๋Œ€์ถฉ ๊ต‰์žฅํžˆ ์ž‘์€ csv ํŒŒ์ผ์„ seeds/์— ํ•˜๋‚˜ ๋งŒ๋“ค์—ˆ๋‹ค.

 

๊ต‰์žฅํžˆ ์ž‘์€ csvํŒŒ์ผ์ด๋‹ค.

 

dbt seed ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด DB์— ์ ์žฌํ•  ์ˆ˜ ์žˆ๋‹ค.

 

 

colab์—์„œ Redshift์— ์—ฐ๊ฒฐํ•ด์„œ ํ…Œ์ด๋ธ”์„ ํ™•์ธํ•ด๋ณด์•˜๋‹ค.

 

์ •์ƒ์ ์œผ๋กœ ์ ์žฌ๋˜์—ˆ๋‹ค.

 


๐ŸŸฆ DBT Sources

Staging ํ…Œ์ด๋ธ”๋“ค ๋งŒ๋“ค ๋•Œ ์ž…๋ ฅ ํ…Œ์ด๋ธ”๋“ค์ด ์ž์ฃผ ๋ฐ”๋€๋‹ค๋ฉด? > ๊ฐ ํ…Œ์ด๋ธ”์— ๋ณ„์นญ ์ œ๊ณตํ•˜๋Š” ๊ธฐ๋Šฅ

์ฒ˜์Œ ์ž…๋ ฅ์ด ๋˜๋Š” ETL ํ…Œ์ด๋ธ”๋“ค ๋Œ€์ƒ์œผ๋กœ ๋ณ„์นญ์„ ์ œ๊ณตํ•˜๊ณ  ์ตœ์‹  ๋ ˆ์ฝ”๋“œ ์ฒดํฌ ๊ธฐ๋Šฅ๋„ ์ œ๊ณตํ•œ๋‹ค.

 

1. ๋ณ„์นญ ์ œ๊ณต

 

models/sources.yml ํŒŒ์ผ์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ˆ˜์ •

 

version: 2

sources:
  - name: sunhokim
    schema: raw_data
    tables:
      - name: metadata
        identifier: user_metadata
      - name: event
        identifier: user_event
      - name: variant
        identifier: user_variant

 

์Šคํ‚ค๋งˆ์™€ ํ…Œ์ด๋ธ”์— ๋ณ„์นญ์„ ์ œ๊ณตํ•ด์ฃผ์–ด, SQL๋ฌธ์„ ์ž‘์„ฑํ•  ๋•Œ ๋ณ„์นญ์œผ๋กœ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ๊ฒŒ ๋œ๋‹ค.

๋“ค์—ฌ์“ฐ๊ธฐ์— ์ฃผ์˜ํ•ด์„œ ์ž‘์„ฑํ•˜์ž.

 

๊ทธ๋ฆฌ๊ณ  ํ…Œ์ด๋ธ”์„ ์‚ฌ์šฉํ•œ SQL ์ฟผ๋ฆฌ์—์„œ ํ…œํ”Œ๋ฆฟํ˜•ํƒœ๋กœ ๋ณ„์นญ์„ ๋ถˆ๋Ÿฌ์˜ค๋ฉด ๋œ๋‹ค.

 

ex)

 

๋ณ€๊ฒฝ ์ „

WITH src_user_event AS (
    SELECT * FROM raw_data.user_event
)

 

๋ณ€๊ฒฝ ํ›„

WITH src_user_metadata AS (
    SELECT * FROM {{ source("sunhokim", "event") }}
)

 

์œ„์ฒ˜๋Ÿผ ์ฝ”๋“œ๋ฅผ ๋ณ€๊ฒฝํ•ด๋„ ์ „๊ณผ ๊ฐ™์ด ๋˜‘๊ฐ™์ด ๋™์ž‘ํ•œ๋‹ค.

 

 

2. ์ตœ์‹ ์„ฑ (Freshness) ์ฒดํฌ

models/sources.yml ํŒŒ์ผ์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ˆ˜์ •ํ•œ๋‹ค.

 

version: 2

sources:
  - name: sunhokim
    schema: raw_data
    tables:
      - name: metadata
        identifier: user_metadata
      - name: event
        identifier: user_event
        loaded_at_field: datestamp
        freshness:
          warn_after: { count: 1, period: hour } 
          error_after: { count: 24, period: hour }

      - name: variant
        identifier: user_variant

 

event ํ…Œ์ด๋ธ”์˜ datestamp ํ•„๋“œ๋ฅผ ๋น„๊ตํ•˜์—ฌ ๋ฐ์ดํ„ฐ๊ฐ€ ์ตœ์‹ ํ™”๋œ ๋ฐ์ดํ„ฐ์ธ์ง€ ํ™•์ธํ•  ์ˆ˜ ์žˆ๊ฒŒ๋œ๋‹ค.

 

freshness: ์˜ warn_after์™€ error_after:์—์„œ datestamp๊ฐ€ ์–ผ๋งˆ๋‚˜ ํ˜„์žฌ ์‹œ๊ฐ„๊ณผ ๋ฉ€์–ด์กŒ์„๋•Œ ๊ฒฝ๊ณ ๋‚˜ ์˜ค๋ฅ˜๋ฅผ ๋ณด๋‚ผ์ง€ ๊ฒฐ์ •ํ•œ๋‹ค.

 

์œ„์—์„œ ์ž‘์„ฑํ•œ warn_after์˜ ๊ฒฝ์šฐ, ๋ฐ์ดํ„ฐ์˜ datestamp๊ฐ€ ํ˜„์žฌ ์‹œ๊ฐ„์—์„œ 1์‹œ๊ฐ„ ~ 24์‹œ๊ฐ„์ฐจ์ด ๋‚ ๋•Œ Warn๋ฅผ ์ฃผ๊ณ ,

error_after๋Š” ๋ฐ์ดํ„ฐ์˜ datastamp๊ฐ€ ํ˜„์žฌ ์‹œ๊ฐ„์—์„œ 24์‹œ๊ฐ„์ด์ƒ ์ฐจ์ด๋‚ ๋•Œ Error๋ฅผ ๋ณด๋‚ธ๋‹ค.

 

 

์ด๋Š” dbt source freshness ๋ช…๋ น์–ด๋กœ ์ฒดํฌํ• ์ˆ˜ ์žˆ๋‹ค.

 

ํ˜„์žฌ ๋ฐ์ดํ„ฐ์—๋Š” datestamp๊ฐ€ 2023-06-22 10:00:00์ธ ๋ฐ์ดํ„ฐ๊ฐ€ ํ•˜๋‚˜ ์‚ฝ์ž…๋˜์—ˆ๊ณ ,

dbt source freshness๋ฅผ ๋ช…๋ นํ•œ ์‹œ๊ฐ„์€ 2023-06-23 21:00:00์ด์—ˆ๋‹ค.

 

Error๊ฐ€ ๋‚˜ํƒ€๋‚œ๋‹ค.

 

๋ฐ์ดํ„ฐ์˜ datestamp๊ฐ€ ๋ช…๋ น์„ ์‹คํ–‰ํ•œ ์‹œ๊ฐ„๋ณด๋‹ค 24์‹œ๊ฐ„์ด์ƒ ๊ฒฝ๊ณผ๋˜์—ˆ์œผ๋‹ˆ Error๊ฐ€ ๋‚˜ํƒ€๋‚˜๋Š” ๊ฒƒ์ด ๋งž๋‹ค.

 


 

๐ŸŸฉ Dbt Snapshots

 

ํ…Œ์ด๋ธ”์˜ ๋ณ€ํ™”๋ฅผ ๊ณ„์†์ ์œผ๋กœ ๊ธฐ๋กํ•จ์œผ๋กœ์จ ๊ณผ๊ฑฐ ์‹œ์ ์˜ ํ…Œ์ด๋ธ” ๋‚ด์šฉ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Œ

 

์Šค๋ƒ…์ƒท์„ ์ €์žฅํ•˜๊ณ  ์‹ถ์€ ํ…Œ์ด๋ธ”์€ snapshot/ ์— SQL๋ฌธ์„ ์ž‘์„ฑํ•ด์„œ ์ €์žฅํ•˜๋ฉด ๋œ๋‹ค.

 

snapshot/scd_user_metadata.sql

{% snapshot scd_user_metadata %}
{{
    config(
    target_schema='sunhokim_public',
    unique_key='user_id',
    strategy='timestamp',
    updated_at='updated_at',
    invalidate_hard_deletes=True
    )
}}
SELECT * FROM {{ source('sunhokim', 'metadata') }}
{% endsnapshot %}

 

  • target_schema : ์Šค๋ƒ…์ƒท์„ ์ €์žฅํ•  ์Šคํ‚ค๋งˆ
  • unique_key : ์Šค๋ƒ…์ƒท ํ…Œ์ด๋ธ”์˜ PK ์ง€์ •
  • strategy : ํžˆ์Šคํ† ๋ฆฌ๋ฅผ ์žก๋Š” ๊ธฐ์ค€
  • updated_at : ์–ด๋Š ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ํžˆ์Šคํ† ๋ฆฌ๋ฅผ ์žก์„์ง€ ์„ค์ •
  • invalidate_hard_deletes : True์ผ ๊ฒฝ์šฐ ์›๋ณธ ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ญ์ œ๋˜์–ด๋„ ์Šค๋ƒ…์ƒท์—๋Š” ํ•ด๋‹น ๋ฐ์ดํ„ฐ๋ฅผ ์œ ์ง€ํ•˜๊ฒŒ ๋จ

 

๊ทธ๋ฆฌ๊ณ  dbt snapshot ๋ช…๋ น์–ด๋กœ ์Šค๋ƒ…์ƒท์„ ์ง€์ •ํ•œ ์Šคํ‚ค๋งˆ์— ํ…Œ์ด๋ธ” ์ด๋ฆ„์œผ๋กœ ์ €์žฅํ•œ๋‹ค.

 

์„ฑ๊ณต์ ์œผ๋กœ snapshot์ด ์ €์žฅ๋˜์—ˆ๋‹ค.

 

colab์—์„œ scd_user_metadata ๋‚ด์šฉ์„ ํ™•์ธํ•ด๋ณด๋ฉด, ์—ฌ๋Ÿฌ ํ•„๋“œ๊ฐ€ ๋” ์ƒ์„ฑ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

dbt_* ํ˜•ํƒœ๋กœ 4๊ฐœ์˜ ํ•„๋“œ๊ฐ€ ๋” ์ƒ์„ฑ๋˜์—ˆ๋‹ค.

 

dbt_vallid_from๊ณผ dbt_valid_to ํ•„๋“œ๋ฅผ ํ†ตํ•ด ํ•ด๋‹น ๋ ˆ์ฝ”๋“œ๊ฐ€ ์–ด๋Š ๊ธฐ๊ฐ„๋™์•ˆ ์œ ํšจํ–ˆ๋Š”์ง€ ์•Œ ์ˆ˜ ์žˆ๋„๋ก ์ €์žฅ๋œ๋‹ค.

dbt_valid_from์ด ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋œ ์‹œ์ž‘๋‚ ์งœ์ด๊ณ , dbt_valid_to๊ฐ€ ์ €์žฅ๋œ ๋งˆ์ง€๋ง‰ ๋‚ ์งœ์ด๋ฏ€๋กœ,

๊ฐ€์žฅ ์ตœ์‹ ์˜ ๋ฐ์ดํ„ฐ์ผ ๊ฒฝ์šฐ dbt_valid_to ํ•„๋“œ์˜ ๊ฐ’์€ NULL๋กœ ์ €์žฅ๋œ๋‹ค.

 

์ด ์ƒํƒœ์—์„œ ์•„๊นŒ ์Šค๋ƒ…์ƒท์„ ์ €์žฅํ•œ ํ…Œ์ด๋ธ”์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ”๊ฟ”์ฃผ์—ˆ๋‹ค.

 

UPDATE raw_data.user_metadata
SET age = '20-29', updated_at = GETDATE()
WHERE user_id = 99;

 

๊ทธ๋ฆฌ๊ณ  dbt snapshot ๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜๋ฉด, ์—…๋ฐ์ดํŠธ ๋ฐ์ดํ„ฐ์— ๋”ฐ๋ผ dbt_vaild_to๊ฐ€ ์—…๋ฐ์ดํŠธ๋  ๊ฒƒ์ด๋‹ค.

 


 

๐ŸŸจ DBT TEST

 

๋ฐ์ดํ„ฐ ํ’ˆ์งˆ์„ ํ…Œ์ŠคํŠธํ•˜๋Š” ๋ฐฉ๋ฒ•, Generic๊ณผ Singular ํ…Œ์ŠคํŠธ ๋ฐฉ๋ฒ•์œผ๋กœ ๋‚˜๋‰œ๋‹ค.

 

1. ๋‚ด์žฅ ์ผ๋ฐ˜ ํ…Œ์ŠคํŠธ (Generic)

unique, not_null, accepted_values, relationships ๋“ฑ์˜ ํ…Œ์ŠคํŠธ ์ง€์›

models/schema.yml ํŒŒ์ผ ์ƒ์„ฑ์„ ํ†ตํ•ด ๊ตฌํ˜„

 

ex) dim_user_metadata ํ…Œ์ด๋ธ”์˜ user_id ์ปฌ๋Ÿผ์— ์ค‘๋ณต์„ฑ๊ณผ not null ์ฒดํฌํ•˜๊ธฐ

version: 2
models:
  - name: dim_user_metadata
    columns:
    - name: user_id
      tests:
        - unique
        - not_null

 

dbt test ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด ๊ฐ„๋‹จํ•˜๊ฒŒ test๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

 

 

2. ์ปค์Šคํ…€ ํ…Œ์ŠคํŠธ (Singular)

SELECT์œผ๋กœ ํ…Œ์ŠคํŠธ๋ฅผ ์งœ์„œ ๊ฒฐ๊ณผ๊ฐ€ ๋ฆฌํ„ด๋˜๋ฉด ์‹คํŒจ๋กœ ๊ฐ„์ฃผํ•˜๋Š” ๋ฐฉ๋ฒ•์ด๋‹ค.

WHERE์ ˆ์— ํ•ด๋‹นํ•˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ์œผ๋ฉด ๋ฐ์ดํ„ฐ ํ’ˆ์งˆ์— ๋ฌธ์ œ๊ฐ€ ์žˆ๋Š” ๊ฒƒ์œผ๋กœ ์ƒ๊ฐํ•œ๋‹ค.

tests/ ํด๋”์— sql ํŒŒ์ผ์„ ์ƒ์„ฑํ•จ์œผ๋กœ์จ ๊ตฌํ˜„ํ•œ๋‹ค.

 

ex) tests/dim_user_metadata.sql

SELECT
 *
FROM (
    SELECT
    user_id, COUNT(1) cnt
    FROM
    {{ ref("dim_user_metadata") }}
    GROUP BY 1
    ORDER BY 2 DESC
    LIMIT 1
)
WHERE cnt > 1

์ด์ฒ˜๋Ÿผ dim_user_metadata ํ…Œ์ด๋ธ”์˜ user_id ์ปฌ๋Ÿผ์ด primary key uniqueness๋ฅผ ๋ณด์žฅํ•˜๋„๋ก ์„ค๊ณ„ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

dbt test --select dim_user_metadata ํ˜•ํƒœ๋กœ ํŠน์ • ํ…Œ์ด๋ธ”๋งŒ test๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

 

์„ฑ๊ณต์ ์œผ๋กœ ํ…Œ์ŠคํŠธ๊ฐ€ ์™„๋ฃŒ๋˜์—ˆ๋‹ค.