Spark์์ Parquet ๋ค๋ฃจ๊ธฐ ์ค์ต + Execution Plan ์์๋ณด๊ธฐ - TIL230706
๐ KDT WEEK 14 DAY 4 TIL
- Spark ํ์ผํฌ๋งท(Parquet)
- Spark Execution Plan
- Bucketing๊ณผ File System Partitioning
๐ฅ Spark ํ์ผํฌ๋งท
Parquet : Spark์ ๊ธฐ๋ณธ ํ์ผ ํฌ๋งท
Parquet ํ์ด์ง : http://parquet.incubator.apache.org/
Apache Parquet
The Apache Parquet Website
parquet.incubator.apache.org
Parquet์ Structured ํฌ๋งท์ผ๋ก, ์์ถ๋ ๋ฐ์ด๋๋ฆฌ ํ์ผ์ด๋ฉฐ ๋ด๋ถ์ ์คํค๋ง ์ ๋ณด(ํ๋ ํ์ ๊น์ง)๋ฅผ ๊ฐ์ง ํ์ผ ํฌ๋งท์ด๋ค.
Structured ex) Parquet, AVRO, ORC, SequenceFile ...
๐ป ์ค์ต(Colab)
์ฌ์ฉํ ํ๊ฒฝ ์ค์น
!pip install pyspark==3.3.1 py4j==0.10.9.5
Spark Session ์์ฑ - avro ํฌ๋งท์ ์ฌ์ฉํ๋ ค๋ฉด ๋ฐ๋ก ์ง์ ํ์
from pyspark.sql import *
from pyspark.sql.functions import *
# avro ํฌ๋งท์ Spark์ ๋ณ๋๋ก ์ง์ ์ด ํ์ํ๋ค.
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Spark Writing Demo") \
.master("local[3]") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
.getOrCreate()
๋ฐ์ดํฐ๋ฅผ ์ํ csv ํ์ผ ๋ก๋ฉ
# ๊ฐ์ง๊ณ ์๋ appl_stock.csv ํ์ผ ๋ก๋ฉ
df = spark.read.format("csv").load("appl_stock.csv")
csvํ์ผ์ด ๋ก๋ฉ๋ df์ ํํฐ์ ๊ฐ์, ๋ ์ฝ๋ ๊ฐ์๋ฅผ ์์๋ณด์.
print("Num Partitions before: " + str(df.rdd.getNumPartitions())) # ๋ช ๊ฐ์ ํํฐ์
์ด ์๋์ง?
df.groupBy(spark_partition_id()).count().show() # ํํฐ์
๋ณ๋ก ๋ช ๊ฐ์ ๋ ์ฝ๋๊ฐ ์๋์ง?
- .rdd.getNumPartitions() : ํด๋น ๋ฐ์ดํฐํ๋ ์์ ํํฐ์ ๊ฐ์ ์ป๊ธฐ
- spark_partition_id : ๋ฐ์ดํฐํ๋ ์์ ๊ตฌ์ฑํ๋ ํํฐ์ ๋ณ๋ก ์์ด๋ ๋ฆฌํด
df๋ฅผ ์ฌ์ฉํด์ df2์ ๋ค ๊ฐ์ ํํฐ์ , df3์ ๋ ๊ฐ์ ํํฐ์ ์ ๊ฐ์ง๋๋ก ์ค์
df2 = df.repartition(4) # ๋ฆฌํํฐ์
ํด์ ๋ค ๊ฐ๋ก ๋ง๋ค์ด์ค
df3 = df2.coalesce(2) # ํํฐ์
์ ์๋ฅผ ์ค์ฌ์ค(์
ํ๋ง์ ์ต์ํํ๋ ๋ฐฉํฅ์ผ๋ก)
๊ทธ๋ฆฌ๊ณ df, df2, df3๋ฅผ ๊ฐ๊ฐ avro, parquet, json์ผ๋ก ์ ์ฅํด๋ณด์.
# df : ํํฐ์
์ด ํ๋์๋ ๊ฒ
# avro๋ก ์ธ์ด๋ธ
# ์ต์
์ผ๋ก path ์ง์ ๊ฐ๋ฅ
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
# df2 : ํํฐ์
์ด ๋ค ๊ฐ์๋ ๊ฒ
# parquet ๋ก ์ธ์ด๋ธ
df2.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "dataOutput/parquet/") \
.save()
# df3 : ํํฐ์
์ด ๋ ๊ฐ์๋ ๊ฒ
# json ๋ก ์ธ์ด๋ธ
df3.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataOutput/json/") \
.save()
๊ทธ๋ฆฌ๊ณ ls ๋ช ๋ น์ด ์ฌ์ฉํด์ ํ์ธํด๋ณด๋ฉด, ๊ฐ ํฌ๋งท๋ณ๋ก ํด๋๊ฐ ์ ์ฅ๋๊ณ ๊ทธ ์์ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋๋ค.
๊ฐ ๋ด์ฉ์ ํ์ธํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
parquet์ ๊ฒฝ์ฐ snappy๊ฐ ๋ค์ ๋ถ์ด์ ์์ถ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ฃผ์์๋ ์ฐ์ฌ์๋ฏ snappy๋ spiltableํ ์์ถ ํํ๋ก, ์์ถ์ด ๋์์ด๋ ๋ธ๋ก๋จ์๋ก ๋๋ ์ ์๋ ์ ์ฉํ ์์ถ ๋จ์์ด๋ค.
์ด๋ฒ์๋ Parquet ํ์ผํฌ๋งท์ Schema Evolution ๊ธฐ๋ฅ์ ์ฌ์ฉํด๋ณด์.
๊ฐ๊ธฐ๋ค๋ฅธ ์คํค๋ง๋ฅผ ๊ฐ์ง ์ธ ๊ฐ์ Parquet ํ์ผ์ ๊ฐ์ ธ์จ๋ค๊ณ ๊ฐ์ ํ์.
๊ฐ parquet ํ์ผ์ด ์ ์ฅ๋์ด์๊ณ , ๋ถ๋ฌ์์ ๋ฐ์ดํฐ๋ฅผ show() ํ์๋ค.
์ด๋ฏธ์ง์์ ๋ณด์ด๋ฏ ๊ฐ parquet ํ์ผ์ ํ๋์ ์ปฌ๋ผ์ด ์ถ๊ฐ๋๋ ํ์์ผ๋ก ์คํค๋ง๊ฐ ๋ค๋ฅด๋ค.
Parquet์ Schema Evolution์ ์ง์ํ๊ธฐ ๋๋ฌธ์ ์ด๋ฅผ ๋ง์น ํ๋์ ์คํค๋ง์ฒ๋ผ ํ๋ฒ์ ๋ถ๋ฌ์ฌ ์๊ฐ ์๋ค.
.option("mergeSchema", True) ์ต์ ์ ์ฌ์ฉํด์ Schema Evolution ๊ธฐ๋ฅ์ ์ฌ์ฉํ์๋ค.
schema1์ด ์๋ ๊ฐ์ง๊ณ ์๋ ๋ฐ์ดํฐ์ฒ๋ผ ์ถ๊ฐ๋ ์ปฌ๋ผ์ ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด NULL๋ก ์ฑ์์ง๋ค.
๐ฆ Spark์ Execution Plan
๐ Execution Plan : ๊ฐ๋ฐ์๊ฐ ์์ฑํ ์ฝ๋๋ฅผ Spark์ด ์ด๋ป๊ฒ ์ค์ ๋ก ์คํ์ ํ๋๊ฐ?
Spark์์ ๋ฐ์ดํฐ ํ๋ ์์ where, groupby, select ๋ฑ์ผ๋ก ์ฒ๋ฆฌํ ๋, ๋ช ๋ น์ด๋ง๋ค ๋ฐ๋ก๋ฐ๋ก ์ฒ๋ฆฌํ์ง ์๋๋ค.
ํด๋น ์์ ๋ค์ ๋ชจ์๋์๋ค๊ฐ, show()๋ collect() ๋ฑ์ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ง์ ๋ณด์ฌ์ค์ผํ ๋ ํ๊บผ๋ฒ์ ์ฒ๋ฆฌํ๊ฒ ๋๋ค!
์ด๋ ๋ฐ์ดํฐ ํ๋ ์์ ์ง์ ์์ ์ ํ๋ ๋ช ๋ น์ด๋ค์ Transformations๋ผ๊ณ ํ๊ณ ,
show()๋ collect()์ฒ๋ผ ๋ณด์ฌ์ฃผ๊ฑฐ๋ Read, Write์ฒ๋ผ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋์ด์ผ ์์ ์ด ๊ฐ๋ฅํ ๋ช ๋ น์ด๋ฅผ Actions๋ผ๊ณ ํ๋ค.
1. Transformations
- Narrow Dependencies : ๋
๋ฆฝ์ ์ธ Partition level ์์
/ ์
ํ๋ง x
- select, filter, map ๋ฑ๋ฑ
- Wide Dependencies : ์
ํ๋ง์ด ํ์ํ ์์
- groupby, reduceby, partitionby, repartition, coalesce ๋ฑ๋ฑ
2. Actions
- Read, Write, Show, Collect : Job์ ์คํ์์ผ ์ค์ ์ฝ๋๊ฐ ์คํ๋๋ ๋ช ๋ น์ด
- Lazy Execution : Job์ ํ๋ ๋ง๋ค์ด๋ด๊ณ ์ฝ๋๋ฅผ ์ค์ ๋ก ์ํ
- ์ต๋ํ ๋ ๋ง์ ์คํผ๋ ์ด์ ์ ๋ชจ์๋์๋ค๊ฐ ํ๋ฒ์ ์ต์ ํ๋ฅผ ํด์ผ ๋ ํจ์จ์ ์ด๊ธฐ ๋๋ฌธ!
๐งฑ ๊ธฐ๋ณธ ๊ตฌ์กฐ : Action > Job > 1 + Stages > 1 + Tasks
- Action : Job์ ํ๋ ๋ง๋ค์ด๋ด๊ณ , ์ฝ๋๊ฐ ์ค์ ๋ก ์คํ๋จ
- Job : ํ๋ ํน์ ๊ทธ ์ด์์ Stage๋ก ๊ตฌ์ฑ๋๋ค.
- Stage๋ ์ ํ๋ง์ด ๋ฐ์ํ๋ ๊ฒฝ์ฐ ์๋ก ์๊ธด๋ค. ex) groupby
- Stage : DAG ํํ๋ก ๊ตฌ์ฑ๋ Task๋ค์ด ์กด์ฌํ์ฌ ๋ณ๋ ฌ ์คํ๋๋ค.
- Task : ๊ฐ์ฅ ์์ ์คํ ์ ๋์ผ๋ก, Executor์ ์ํด ์คํ๋๋ค.
๐ป ์ค์ต(Colab)
1. WordCount.py
from pyspark.sql import *
from pyspark.sql.functions import *
# spark.sql.adaptive.enabled : Fasle๋ ์ดํดํ๊ธฐ ์ด๋ ค์ด ์ต์ ํ๋ Unable
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 3)
# ์ฌ๊ธฐ์ read์ ๊ฒฝ์ฐ ํค๋๋ฅผ ํ์ธํ์ง ์๊ธฐ ๋๋ฌธ์ JOB์ ์คํํ์ง๋ ์์
# ํค๋๋ฅผ ํ์ธํด์ผ ํ ๋๋ง ๋ฐ์ดํฐ๋ฅผ ์ง์ ํ์ธํ๋ฏ๋ก JOB์ ์คํ
df = spark.read.text("shakespeare.txt")
# value(๊ธฐ๋ณธ ์ปฌ๋ผ๋ช
)์ ๊ณต๋ฐฑ์ ๊ธฐ์ค์ผ๋ก ๋๋์ด word ์ปฌ๋ผ์ ๊ฐ๊ฐ ๋ฃ๊ณ , ๊ทธ ๊ฐ์๋ฅผ ์๋ SQL
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
input("stopping ...")
Groupby ๋๋ฌธ์ ์๋ก์ด Stage๊ฐ ์์ฑ๋ ๊ฒ์ ์ ์ ์๋ค.
2. JOIN.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.appName("Shuffle Join Demo") \
.master("local[3]") \
.config("spark.sql.shuffle.partitions", 3) \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
# json ํ์ผ์ read๋ ์คํค๋ง๋ฅผ ์๊ธฐ ์ํด ์ฝ์ด์ผํ๋ฏ๋ก read๋ ACTION์ด ๋๋ค.
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.collect()
input("Waiting ...")
spark.stop()
๊ฐ๊ฐ json์ ํ์ผ์ ๋ถ๋ฌ์ค๋ job์์ Stage๋ฅผ ํ๋์ฉ ๊ฐ๊ฒ ๋๊ณ , JOIN์ ํตํด Stage๋ฅผ ํ๋ ๋ ์์ฑํ๊ฒ ๋๋ค.
์์ ์ด๋ฏธ์ง๊ฐ ๊ธฐ๋ณธ์ ์ธ JOIN์ Execution Plan ํํ๊ฐ ๋๋ค.
๐ฉ Bucketing๊ณผ File System Partitioning
๋ฐ์ดํฐ ์ ์ฅ์ ์ดํ ๋ฐ๋ณต์ฒ๋ฆฌ์ ์ต์ ํ๋ ๋ฐฉ๋ฒ์ผ๋ก ํ๋ ๊ฒ์ด ๋ชฉ์ ์ผ๋ก, Hive ๋ฉํ์คํ ์ด ์ฌ์ฉ์ด ํ์ํ๋ค.
1. Bucketing
- DataFrame์ ํน์ ID ๊ธฐ์ค์ผ๋ก ๋๋ ์ ํ ์ด๋ธ๋ก ์ ์ฅ
- ํ๋ฒ์ ์ ํ๋ง์ด ํ์ํ๋, ๋์ค์ ๋ค์ ์ ํ๋ง์ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๋น์ฑ ์ด ๋จ
- ๋ฐ์ดํฐ์ ํน์ฑ์ ์ ์๊ณ ์๋ ๊ฒฝ์ฐ๋ค ์ฌ์ฉ์ด ๊ฐ๋ฅํ๋ค.
Aggregation, Window ํจ์, JOIN์์ ๋ง์ด ์ฌ์ฉ๋๋ ์ปฌ๋ผ์ด ์๋์ง ๊ฒ์ฌ > ์ด ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ํ ์ด๋ธ ์ ์ฅ
2. File System Partitioning = Hive์ Partitioning
- ๋ฐ์ดํฐ์ ํน์ ์ปฌ๋ผ๋ค์ ๊ธฐ์ค์ผ๋ก ํด๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ค์ด ๋ฐ์ดํฐ ์ ์ฅ์ ์ต์ ํ > ๋ก๋ฉ, ํํฐ๋ง์ ์ค๋ฒํค๋ ์ค์
- Partition Key : ํด๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ค ํน์ ์ปฌ๋ผ
- DataFrameWriter์ partitionBy ์ฌ์ฉ
- Partiton Key๋ Cardinality๊ฐ ์ ์ ์ปฌ๋ผ์ด ์ข๋ค.
ex) ๋ก๊ทธ ํ์ผ์ ๋ฐ์ดํฐ ์์ฑ์๊ฐ ๊ธฐ๋ฐ์ผ๋ก ๋ฐ์ดํฐ ์ฝ๊ธฐ๋ฅผ ๋ง์ด ํ๊ฒ ๋จ.
์ด ๋ก๊ทธ ํ์ผ์ด ๊ต์ฅํ ํฌ๋ค๋ฉด ๋ฐ์ดํฐ ์์ฒด๋ฅผ ์ฐ๋-์-์ผ์ ํด๋ ๊ตฌ์กฐ๋ก ์ ์ฅํ๋ ๊ฒ! > ์ฝ๊ธฐ ๊ณผ์ ์ต์ํ, ๊ด๋ฆฌ ์ฌ์