์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Docker
- off heap memory
- Speculative Execution
- Spark Partitioning
- KDT_TIL
- k8s
- CI/CD
- Kafka
- Kubernetes
- DataFrame Hint
- colab
- etl
- disk spill
- SQL
- Dag
- redshift
- Spark SQL
- Spark Caching
- aws
- Spark ์ค์ต
- Airflow
- Salting
- AQE
- ๋น ๋ฐ์ดํฐ
- Spark
- topic
- mysql
- spark executor memory
- backfill
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Today
- Total
JUST DO IT!
Colab์์ Spark SQL ๊ฐ๋จ ์ค์ตํด๋ณด๊ธฐ(+ Hive ๋ฉํ์คํ ์ด) - TIL230705 ๋ณธ๋ฌธ
Colab์์ Spark SQL ๊ฐ๋จ ์ค์ตํด๋ณด๊ธฐ(+ Hive ๋ฉํ์คํ ์ด) - TIL230705
sunhokimDev 2023. 7. 9. 00:37๐ KDT WEEK 14 DAY 3 TIL
- SparkSQL
- SparkSQL Colab ์ค์ต
- Hive ๋ฉํ์คํ ์ด
- ์ ๋ ํ ์คํธํด๋ณด๊ธฐ
๐ฅ SparkSQL
๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ํ Spark ๋ชจ๋
- ๋ฐ์ดํฐํ๋ ์์ ํ ์ด๋ธ ์ด๋ฆ์ ์ง์ ํ๋ฉด, SQL ํจ์ ์ฌ์ฉ ๊ฐ๋ฅ
- HQL(Hive Query Language)์ ํธํ ์ ๊ณตํ์ฌ, Hive ํ ์ด๋ธ๋ค์ ์ฝ๊ณ ์ธ ์ ์๋ค
- SQL์ด ๋ฐ์ดํฐํ๋ ์ ์์ ๋ณด๋ค ๊ฐ๋ ์ฑ์ด ๋ ์ข๊ณ Spark SQL Engine ์ต์ ํํ๊ธฐ์๋ ์ข์
SQL ์ฌ์ฉ๋ฐฉ๋ฒ ( ๋ฐ์ดํฐํ๋ ์ = df )
df.createOrReplaceTempView("Tablename")
group_df = spark.sql("""
SELECT gender, count(1) FROM Tablename GROUP BY
""")
print(group_df.collect())
์ฌ์ฉ๋ฐฉ๋ฒ์ด ๊ต์ฅํ ๊ฐ๋จํ๋ค.
ํ ์ด๋ธ ์ด๋ฆ ์ง์ --> ์ฟผ๋ฆฌ๋ฌธ --> collectํด์ ์ถ๋ ฅ
๐ฆ ์ค์ต
PySpark ์ค์น
!pip install pyspark==3.3.1 py4j==0.10.9.5
SparkSession ์์ฑ
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL #1") \
.getOrCreate()
1. JOIN ์ค์ต
vital = [
{ 'UserID': 100, 'VitalID': 1, 'Date': '2020-01-01', 'Weight': 75 },
{ 'UserID': 100, 'VitalID': 2, 'Date': '2020-01-02', 'Weight': 78 },
{ 'UserID': 101, 'VitalID': 3, 'Date': '2020-01-01', 'Weight': 90 },
{ 'UserID': 101, 'VitalID': 4, 'Date': '2020-01-02', 'Weight': 95 },
]
alert = [
{ 'AlertID': 1, 'VitalID': 4, 'AlertType': 'WeightIncrease', 'Date': '2020-01-01', 'UserID': 101},
{ 'AlertID': 2, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-04', 'UserID': 100},
{ 'AlertID': 3, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-05', 'UserID': 101}
]
# rdd์ ์ฌ๋ฆฌ๊ธฐ
rdd_vital = spark.sparkContext.parallelize(vital)
rdd_alert = spark.sparkContext.parallelize(alert)
# ๋ฐ์ดํฐํ๋ ์ํ
df_vital = rdd_vital.toDF()
df_alert = rdd_alert.toDF()
#์กฐ์ธ ์กฐ๊ฑด
join_expr = df_vital.VitalID == df_alert.VitalID
# ์ผ์ชฝํ
์ด๋ธ . join (์ค๋ฅธ์ชฝํ
์ด๋ธ, ์กฐ์ธ ์กฐ๊ฑด, ์กฐ์ธ ์ข
๋ฅ)
df_vital.join(df_alert, join_expr, "inner").show()
๋ฐ์ดํฐ๋ฅผ Spark์์ ๋ค๋ฃจ๊ธฐ ์ํด์๋, ๋จผ์ RDD์ ์ฌ๋ฆฐ๋ค ๋ฐ์ดํฐํ๋ ์ํํด์ค์ผํ๋ค.
JOIN์ ๋ฐ์ดํฐํ๋ ์์ .join์ ์ฌ์ฉํ๊ณ , ๋ช ๊ฐ์ง ์ธ์๋ฅผ ๋ํด ์กฐ์ธํ๋ค.
์ฝ๋์๋ ์กฐ์ธ ์กฐ๊ฑด(join_expr)์ผ๋ก VitalID๋ฅผ ์ฃผ๊ณ , ์ธ ๋ฒ์งธ ์ธ์๋ก "inner"๋ฅผ ๋ฃ์์ผ๋ก์จ INNER JOIN์ ๋ง๋ค์๋ค.
์ธ ๋ฒ์งธ ์ธ์ ์กฐ๊ฑด์ผ๋ก๋ left, right, full, cross๋ฅผ ๋ฃ์ ์ ์๋ค.
SQL ์ฟผ๋ฆฌ๋ฌธ์ผ๋ก ์กฐ์ธ์ ํ ์๋ ์๋ค.
# ์ฟผ๋ฆฌ๋ฌธ์ ์ฌ์ฉํ๋ ค๋ฉด ํ
์ด๋ธ์ด ํ์ํ๋ค.
df_vital.createOrReplaceTempView("Vital")
df_alert.createOrReplaceTempView("Alert")
# INNER JOIN
df_inner_join = spark.sql("""
SELECT * FROM Vital v
JOIN Alert a ON v.vitalID = a.vitalID;
""")
df_inner_join.show()
์ด๋ ๊ฒ ์ฟผ๋ฆฌ๋ฌธ์ ์ฌ์ฉํ๋ฉด ๊ต์ฅํ ๊ฐ๋จํ๋ค.
2. GROUP BY , Window ํจ์์ค์ต
์ญ์ SQL ์ฟผ๋ฆฌ๋ฌธ์ ์ฌ์ฉํ๋ค.
์ฌ๊ธฐ์๋ user_session_channel, session_timestamp, session_transaction ํ ์ด๋ธ์ ๊ฐ์ ธ์๋ค.
- session_timestamp : sessionid(์ฌ์ฉ์ ์ธ์ ID), ts(์ธ์ ์๊ฐ)
- user_session_channel : userid, sessionid, channel(์ ์ํ ์ฑ๋)
- session_transaction : sessionid, refunded(ํ๋ถ ์ ๋ฌด), amount(๊ตฌ๋งค ๊ธ์ก)
# GROUP BY๋ก ์๋ณ, ์ฑ๋๋ณ๋ก ๋ฌถ์
# ๊ทธ๋ฆฌ๊ณ DISTINCT ํ userid๋ก ๋ฐฉ๋ฌธ์๊ณ์ฐ
mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2""")
ํ์ SQL๋ฌธ์ ์ฌ์ฉํ๋ฏ์ด ์ฟผ๋ฆฌ๋ฌธ์ ์์ฑํ๋ฉด ๋๋ค.
์๋ณ ์ฑ๋๋ณ ์ด ๋ฐฉ๋ฌธ์(์ฌ์ฉ์๋ ์ ๋ํฌํ๊ฒ)๋ฅผ ๊ณ์ฐํ๋ ์ฟผ๋ฆฌ๋ฌธ์ด๋ค!
๋ ๋ณต์กํ ์ฟผ๋ฆฌ๋ฌธ๋ ๊ฐ๋ฅํ๋ค.
mon_channel_rev_df = spark.sql("""
SELECT LEFT(ts, 7) month,
usc.channel,
COUNT(DISTINCT userid) uniqueUsers,
COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
SUM(amount) grossRevenue,
SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END)*100
/ COUNT(DISTINCT userid), 2) conversionRate
FROM user_session_channel usc
LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
GROUP BY 1, 2
ORDER BY 1, 2;
""")
์๋ณ ์ฑ๋๋ณ ์ด ๋ฐฉ๋ฌธ์(uniqueUsers), ๋งค์ถ ๋ฐ์ ๋ฐฉ๋ฌธ์(paidUsers), ์ด ์์ต(grossRevenue), ์ ์์ต(netRevenue), ๊ตฌ๋งค ์ ํ๋ฅ (conversionRate)๋ฅผ ๊ณ์ฐํ๋ค.
์๋์ฐ ํจ์๋ ์ฌ์ฉํด๋ณด์.
first_last_channel_df2 = spark.sql("""
SELECT DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid""")
userid์ ๋ํด ์ฒ์ ์ ์ํ ์ฑ๋(First_Channel)๊ณผ ๋ง์ง๋ง ์ฑ๋(Last_Channel)์ ๊ตฌํ๋ ์ฟผ๋ฆฌ๋ฌธ์ด๋ค.
FIRST_VALUE์ LAST_VALUE๋ฅผ ์ฌ์ฉํด์ ๊ฐ๋จํ๊ฒ ์์๋ผ ์ ์๋ค.
๐ฉ Hive ๋ฉํ์คํ ์ด ์ฌ์ฉํ๊ธฐ
ํ์ด๋ธ๋ ํ ์ด๋ธ๊ณผ ํํฐ์ ๊ณผ ๊ด๋ จ๋ ๋ฉํ์ ๋ณด๋ฅผ ๋ชจ๋ ๋ฉํ์คํ ์ด์ ์ ์ฅํ๋ค. ์ด๊ฑธ Spark์์ ์ฌ์ฉํด๋ณด์.
Spark ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ํ ์ด๋ธ์ ๋ํด ๋จผ์ ์์๋ณด์.
- ์นดํ๋ก๊ทธ : ํ ์ด๋ธ๊ณผ ๋ทฐ์ ๊ดํ ๋ฉํ ๋ฐ์ดํฐ ๊ด๋ฆฌ(๋ฉ๋ชจ๋ฆฌ ๊ธฐ๋ฐ, ํ์ด๋ธ์ ํธํ)
- ํ ์ด๋ธ ๊ด๋ฆฌ ๋ฐฉ์ : ๋ฐ์ดํฐ๋ฒ ์ด์ค๋ผ ๋ถ๋ฅด๋ ํด๋์์ ํ ์ด๋ธ์ด ์ ์ฅ๋๋ ๊ตฌ์กฐ
- ์คํ ๋ฆฌ์ง ๊ธฐ๋ฐ ํ
์ด๋ธ :
- ๊ธฐ๋ณธ์ ์ผ๋ก HDFS์ Parquet ํฌ๋งท์ ์ฌ์ฉ
- Hive์ ํธํ๋๋ ๋ฉํ ์คํ ์ด ์ฌ์ฉ
- Managed Table, Unmanaged(External) Table ๋ ์ข
๋ฅ์ ํ
์ด๋ธ์ด ์กด์ฌ(Hive์ ๋์ผ)
- Managed Table
- dataframe.saveAsTable("ํ ์ด๋ธ์ด๋ฆ") or SQL(CTAS) -> HDFS์ ์ ์ฅ
- spark.sql.warehouse.dir๊ฐ ๊ฐ๋ฅดํค๋ ์์น์ ๋ฐ์ดํฐ ์ ์ฅ, Unmanaged๋ณด๋ค ์ฑ๋ฅ์ข์
- Unmanaged(External) Table
- ์ด๋ฏธ HDFS์ ์กด์ฌํ๋ ๋ฐ์ดํฐ์ ์คํค๋ง๋ฅผ ์ ์ํด์ ์ฌ์ฉ
- ๋ฐ์ดํฐ๊ฐ ์ด๋ฏธ ์กด์ฌํ๋ฏ๋ก ๋ฉํ๋ฐ์ดํฐ๋ง ์นดํ๋ก๊ทธ์ ๊ธฐ๋ก๋๋ฉฐ, ์ญ์ ๋์ด๋ ๋ฐ์ดํฐ๋ ๊ทธ๋๋ก์
- Managed Table
Spark์์๋ SparkSession ์์ฑ์ enableHiveSupport() ํธ์ถํ๋ฉด Hive์ ํธํ๋๋ ๋ฉํ์คํ ์ด ์ฌ์ฉํ ์ ์๋ค.
๐ป Hive ๋ฉํ์คํ ์ด(Managed Table) Colab์์ ์ค์ตํด๋ณด๊ธฐ
# Hive ๋ฉํ์คํ ์ด๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด .enableHiveSupport() ์ต์
์ฌ์ฉ
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Hive") \
.enableHiveSupport() \
.getOrCreate()
SparkSession ์์ฑ์ .enableHiveSupport()๋ฅผ ํธ์ถํ๋ค.
๋ฐ์ดํฐ๋ก๋ ์ ์ฅํด๋ orders.csv ํ์ผ์ ์ฌ์ฉํ์๋ค.
# DB ์์ฑ
spark.sql("CREATE DATABASE IF NOT EXISTS TEST_DB")
spark.sql("USE TEST_DB")
#default๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์์ฑ๋๋ DB
spark.sql("SHOW DATABASES").show()
DB๋ฅผ ์์ฑํ๊ณ ๋๋ฉด, SHOW DATABASES๋ฅผ ์ฌ์ฉํด์ ์ด๋ค DB๊ฐ ์๋์ง ํ์ธํด๋ณด์.
DB๋ฅผ ์์ฑํ ๋ค, ls๋ก ๋ด์ฉ์ ํ์ธํด๋ณด์.
๋ก์ปฌ์์ ์์ ํ๋ฏ๋ก ๋ก์ปฌ ๋์คํฌ๋ฅผ HDFS๋ก ์ฌ์ฉํ๋ค. ์ด๊ณณ์ ๋ฉํ์คํ ์ด(metastore_db)๋ ์ ์ฅ๋๋ค.
spark-warehouse๋ DBํด๋๋ก์จ, Spark์์ Managed Table์ ๋ง๋ค๋ฉด ๊ทธ ํ ์ด๋ธ์ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋๋ ๊ณณ์ด๋ค.
๊ทธ๋ฆฌ๊ณ TEST_DB์ orders๋ฅผ ํ ์ด๋ธ๋ก ์ ์ฅํด๋ณด์.
# ํ
์ด๋ธ๋ก ์ ์ฅ
df.write.saveAsTable("TEST_DB.orders", mode="overwrite")
๊ทธ๋ฌ๋ฉด, spark-warehouse ํด๋์ test_db.db/orders/ ๊ฒฝ๋ก๋ก ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋๋ค.
์ด ํ ์ด๋ธ์ ๋ด์ฉ์ ํ์ธํ๋ ๋ฐฉ๋ฒ์ ๋ ๊ฐ์ง๊ฐ ์๋ค.
spark.sql("SELECT * FROM TEST_DB.orders").show(5)
spark.table("TEST_DB.orders").show(5)
๋ ๋ค ๊ฐ์ ๊ฒฐ๊ณผ๊ฐ ๋์จ๋ค.
๋ฉํ์คํ ์ด ๋ด์ฉ๋ ํ์ธํด๋ณด์.
spark ๋ช ๋ น์ด๋ก ํ ์ด๋ธ ๋ฆฌ์คํธ๋ฅผ ํ์ธํ ์๋ ์๋ค.
# ํ
์ด๋ธ ๋ฆฌ์คํธ ํ์ธ
spark.catalog.listTables()
์ ์ฅ๋ orders ํ ์ด๋ธ์ ์ฌ์ฉํด์ CTAS๋ก ์๋ก์ด ํ ์ด๋ธ์ ๋ง๋ค์ด๋ณด์๋ค.
ํ ์ด๋ธ์ด ์ ์์ ์ผ๋ก ํ๋ ๋์ด๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
๐จ ์ ๋ ํ ์คํธ ์ค์ต
์ฝ๋ ์์ ํน์ ๊ธฐ๋ฅ์ ํ ์คํธํ๊ธฐ ์ํด ์์ฑ๋๋ ์ฝ๋๋ก, ์ ํด์ง ์ ๋ ฅ์ ์ฃผ๊ณ ์์๋ ์ถ๋ ฅ์ด ๋์ค๋์ง ํ์ธํ๋ค.
์ ์ ์ค์ตํด๋ณด์๋ Spark UDF๋ฅผ ์ฌ์ฉํด์, ๊ฐ ํจ์์ ๊ธฐ๋ฅ์ ํ ์คํธํ๋ ์ฝ๋๋ฅผ ์์ฑํด๋ณธ๋ค.
https://sunhokimdev.tistory.com/63
ํ์ด์ฌ์ ์ ๋ํ ์คํธ ๋ชจ๋์ ๊ธฐ๋ฐ์ผ๋ก ์์ฑํ๋ค.
๋จผ์ , UDF๋ฅผ ์์ฑํ๋ค.
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd
# ๋๋ฌธ์๋ก ๋ง๋๋ ํจ์(UDF)
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
๊ทธ๋ฆฌ๊ณ ๋ค๋ฅธ python ํ์ผ์ ํ ์คํธ ์ฝ๋๋ฅผ ์์ฑํ๋ ๊ฒ์ด ์ข์ง๋ง,
colab ํ๊ฒฝ์์ ์ค์ตํ๊ธฐ ๋๋ฌธ์ ๋์ผํ ํ์ผ์์ ํ ์คํธ ํด๋์ค๋ฅผ ๋ง๋ค์๋ค.
from unittest import TestCase
class UtilsTestCase(TestCase):
spark = None
# Test๊ฐ ์์ํ๋ฉด์ ์๋์ผ๋ก ์ฒ์ ๋ถ๋ฌ์ค๋ ํจ์
# SparkSession์ ํ๋ ๋ง๋ค์ด์ spark์ ์ ์ฅํ๋ค.
# ๋ค๋ฅธ ํจ์์์๋ ์ด๊ฑธ self๋ก ๊ฐ์ ธ์ฌ ์ ์๋ค.
@classmethod
def setUpClass(cls) -> None:
cls.spark = SparkSession.builder \
.appName("Spark Unit Test") \
.getOrCreate()
# ํ
์คํธ ํจ์
def test_upper_udf(self):
test_data = [
{ "name": "John Kim" },
{ "name": "Johnny Kim"},
{ "name": "1234" }
]
expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]
upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
test_df = self.spark.createDataFrame(test_data)
names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
results = []
for name in names:
results.append(name["NAME"])
self.assertCountEqual(results, expected_results)
# ๋ง์ง๋ง์ ํธ์ถ๋๋ ํจ์์ด๋ค.
# setUpClass์์ ํ ๋น๋ ์์์ด ์๋ค๋ฉด ์ฌ๊ธฐ์ ์๋์ผ๋ก ๋ง์ง๋ง์ ๋ฆด๋ฆฌ์คํด์ฃผ๋ ์ญํ ์ ํ๋ค.
@classmethod
def tearDownClass(cls) -> None:
cls.spark.stop()
ํ์ด์ฌ์ unittest์์ TestCase๋ฅผ importํด์ ์ฌ์ฉํ๋ค.
ํ ์คํธ๊ฐ ์์๋๋ฉด setUpClass ํจ์๋ฅผ ๋จผ์ ํธ์ถํ๊ฒ ๋๋ค.
์ฌ๊ธฐ์ SparkSession์ ๋ง๋ค๋ฉด ๋๊ณ , ํ ์คํธ์ ํ์ํ ๋ฆฌ์์ค๊ฐ ์๋ค๋ฉด ์ฌ๊ธฐ์ ํ ๋นํ๋ฉด ๋๋ค.
test_upper_udf ํจ์๋ฅผ ํตํด ํ ์คํธ๋ฅผ ํ๊ฒ ๋๋๋ฐ,
๊ฐ๋จํ ์๊ฐํ๋ฉด test_data๊ฐ upperUDF๋ฅผ ๊ฑฐ์ณ์ expected_results๊ฐ ๋๋์ง ํ์ธํ๋ ๊ณผ์ ์ด๋ค.
์ฌ๊ธฐ์ assertCountEqual์ด ๋ ๊ฐ์ ๋ฆฌ์คํธ๋ฅผ ๋ฐ์ ๋ฆฌ์คํธ๋ฅผ Sortingํ์ ๋ ๋์ผํ ์ง ํ์ธํด์ฃผ๊ฒ ๋๋ค.
์ด์ ํ ์คํธ๋ฅผ ์คํํด๋ณด์.
import unittest
unittest.main(argv=[''], verbosity=2, exit=False)