์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Spark Partitioning
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Kubernetes
- k8s
- Spark
- Docker
- CI/CD
- Speculative Execution
- spark executor memory
- colab
- Airflow
- Spark ์ค์ต
- Kafka
- KDT_TIL
- redshift
- ๋น ๋ฐ์ดํฐ
- aws
- Spark Caching
- mysql
- AQE
- Spark SQL
- disk spill
- etl
- backfill
- Dag
- Salting
- off heap memory
- DataFrame Hint
- SQL
- topic
- Today
- Total
JUST DO IT!
Spark์์ Parquet ๋ค๋ฃจ๊ธฐ ์ค์ต + Execution Plan ์์๋ณด๊ธฐ - TIL230706 ๋ณธ๋ฌธ
Spark์์ Parquet ๋ค๋ฃจ๊ธฐ ์ค์ต + Execution Plan ์์๋ณด๊ธฐ - TIL230706
sunhokimDev 2023. 7. 9. 23:19๐ KDT WEEK 14 DAY 4 TIL
- Spark ํ์ผํฌ๋งท(Parquet)
- Spark Execution Plan
- Bucketing๊ณผ File System Partitioning
๐ฅ Spark ํ์ผํฌ๋งท
Parquet : Spark์ ๊ธฐ๋ณธ ํ์ผ ํฌ๋งท
Parquet ํ์ด์ง : http://parquet.incubator.apache.org/
Parquet์ Structured ํฌ๋งท์ผ๋ก, ์์ถ๋ ๋ฐ์ด๋๋ฆฌ ํ์ผ์ด๋ฉฐ ๋ด๋ถ์ ์คํค๋ง ์ ๋ณด(ํ๋ ํ์ ๊น์ง)๋ฅผ ๊ฐ์ง ํ์ผ ํฌ๋งท์ด๋ค.
Structured ex) Parquet, AVRO, ORC, SequenceFile ...
๐ป ์ค์ต(Colab)
์ฌ์ฉํ ํ๊ฒฝ ์ค์น
!pip install pyspark==3.3.1 py4j==0.10.9.5
Spark Session ์์ฑ - avro ํฌ๋งท์ ์ฌ์ฉํ๋ ค๋ฉด ๋ฐ๋ก ์ง์ ํ์
from pyspark.sql import *
from pyspark.sql.functions import *
# avro ํฌ๋งท์ Spark์ ๋ณ๋๋ก ์ง์ ์ด ํ์ํ๋ค.
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Spark Writing Demo") \
.master("local[3]") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
.getOrCreate()
๋ฐ์ดํฐ๋ฅผ ์ํ csv ํ์ผ ๋ก๋ฉ
# ๊ฐ์ง๊ณ ์๋ appl_stock.csv ํ์ผ ๋ก๋ฉ
df = spark.read.format("csv").load("appl_stock.csv")
csvํ์ผ์ด ๋ก๋ฉ๋ df์ ํํฐ์ ๊ฐ์, ๋ ์ฝ๋ ๊ฐ์๋ฅผ ์์๋ณด์.
print("Num Partitions before: " + str(df.rdd.getNumPartitions())) # ๋ช ๊ฐ์ ํํฐ์
์ด ์๋์ง?
df.groupBy(spark_partition_id()).count().show() # ํํฐ์
๋ณ๋ก ๋ช ๊ฐ์ ๋ ์ฝ๋๊ฐ ์๋์ง?
- .rdd.getNumPartitions() : ํด๋น ๋ฐ์ดํฐํ๋ ์์ ํํฐ์ ๊ฐ์ ์ป๊ธฐ
- spark_partition_id : ๋ฐ์ดํฐํ๋ ์์ ๊ตฌ์ฑํ๋ ํํฐ์ ๋ณ๋ก ์์ด๋ ๋ฆฌํด
df๋ฅผ ์ฌ์ฉํด์ df2์ ๋ค ๊ฐ์ ํํฐ์ , df3์ ๋ ๊ฐ์ ํํฐ์ ์ ๊ฐ์ง๋๋ก ์ค์
df2 = df.repartition(4) # ๋ฆฌํํฐ์
ํด์ ๋ค ๊ฐ๋ก ๋ง๋ค์ด์ค
df3 = df2.coalesce(2) # ํํฐ์
์ ์๋ฅผ ์ค์ฌ์ค(์
ํ๋ง์ ์ต์ํํ๋ ๋ฐฉํฅ์ผ๋ก)
๊ทธ๋ฆฌ๊ณ df, df2, df3๋ฅผ ๊ฐ๊ฐ avro, parquet, json์ผ๋ก ์ ์ฅํด๋ณด์.
# df : ํํฐ์
์ด ํ๋์๋ ๊ฒ
# avro๋ก ์ธ์ด๋ธ
# ์ต์
์ผ๋ก path ์ง์ ๊ฐ๋ฅ
df.write \
.format("avro") \
.mode("overwrite") \
.option("path", "dataOutput/avro/") \
.save()
# df2 : ํํฐ์
์ด ๋ค ๊ฐ์๋ ๊ฒ
# parquet ๋ก ์ธ์ด๋ธ
df2.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "dataOutput/parquet/") \
.save()
# df3 : ํํฐ์
์ด ๋ ๊ฐ์๋ ๊ฒ
# json ๋ก ์ธ์ด๋ธ
df3.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataOutput/json/") \
.save()
๊ทธ๋ฆฌ๊ณ ls ๋ช ๋ น์ด ์ฌ์ฉํด์ ํ์ธํด๋ณด๋ฉด, ๊ฐ ํฌ๋งท๋ณ๋ก ํด๋๊ฐ ์ ์ฅ๋๊ณ ๊ทธ ์์ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋๋ค.
๊ฐ ๋ด์ฉ์ ํ์ธํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
parquet์ ๊ฒฝ์ฐ snappy๊ฐ ๋ค์ ๋ถ์ด์ ์์ถ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์ฃผ์์๋ ์ฐ์ฌ์๋ฏ snappy๋ spiltableํ ์์ถ ํํ๋ก, ์์ถ์ด ๋์์ด๋ ๋ธ๋ก๋จ์๋ก ๋๋ ์ ์๋ ์ ์ฉํ ์์ถ ๋จ์์ด๋ค.
์ด๋ฒ์๋ Parquet ํ์ผํฌ๋งท์ Schema Evolution ๊ธฐ๋ฅ์ ์ฌ์ฉํด๋ณด์.
๊ฐ๊ธฐ๋ค๋ฅธ ์คํค๋ง๋ฅผ ๊ฐ์ง ์ธ ๊ฐ์ Parquet ํ์ผ์ ๊ฐ์ ธ์จ๋ค๊ณ ๊ฐ์ ํ์.
๊ฐ parquet ํ์ผ์ด ์ ์ฅ๋์ด์๊ณ , ๋ถ๋ฌ์์ ๋ฐ์ดํฐ๋ฅผ show() ํ์๋ค.
์ด๋ฏธ์ง์์ ๋ณด์ด๋ฏ ๊ฐ parquet ํ์ผ์ ํ๋์ ์ปฌ๋ผ์ด ์ถ๊ฐ๋๋ ํ์์ผ๋ก ์คํค๋ง๊ฐ ๋ค๋ฅด๋ค.
Parquet์ Schema Evolution์ ์ง์ํ๊ธฐ ๋๋ฌธ์ ์ด๋ฅผ ๋ง์น ํ๋์ ์คํค๋ง์ฒ๋ผ ํ๋ฒ์ ๋ถ๋ฌ์ฌ ์๊ฐ ์๋ค.
.option("mergeSchema", True) ์ต์ ์ ์ฌ์ฉํด์ Schema Evolution ๊ธฐ๋ฅ์ ์ฌ์ฉํ์๋ค.
schema1์ด ์๋ ๊ฐ์ง๊ณ ์๋ ๋ฐ์ดํฐ์ฒ๋ผ ์ถ๊ฐ๋ ์ปฌ๋ผ์ ๋ฐ์ดํฐ๊ฐ ์์ผ๋ฉด NULL๋ก ์ฑ์์ง๋ค.
๐ฆ Spark์ Execution Plan
๐ Execution Plan : ๊ฐ๋ฐ์๊ฐ ์์ฑํ ์ฝ๋๋ฅผ Spark์ด ์ด๋ป๊ฒ ์ค์ ๋ก ์คํ์ ํ๋๊ฐ?
Spark์์ ๋ฐ์ดํฐ ํ๋ ์์ where, groupby, select ๋ฑ์ผ๋ก ์ฒ๋ฆฌํ ๋, ๋ช ๋ น์ด๋ง๋ค ๋ฐ๋ก๋ฐ๋ก ์ฒ๋ฆฌํ์ง ์๋๋ค.
ํด๋น ์์ ๋ค์ ๋ชจ์๋์๋ค๊ฐ, show()๋ collect() ๋ฑ์ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ง์ ๋ณด์ฌ์ค์ผํ ๋ ํ๊บผ๋ฒ์ ์ฒ๋ฆฌํ๊ฒ ๋๋ค!
์ด๋ ๋ฐ์ดํฐ ํ๋ ์์ ์ง์ ์์ ์ ํ๋ ๋ช ๋ น์ด๋ค์ Transformations๋ผ๊ณ ํ๊ณ ,
show()๋ collect()์ฒ๋ผ ๋ณด์ฌ์ฃผ๊ฑฐ๋ Read, Write์ฒ๋ผ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ฐ ์๋ฃ๋์ด์ผ ์์ ์ด ๊ฐ๋ฅํ ๋ช ๋ น์ด๋ฅผ Actions๋ผ๊ณ ํ๋ค.
1. Transformations
- Narrow Dependencies : ๋
๋ฆฝ์ ์ธ Partition level ์์
/ ์
ํ๋ง x
- select, filter, map ๋ฑ๋ฑ
- Wide Dependencies : ์
ํ๋ง์ด ํ์ํ ์์
- groupby, reduceby, partitionby, repartition, coalesce ๋ฑ๋ฑ
2. Actions
- Read, Write, Show, Collect : Job์ ์คํ์์ผ ์ค์ ์ฝ๋๊ฐ ์คํ๋๋ ๋ช ๋ น์ด
- Lazy Execution : Job์ ํ๋ ๋ง๋ค์ด๋ด๊ณ ์ฝ๋๋ฅผ ์ค์ ๋ก ์ํ
- ์ต๋ํ ๋ ๋ง์ ์คํผ๋ ์ด์ ์ ๋ชจ์๋์๋ค๊ฐ ํ๋ฒ์ ์ต์ ํ๋ฅผ ํด์ผ ๋ ํจ์จ์ ์ด๊ธฐ ๋๋ฌธ!
๐งฑ ๊ธฐ๋ณธ ๊ตฌ์กฐ : Action > Job > 1 + Stages > 1 + Tasks
- Action : Job์ ํ๋ ๋ง๋ค์ด๋ด๊ณ , ์ฝ๋๊ฐ ์ค์ ๋ก ์คํ๋จ
- Job : ํ๋ ํน์ ๊ทธ ์ด์์ Stage๋ก ๊ตฌ์ฑ๋๋ค.
- Stage๋ ์ ํ๋ง์ด ๋ฐ์ํ๋ ๊ฒฝ์ฐ ์๋ก ์๊ธด๋ค. ex) groupby
- Stage : DAG ํํ๋ก ๊ตฌ์ฑ๋ Task๋ค์ด ์กด์ฌํ์ฌ ๋ณ๋ ฌ ์คํ๋๋ค.
- Task : ๊ฐ์ฅ ์์ ์คํ ์ ๋์ผ๋ก, Executor์ ์ํด ์คํ๋๋ค.
๐ป ์ค์ต(Colab)
1. WordCount.py
from pyspark.sql import *
from pyspark.sql.functions import *
# spark.sql.adaptive.enabled : Fasle๋ ์ดํดํ๊ธฐ ์ด๋ ค์ด ์ต์ ํ๋ Unable
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 3)
# ์ฌ๊ธฐ์ read์ ๊ฒฝ์ฐ ํค๋๋ฅผ ํ์ธํ์ง ์๊ธฐ ๋๋ฌธ์ JOB์ ์คํํ์ง๋ ์์
# ํค๋๋ฅผ ํ์ธํด์ผ ํ ๋๋ง ๋ฐ์ดํฐ๋ฅผ ์ง์ ํ์ธํ๋ฏ๋ก JOB์ ์คํ
df = spark.read.text("shakespeare.txt")
# value(๊ธฐ๋ณธ ์ปฌ๋ผ๋ช
)์ ๊ณต๋ฐฑ์ ๊ธฐ์ค์ผ๋ก ๋๋์ด word ์ปฌ๋ผ์ ๊ฐ๊ฐ ๋ฃ๊ณ , ๊ทธ ๊ฐ์๋ฅผ ์๋ SQL
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()
df_count.show()
input("stopping ...")
Groupby ๋๋ฌธ์ ์๋ก์ด Stage๊ฐ ์์ฑ๋ ๊ฒ์ ์ ์ ์๋ค.
2. JOIN.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession \
.builder \
.appName("Shuffle Join Demo") \
.master("local[3]") \
.config("spark.sql.shuffle.partitions", 3) \
.config("spark.sql.adaptive.enabled", False) \
.getOrCreate()
# json ํ์ผ์ read๋ ์คํค๋ง๋ฅผ ์๊ธฐ ์ํด ์ฝ์ด์ผํ๋ฏ๋ก read๋ ACTION์ด ๋๋ค.
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")
join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")
join_df.collect()
input("Waiting ...")
spark.stop()
๊ฐ๊ฐ json์ ํ์ผ์ ๋ถ๋ฌ์ค๋ job์์ Stage๋ฅผ ํ๋์ฉ ๊ฐ๊ฒ ๋๊ณ , JOIN์ ํตํด Stage๋ฅผ ํ๋ ๋ ์์ฑํ๊ฒ ๋๋ค.
์์ ์ด๋ฏธ์ง๊ฐ ๊ธฐ๋ณธ์ ์ธ JOIN์ Execution Plan ํํ๊ฐ ๋๋ค.
๐ฉ Bucketing๊ณผ File System Partitioning
๋ฐ์ดํฐ ์ ์ฅ์ ์ดํ ๋ฐ๋ณต์ฒ๋ฆฌ์ ์ต์ ํ๋ ๋ฐฉ๋ฒ์ผ๋ก ํ๋ ๊ฒ์ด ๋ชฉ์ ์ผ๋ก, Hive ๋ฉํ์คํ ์ด ์ฌ์ฉ์ด ํ์ํ๋ค.
1. Bucketing
- DataFrame์ ํน์ ID ๊ธฐ์ค์ผ๋ก ๋๋ ์ ํ ์ด๋ธ๋ก ์ ์ฅ
- ํ๋ฒ์ ์ ํ๋ง์ด ํ์ํ๋, ๋์ค์ ๋ค์ ์ ํ๋ง์ ๋ฐฉ์งํ๊ธฐ ์ํ ๋๋น์ฑ ์ด ๋จ
- ๋ฐ์ดํฐ์ ํน์ฑ์ ์ ์๊ณ ์๋ ๊ฒฝ์ฐ๋ค ์ฌ์ฉ์ด ๊ฐ๋ฅํ๋ค.
Aggregation, Window ํจ์, JOIN์์ ๋ง์ด ์ฌ์ฉ๋๋ ์ปฌ๋ผ์ด ์๋์ง ๊ฒ์ฌ > ์ด ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก ํ ์ด๋ธ ์ ์ฅ
2. File System Partitioning = Hive์ Partitioning
- ๋ฐ์ดํฐ์ ํน์ ์ปฌ๋ผ๋ค์ ๊ธฐ์ค์ผ๋ก ํด๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ค์ด ๋ฐ์ดํฐ ์ ์ฅ์ ์ต์ ํ > ๋ก๋ฉ, ํํฐ๋ง์ ์ค๋ฒํค๋ ์ค์
- Partition Key : ํด๋ ๊ตฌ์กฐ๋ฅผ ๋ง๋ค ํน์ ์ปฌ๋ผ
- DataFrameWriter์ partitionBy ์ฌ์ฉ
- Partiton Key๋ Cardinality๊ฐ ์ ์ ์ปฌ๋ผ์ด ์ข๋ค.
ex) ๋ก๊ทธ ํ์ผ์ ๋ฐ์ดํฐ ์์ฑ์๊ฐ ๊ธฐ๋ฐ์ผ๋ก ๋ฐ์ดํฐ ์ฝ๊ธฐ๋ฅผ ๋ง์ด ํ๊ฒ ๋จ.
์ด ๋ก๊ทธ ํ์ผ์ด ๊ต์ฅํ ํฌ๋ค๋ฉด ๋ฐ์ดํฐ ์์ฒด๋ฅผ ์ฐ๋-์-์ผ์ ํด๋ ๊ตฌ์กฐ๋ก ์ ์ฅํ๋ ๊ฒ! > ์ฝ๊ธฐ ๊ณผ์ ์ต์ํ, ๊ด๋ฆฌ ์ฌ์