JUST DO IT!

Colab์—์„œ Spark SQL ๊ฐ„๋‹จ ์‹ค์Šตํ•ด๋ณด๊ธฐ(+ Hive ๋ฉ”ํƒ€์Šคํ† ์–ด) - TIL230705 ๋ณธ๋ฌธ

TIL

Colab์—์„œ Spark SQL ๊ฐ„๋‹จ ์‹ค์Šตํ•ด๋ณด๊ธฐ(+ Hive ๋ฉ”ํƒ€์Šคํ† ์–ด) - TIL230705

sunhokimDev 2023. 7. 9. 00:37

๐Ÿ“š KDT WEEK 14 DAY 3 TIL

  • SparkSQL
  • SparkSQL Colab ์‹ค์Šต
  • Hive ๋ฉ”ํƒ€์Šคํ† ์–ด
  • ์œ ๋‹› ํ…Œ์ŠคํŠธํ•ด๋ณด๊ธฐ

 


 

๐ŸŸฅ SparkSQL

๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ Spark ๋ชจ๋“ˆ

  • ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— ํ…Œ์ด๋ธ” ์ด๋ฆ„์„ ์ง€์ •ํ•˜๋ฉด, SQL ํ•จ์ˆ˜ ์‚ฌ์šฉ ๊ฐ€๋Šฅ
  • HQL(Hive Query Language)์™€ ํ˜ธํ™˜ ์ œ๊ณตํ•˜์—ฌ, Hive ํ…Œ์ด๋ธ”๋“ค์„ ์ฝ๊ณ  ์“ธ ์ˆ˜ ์žˆ๋‹ค
  • SQL์ด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ž‘์—…๋ณด๋‹ค ๊ฐ€๋…์„ฑ์ด ๋” ์ข‹๊ณ  Spark SQL Engine ์ตœ์ ํ™”ํ•˜๊ธฐ์—๋„ ์ข‹์Œ

 

SQL ์‚ฌ์šฉ๋ฐฉ๋ฒ• ( ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ = df )

df.createOrReplaceTempView("Tablename")

group_df = spark.sql("""
	SELECT gender, count(1) FROM Tablename GROUP BY
""")

print(group_df.collect())

์‚ฌ์šฉ๋ฐฉ๋ฒ•์ด ๊ต‰์žฅํžˆ ๊ฐ„๋‹จํ•˜๋‹ค.

ํ…Œ์ด๋ธ” ์ด๋ฆ„ ์ง€์ • --> ์ฟผ๋ฆฌ๋ฌธ --> collectํ•ด์„œ ์ถœ๋ ฅ

 


 

๐ŸŸฆ ์‹ค์Šต

 

PySpark ์„ค์น˜

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

SparkSession ์ƒ์„ฑ

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL #1") \
    .getOrCreate()

 

1. JOIN ์‹ค์Šต

vital = [
     { 'UserID': 100, 'VitalID': 1, 'Date': '2020-01-01', 'Weight': 75 },
     { 'UserID': 100, 'VitalID': 2, 'Date': '2020-01-02', 'Weight': 78 },
     { 'UserID': 101, 'VitalID': 3, 'Date': '2020-01-01', 'Weight': 90 },
     { 'UserID': 101, 'VitalID': 4, 'Date': '2020-01-02', 'Weight': 95 },
]

