JUST DO IT!

Spark UDF์™€ explode ๊ธฐ๋Šฅ Colab์—์„œ ์‹ค์Šตํ•˜๊ธฐ - TIL230705 ๋ณธ๋ฌธ

TIL

Spark UDF์™€ explode ๊ธฐ๋Šฅ Colab์—์„œ ์‹ค์Šตํ•˜๊ธฐ - TIL230705

sunhokimDev 2023. 7. 9. 16:29

๐Ÿ“š KDT WEEK 14 DAY 3 TIL

  • UDF
  • UDAF
  • Explode

 


โš’๏ธ UDF - User Defined Function

DataFrame์ด๋‚˜ SQL์—์„œ ์ ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜

  • Scalar ํ•จ์ˆ˜ : UPPER, LOWER ...
  • Aggregation ํ•จ์ˆ˜(UDAF) : SUM, MIN, MAX

 

Google Colab์—์„œ ์‹ค์Šต์„ ์ง„ํ–‰ํ•œ๋‹ค.

 

pyspark์™€ py4j ํ™˜๊ฒฝ์„ค์น˜

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

SparkSession ์ƒ์„ฑ

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark UDF") \
    .getOrCreate()

 

๊ฐ„๋‹จํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ƒ์„ฑ

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

 


 

๐ŸŸฅ UDF ์‹ค์Šต

 

pyspark์—์„œ udf๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” pyspark.sql.functions๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

 

1. ๋žŒ๋‹คํ•จ์ˆ˜ UDF

 

# ๋žŒ๋‹คํ•จ์ˆ˜ UDF
# F.udf์— ๋žŒ๋‹คํ•จ์ˆ˜ ๋„ฃ๊ธฐ

import pyspark.sql.functions as F
from pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper())

df.withColumn("Curated Name", upperUDF("Name")) \
  .show(truncate=False)

 

F.udf(๋žŒ๋‹คํ•จ์ˆ˜) ํ˜•ํƒœ๋กœ ์ €์žฅํ•ด์„œ ํŒŒ์ด์ฌ์—์„œ ํ•จ์ˆ˜๋ฅผ ์“ฐ๋“ฏ ์‚ฌ์šฉํ•œ๋‹ค.

withColumn์€ ์ƒˆ๋กœ์šด ์ปฌ๋Ÿผ์„ ์ถ”๊ฐ€ํ•˜๋Š” ๊ธฐ๋Šฅ์œผ๋กœ์จ upperUDF ํ•จ์ˆ˜์˜ ๊ฒฐ๊ณผ๋ฅผ Curated Name ์ปฌ๋Ÿผ์— ์ถ”๊ฐ€ํ•˜๊ฒŒ ๋œ๋‹ค.

 

 

2. ํŒŒ์ด์ฌ ํ•จ์ˆ˜

 

# ํŒŒ์ด์ฌ ํ•จ์ˆ˜ UDF
# F.udf์— ํŒŒ์ด์ฌํ•จ์ˆ˜ ๋„ฃ๊ธฐ

def upper_udf(s):
    return s.upper()
    
upperUDF = F.udf(upper_udf, StringType())

df.withColumn("Curated Name", upperUDF("Name")) \
   .show(truncate=False)

 

์ผ๋ฐ˜์ ์œผ๋กœ ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•ด์„œ ์ด๋ฅผ F.udf์— ๋„ฃ์–ด์ฃผ๋Š” ํ˜•ํƒœ์ด๋‹ค.

๋žŒ๋‹คํ•จ์ˆ˜์™€ ๋น„์Šทํ•˜์ง€๋งŒ ๋” ๋ช…ํ™•ํ•˜๊ณ  ์ดํ•ดํ•˜๊ธฐ ์‰ฌ์šด ํ˜•ํƒœ์ธ ๊ฒƒ ๊ฐ™๋‹ค.

 

 

 

์ถ”๊ฐ€๋กœ withColumn์œผ๋กœ ์ปฌ๋Ÿผ์„ ๋งค๋ฒˆ ์ถ”๊ฐ€ํ•˜์ง€ ์•Š๊ณ , SELECT์—์„œ ๋ฐ”๋กœ ์›ํ•˜๋Š” ์ปฌ๋Ÿผ๊ณผ ๊ฐ™์ด ๋ณผ ์ˆ˜๋„ ์žˆ๋‹ค.

 

