์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- topic
- etl
- Salting
- off heap memory
- backfill
- Kubernetes
- Spark Caching
- redshift
- ๋น ๋ฐ์ดํฐ
- Airflow
- Spark SQL
- mysql
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- aws
- Spark ์ค์ต
- KDT_TIL
- Docker
- Dag
- SQL
- Spark
- spark executor memory
- Spark Partitioning
- Speculative Execution
- disk spill
- CI/CD
- k8s
- colab
- Kafka
- AQE
- DataFrame Hint
- Today
- Total
JUST DO IT!
Spark Partitioning๊ณผ AQE ์์๋ณด๊ณ ์ค์ตํ๊ธฐ - TIL230725(2) ๋ณธ๋ฌธ
Spark Partitioning๊ณผ AQE ์์๋ณด๊ณ ์ค์ตํ๊ธฐ - TIL230725(2)
sunhokimDev 2023. 7. 25. 23:09๐ KDT WEEK 17 DAY 2 TIL(2)
- Dynamic Partition Pruning
- Spark Partition ์ ์กฐ์ ํ๊ธฐ - Repartition & Coalesce
- DataFrame Hint
- AQE
๐ฅ Dynamic Partition Pruning
๋น Partition ํ ์ด๋ธ์ ์ ์ฉ๋ ํํฐ๋ง์ Partition ํ ์ด๋ธ์ ์ ์ฉํด๋ณด๋ ๊ฒ
์ผ๋ฐ์ ์ผ๋ก ํฐ ๋ฐ์ดํฐ์ ์ ์กฐ๊ฑด๋ฌธ(WHERE)์ ์ ์ฉํจ์ผ๋ก์จ ํฐ ๋ฐ์ดํฐ๋ฅผ ๋จผ์ ์๊ฒ ํํฐ๋งํ๋ ๊ฒ์ด ํจ์จ์ ์ด๋ค.
์๋ ์ด๋ฏธ์ง๋ orders ํ ์ด๋ธ์ด ํํฐ์ ์ด ์๋ ํฐ ํ ์ด๋ธ์ด๊ณ , date๊ฐ ํํฐ์ ์ด ์๋ ์์ ํ ์ด๋ธ์ผ ๋,
์ฌ์ฉ์๊ฐ ์ค์๋ก date ํ ์ด๋ธ์ ์กฐ๊ฑด๋ฌธ์ ๊ฑธ์์ ๋ ์๋์ผ๋ก ์์ ๋๋ ๊ณผ์ ์ด๋ค.
๊ธฐ๋ณธ ์ค์ ์ True๋ก ๋์ด์์ง๋ง, ๋ค์์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ๋ฉด False๋ก ๋ฐ๊ฟ ์๋ ์๋ค.
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false") # OFF
๐ฆ Spark Partiton ์ ์กฐ์ - Repartition๊ณผ Coalesce
1. Repartition
- ํํฐ์ ์๋ฅผ ์ฆ๊ฐ์์ผ ๋ณ๋ ฌ์ฑ์ ์ฆ๋
- ๊ต์ฅํ ํฐ ํํฐ์ ์ด๋ Skew ํํฐ์ ์ ํฌ๊ธฐ ์กฐ์
- ํ์ง๋ง ํํฐ์ ์ ์๊ฐ ๋๋ฌด ๋ง์์ง๋ฉด ํ์คํฌ ์ค์ผ์ค๋ง์ ์ค๋ฒํค๋ ๋ฑ์ ์ด์ ๋ฐ์
- Shuffling์ด ๋ฐ์ํ๋ฏ๋ก ๋ถํ์ํ๊ฒ ์ฌ์ฉ๋ ๊ฒฝ์ฐ ์คํ๋ ค ์๊ฐ๊ณผ ๋น์ฉ์ด ์ฆ๊ฐ
Repartition ์ฌ์ฉ ๋ฐฉ๋ฒ(Hash ๊ธฐ๋ฐ)
- repartition(5)
- rapartition(5, "city") # city ์ปฌ๋ผ์ ๊ธฐ์ค์ผ๋ก 5๊ฐ์ ํํฐ์ ์ผ๋ก repartition, ์ปฌ๋ผ์ ์ฌ๋ฌ ๊ฐ๋ ์ ๋ ฅ ๊ฐ๋ฅ
- repartition("city") # spark.sql.shuffle.partitions ๊ฐ์ ๋ฐ๋ผ repartition, AQE๋ฅผ ์ฌ์ฉํ๋ค๋ฉด ๋ค๋ฅผ ์ ์์
- repartitionByRange(numPartitions, *cols) # ์ง์ ๋ ์ปฌ๋ผ ๊ฐ์ ๋ฒ์๋ฅผ ๊ธฐ์ค์ผ๋ก ๋๋, ์์์ ์ํ๋งํด์ ๋๋๋ค.
2. Coalesce
- ํํฐ์ ์ ์๋ฅผ ์ค์ด๋ ์ฉ๋๋ก, ๋ก์ปฌ ํํฐ์ ๋ค์ Merge
- Shuffling์ด ๋ฐ์ํ์ง ์์ง๋ง, ํํฐ์ ์ ํฉ์น๋ ๊ณผ์ ์์ Skew ํํฐ์ ์ด ๋ฐ์ํ ์ ์์
> Repartition ์ค์ต
# order ํ
์ด๋ธ์ ํํฐ์
๋ณ ๋ช ๊ฐ์ ๋ฐ์ดํฐ๊ฐ ์ ์ฅ๋์๋์ง ํ์ธํด๋ณด๊ธฐ
from pyspark.sql.functions import spark_partition_id
spark.table("order").groupBy(spark_partition_id()).count().show()
# ๋ผ์ด๋๋ก๋นํํ๋ก ๊ท ๋ฑํ๊ฒ 10๊ฐ์ ํํฐ์
์ผ๋ก ๋ฆฌํํฐ์
๋จ
order_10 = spark.table("order").repartition(10).cache()
order_10.groupBy(spark_partition_id()).count().show()
# "sku" ์ปฌ๋ผ์ผ๋ก ๋ฆฌํํฐ์
sku_df = spark.table("order").repartition("sku")
# AQE๊ฐ ํ์ฑํ๋์ด ์์ง ์์ผ๋ฉด spark.sql.shuffle.partitions ๊ฐ์ ๋ฐ๋ผ ๋ฆฌํํฐ์
# ํ์ง๋ง ๊ธฐ๋ณธ์ ์ผ๋ก ํ์ฑํ๋์ด์๊ธฐ ๋๋ฌธ์ ์์์ ์ต์ ํ
sku_df.groupBy(spark_partition_id()).count().show()
๐ง DataFrame ๊ด๋ จ ํํธ
Spark SQL Optimizer์๊ฒ Execution plan์ ๋ง๋ฌ์ ์์ด ํน์ ํ ๋ฐฉ์์ ์ฌ์ฉํ๋๋ก ์ ์ํ๋ ๊ฒ
1. Partitioning ๊ด๋ จ ํํธ
์ด๋ค ๋ฐฉ์์ผ๋ก Partitioning์ ํ๊ณ ์ถ์์ง ๋ณด๋ผ ์ ์๋ค.
ex)
COALESCE, REPARTITION, REPARTITON_BY_RANGE๋ฅผ ์ฌ์ฉํ ์ ์๊ณ , ๋ค์ ์ธ์๋ก ํํฐ์ ๊ฐ์์ ์ปฌ๋ผ์ ๋ฐ์ ์ ์๋ค.
REBALANCE๋ผ๋ ์ต์ ๋ ์๋๋ฐ, ํ์ผ์ ํฌ๊ธฐ๋ฅผ ์ต๋ํ ๋น์ทํ๊ฒ ๋ง๋ค์ด์ ์ ์ฅํด์ฃผ๋ ์ต์ ์ด๋ค.(AQE ํ์)
2. Join ๊ด๋ จ ํํธ
Join ๋ฐฉ์์ ๊ด๋ จ๋ ํํธ๋ฅผ ์ ๊ณตํ๋ค.
์ต์ ์ผ๋ก BROADCAST, MAPJOIN, MERGE(๊ธฐ๋ณธ), SHUFFLE_HASH(Full Outer Join์ ๋ถ๊ฐ), SHUFFLE_REPLICATE_NL ๋ฑ์ด ์๋ค.
ํํธ ์ฌ์ฉ ๋ฐฉ์
1. DataFrame API
์ผ๋ฐ์ ์ธ DataFrame Operation ๋ค๋ก .hint๋ฅผ ์ฌ์ฉํจ์ผ๋ก์จ Optimizer์๊ฒ ํํธ๋ฅผ ์ ๊ณตํ๋ค.
df1.join(df2, "id", "inner").hint("COALESCE", 3)
df1.join(df2.hint("broadcast"), "id", "inner").hint("COALESCE", 3) # JOIN ํํธ๋ฅผ ์ถ๊ฐํ ๊ฒฝ์ฐ
2. Spark SQL
/*+ hint [, ...] */ ๋ฅผ ์ฟผ๋ฆฌ๋ฌธ์์ ์ฝ์ ํ์ฌ ์ฌ์ฉํ๋ค.
SELECT /*+ REPARTITION(3) */ * FROM TABLE
SELECT /*+ BROADCAST(table1) */ * FROM table1 JOIN table2 ON table1.key = table2.key
๐ฉ AQE (Adaptive Query Execution)
Spark 3.2๋ถํฐ ๊ธฐ๋ณธ์ผ๋ก ์ ์ฉ๋๋ ์ต์ ํ ํ ํฌ๋
Partition์ ์๊ฐ ์ ์ผ๋ฉด ๋ณ๋ ฌ์ฑ์ด ๋จ์ด์ง๊ณ OOM๊ณผ disk spill์ ๊ฐ๋ฅ์ฑ์ ๋์ด๊ณ ,
Partition์ ์๊ฐ ๋ง์ผ๋ฉด task scheduler์ task ์์ฑ ๊ด๋ จ๋ ์ค๋ฒํค๋๊ฐ ์๊ธฐ๋ฉฐ ๋คํธ์ํฌ ๋ณ๋ชฉ์ ์ด๋ํ๋ค.
Spark Engine Optimizer๊ฐ ์์์ Partition์ ์๋ฅผ ๊ฒฐ์ ํ ์ ์๋ค๋ฉด? >> AQE
AQE๋ ๊ธฐ์กด์ parsing time ์ต์ ํ๋ฟ๋ง ์๋๋ผ runtime ์ต์ ํ๊น์ง ๋ณํ๋์ด ๋ ํจ์จ์ ์ธ ์ต์ ํ๋ฅผ ์ถ๊ตฌํ๋ค.
"Dynamic query optimization that happens in the middle of query execution based on runtime statistics"
๋ค์๋งํด AQE๋ ๋์ ์ผ๋ก ์ฟผ๋ฆฌ ํ๋์ ๋ฐ๊พธ๋๋ฐ, ๊ทธ ์์ ์ด Shuffling์ ํ๊ธฐ ์ ํ(Staging)์ด๋ค.
Stage DAG๋ฅผ ์์ฐจ์ ์ผ๋ก ์คํํด๋ณด๊ณ , ๋งค๋ฒ ์๋ก์ด ์ต์ ํ ๊ธฐํ๊ฐ ์๋์ง ์กฐ์ฌํ๋ค.
AQE๊ฐ ํ์ํ ๊ฒฝ์ฐ๋ค
- Shufflingํ Partition์ ์๋ฅผ ๋์ ์ผ๋ก ์กฐ์ ํ๊ธฐ
- Join Plan ๋์ ์ผ๋ก ๋ณ๊ฒฝํ๊ธฐ(runtime ํต๊ณ๋ฅผ ๊ฐ์ ธ์ค๋ฏ๋ก ๊ฐ๋ฅ)
- Skew Join ์ต์ ํ
โ AQE ๋์
1. Dynamically coalescing shuffle partitions
๊ธฐ๋ณธ์ ์ผ๋ก Job์ ์คํํ๋ฉด์ ๋ด๋ถ์ ์ผ๋ก ๋ง์ ์์ ํํฐ์ ์ ์ผ๋ถ๋ฌ ์์ฑํ๊ณ , ๋งค Stage๊ฐ ์ข ๋ฃ๋ ๋๋ง๋ค ํ์ํ๋ค๋ฉด ์๋์ผ๋ก Coalesce๋ฅผ ์ํํ๋ฉด์ ์ ๋นํ ์์ ํํฐ์ ์๋ฅผ ๋ง์ถฐ๊ฐ๋ ๋ฐฉ์์ด๋ค.
์ฒซ ํํฐ์ ์ ์๋ spark.sql.adaptive.coalescePartitions.initialPartitionNum ๊ฐ์ ๋ฐ๋ฅด์ง๋ง, ๊ธฐ๋ณธ๊ฐ์ ์์ด์
๊ธฐ๋ณธ์ ์ผ๋ก spark.sql.shuffle.partitions ์ ๊ฐ์ ๋ฐ๋ฅธ๋ค.
์๋ ๊ทธ๋ฆผ์ Shuffleํ์ ํํฐ์ ์ ์๊ฐ ๋ง๋ค๊ณ ํ๋จํด์ COALESCE ๋์์ ์ถ๊ฐํ๋ ๋ชจ์ต์ด๋ค.
์ด๋ ์๋์ ์ค์ ๋ณ์์ ๋ฐ๋ผ ์กฐ์ ๋๋ค. ๊ดํธ ๊ฐ์ ๊ธฐ๋ณธ๊ฐ์ด๋ค.
- spark.sql.adaptive.coalescePartitions.enabled : ์ ํ ํ ํํฐ์ ์๋ฅผ ๋์ ์ผ๋ก ์ค์ธ๋ค. (True)
- spark.sql.adaptive.coalescePartitions.parallelismFirst : ๋ณ๋ ฌ์ฑ์ ๋ณด์ฅํ ๊ฒ์ธ์ง ์ ํ๋ค. ๋ ๊ฒฝ์ฐ๋ก ๋๋๋ค. (True)
- True์ธ ๊ฒฝ์ฐ > spark.sql.adaptive.coalescePartitions.minPartitionSize ๊ฐ์ ๋ฐ๋ผ Coalescing ๊ฒฐ์ (1MB)
- .minPartitionSize๋ Coalescing ํ ํํฐ์ ์ ์ต์ ํฌ๊ธฐ๋ก์จ, ์ด ๊ฐ๋ณด๋ค ํํฐ์ ์ ํฌ๊ธฐ๊ฐ ์์ผ๋ฉด Coalesce
- False์ธ ๊ฒฝ์ฐ(๊ณต์๋ฌธ์ ์ถ์ฒ) > spark.sql.adaptive.advisoryPartitionSizeInBytes ๊ฐ์ผ๋ก ๋ง์ถฐ์ ํํฐ์
๋ (64MB)
- .advisoryPartitionSizeInBytes๋ ์ ํ๋ง ํ ํํฐ์ ์๋ฅผ ์ค์ผ ๋ ๋ชฉํ๋ก ํ๋ ํํฐ์ ์ ํฌ๊ธฐ์ด๋ค.
- True์ธ ๊ฒฝ์ฐ > spark.sql.adaptive.coalescePartitions.minPartitionSize ๊ฐ์ ๋ฐ๋ผ Coalescing ๊ฒฐ์ (1MB)
2. Dynamically switching join strategies
runtime ํต๊ณ๋ฅผ ํตํด Join ํ๋์ ๋์ ์ผ๋ก ๋ฐ๊ฟ ๋ค์ ์คํํ๋ ๋ฐฉ๋ฒ
Static Query Plan(์ฒ์ ๊ธฐ๋ณธ ํ๋)์ด ์ฌ๋ฌ ์ด์ ๋ก BHJ(Broadcast Hash Join) ๊ธฐํ๋ฅผ ๋์น ๊ฒฝ์ฐ์ ํจ์จ์ ์ธ ๋ฐฉ๋ฒ์ด๋ค.
Shuffling ํ AQEShuffleRead๋ฅผ ์ํํ์ฌ ์๋ง๋ Join ๋ฐฉ์์ผ๋ก ๋ณ๊ฒฝํ๋ค. Join ๋ฐฉ์์ ๋ง๋ ์ ํ๋ง์ ๋ค์ ์คํํ๊ฒ๋๋ค.
- spark.sql.join.preferSortMergeJoin : Join์ Sort Merge Join ๊ธฐ๋ณธ ์ฌ์ฉ ์ฌ๋ถ (True)
- spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold : Hash Join์ ํํฐ์ ๋ณ ํด์๋งต ์ต๋ ํฌ๊ธฐ (0)
- spark.sql.adaptive.autoBroadcastJoinThreshold : Broadcast ๊ฐ๋ฅํ ๋ฐ์ดํฐํ๋ ์ ์ต๋ ํฌ๊ธฐ (์์), -1์ด๋ฉด ์ฌ์ฉx
- spark.sql.autoBroadcastJoinThreshold์ ๊ธฐ๋ณธ๊ฐ์ด ๋์ผํ๊ณ , AQE ํ์ฑํ ํ์
- spark.sql.adaptive.autoBroadcastJoinThreshold ์ spark.sql.autoBroadcastJoinThreshold ์ ์ฐจ์ด๋ ๊ฐ๊ฐ AQE์์ BroadCast Join ํ๋จ์ ์ฌ์ฉํ๊ณ , Shuffling ์ ์ ์ฒ์๋ถํฐ BroadCastJoin์ด ํ์ํ์ง ํ์ธํ๋ ๋ณ์์ด๋ค.
3. Dynamically optimizing skew joins
Skew ํํฐ์ ์ผ๋ก ์ธํ ์ฑ๋ฅ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด AQE๊ฐ ์ต์ ํํ๋ ๋ฐฉ๋ฒ
Skew ํํฐ์ ์ ์กด์ฌ ์ฌ๋ถ ํ์ ํ, Skew ํํฐ์ ์ ์๊ฒ ๋๋๊ณ ์๋ ์กฐ์ธ ํํฐ์ ์ ์ค๋ณต ์์ฑํ์ฌ ์กฐ์ธ์ ์ํํ๋ค.
Skew Partition์ธ์ง ํ๋จํ๋ ์กฐ๊ฑด์ ๋ค์์ ์ค์ ๋ณ์๋ฅผ ๋ชจ๋ ๋ง์กฑํด์ผ AQE๊ฐ Skew Partition์ผ๋ก ํ๋จํ๋ค.
- spark.sql.adaptive.skewJoin.skewedPartitionFactor : ์ค๊ฐ ํํฐ์ ํฌ๊ธฐ๋ณด๋ค ์ด ๊ฐ์ ๋ฐฐ์๋งํผ ํ๋ค. (5)
- spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes : ์ด ๊ฐ๋ณด๋ค ํฐ ํฌ๊ธฐ์ด๋ค. (256(MB))
๊ทธ๋ฆฌ๊ณ ์ด ๊ธฐ๋ฅ์ ์ฌ์ฉํ๋ ค๋ฉด spark.sql.adaptive.skewJoin.enabled๋ฅผ True๋ก ์ฃผ์ด์ผ ํ๋ค.
Skew Partition์ผ๋ก ํ๋จ๋ Partition.A0๋ ์ฌ๋ฌ ๊ฐ์ ํํฐ์ ์ผ๋ก ๋๋์ด์ง๋ค.
๊ทธ๋ฆฌ๊ณ ์๋ ์กฐ์ธ๋๋ ค๋ PART.B0์ด ์ฌ๋ฌ ๊ฐ์ ์ค๋ณต ํํฐ์ ์ผ๋ก ๋ณต์ ๋์ด ๊ฐ๊ฐ ์กฐ์ธํ๊ฒ ๋๋ค.