์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Salting
- SQL
- mysql
- Speculative Execution
- Spark SQL
- AQE
- Spark Caching
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Airflow
- backfill
- Dag
- Kubernetes
- off heap memory
- redshift
- Kafka
- Spark ์ค์ต
- spark executor memory
- aws
- Docker
- ๋น ๋ฐ์ดํฐ
- disk spill
- CI/CD
- topic
- k8s
- etl
- DataFrame Hint
- colab
- Spark
- KDT_TIL
- Spark Partitioning
- Today
- Total
JUST DO IT!
Spark UDF์ explode ๊ธฐ๋ฅ Colab์์ ์ค์ตํ๊ธฐ - TIL230705 ๋ณธ๋ฌธ
๐ KDT WEEK 14 DAY 3 TIL
- UDF
- UDAF
- Explode
โ๏ธ UDF - User Defined Function
DataFrame์ด๋ SQL์์ ์ ์ฉํ ์ ์๋ ์ฌ์ฉ์ ์ ์ ํจ์
- Scalar ํจ์ : UPPER, LOWER ...
- Aggregation ํจ์(UDAF) : SUM, MIN, MAX
Google Colab์์ ์ค์ต์ ์งํํ๋ค.
pyspark์ py4j ํ๊ฒฝ์ค์น
!pip install pyspark==3.3.1 py4j==0.10.9.5
SparkSession ์์ฑ
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark UDF") \
.getOrCreate()
๊ฐ๋จํ ๋ฐ์ดํฐํ๋ ์ ์์ฑ
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
๐ฅ UDF ์ค์ต
pyspark์์ udf๋ฅผ ์ฌ์ฉํ ๋๋ pyspark.sql.functions๋ฅผ ์ฌ์ฉํ๋ค.
1. ๋๋คํจ์ UDF
# ๋๋คํจ์ UDF
# F.udf์ ๋๋คํจ์ ๋ฃ๊ธฐ
import pyspark.sql.functions as F
from pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
F.udf(๋๋คํจ์) ํํ๋ก ์ ์ฅํด์ ํ์ด์ฌ์์ ํจ์๋ฅผ ์ฐ๋ฏ ์ฌ์ฉํ๋ค.
withColumn์ ์๋ก์ด ์ปฌ๋ผ์ ์ถ๊ฐํ๋ ๊ธฐ๋ฅ์ผ๋ก์จ upperUDF ํจ์์ ๊ฒฐ๊ณผ๋ฅผ Curated Name ์ปฌ๋ผ์ ์ถ๊ฐํ๊ฒ ๋๋ค.
2. ํ์ด์ฌ ํจ์
# ํ์ด์ฌ ํจ์ UDF
# F.udf์ ํ์ด์ฌํจ์ ๋ฃ๊ธฐ
def upper_udf(s):
return s.upper()
upperUDF = F.udf(upper_udf, StringType())
df.withColumn("Curated Name", upperUDF("Name")) \
.show(truncate=False)
์ผ๋ฐ์ ์ผ๋ก ํจ์๋ฅผ ์ ์ํด์ ์ด๋ฅผ F.udf์ ๋ฃ์ด์ฃผ๋ ํํ์ด๋ค.
๋๋คํจ์์ ๋น์ทํ์ง๋ง ๋ ๋ช ํํ๊ณ ์ดํดํ๊ธฐ ์ฌ์ด ํํ์ธ ๊ฒ ๊ฐ๋ค.
์ถ๊ฐ๋ก withColumn์ผ๋ก ์ปฌ๋ผ์ ๋งค๋ฒ ์ถ๊ฐํ์ง ์๊ณ , SELECT์์ ๋ฐ๋ก ์ํ๋ ์ปฌ๋ผ๊ณผ ๊ฐ์ด ๋ณผ ์๋ ์๋ค.
3. Pandas UDF
ํจ์๋ฅผ ๋ ์ฝ๋ ์งํฉ ๋จ์๋ก ์ฒ๋ฆฌํ๊ธฐ ๋๋ฌธ์ ๋ค๋ฅธ ํจ์๋ณด๋ค ํจ์จ์ ์ธ ๋ฐฉ๋ฒ์ด๋ค.
# ํ๋ค์ค udf
# ์ด ๋ฐฉ๋ฒ์ ๋ ์ฝ๋ ์งํฉ์ด ๋ค์ด์์ ์ฒ๋ฆฌํ๋ฏ๋ก ํจ์จ์ ์ธ ๋ฐฉ๋ฒ์ด๋ค
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
# ์
๋ ฅํ์
-> ์ถ๋ ฅํ์
ํํ๋ก ์ ์ํ๋ค.
@pandas_udf(StringType())
def upper_udf_f(s: pd.Series) -> pd.Series:
return s.str.upper()
# ์์์ ์ ์ํ ํ์ด์ฌ upper ํจ์๋ฅผ ๊ทธ๋๋ก ์ฌ์ฉ
# SQL ์ฟผ๋ฆฌ์์ ์ฌ์ฉํ ์ ์๋๋ก ์ฒซ ๋ฒ์งธ ์ธ์์ ์ฌ์ฉํ ์ด๋ฆ์ ๋ง๋ค์ด์ค๋ค.
upperUDF = spark.udf.register("upper_udf", upper_udf_f)
spark.sql("SELECT upper_udf('aBcD')").show()
spark.udf.register๋ก SQL ์ฟผ๋ฆฌ์์์ ์ฌ์ฉํ ํจ์ ์ด๋ฆ์ ๋ง๋ค๊ณ , spark.sql์์์ UDF๋ฅผ ์ฌ์ฉํ๋ค.
๐ฆ UDAF ์ค์ต
UDF ํจ์๋ก Aggregation(SUM, AVERAGE, MIN.. ๋ฑ)์ ์ฌ์ฉํ๋ ํํ์ด๋ค.
Pandas UDF๋ฅผ ์ฌ์ฉํ๋ค.
# dataframe์ User Define Aggregation Fuction ์ฌ์ฉํด๋ณด๊ธฐ
from pyspark.sql.functions import pandas_udf
import pandas as pd
# Define the UDF
@pandas_udf(FloatType())
# pd.Series๋ฅผ ์
๋ ฅ์ผ๋ก ๋ฐ์ float ๊ฐ์ ์ถ๋ ฅ์ผ๋ก ๋
def average_udf_f(v: pd.Series) -> float:
return v.mean()
averageUDF = spark.udf.register('average_udf', average_udf_f)
spark.sql('SELECT average_udf(a) FROM test').show() # a์ ํ๊ท ๊ฐ ๊ณ์ฐํ๊ธฐ (a๋ก ๊ทธ๋ฃนํ์ฌ ํ๊ท ๊ฐ ๊ณ์ฐ)
์์์ UDF๋ฅผ ์ฌ์ฉํ ํํ์ ๋น์ทํ ํํ์ด๋ค.
ํ์ง๋ง, Aggregation ํจ์๋ withColumn์ ์ฌ์ฉํ์ง ๋ชปํ๋ค๊ณ ํ๋ค.
๋ค๋ฅธ ๊ธฐ๋ฅ์ผ๋ก, ๋ฐ์ดํฐํ๋ ์์ .agg๋ฅผ ์ฌ์ฉํด์ ๊ฐ์ ํ์ธํ ์ ์๋ค.
++) ๊ธฐ์กด์ ํจ์์ ํจ๊ป ๋ด๊ฐ ๋ง๋ UDF๋ฅผ ํ์ธํ ์ ์๋ ๋ฐฉ๋ฒ์ด ์๋ค.
for f in spark.catalog.listFunctions():
print(f[0])
๐ฉ Explode
์ด๋ฆ๋ณ๋ก ์ฌ์ฉํ๋ ํ๋ก๊ทธ๋๋ฐ ์ธ์ด์, ๊ทธ ์ฌ๋์ ํน์ง์ ๋ฃ์ด๋ ๊ฐ๋จํ ๋ฐ์ดํฐ๋ฅผ ์ฃผ์๋ค.
์ฌ์ฉํ๋ ์ธ์ด๋ ์ฌ๋ฌ ๊ฐ๊ฐ ์์ ์ ์๊ธฐ ๋๋ฌธ์ ํ ์ปฌ๋ผ์ ๋ฆฌ์คํธ๋ก ์ ์ฅํ๊ฒ ๋์๋ค.
์ด ์ธ์ด๋ค์ ํ๋์ ๋ ์ฝ๋๋ก ์ชผ๊ฐ์ ๋ณ๊ฐ์ ๋ ์ฝ๋๋ก ์์ฑ(explode)ํด๋ณด์.
arrayData = [
('James',['Java','Scala'],{'hair':'black','eye':'brown'}),
('Michael',['Spark','Java',None],{'hair':'brown','eye':None}),
('Robert',['CSharp',''],{'hair':'red','eye':''}),
('Washington',None,None),
('Jefferson',['1','2'],{})]
df = spark.createDataFrame(data=arrayData, schema = ['name','knownLanguages','properties'])
df.show()
๋ฐ์ดํฐ๋ฅผ ๋ณด๋ฉด, ํ ์ปฌ๋ผ์ ๋ฆฌ์คํธ ํํ์ ๋ฐ์ดํฐ๊ฐ ์ฝ์ ๋ ๊ฒ์ ์ ์ ์๋ค.
์ฌ๊ธฐ์ explode๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฆฌ์คํธ์ ๊ฐ ๋ฐ์ดํฐ๋ณ๋ก ๋ ์ฝ๋๋ฅผ ์์ฑํ ์ ์๋ค.
# knownLanguages ํ๋๋ฅผ ์ธ์ด๋ณ๋ก ์๋ผ์ ์๋ก์ด ๋ ์ฝ๋๋ก ์์ฑ
from pyspark.sql.functions import explode
df2 = df.select(df.name,explode(df.knownLanguages)) # knownLanguages ๋ฅผ explodeํ ์ปฌ๋ผ + name ์ปฌ๋ผ์ผ๋ก select
df2.printSchema()
df2.show()
์ด๋ฒ์๋ Dictionary ํํ๋ก ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ๊ตฌ์กฐ(struct)๋ก ์ ์ฅํ์ฌ ๋ถ๋ฌ์ค๋ ์ค์ต์ ํด๋ณด์.
์ฌ๊ธฐ์๋ ๋ฐ๋ก ์ ์ฅํ orders.csv ํ์ผ์ ์ฌ์ฉํ๋ค.
items ์ปฌ๋ผ์ name, quantity, id๋ณ๋ก ๊ฐ value๊ฐ ์ ์ฅ๋ ํํ์ด๋ค.
์ด ๊ตฌ์กฐ๋ฅผ ๋ช ํํ๊ฒ ๋ช ์ํด์ ๊ฐ๊ฐ Key๋ฅผ ๋ถ๋ฌ์์ ๋ value๊ฐ ๋ํ๋๋๋ก ์ฝ๋ฉํด๋ณด์.
๋จผ์ csv ํ์ผ์ ๋ถ๋ฌ์จ๋ค.
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType
order = spark.read.options(delimiter='\t').option("header","true").csv("orders.csv") # ํญ์ผ๋ก ๊ตฌ๋ถ๋ ํ์ผ์ด๋ผ \t ์ฌ์ฉ
์ฌ๊ธฐ์ printSchema๋ฅผ ์ฌ์ฉํด์ ํ์ ์ ํ์ธํด๋ณด๋ฉด, ์์ง ๊ตฌ์กฐ์์ด String์ ๊ฐ์ง๊ณ ์์์ ์ ์ ์๋ค.
ArrayType์ ์ฌ์ฉํด์ ๊ตฌ์กฐ๋ฅผ ํ๋ ๋ง๋ค์ด์ค๋ค.
# items ์ปฌ๋ผ์ ๊ตฌ์กฐ ์ก๊ธฐ
# ๋ฐ์ดํฐํ๋ ์์ ์ด์ฉํด์ ํด๋ณด๊ธฐ
struct = ArrayType(
StructType([
StructField("name", StringType()),
StructField("id", StringType()),
StructField("quantity", LongType())
])
)
# items ํํ๋ฅผ ๊ณ ๋ คํด์ from_json + ์๊น ๋ง๋ items ๊ตฌ์กฐ๋ฅผ ํ์ฉํด์ explode
order.withColumn("item", explode(from_json("items", struct))).show(truncate=False)
items์์ ํํ๋๋ก ๊ตฌ์กฐ๋ฅผ ์ ์ํ๊ณ , explode์ from_json(์ปฌ๋ผ์ด๋ฆ, ์ฌ์ฉํ ๊ตฌ์กฐ)๋ฅผ ๋ฃ์ด ๊ตฌ์กฐ๋ฅผ ๋ฃ์๋ค.
์๋ก ์ถ๊ฐ๋ item ์ปฌ๋ผ์ value ๊ฐ๋ง ๋จ์ ์ ์ฅ๋์๋ค.
์ด์ ํ์์๋ ๊ธฐ์กด ์ปฌ๋ผ items๋ฅผ dropํ๊ณ , ๋ฐ๋ ์คํค๋ง๋ฅผ ํ์ธํด๋ณด์.
# items ์ปฌ๋ผ ์ ๊ฑฐ
order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")
์ด ๊ตฌ์กฐ๋ฅผ ํ ์ด๋ธ์์ ํ์ฉํด๋ณด์!
# ํ
์ด๋ธ๋ก ๋ง๋ค๊ธฐ
order_items.createOrReplaceTempView("order_items")
# ์ฟผ๋ฆฌ๋ฌธ
# item์ quantity๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด item.quantity๋ก ์ฌ์ฉํ๋ฉด ๋๋ค!
spark.sql("""
SELECT order_id, CAST(average_udf(item.quantity) as decimal) avg_count
FROM order_items
GROUP BY 1
ORDER BY 2 DESC""").show(5)