์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Salting
- Spark Caching
- Spark SQL
- etl
- aws
- Docker
- Kafka
- Spark ์ค์ต
- SQL
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- colab
- AQE
- disk spill
- Spark Partitioning
- Speculative Execution
- KDT_TIL
- Spark
- mysql
- Dag
- DataFrame Hint
- ๋น ๋ฐ์ดํฐ
- CI/CD
- Airflow
- backfill
- spark executor memory
- k8s
- redshift
- off heap memory
- topic
- Kubernetes
- Today
- Total
JUST DO IT!
Spark์์ Parquet ๋ค๋ฃจ๊ธฐ ์ค์ต + Execution Plan ์์๋ณด๊ธฐ - TIL230706 ๋ณธ๋ฌธ
Spark์์ Parquet ๋ค๋ฃจ๊ธฐ ์ค์ต + Execution Plan ์์๋ณด๊ธฐ - TIL230706
sunhokimDev 2023. 7. 9. 23:19๐ KDT WEEK 14 DAY 4 TIL
- Spark ํ์ผํฌ๋งท(Parquet)
- Spark Execution Plan
- Bucketing๊ณผ File System Partitioning
๐ฅ Spark ํ์ผํฌ๋งท
Parquet : Spark์ ๊ธฐ๋ณธ ํ์ผ ํฌ๋งท
Parquet ํ์ด์ง : http://parquet.incubator.apache.org/
Parquet์ Structured ํฌ๋งท์ผ๋ก, ์์ถ๋ ๋ฐ์ด๋๋ฆฌ ํ์ผ์ด๋ฉฐ ๋ด๋ถ์ ์คํค๋ง ์ ๋ณด(ํ๋ ํ์ ๊น์ง)๋ฅผ ๊ฐ์ง ํ์ผ ํฌ๋งท์ด๋ค.
Structured ex) Parquet, AVRO, ORC, SequenceFile ...
๐ป ์ค์ต(Colab)
์ฌ์ฉํ ํ๊ฒฝ ์ค์น
!pip install pyspark==3.3.1 py4j==0.10.9.5
Spark Session ์์ฑ - avro ํฌ๋งท์ ์ฌ์ฉํ๋ ค๋ฉด ๋ฐ๋ก ์ง์ ํ์
from pyspark.sql import *
from pyspark.sql.functions import *
# avro ํฌ๋งท์ Spark์ ๋ณ๋๋ก ์ง์ ์ด ํ์ํ๋ค.
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Spark Writing Demo") \
.master("local[3]") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
.getOrCreate()
๋ฐ์ดํฐ๋ฅผ ์ํ csv ํ์ผ ๋ก๋ฉ
# ๊ฐ์ง๊ณ ์๋ appl_stock.csv ํ์ผ ๋ก๋ฉ
df = spark.read.format("csv").load("appl_stock.csv")
csvํ์ผ์ด ๋ก๋ฉ๋ df์ ํํฐ์ ๊ฐ์, ๋ ์ฝ๋ ๊ฐ์๋ฅผ ์์๋ณด์.
print("Num Partitions before: " + str(df.rdd.getNumPartitions())) # ๋ช ๊ฐ์ ํํฐ์
์ด ์๋์ง?
df.groupBy(spark_partition_id()).count().show() # ํํฐ์
๋ณ๋ก ๋ช ๊ฐ์ ๋ ์ฝ๋๊ฐ ์๋์ง?
- .rdd.getNumPartitions() : ํด๋น ๋ฐ์ดํฐํ๋ ์์ ํํฐ์ ๊ฐ์ ์ป๊ธฐ
- spark_partition_id : ๋ฐ์ดํฐํ๋ ์์ ๊ตฌ์ฑํ๋ ํํฐ์ ๋ณ๋ก ์์ด๋ ๋ฆฌํด
df๋ฅผ ์ฌ์ฉํด์ df2์ ๋ค ๊ฐ์ ํํฐ์ , df3์ ๋ ๊ฐ์ ํํฐ์ ์ ๊ฐ์ง๋๋ก ์ค์
df2 = df.repartition(4) # ๋ฆฌํํฐ์
ํด์ ๋ค ๊ฐ๋ก ๋ง๋ค์ด์ค
df3 = df2.coalesce(2) # ํํฐ์
์ ์๋ฅผ ์ค์ฌ์ค(์
ํ๋ง์ ์ต์ํํ๋ ๋ฐฉํฅ์ผ๋ก)
๊ทธ๋ฆฌ๊ณ df, df2, df3๋ฅผ ๊ฐ๊ฐ avro, parquet, json์ผ๋ก ์ ์ฅํด๋ณด์.
# df : ํํฐ์
์ด ํ๋์๋ ๊ฒ
# avro๋ก ์ธ์ด๋ธ
# ์ต์
์ผ๋ก path ์ง์ ๊ฐ๋ฅ
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
# df2 : ํํฐ์
์ด ๋ค ๊ฐ์๋ ๊ฒ
# parquet ๋ก ์ธ์ด๋ธ
df2.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "dataOutput/parquet/") \
.save()
# df3 : ํํฐ์
์ด ๋ ๊ฐ์๋ ๊ฒ
# json ๋ก ์ธ์ด๋ธ
df3.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataOutput/json/") \
.save()
๊ทธ๋ฆฌ๊ณ ls ๋ช ๋ น์ด ์ฌ์ฉํด์ ํ์ธํด๋ณด๋ฉด, ๊ฐ ํฌ๋งท๋ณ๋ก ํด๋๊ฐ ์ ์ฅ๋๊ณ ๊ทธ ์์ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋๋ค.
๊ฐ ๋ด์ฉ์ ํ์ธํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
parquet์ ๊ฒฝ์ฐ snappy๊ฐ ๋ค์ ๋ถ์ด์ ์์ถ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ฃผ์์๋ ์ฐ์ฌ์๋ฏ snappy๋ spiltableํ ์์ถ ํํ๋ก, ์์ถ์ด ๋์์ด๋ ๋ธ๋ก๋จ์๋ก ๋๋ ์ ์๋ ์ ์ฉํ ์์ถ ๋จ์์ด๋ค.
์ด๋ฒ์๋ Parquet ํ์ผํฌ๋งท์ Schema Evolution ๊ธฐ๋ฅ์ ์ฌ์ฉํด๋ณด์.
๊ฐ๊ธฐ๋ค๋ฅธ ์คํค๋ง๋ฅผ ๊ฐ์ง ์ธ ๊ฐ์ Parquet ํ์ผ์ ๊ฐ์ ธ์จ๋ค๊ณ ๊ฐ์ ํ์.
๊ฐ parquet ํ์ผ์ด ์ ์ฅ๋์ด์๊ณ , ๋ถ๋ฌ์์ ๋ฐ์ดํฐ๋ฅผ show() ํ์๋ค.
์ด๋ฏธ์ง์์ ๋ณด์ด๋ฏ ๊ฐ parquet ํ์ผ์ ํ๋์ ์ปฌ๋ผ์ด ์ถ๊ฐ๋๋ ํ์์ผ๋ก ์คํค๋ง๊ฐ ๋ค๋ฅด๋ค.
Parquet์ Schema Evolution์ ์ง์ํ๊ธฐ ๋๋ฌธ์ ์ด๋ฅผ ๋ง์น ํ๋์ ์คํค๋ง์ฒ๋ผ ํ๋ฒ์ ๋ถ๋ฌ์ฌ ์๊ฐ ์๋ค.
.option("mergeSchema", True) ์ต์ ์ ์ฌ์ฉํด์ Schema Evolution ๊ธฐ๋ฅ์ ์ฌ์ฉํ์๋ค.
schema1์ด ์๋ ๊ฐ์ง๊ณ ์๋ ๋ฐ์ดํฐ์ฒ๋ผ ์ถ๊ฐ๋ ์ปฌ๋ผ์ ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด NULL๋ก ์ฑ์์ง๋ค.
๐ฆ Spark์ Execution Plan
๐ Execution Plan : ๊ฐ๋ฐ์๊ฐ ์์ฑํ ์ฝ๋๋ฅผ Spark์ด ์ด๋ป๊ฒ ์ค์ ๋ก ์คํ์ ํ๋๊ฐ?
Spark์์ ๋ฐ์ดํฐ ํ๋ ์์ where, groupby, select ๋ฑ์ผ๋ก ์ฒ๋ฆฌํ ๋, ๋ช ๋ น์ด๋ง๋ค ๋ฐ๋ก๋ฐ๋ก ์ฒ๋ฆฌํ์ง ์๋๋ค.
ํด๋น ์์ ๋ค์ ๋ชจ์๋์๋ค๊ฐ, show()๋ collect() ๋ฑ์ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ง์ ๋ณด์ฌ์ค์ผํ ๋ ํ๊บผ๋ฒ์ ์ฒ๋ฆฌํ๊ฒ ๋๋ค!
์ด๋ ๋ฐ์ดํฐ ํ๋ ์์ ์ง์ ์์ ์ ํ๋ ๋ช ๋ น์ด๋ค์ Transformations๋ผ๊ณ ํ๊ณ ,
show()๋ collect()์ฒ๋ผ ๋ณด์ฌ์ฃผ๊ฑฐ๋ Read, Write์ฒ๋ผ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋์ด์ผ ์์ ์ด ๊ฐ๋ฅํ ๋ช ๋ น์ด๋ฅผ Actions๋ผ๊ณ ํ๋ค.
1. Transformations
- Narrow Dependencies : ๋
๋ฆฝ์ ์ธ Partition level ์์
/ ์
ํ๋ง x
- select, filter, map ๋ฑ๋ฑ
- Wide Dependencies : ์
ํ๋ง์ด ํ์ํ ์์
- groupby, reduceby, partitionby, repartition, coalesce ๋ฑ๋ฑ
2. Actions
- Read, Write, Show, Collect : Job์ ์คํ์์ผ ์ค์ ์ฝ๋๊ฐ ์คํ๋๋ ๋ช ๋ น์ด
- Lazy Execution : Job์ ํ๋ ๋ง๋ค์ด๋ด๊ณ ์ฝ๋๋ฅผ ์ค์ ๋ก ์ํ
- ์ต๋ํ ๋ ๋ง์ ์คํผ๋ ์ด์ ์ ๋ชจ์๋์๋ค๊ฐ ํ๋ฒ์ ์ต์ ํ๋ฅผ ํด์ผ ๋ ํจ์จ์ ์ด๊ธฐ ๋๋ฌธ!
๐งฑ ๊ธฐ๋ณธ ๊ตฌ์กฐ : Action > Job > 1 + Stages > 1 + Tasks
- Action : Job์ ํ๋ ๋ง๋ค์ด๋ด๊ณ , ์ฝ๋๊ฐ ์ค์ ๋ก ์คํ๋จ
- Job : ํ๋ ํน์ ๊ทธ ์ด์์ Stage๋ก ๊ตฌ์ฑ๋๋ค.
- Stage๋ ์ ํ๋ง์ด ๋ฐ์ํ๋ ๊ฒฝ์ฐ ์๋ก ์๊ธด๋ค. ex) groupby
- Stage : DAG ํํ๋ก ๊ตฌ์ฑ๋ Task๋ค์ด ์กด์ฌํ์ฌ ๋ณ๋ ฌ ์คํ๋๋ค.
- Task : ๊ฐ์ฅ ์์ ์คํ ์ ๋์ผ๋ก, Executor์ ์ํด ์คํ๋๋ค.
๐ป ์ค์ต(Colab)
1. WordCount.py
from pyspark.sql import *
from pyspark.sql.functions import *
# spark.sql.adaptive.enabled : Fasle๋ ์ดํดํ๊ธฐ ์ด๋ ค์ด ์ต์ ํ๋ Unable
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 3)
# ์ฌ๊ธฐ์ read์ ๊ฒฝ์ฐ ํค๋๋ฅผ ํ์ธํ์ง ์๊ธฐ ๋๋ฌธ์ JOB์ ์คํํ์ง๋ ์์
# ํค๋๋ฅผ ํ์ธํด์ผ ํ ๋๋ง ๋ฐ์ดํฐ๋ฅผ ์ง์ ํ์ธํ๋ฏ๋ก JOB์ ์คํ
df = spark.read.text("shakespeare.txt")
# value(๊ธฐ๋ณธ ์ปฌ๋ผ๋ช
)์ ๊ณต๋ฐฑ์ ๊ธฐ์ค์ผ๋ก ๋๋์ด word ์ปฌ๋ผ์ ๊ฐ๊ฐ ๋ฃ๊ณ , ๊ทธ ๊ฐ์๋ฅผ ์๋ SQL
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
input("stopping ...")
Groupby ๋๋ฌธ์ ์๋ก์ด Stage๊ฐ ์์ฑ๋ ๊ฒ์ ์ ์ ์๋ค.
2. JOIN.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.appName("Shuffle Join Demo") \
.master("local[3]") \
.config("spark.sql.shuffle.partitions", 3) \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
# json ํ์ผ์ read๋ ์คํค๋ง๋ฅผ ์๊ธฐ ์ํด ์ฝ์ด์ผํ๋ฏ๋ก read๋ ACTION์ด ๋๋ค.
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.collect()
input("Waiting ...")
spark.stop()
๊ฐ๊ฐ json์ ํ์ผ์ ๋ถ๋ฌ์ค๋ job์์ Stage๋ฅผ ํ๋์ฉ ๊ฐ๊ฒ ๋๊ณ , JOIN์ ํตํด Stage๋ฅผ ํ๋ ๋ ์์ฑํ๊ฒ ๋๋ค.
์์ ์ด๋ฏธ์ง๊ฐ ๊ธฐ๋ณธ์ ์ธ JOIN์ Execution Plan ํํ๊ฐ ๋๋ค.
๐ฉ Bucketing๊ณผ File System Partitioning
๋ฐ์ดํฐ ์ ์ฅ์ ์ดํ ๋ฐ๋ณต์ฒ๋ฆฌ์ ์ต์ ํ๋ ๋ฐฉ๋ฒ์ผ๋ก ํ๋ ๊ฒ์ด ๋ชฉ์ ์ผ๋ก, Hive ๋ฉํ์คํ ์ด ์ฌ์ฉ์ด ํ์ํ๋ค.
1. Bucketing
- DataFrame์ ํน์ ID ๊ธฐ์ค์ผ๋ก ๋๋ ์ ํ ์ด๋ธ๋ก ์ ์ฅ
- ํ๋ฒ์ ์ ํ๋ง์ด ํ์ํ๋, ๋์ค์ ๋ค์ ์ ํ๋ง์ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๋น์ฑ ์ด ๋จ
- ๋ฐ์ดํฐ์ ํน์ฑ์ ์ ์๊ณ ์๋ ๊ฒฝ์ฐ๋ค ์ฌ์ฉ์ด ๊ฐ๋ฅํ๋ค.
Aggregation, Window ํจ์, JOIN์์ ๋ง์ด ์ฌ์ฉ๋๋ ์ปฌ๋ผ์ด ์๋์ง ๊ฒ์ฌ > ์ด ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ํ ์ด๋ธ ์ ์ฅ
2. File System Partitioning = Hive์ Partitioning
- ๋ฐ์ดํฐ์ ํน์ ์ปฌ๋ผ๋ค์ ๊ธฐ์ค์ผ๋ก ํด๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ค์ด ๋ฐ์ดํฐ ์ ์ฅ์ ์ต์ ํ > ๋ก๋ฉ, ํํฐ๋ง์ ์ค๋ฒํค๋ ์ค์
- Partition Key : ํด๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ค ํน์ ์ปฌ๋ผ
- DataFrameWriter์ partitionBy ์ฌ์ฉ
- Partiton Key๋ Cardinality๊ฐ ์ ์ ์ปฌ๋ผ์ด ์ข๋ค.
ex) ๋ก๊ทธ ํ์ผ์ ๋ฐ์ดํฐ ์์ฑ์๊ฐ ๊ธฐ๋ฐ์ผ๋ก ๋ฐ์ดํฐ ์ฝ๊ธฐ๋ฅผ ๋ง์ด ํ๊ฒ ๋จ.
์ด ๋ก๊ทธ ํ์ผ์ด ๊ต์ฅํ ํฌ๋ค๋ฉด ๋ฐ์ดํฐ ์์ฒด๋ฅผ ์ฐ๋-์-์ผ์ ํด๋ ๊ตฌ์กฐ๋ก ์ ์ฅํ๋ ๊ฒ! > ์ฝ๊ธฐ ๊ณผ์ ์ต์ํ, ๊ด๋ฆฌ ์ฌ์