Name ์ปฌ๋Ÿผ๊ณผ Curated Name ์ปฌ๋Ÿผ์ด ์ถœ๋ ฅ

 

3. Pandas UDF

 

ํ•จ์ˆ˜๋ฅผ ๋ ˆ์ฝ”๋“œ ์ง‘ํ•ฉ ๋‹จ์œ„๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋‹ค๋ฅธ ํ•จ์ˆ˜๋ณด๋‹ค ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์ด๋‹ค.

 

# ํŒ๋‹ค์Šค udf
# ์ด ๋ฐฉ๋ฒ•์€ ๋ ˆ์ฝ”๋“œ ์ง‘ํ•ฉ์ด ๋“ค์–ด์™€์„œ ์ฒ˜๋ฆฌํ•˜๋ฏ€๋กœ ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์ด๋‹ค

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Define the UDF
# ์ž…๋ ฅํƒ€์ž… -> ์ถœ๋ ฅํƒ€์ž… ํ˜•ํƒœ๋กœ ์ •์˜ํ•œ๋‹ค.
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
    return s.str.upper()
    
# ์œ„์—์„œ ์ •์˜ํ•œ ํŒŒ์ด์ฌ upper ํ•จ์ˆ˜๋ฅผ ๊ทธ๋Œ€๋กœ ์‚ฌ์šฉ
# SQL ์ฟผ๋ฆฌ์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ์ฒซ ๋ฒˆ์งธ ์ธ์ž์— ์‚ฌ์šฉํ•  ์ด๋ฆ„์„ ๋งŒ๋“ค์–ด์ค€๋‹ค.
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
spark.sql("SELECT upper_udf('aBcD')").show()

 

spark.udf.register๋กœ SQL ์ฟผ๋ฆฌ์•ˆ์—์„œ ์‚ฌ์šฉํ•  ํ•จ์ˆ˜ ์ด๋ฆ„์„ ๋งŒ๋“ค๊ณ , spark.sql์•ˆ์—์„œ UDF๋ฅผ ์‚ฌ์šฉํ–ˆ๋‹ค.

 

 

 


 

๐ŸŸฆ UDAF ์‹ค์Šต

UDF ํ•จ์ˆ˜๋กœ Aggregation(SUM, AVERAGE, MIN.. ๋“ฑ)์„ ์‚ฌ์šฉํ•˜๋Š” ํ˜•ํƒœ์ด๋‹ค.

 

Pandas UDF๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

# dataframe์— User Define Aggregation Fuction ์‚ฌ์šฉํ•ด๋ณด๊ธฐ

from pyspark.sql.functions import pandas_udf
import pandas as pd

# Define the UDF
@pandas_udf(FloatType())
# pd.Series๋ฅผ ์ž…๋ ฅ์œผ๋กœ ๋ฐ›์•„ float ๊ฐ’์„ ์ถœ๋ ฅ์œผ๋กœ ๋ƒ„
def average_udf_f(v: pd.Series) -> float:
    return v.mean()

averageUDF = spark.udf.register('average_udf', average_udf_f)
spark.sql('SELECT average_udf(a) FROM test').show() # a์˜ ํ‰๊ท ๊ฐ’ ๊ณ„์‚ฐํ•˜๊ธฐ (a๋กœ ๊ทธ๋ฃนํ•˜์—ฌ ํ‰๊ท ๊ฐ’ ๊ณ„์‚ฐ)

 

์œ„์—์„œ UDF๋ฅผ ์‚ฌ์šฉํ•œ ํ˜•ํƒœ์™€ ๋น„์Šทํ•œ ํ˜•ํƒœ์ด๋‹ค.

ํ•˜์ง€๋งŒ, Aggregation ํ•จ์ˆ˜๋Š” withColumn์„ ์‚ฌ์šฉํ•˜์ง€ ๋ชปํ•œ๋‹ค๊ณ  ํ•œ๋‹ค.

 

๋‹ค๋ฅธ ๊ธฐ๋Šฅ์œผ๋กœ, ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์— .agg๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๊ฐ’์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

averageUDF๋ฅผ ์‚ฌ์šฉํ•œ ๋ชจ์Šต

 

++) ๊ธฐ์กด์˜ ํ•จ์ˆ˜์™€ ํ•จ๊ป˜ ๋‚ด๊ฐ€ ๋งŒ๋“  UDF๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

