์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- disk spill
- Spark ์ค์ต
- off heap memory
- aws
- Speculative Execution
- Spark SQL
- k8s
- Salting
- SQL
- Spark
- DataFrame Hint
- spark executor memory
- mysql
- Dag
- etl
- AQE
- CI/CD
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Spark Caching
- Spark Partitioning
- Airflow
- Docker
- ๋น ๋ฐ์ดํฐ
- topic
- colab
- redshift
- backfill
- Kubernetes
- Kafka
- KDT_TIL
- Today
- Total
JUST DO IT!
SQL ํ์ต - Transaction(for Python) & ์ถ๊ฐ SQL ๋ฌธ๋ฒ (TIL 230512) ๋ณธ๋ฌธ
SQL ํ์ต - Transaction(for Python) & ์ถ๊ฐ SQL ๋ฌธ๋ฒ (TIL 230512)
sunhokimDev 2023. 5. 12. 22:06๐ KDT WEEK 6 DAY 5 TIL
- Transaction
- Python์์ ํธ๋์ญ์ ์ฌ์ฉํ๊ธฐ
- ์ ์ฉํ SQL ๋ฌธ๋ฒ
๐ฅ Transaction
Atomicํ๊ฒ ์คํ๋์ด์ผ ํ๋ SQL๋ค์ ๋ฌถ์ด์ ํ๋์ ์์ ์ฒ๋ผ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ
- DDL ๋๋ DML ์ค ๋ ์ฝ๋๋ฅผ ์์ , ์ถ๊ฐ, ์ญ์ ํ ๊ฒ์๋ง ์๋ฏธ๊ฐ ์์
- SELECT์๋ ํธ๋์ญ์ ๋ถ๊ฐ
- BEGIN ~ END or BEGIN ~ COMMIT ์ฌ์ด์ ์ฟผ๋ฆฌ๋ฌธ ์์ฑ
- Atomic : ํธ๋์ญ์ ์ ๊ณผ์ ์ด ๋ชจ๋ ์ฑ๊ณต๋๊ฑฐ๋ ๋ชจ๋ ์คํจ๋์ด์ผํจ
- ROLLBACK : ์ด์ ์ํ๋ก ๋์๊ฐ
ex) ์ํ์ ๊ณ์ข ์ธ์ถ๊ณผ ์ ๊ธ์ด ๋ฌถ์ฌ์ ์คํ๋์ด์ผ ํ๋ฏ๋ก ํธ๋์ญ์ ์ด ํ์ํจ
BEGIN;
A์ ๊ณ์ข๋ก๋ถํฐ ์ธ์ถ;
B์ ๊ณ์ข๋ก ์
๊ธ;
END; -- COMMIT๊ณผ ๋์ผ
- ๊ณ์ข ์ธ์ถ๊ณผ ์ ๊ธ์ด ๋ง์น ํ๋์ ๋ช ๋ น์ด์ฒ๋ผ ์ฒ๋ฆฌ๋๋ค.
- BEGIN ์ด์ ์ ์ํ๋ก ๋์๊ฐ๋ ค๋ฉด ROLLBACK ์คํ
commit mode
- autocommit
- True : ๋ชจ๋ ๋ ์ฝ๋์ ์์ /์ญ์ /์ถ๊ฐ ์์ ์ด ๋ฐ๋ก DB์ ์ฐ์ฌ์ง
- False : ๋ชจ๋ ๋ ์ฝ๋์ ์์ /์ญ์ /์ถ๊ฐ ์์
์ด COMMIT์ด ํธ์ถ๋ ๋๊น์ง ์ ์ฅ X
- False์ ๊ฒฝ์ฐ .commit(), .rollback()์ผ๋ก DB ์ ์ฅ ์กฐ์จ
DELETE FROM vs TRUNCATE
DELETE FROM table_name (DELETE * FROM ๊ณผ๋ ๋ค๋ฆ)
- ํ ์ด๋ธ์์ ๋ชจ๋ ๋ ์ฝ๋๋ฅผ ์ญ์ ๊ฐ๋ฅ
- WHERE ์ฌ์ฉ๊ฐ๋ฅ โก๏ธ ํน์ ๋ ์ฝ๋๋ง ์ญ์ ๊ฐ๋ฅ (DROP TABLE๊ณผ์ ์ฐจ์ด์ !)
TRUNCATE table_name
- ํ ์ด๋ธ์์ ๋ชจ๋ ๋ ์ฝ๋๋ฅผ ์ญ์
- DELETE FROM ๋ณด๋ค ์๋๊ฐ ๋น ๋ฅด๋ค
- WHERE, Transaction ์ฌ์ฉ๋ถ๊ฐ โก๏ธ ๋กค๋ฐฑ๋ถ๊ฐ
๐ฆ ์ค์ต
Redshift Connet in Python
.connect()์ ์์ฑํ๋ Redshift ์ ๋ณด๋ฅผ ์ฑ์๋ฃ์ผ๋ฉด ๋๋ค. (autocommit = False)
import psycopg2
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=autocommit)
cur = conn.cursor()
cur.execute("์ฟผ๋ฆฌ๋ฌธ")
res = cur.fetchall()
for r in res:
print(r)
cur.execute() ์์ ์ฟผ๋ฆฌ๋ฌธ์ ํตํด ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ ์ถ๋ ฅํ๋ ์ฝ๋์ด๋ค.
1. autocommit = False์ด๋ฏ๋ก DB ๋ฐ์ดํฐ ์กฐ์์ด ํ์ฌ ์ธ์ ์์๋ ์ด๋ค์ง๋, ์ค์ ์ ์ฅ๋์ง๋ ์๋๋ค.
2. conn.commit() ์ผ๋ก ์ค์ DB์ ์ ์ฅํ ์ ์๋ค.
3. conn.rollback() ์ผ๋ก ์คํ๋๊ธฐ ์ด์ ์ํ๋ก ๋์๊ฐ ์ ์๋ค.
๋ค์์ transaction์ ์์์ด๋ค.
try:
cur.execute("DELETE FROM test_name_gender;")
cur.execute("INSERT INTO test_name_gender VALUES ('Claire', 'Female');")
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print(error)
conn.rollback()
finally :
conn.close()
test_name_gender ํ ์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ ์ญ์ , ์ถ๊ฐํ๋ ๋ฑ ๋ฐ์ดํฐ๋ฅผ ์กฐ์ํ์๋ค.
ํ์ง๋ง try ๊ณผ์ ์์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค๋ฉด conn.commit()์ด ์คํ๋์ง ์๊ณ conn.rollback()์ด ์คํ๋๋ค.
autocommit=True์ธ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ ์กฐ์์ด๋ ์๋์ผ๋ก commit๋๋ฏ๋ก ํธ๋์ญ์ ์ ๋ค๋ฃจ๋ ค๋ฉด ๋ค์์ ๋ฐฉ์์ ๋ฐ๋ฅธ๋ค.
cur.execute("BEGIN;")
cur.execute("DELETE FROM test_name_gender;")
cur.execute("INSERT INTO test_name_gender VALUES ('Benjamin', 'Male');")
cur.execute("END;")
- cur.execute()๋ฅผ ์ฌ์ฉํ์ฌ ํ ์ค์ฉ ๋ช ๋ น์ด๋ฅผ ์คํํ๋ค.
- BEGIN๊ณผ END(COMMIT)์ผ๋ก ํธ๋์ญ์ ์ range๋ฅผ ์ค์ ํ๋ค.
๐จ ์์๋๋ฉด ์ ์ฉํ SQL ๋ฌธ๋ฒ๋ค
ํ ์ด๋ธ ์ฐ์ฐ
- UNION : ์ฌ๋ฌ๊ฐ์ ํ
์ด๋ธ์ด๋ SELECT ๊ฒฐ๊ณผ๋ฅผ ํ๋์ ๊ฒฐ๊ณผ๋ก ํฉ์ณ์ค๋ค.
- UNION ALL์ ๊ฒฝ์ฐ ์ค๋ณต์ ์ ๊ฑฐํ์ง ์๊ณ ๋ชจ๋ ๊ฒฐ๊ณผ๋ก ํฉ์น๋ค.
- EXCEPT : ํ๋์ SELECT ๊ฒฐ๊ณผ์์ ๋ค๋ฅธ SELECT ๊ฒฐ๊ณผ๋ฅผ ๋นผ์ค๋ค.
- INTERSECT : ์ฌ๋ฌ๊ฐ์ SELECT๋ฌธ์์ ๊ฐ์ ๋ ์ฝ๋๋ง ์ฐพ์์ค๋ค.
NULL ์ฒ๋ฆฌ
- COALESCE(A, B, C, ...) : A๋ถํฐ ์ฐจ๋ก๋๋ก NULL๊ฐ์ด ๋์ค๋ฉด ๊ทธ ๊ฐ์ ๋ฆฌํดํ๊ณ ๋ชจ๋ NULL์ด๋ฉด NULL ๋ฆฌํด
- NULL๊ฐ์ ๋ค๋ฅธ ๊ฐ์ผ๋ก ๋ฐ๊พธ๊ณ ์ถ์ ๋ ์ฌ์ฉ
- NULLIF(A, B) : A์ B๊ฐ ๊ฐ์ผ๋ฉด NULL์ ๋ฆฌํดํ๋ค.
LISTAGG
๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด ์ ์ ์๋ฏ์ด ์ญ ๊ฒฐ๊ณผ๋ฅผ ํ ๋ฌธ์์ด๋ก ํฉ์ณ์ค๋ค.
- LISTAGG(channel) ์ LISTAGG(channel, '->') ๋ก ๋ฐ๊ฟ์ฃผ๋ฉด, -> ๊ฐ ๋ฌธ์์ด์ ๊ตฌ๋ถ์๋ก ์ถ๊ฐ๋๋ค.
- GROUP BY์ Aggํจ์์ด๋ฏ๋ก GROUP BY๊ฐ ํ์ํ๋ค.
- LISTAGG ๋ค์ชฝ์ ์ ์ธ๋ WITHIN์ ์ ๋ ฌ์ ๋ด๋นํ๋ค. WITHIN์ด ์์ผ๋ฉด ๋ฌด์์๋ก ๋ฌธ์์ด์ด ์์ฑ๋๋ค.
WINDOW
- ex) FIRST_VALUE, LAST_VALUE, ROW_NUMBER... > ๋ฐ์ ์์ ์ฐธ๊ณ
- ํจ์์ด๋ฆ() OVER ( PARTITION BY ... ORDER BY ...) ํํ๋ฅผ ์ง๋ ๊ฒ์ด ํน์ง
1. LAG : ์ด์ ๋๋ ๋ค์ ํน์ ํ๋ ๊ฐ์ ๊ฐ์ ธ์ฌ ์ ์๋ ํจ์
ORDER BY userid, ts๊ฐ ๋ค์ชฝ์ ์ ์ธ๋์ด์๋ค๊ณ ๊ฐ์ ํ๋ค.
LAG(channel,1) : ๋ด ์ ํํฐ์ ์ ์ฑ๋์ ๊ฐ์ ธ์จ๋ค.
PARTITION BY userId ORDER BY ts : userId๊ฐ ํํฐ์ ์ ๊ธฐ์ค์ด ๋๊ณ , ts๊ฐ ์ ๋ ฌ ๊ธฐ์ค์ด ๋์ด์ค๋ค.
JSON Parsing Functions
JSON ํฌ๋งท์ ํ์ฉํ์ฌ Parsing ๊ฐ๋ฅํ ํจ์
SELECT JSON_EXTRACT_PATH_TEXT('JSON ๋ฌธ์์ด', 'f4','f6');
--> f4 ํ๊ทธ์ f6 ์์๋ฅผ ์ฐพ์ ๊ฐ์ ๋ฐํํด์ค๋ค.
JSON์ ๊ตฌ์กฐ๋ฅผ ๋ฏธ๋ฆฌ ์๊ณ ์์ด์ผ ํ๋ค๋ ๊ฒ์ด ํ
์์
1. ์ฌ์ฉ์๋ณ๋ก ์ฒ์ ์ฑ๋๊ณผ ๋ง์ง๋ง ์ฑ๋ ์์๋ด๊ธฐ
๋ดํ์ด
SELECT usc.userid, ROW_NUMBER() OVER (PARTITION BY usc.userid ORDER BY ts.ts) as nn
FROM raw_data.session_transaction as trans
INNER JOIN raw_data.user_session_channel as usc ON trans.sessionid = usc.sessionid
INNER JOIN raw_data.session_timestamp as ts ON trans.sessionid = ts.sessionid
์ฒ์ ์ฑ๋๊ณผ ๋ง์ง๋ง ์ฑ๋๋ฟ๋ง ์๋๋ผ ์ ์ฒด ์ฑ๋์ด ๋ชจ๋ ์ถ๋ ฅ๋๋ค.
FIRST_VALUE, LAST_VALUE ์ฌ์ฉํ๊ธฐ
SELECT DISTINCT
A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel
FROM raw_data.user_session_channel A
LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;
rows between ~ following ๊ตฌ๋ฌธ์ ์๋์ฐ์ ๋ฒ์๋ฅผ ์ ์ฒด๋ก ํ๋ ๊ตฌ์ ์ด๋ค.
CTE ๋น๋ฉ๋ธ๋ก ์ฌ์ฉํ๊ธฐ
WITH first AS (
SELECT userid, ts, channel, ROW_NUMBER() OVER(PARTITION BY userid ORDER BY ts) seq
FROM raw_data.user_session_channel usc
JOIN raw_data.session_timestamp st ON usc.sessionid = st.sessionid
), last AS(
SELECT userid, ts, channel, ROW_NUMBER() OVER(PARTITION BY userid ORDER BY ts DESC) seq
FROM raw_data.user_session_channel usc
JOIN raw_data.session_timestamp st ON usc.sessionid = st.sessionid
)
SELECT first.userid AS userid, first.channel AS first_channel, last.channel AS last_channel
FROM first
JOIN last ON first.userid = last.userid and last.seq = 1
WHERE first.seq = 1
2.. Gross Revenue(Refund ํฌํจ๋งค์ถ)์ด ๊ฐ์ฅ ํฐ userId 10๊ฐ ์ฐพ๊ธฐ
๋ด ํ์ด(์ ๋ต)
SELECT usc.userid, SUM(trans.amount) as GrossRevenue
FROM raw_data.session_transaction as trans
INNER JOIN raw_data.user_session_channel as usc ON trans.sessionid = usc.sessionid
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
SUM OVER ํ์ฉํด๋ณด๊ธฐ
SELECT
DISTINCT usc.userid,
SUM(amount) OVER(PARTITION BY usc.userid)
FROM raw_data.user_session_channel AS usc
JOIN raw_data.session_transaction AS revenue ON revenue.sessionid = usc.sessionid
ORDER BY 2 DESC
์์ฒ๋ผ GROUP BY๋ฅผ ์ ๊ฑฐํ๊ณ SUM์์ PARTITION BY๋ฅผ ํ์ฉํ ์ ์๋ค.
ํ์ง๋ง ์ด๋ฌ๋ฉด SUM์ด userid์ ์ค๋ณตํ์๋งํผ ๋ช ๋ฒ์ฉ ์ถ๋ ฅ๋๊ธฐ ๋๋ฌธ์ DISTINCT๋ฅผ ํ์ฉํ๋ค.
์ฌ์ค ์ด๋ณด๋ค ์์ ์๋ ๋ต์ด ํจ์ฌ ํจ์จ์ ์ด๊ณ ๊น๋ํ๋ค.
3. raw_data.nps ํ ์ด๋ธ์ ๋ฐํ์ผ๋ก ์๋ณ NPS ๊ณ์ฐํ๊ธฐ
๋ด ํ์ด
%%sql
SELECT
LEFT(created_at,7) as month,
ROUND(SUM(CASE WHEN score >= 9 THEN 1 END) / COUNT(1)::float , 3) - ROUND(SUM(CASE WHEN score <= 6 THEN 1 END) / COUNT(1)::float , 3) as NPS
FROM raw_data.nps
GROUP BY 1
ํ ์ด๋ธ ํ๋ ๋ง๋ค์ด์ ๋ช ์ํ๊ธฐ
SELECT
month,
ROUND((promoters-detractors)::float/total_count*100, 2) AS overall_nps
FROM (
SELECT
LEFT(created_at, 7) AS month,
COUNT(CASE WHEN score >= 9 THEN 1 END) AS promoters,
COUNT(CASE WHEN score <= 6 THEN 1 END) AS detractors,
COUNT(CASE WHEN score > 6 AND score < 9 THEN 1 END) AS passives,
COUNT(1) AS total_count
FROM raw_data.nps
GROUP BY 1
ORDER BY 1
)
0์ผ๋ก ๋๋ ๋๋ฅผ ์ ์ธํ๊ธฐ ์ํ ์ฒ๋ฆฌ๊ฐ ๋์ด์์ง๋ ์๋ค.
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
AWS IAM, S3, CI/CD, ์ข ํฉ์ค์ต - TIL230517 (0) | 2023.05.17 |
---|---|
AWS EC2 ๊ธฐ๋ณธ ์ฉ์ด ๋ฐ ์ธ์คํด์ค ์์ฑํ๊ธฐ(TIL 230515) (0) | 2023.05.15 |
SQL ํ์ต - JOIN (TIL 230511) (0) | 2023.05.12 |
SQL ํ์ต - GROUP BY, CTAS (TIL 230510) (0) | 2023.05.10 |
Redshift cluster ์์ฑ ๋ฐ SQL ๋ฐ์ดํฐ ํ์ง ์ฒดํฌ (TIL 230509) (0) | 2023.05.10 |