alert = [
    { 'AlertID': 1, 'VitalID': 4, 'AlertType': 'WeightIncrease', 'Date': '2020-01-01', 'UserID': 101},
    { 'AlertID': 2, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-04', 'UserID': 100},
    { 'AlertID': 3, 'VitalID': None, 'AlertType': 'MissingVital', 'Date': '2020-01-05', 'UserID': 101}
]

# rdd์— ์˜ฌ๋ฆฌ๊ธฐ
rdd_vital = spark.sparkContext.parallelize(vital)
rdd_alert = spark.sparkContext.parallelize(alert)

# ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ํ™”
df_vital = rdd_vital.toDF()
df_alert = rdd_alert.toDF()

#์กฐ์ธ ์กฐ๊ฑด
join_expr = df_vital.VitalID == df_alert.VitalID
# ์™ผ์ชฝํ…Œ์ด๋ธ” . join (์˜ค๋ฅธ์ชฝํ…Œ์ด๋ธ”, ์กฐ์ธ ์กฐ๊ฑด, ์กฐ์ธ ์ข…๋ฅ˜)
df_vital.join(df_alert, join_expr, "inner").show()

 

๋ฐ์ดํ„ฐ๋ฅผ Spark์—์„œ ๋‹ค๋ฃจ๊ธฐ ์œ„ํ•ด์„œ๋Š”, ๋จผ์ € RDD์— ์˜ฌ๋ฆฐ๋’ค ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ํ™”ํ•ด์ค˜์•ผํ•œ๋‹ค.

 

INNER JOIN ๊ฒฐ๊ณผ

 

JOIN์€ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— .join์„ ์‚ฌ์šฉํ•˜๊ณ , ๋ช‡ ๊ฐ€์ง€ ์ธ์ž๋ฅผ ๋”ํ•ด ์กฐ์ธํ•œ๋‹ค.

์ฝ”๋“œ์—๋Š” ์กฐ์ธ ์กฐ๊ฑด(join_expr)์œผ๋กœ VitalID๋ฅผ ์ฃผ๊ณ , ์„ธ ๋ฒˆ์งธ ์ธ์ž๋กœ "inner"๋ฅผ ๋„ฃ์Œ์œผ๋กœ์จ INNER JOIN์„ ๋งŒ๋“ค์—ˆ๋‹ค.

 

์„ธ ๋ฒˆ์งธ ์ธ์ž ์กฐ๊ฑด์œผ๋กœ๋Š” left, right, full, cross๋ฅผ ๋„ฃ์„ ์ˆ˜ ์žˆ๋‹ค. 

 

SQL ์ฟผ๋ฆฌ๋ฌธ์œผ๋กœ ์กฐ์ธ์„ ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

 

# ์ฟผ๋ฆฌ๋ฌธ์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ํ…Œ์ด๋ธ”์ด ํ•„์š”ํ•˜๋‹ค.
df_vital.createOrReplaceTempView("Vital")
df_alert.createOrReplaceTempView("Alert")

# INNER JOIN
df_inner_join = spark.sql("""
	SELECT * FROM Vital v
	JOIN Alert a ON v.vitalID = a.vitalID;
""")
df_inner_join.show()

 

์ด๋ ‡๊ฒŒ ์ฟผ๋ฆฌ๋ฌธ์„ ์‚ฌ์šฉํ•˜๋ฉด ๊ต‰์žฅํžˆ ๊ฐ„๋‹จํ•˜๋‹ค.

 

์ฟผ๋ฆฌ๋ฌธ์„ ์‚ฌ์šฉํ•œ INNER JOIN ๊ฒฐ๊ณผ

 

2. GROUP BY , Window ํ•จ์ˆ˜์‹ค์Šต

 

์—ญ์‹œ SQL ์ฟผ๋ฆฌ๋ฌธ์„ ์‚ฌ์šฉํ•œ๋‹ค.

 

์—ฌ๊ธฐ์„œ๋Š” user_session_channel, session_timestamp, session_transaction ํ…Œ์ด๋ธ”์„ ๊ฐ€์ ธ์™”๋‹ค.

 

  • session_timestamp : sessionid(์‚ฌ์šฉ์ž ์„ธ์…˜ID), ts(์„ธ์…˜ ์‹œ๊ฐ„)
  • user_session_channel : userid, sessionid, channel(์ ‘์†ํ•œ ์ฑ„๋„)
  • session_transaction : sessionid, refunded(ํ™˜๋ถˆ ์œ ๋ฌด), amount(๊ตฌ๋งค ๊ธˆ์•ก)

 

# GROUP BY๋กœ ์›”๋ณ„, ์ฑ„๋„๋ณ„๋กœ ๋ฌถ์Œ
# ๊ทธ๋ฆฌ๊ณ  DISTINCT ํ•œ userid๋กœ ๋ฐฉ๋ฌธ์ž๊ณ„์‚ฐ

mon_channel_rev_df = spark.sql("""
    SELECT LEFT(sti.ts, 7) year_month,
      usc.channel channel,
      COUNT(DISTINCT userid) total_visitors
    FROM user_session_channel usc
    LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
    GROUP BY 1 ,2
    ORDER BY 1, 2""")

 

ํ‰์†Œ SQL๋ฌธ์„ ์‚ฌ์šฉํ•˜๋“ฏ์ด ์ฟผ๋ฆฌ๋ฌธ์„ ์ž‘์„ฑํ•˜๋ฉด ๋œ๋‹ค.

์›”๋ณ„ ์ฑ„๋„๋ณ„ ์ด ๋ฐฉ๋ฌธ์ž(์‚ฌ์šฉ์ž๋Š” ์œ ๋‹ˆํฌํ•˜๊ฒŒ)๋ฅผ ๊ณ„์‚ฐํ•˜๋Š” ์ฟผ๋ฆฌ๋ฌธ์ด๋‹ค!

 

mon_channel_rev_df ๊ฒฐ๊ณผ

 

 

 

๋” ๋ณต์žกํ•œ ์ฟผ๋ฆฌ๋ฌธ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

 

 mon_channel_rev_df = spark.sql("""
  SELECT LEFT(ts, 7) month,
       usc.channel,
       COUNT(DISTINCT userid) uniqueUsers,
       COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
       SUM(amount) grossRevenue,
       SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
       ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END)*100
          / COUNT(DISTINCT userid), 2) conversionRate
   FROM user_session_channel usc
   LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
   LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
   GROUP BY 1, 2
   ORDER BY 1, 2;
""")

 

์›”๋ณ„ ์ฑ„๋„๋ณ„ ์ด ๋ฐฉ๋ฌธ์ž(uniqueUsers), ๋งค์ถœ ๋ฐœ์ƒ ๋ฐฉ๋ฌธ์ž(paidUsers), ์ด ์ˆ˜์ต(grossRevenue), ์ˆœ ์ˆ˜์ต(netRevenue), ๊ตฌ๋งค ์ „ํ™˜๋ฅ (conversionRate)๋ฅผ ๊ณ„์‚ฐํ–ˆ๋‹ค.

 

mon_channel_rev_df ๊ฒฐ๊ณผ

 

์œˆ๋„์šฐ ํ•จ์ˆ˜๋„ ์‚ฌ์šฉํ•ด๋ณด์ž.

 

first_last_channel_df2 = spark.sql("""
SELECT DISTINCT A.userid,
    FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel,
    LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid""")

 

userid์— ๋Œ€ํ•ด ์ฒ˜์Œ ์ ‘์†ํ•œ ์ฑ„๋„(First_Channel)๊ณผ ๋งˆ์ง€๋ง‰ ์ฑ„๋„(Last_Channel)์„ ๊ตฌํ•˜๋Š” ์ฟผ๋ฆฌ๋ฌธ์ด๋‹ค.

FIRST_VALUE์™€ LAST_VALUE๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๊ฐ„๋‹จํ•˜๊ฒŒ ์•Œ์•„๋‚ผ ์ˆ˜ ์žˆ๋‹ค.

 

fisrt_last_channel_df ๊ฒฐ๊ณผ

 


 

๐ŸŸฉ Hive ๋ฉ”ํƒ€์Šคํ† ์–ด ์‚ฌ์šฉํ•˜๊ธฐ

ํ•˜์ด๋ธŒ๋Š” ํ…Œ์ด๋ธ”๊ณผ ํŒŒํ‹ฐ์…˜๊ณผ ๊ด€๋ จ๋œ ๋ฉ”ํƒ€์ •๋ณด๋ฅผ ๋ชจ๋‘ ๋ฉ”ํƒ€์Šคํ† ์–ด์— ์ €์žฅํ•œ๋‹ค. ์ด๊ฑธ Spark์—์„œ ์‚ฌ์šฉํ•ด๋ณด์ž.

 

Spark ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์™€ ํ…Œ์ด๋ธ”์— ๋Œ€ํ•ด ๋จผ์ € ์•Œ์•„๋ณด์ž.

  • ์นดํƒˆ๋กœ๊ทธ : ํ…Œ์ด๋ธ”๊ณผ ๋ทฐ์— ๊ด€ํ•œ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ(๋ฉ”๋ชจ๋ฆฌ ๊ธฐ๋ฐ˜, ํ•˜์ด๋ธŒ์™€ ํ˜ธํ™˜)
  • ํ…Œ์ด๋ธ” ๊ด€๋ฆฌ ๋ฐฉ์‹ : ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ผ ๋ถ€๋ฅด๋Š” ํด๋”์•ˆ์— ํ…Œ์ด๋ธ”์ด ์ €์žฅ๋˜๋Š” ๊ตฌ์กฐ
  • ์Šคํ† ๋ฆฌ์ง€ ๊ธฐ๋ฐ˜ ํ…Œ์ด๋ธ” :
    • ๊ธฐ๋ณธ์ ์œผ๋กœ HDFS์™€ Parquet ํฌ๋งท์„ ์‚ฌ์šฉ
    • Hive์™€ ํ˜ธํ™˜๋˜๋Š” ๋ฉ”ํƒ€ ์Šคํ† ์–ด ์‚ฌ์šฉ
    • Managed Table, Unmanaged(External) Table ๋‘ ์ข…๋ฅ˜์˜ ํ…Œ์ด๋ธ”์ด ์กด์žฌ(Hive์™€ ๋™์ผ)
      • Managed Table
        • dataframe.saveAsTable("ํ…Œ์ด๋ธ”์ด๋ฆ„") or SQL(CTAS) -> HDFS์— ์ €์žฅ
        • spark.sql.warehouse.dir๊ฐ€ ๊ฐ€๋ฅดํ‚ค๋Š” ์œ„์น˜์— ๋ฐ์ดํ„ฐ ์ €์žฅ, Unmanaged๋ณด๋‹ค ์„ฑ๋Šฅ์ข‹์Œ
      • Unmanaged(External) Table 
        • ์ด๋ฏธ HDFS์— ์กด์žฌํ•˜๋Š” ๋ฐ์ดํ„ฐ์— ์Šคํ‚ค๋งˆ๋ฅผ ์ •์˜ํ•ด์„œ ์‚ฌ์šฉ
        • ๋ฐ์ดํ„ฐ๊ฐ€ ์ด๋ฏธ ์กด์žฌํ•˜๋ฏ€๋กœ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋งŒ ์นดํƒˆ๋กœ๊ทธ์— ๊ธฐ๋ก๋˜๋ฉฐ, ์‚ญ์ œ๋˜์–ด๋„ ๋ฐ์ดํ„ฐ๋Š” ๊ทธ๋Œ€๋กœ์ž„

 

Spark์—์„œ๋Š” SparkSession ์ƒ์„ฑ์‹œ enableHiveSupport() ํ˜ธ์ถœํ•˜๋ฉด Hive์™€ ํ˜ธํ™˜๋˜๋Š” ๋ฉ”ํƒ€์Šคํ† ์–ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

 

๐Ÿ’ป Hive ๋ฉ”ํƒ€์Šคํ† ์–ด(Managed Table) Colab์—์„œ ์‹ค์Šตํ•ด๋ณด๊ธฐ

 

# Hive ๋ฉ”ํƒ€์Šคํ† ์–ด๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด .enableHiveSupport() ์˜ต์…˜ ์‚ฌ์šฉ

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Hive") \
    .enableHiveSupport() \
    .getOrCreate()

 

SparkSession ์ƒ์„ฑ์‹œ .enableHiveSupport()๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.

 

๋ฐ์ดํ„ฐ๋กœ๋Š” ์ €์žฅํ•ด๋‘” orders.csv ํŒŒ์ผ์„ ์‚ฌ์šฉํ•˜์˜€๋‹ค.

 

orders.csv์˜ ๋‚ด์šฉ

 

# DB ์ƒ์„ฑ
spark.sql("CREATE DATABASE IF NOT EXISTS TEST_DB")
spark.sql("USE TEST_DB")

#default๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์ƒ์„ฑ๋˜๋Š” DB
spark.sql("SHOW DATABASES").show()

 

DB๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๋‚˜๋ฉด, SHOW DATABASES๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์–ด๋–ค DB๊ฐ€ ์žˆ๋Š”์ง€ ํ™•์ธํ•ด๋ณด์ž.

 

default DB๋„ ๋ณด์ธ๋‹ค.

 

DB๋ฅผ ์ƒ์„ฑํ•œ ๋’ค, ls๋กœ ๋‚ด์šฉ์„ ํ™•์ธํ•ด๋ณด์ž.

 

ls -tl์˜ ๊ฒฐ๊ณผ

 

๋กœ์ปฌ์—์„œ ์ž‘์—…ํ•˜๋ฏ€๋กœ ๋กœ์ปฌ ๋””์Šคํฌ๋ฅผ HDFS๋กœ ์‚ฌ์šฉํ•œ๋‹ค. ์ด๊ณณ์— ๋ฉ”ํƒ€์Šคํ† ์–ด(metastore_db)๋„ ์ €์žฅ๋œ๋‹ค.

spark-warehouse๋Š” DBํด๋”๋กœ์จ, Spark์—์„œ Managed Table์„ ๋งŒ๋“ค๋ฉด ๊ทธ ํ…Œ์ด๋ธ”์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋˜๋Š” ๊ณณ์ด๋‹ค.

 

๊ทธ๋ฆฌ๊ณ  TEST_DB์— orders๋ฅผ ํ…Œ์ด๋ธ”๋กœ ์ €์žฅํ•ด๋ณด์ž.

# ํ…Œ์ด๋ธ”๋กœ ์ €์žฅ
df.write.saveAsTable("TEST_DB.orders", mode="overwrite")

 

๊ทธ๋Ÿฌ๋ฉด, spark-warehouse ํด๋”์— test_db.db/orders/ ๊ฒฝ๋กœ๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋œ๋‹ค.

 

ls๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋œ ๋ชจ์Šต์„ ํ™•์ธํ–ˆ๋‹ค.

 

์ด ํ…Œ์ด๋ธ”์˜ ๋‚ด์šฉ์„ ํ™•์ธํ•˜๋Š” ๋ฐฉ๋ฒ•์€ ๋‘ ๊ฐ€์ง€๊ฐ€ ์žˆ๋‹ค.

 

spark.sql("SELECT * FROM TEST_DB.orders").show(5)
spark.table("TEST_DB.orders").show(5)

 

๋‘˜ ๋‹ค ๊ฐ™์€ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์˜จ๋‹ค.

 

๋ฉ”ํƒ€์Šคํ† ์–ด ๋‚ด์šฉ๋„ ํ™•์ธํ•ด๋ณด์ž.

 

metastore_db ํด๋”์˜ ์•ˆ์ชฝ ๋ชจ์Šต

 

spark ๋ช…๋ น์–ด๋กœ ํ…Œ์ด๋ธ” ๋ฆฌ์ŠคํŠธ๋ฅผ ํ™•์ธํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

# ํ…Œ์ด๋ธ” ๋ฆฌ์ŠคํŠธ ํ™•์ธ
spark.catalog.listTables()

 

orders ํ…Œ์ด๋ธ”์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

์ €์žฅ๋œ orders ํ…Œ์ด๋ธ”์„ ์‚ฌ์šฉํ•ด์„œ CTAS๋กœ ์ƒˆ๋กœ์šด ํ…Œ์ด๋ธ”์„ ๋งŒ๋“ค์–ด๋ณด์•˜๋‹ค.

 

order_id๋ณ„ ๋ฐ์ดํ„ฐ์˜ ๊ฐœ์ˆ˜๋ฅผ ๊ณ„์‚ฐํ•˜๋Š” ์ฟผ๋ฆฌ์ด๋‹ค.

 

ํ…Œ์ด๋ธ”์ด ํ•˜๋‚˜ ๋Š˜์–ด๋‚œ ๋ชจ์Šต

 

ํ…Œ์ด๋ธ”์ด ์ •์ƒ์ ์œผ๋กœ ํ•˜๋‚˜ ๋Š˜์–ด๋‚œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 


 

๐ŸŸจ ์œ ๋‹› ํ…Œ์ŠคํŠธ ์‹ค์Šต

์ฝ”๋“œ ์ƒ์˜ ํŠน์ • ๊ธฐ๋Šฅ์„ ํ…Œ์ŠคํŠธํ•˜๊ธฐ ์œ„ํ•ด ์ž‘์„ฑ๋˜๋Š” ์ฝ”๋“œ๋กœ,  ์ •ํ•ด์ง„ ์ž…๋ ฅ์„ ์ฃผ๊ณ  ์˜ˆ์ƒ๋œ ์ถœ๋ ฅ์ด ๋‚˜์˜ค๋Š”์ง€ ํ™•์ธํ•œ๋‹ค.

 

์ „์— ์‹ค์Šตํ•ด๋ณด์•˜๋˜ Spark UDF๋ฅผ ์‚ฌ์šฉํ•ด์„œ, ๊ฐ ํ•จ์ˆ˜์˜ ๊ธฐ๋Šฅ์„ ํ…Œ์ŠคํŠธํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•ด๋ณธ๋‹ค.

https://sunhokimdev.tistory.com/63

 

Spark UDF์™€ explode ๊ธฐ๋Šฅ Colab์—์„œ ์‹ค์Šตํ•˜๊ธฐ - TIL230705

๐Ÿ“š KDT WEEK 14 DAY 3 TIL UDF UDAF Explode โš’๏ธ UDF - User Defined Function DataFrame์ด๋‚˜ SQL์—์„œ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜ Scalar ํ•จ์ˆ˜ : UPPER, LOWER ... Aggregation ํ•จ์ˆ˜(UDAF) : SUM, MIN, MAX Google Colab์—์„œ ์‹ค์Šต์„ ์ง„

sunhokimdev.tistory.com

 

ํŒŒ์ด์ฌ์˜ ์œ ๋‹›ํ…Œ์ŠคํŠธ ๋ชจ๋“ˆ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์ž‘์„ฑํ•œ๋‹ค.

 

๋จผ์ €, UDF๋ฅผ ์ž‘์„ฑํ•œ๋‹ค.

 

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
import pandas as pd

# ๋Œ€๋ฌธ์ž๋กœ ๋งŒ๋“œ๋Š” ํ•จ์ˆ˜(UDF)
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
    return s.str.upper()

upperUDF = spark.udf.register("upper_udf", upper_udf_f)

 

๊ทธ๋ฆฌ๊ณ  ๋‹ค๋ฅธ python ํŒŒ์ผ์— ํ…Œ์ŠคํŠธ ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•˜๋Š” ๊ฒƒ์ด ์ข‹์ง€๋งŒ,

colab ํ™˜๊ฒฝ์—์„œ ์‹ค์Šตํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋™์ผํ•œ ํŒŒ์ผ์—์„œ ํ…Œ์ŠคํŠธ ํด๋ž˜์Šค๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค.

 

from unittest import TestCase

class UtilsTestCase(TestCase):
    spark = None

	# Test๊ฐ€ ์‹œ์ž‘ํ•˜๋ฉด์„œ ์ž๋™์œผ๋กœ ์ฒ˜์Œ ๋ถˆ๋Ÿฌ์˜ค๋Š” ํ•จ์ˆ˜
    # SparkSession์„ ํ•˜๋‚˜ ๋งŒ๋“ค์–ด์„œ spark์— ์ €์žฅํ•œ๋‹ค.
    # ๋‹ค๋ฅธ ํ•จ์ˆ˜์—์„œ๋Š” ์ด๊ฑธ self๋กœ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.
    @classmethod
    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .appName("Spark Unit Test") \
            .getOrCreate()

	# ํ…Œ์ŠคํŠธ ํ•จ์ˆ˜
    def test_upper_udf(self):
        test_data = [
            { "name": "John Kim" },
            { "name": "Johnny Kim"},
            { "name": "1234" }
        ]
        expected_results = [ "JOHN KIM", "JOHNNY KIM", "1234" ]

        upperUDF = self.spark.udf.register("upper_udf", upper_udf_f)
        test_df = self.spark.createDataFrame(test_data)
        names = test_df.select("name", upperUDF("name").alias("NAME")).collect()
        results = []
        for name in names:
            results.append(name["NAME"])
        self.assertCountEqual(results, expected_results)

	# ๋งˆ์ง€๋ง‰์— ํ˜ธ์ถœ๋˜๋Š” ํ•จ์ˆ˜์ด๋‹ค.
    # setUpClass์—์„œ ํ• ๋‹น๋œ ์ž์›์ด ์žˆ๋‹ค๋ฉด ์—ฌ๊ธฐ์„œ ์ž๋™์œผ๋กœ ๋งˆ์ง€๋ง‰์— ๋ฆด๋ฆฌ์Šคํ•ด์ฃผ๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.
    @classmethod
    def tearDownClass(cls) -> None:
        cls.spark.stop()

 

ํŒŒ์ด์ฌ์˜ unittest์—์„œ TestCase๋ฅผ importํ•ด์„œ ์‚ฌ์šฉํ•œ๋‹ค.

ํ…Œ์ŠคํŠธ๊ฐ€ ์‹œ์ž‘๋˜๋ฉด setUpClass ํ•จ์ˆ˜๋ฅผ ๋จผ์ € ํ˜ธ์ถœํ•˜๊ฒŒ ๋œ๋‹ค.

์—ฌ๊ธฐ์„œ SparkSession์„ ๋งŒ๋“ค๋ฉด ๋˜๊ณ , ํ…Œ์ŠคํŠธ์— ํ•„์š”ํ•œ ๋ฆฌ์†Œ์Šค๊ฐ€ ์žˆ๋‹ค๋ฉด ์—ฌ๊ธฐ์„œ ํ• ๋‹นํ•˜๋ฉด ๋œ๋‹ค.

 

test_upper_udf ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด ํ…Œ์ŠคํŠธ๋ฅผ ํ•˜๊ฒŒ ๋˜๋Š”๋ฐ,

๊ฐ„๋‹จํžˆ ์ƒ๊ฐํ•˜๋ฉด test_data๊ฐ€ upperUDF๋ฅผ ๊ฑฐ์ณ์„œ expected_results๊ฐ€ ๋˜๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๊ณผ์ •์ด๋‹ค.

์—ฌ๊ธฐ์„œ assertCountEqual์ด ๋‘ ๊ฐœ์˜ ๋ฆฌ์ŠคํŠธ๋ฅผ ๋ฐ›์•„ ๋ฆฌ์ŠคํŠธ๋ฅผ Sortingํ–ˆ์„ ๋•Œ ๋™์ผํ•œ ์ง€ ํ™•์ธํ•ด์ฃผ๊ฒŒ ๋œ๋‹ค.

 

์ด์ œ ํ…Œ์ŠคํŠธ๋ฅผ ์‹คํ–‰ํ•ด๋ณด์ž.

import unittest

unittest.main(argv=[''], verbosity=2, exit=False)