for f in spark.catalog.listFunctions():
    print(f[0])

 

์—ฌ๋Ÿฌ ํ•จ์ˆ˜ ์ค‘์—์„œ ๋‚ด๊ฐ€ ๋งŒ๋“  ํ•จ์ˆ˜ average_udf๊ฐ€ ๋ณด์ธ๋‹ค.

 


 

๐ŸŸฉ Explode

 

์ด๋ฆ„๋ณ„๋กœ ์‚ฌ์šฉํ•˜๋Š” ํ”„๋กœ๊ทธ๋ž˜๋ฐ ์–ธ์–ด์™€, ๊ทธ ์‚ฌ๋žŒ์˜ ํŠน์ง•์„ ๋„ฃ์–ด๋‘” ๊ฐ„๋‹จํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ์—ˆ๋‹ค.

์‚ฌ์šฉํ•˜๋Š” ์–ธ์–ด๋Š” ์—ฌ๋Ÿฌ ๊ฐœ๊ฐ€ ์žˆ์„ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ํ•œ ์ปฌ๋Ÿผ์— ๋ฆฌ์ŠคํŠธ๋กœ ์ €์žฅํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.

์ด ์–ธ์–ด๋“ค์„ ํ•˜๋‚˜์˜ ๋ ˆ์ฝ”๋“œ๋กœ ์ชผ๊ฐœ์„œ ๋ณ„๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๋กœ ์ƒ์„ฑ(explode)ํ•ด๋ณด์ž.

 

arrayData = [
        ('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
        ('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
        ('Robert',['CSharp',''],{'hair':'red','eye':''}),
        ('Washington',None,None),
        ('Jefferson',['1','2'],{})]

df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.show()

 

 

๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋ฉด, ํ•œ ์ปฌ๋Ÿผ์— ๋ฆฌ์ŠคํŠธ ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์‚ฝ์ž…๋œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

์—ฌ๊ธฐ์„œ explode๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฆฌ์ŠคํŠธ์˜ ๊ฐ ๋ฐ์ดํ„ฐ๋ณ„๋กœ ๋ ˆ์ฝ”๋“œ๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

# knownLanguages ํ•„๋“œ๋ฅผ ์–ธ์–ด๋ณ„๋กœ ์ž˜๋ผ์„œ ์ƒˆ๋กœ์šด ๋ ˆ์ฝ”๋“œ๋กœ ์ƒ์„ฑ
from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages)) # knownLanguages ๋ฅผ explodeํ•œ ์ปฌ๋Ÿผ + name ์ปฌ๋Ÿผ์œผ๋กœ select
df2.printSchema()
df2.show()

 

Name ์ปฌ๋Ÿผ์ด ์ค‘๋ณต๋˜๊ณ , ์–ธ์–ด๋ณ„๋กœ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์ƒ์„ฑ๋˜์—ˆ๋‹ค!

 


 

์ด๋ฒˆ์—๋Š” Dictionary ํ˜•ํƒœ๋กœ ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ตฌ์กฐ(struct)๋กœ ์ €์žฅํ•˜์—ฌ ๋ถˆ๋Ÿฌ์˜ค๋Š” ์‹ค์Šต์„ ํ•ด๋ณด์ž.

 

์—ฌ๊ธฐ์„œ๋Š” ๋”ฐ๋กœ ์ €์žฅํ•œ orders.csv ํŒŒ์ผ์„ ์‚ฌ์šฉํ–ˆ๋‹ค.

 

orders.csv์˜ ์ผ๋ถ€ ๋‚ด์šฉ

 

items ์ปฌ๋Ÿผ์— name, quantity, id๋ณ„๋กœ ๊ฐ value๊ฐ€ ์ €์žฅ๋œ ํ˜•ํƒœ์ด๋‹ค.

์ด ๊ตฌ์กฐ๋ฅผ ๋ช…ํ™•ํ•˜๊ฒŒ ๋ช…์‹œํ•ด์„œ ๊ฐ๊ฐ Key๋ฅผ ๋ถˆ๋Ÿฌ์™”์„ ๋•Œ value๊ฐ€ ๋‚˜ํƒ€๋‚˜๋„๋ก ์ฝ”๋”ฉํ•ด๋ณด์ž.

 

๋จผ์ € csv ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์˜จ๋‹ค.

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType

order = spark.read.options(delimiter='\t').option("header","true").csv("orders.csv") # ํƒญ์œผ๋กœ ๊ตฌ๋ถ„๋œ ํŒŒ์ผ์ด๋ผ \t ์‚ฌ์šฉ

 

์—ฌ๊ธฐ์„œ printSchema๋ฅผ ์‚ฌ์šฉํ•ด์„œ ํƒ€์ž…์„ ํ™•์ธํ•ด๋ณด๋ฉด, ์•„์ง ๊ตฌ์กฐ์—†์ด String์„ ๊ฐ€์ง€๊ณ  ์žˆ์Œ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

์•„์ง์€ items ์ปฌ๋Ÿผ์— ๊ตฌ์กฐ๊ฐ€ ์—†๋‹ค.

 

ArrayType์„ ์‚ฌ์šฉํ•ด์„œ ๊ตฌ์กฐ๋ฅผ ํ•˜๋‚˜ ๋งŒ๋“ค์–ด์ค€๋‹ค.

 

# items ์ปฌ๋Ÿผ์˜ ๊ตฌ์กฐ ์žก๊ธฐ
# ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ด์šฉํ•ด์„œ ํ•ด๋ณด๊ธฐ
struct = ArrayType(
    StructType([
        StructField("name", StringType()),
        StructField("id", StringType()),
        StructField("quantity", LongType())
    ])
)

# items ํ˜•ํƒœ๋ฅผ ๊ณ ๋ คํ•ด์„œ from_json + ์•„๊นŒ ๋งŒ๋“  items ๊ตฌ์กฐ๋ฅผ ํ™œ์šฉํ•ด์„œ explode
order.withColumn("item", explode(from_json("items", struct))).show(truncate=False)

 

items์•ˆ์˜ ํ˜•ํƒœ๋Œ€๋กœ ๊ตฌ์กฐ๋ฅผ ์ •์˜ํ•˜๊ณ , explode์— from_json(์ปฌ๋Ÿผ์ด๋ฆ„, ์‚ฌ์šฉํ•  ๊ตฌ์กฐ)๋ฅผ ๋„ฃ์–ด ๊ตฌ์กฐ๋ฅผ ๋„ฃ์—ˆ๋‹ค.

 

withColumn์„ ์‚ฌ์šฉํ–ˆ๊ธฐ ๋•Œ๋ฌธ์—, ๊ธฐ์กด์˜ ์ปฌ๋Ÿผ์€ ๋‚จ๊ณ  ์ถ”๊ฐ€ ์ปฌ๋Ÿผ์ด ๋ถ™์—ˆ๋‹ค.

 

์ƒˆ๋กœ ์ถ”๊ฐ€๋œ item ์ปฌ๋Ÿผ์— value ๊ฐ’๋งŒ ๋‚จ์•„ ์ €์žฅ๋˜์—ˆ๋‹ค.

์ด์ œ ํ•„์š”์—†๋Š” ๊ธฐ์กด ์ปฌ๋Ÿผ items๋ฅผ dropํ•˜๊ณ , ๋ฐ”๋€ ์Šคํ‚ค๋งˆ๋ฅผ ํ™•์ธํ•ด๋ณด์ž.

 

# items ์ปฌ๋Ÿผ ์ œ๊ฑฐ
order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")

 

item ์ปฌ๋Ÿผ์— ๊ตฌ์กฐ๊ฐ€ ๋“ค์–ด๊ฐ„ ๋ชจ์Šต!

 

์ด ๊ตฌ์กฐ๋ฅผ ํ…Œ์ด๋ธ”์—์„œ ํ™œ์šฉํ•ด๋ณด์ž!

 

# ํ…Œ์ด๋ธ”๋กœ ๋งŒ๋“ค๊ธฐ
order_items.createOrReplaceTempView("order_items")

# ์ฟผ๋ฆฌ๋ฌธ
# item์˜ quantity๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด item.quantity๋กœ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค!
spark.sql("""
    SELECT order_id, CAST(average_udf(item.quantity) as decimal) avg_count
    FROM order_items
    GROUP BY 1
    ORDER BY 2 DESC""").show(5)

 

์ฟผ๋ฆฌ๋ฌธ์˜ ๊ฒฐ๊ณผ