JUST DO IT!

Spark Partitioning๊ณผ AQE ์•Œ์•„๋ณด๊ณ  ์‹ค์Šตํ•˜๊ธฐ - TIL230725(2) ๋ณธ๋ฌธ

TIL

Spark Partitioning๊ณผ AQE ์•Œ์•„๋ณด๊ณ  ์‹ค์Šตํ•˜๊ธฐ - TIL230725(2)

sunhokimDev 2023. 7. 25. 23:09

๐Ÿ“š KDT WEEK 17 DAY 2 TIL(2)

  • Dynamic Partition Pruning
  • Spark Partition ์ˆ˜ ์กฐ์ ˆํ•˜๊ธฐ - Repartition & Coalesce
  • DataFrame Hint
  • AQE

๐ŸŸฅ Dynamic Partition Pruning

๋น„ Partition ํ…Œ์ด๋ธ”์— ์ ์šฉ๋œ ํ•„ํ„ฐ๋ง์„ Partition ํ…Œ์ด๋ธ”์— ์ ์šฉํ•ด๋ณด๋Š” ๊ฒƒ

์ผ๋ฐ˜์ ์œผ๋กœ ํฐ ๋ฐ์ดํ„ฐ์…‹์— ์กฐ๊ฑด๋ฌธ(WHERE)์„ ์ ์šฉํ•จ์œผ๋กœ์จ ํฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋จผ์ € ์ž‘๊ฒŒ ํ•„ํ„ฐ๋งํ•˜๋Š” ๊ฒƒ์ด ํšจ์œจ์ ์ด๋‹ค.

 

์•„๋ž˜ ์ด๋ฏธ์ง€๋Š” orders ํ…Œ์ด๋ธ”์ด ํŒŒํ‹ฐ์…˜์ด ์žˆ๋Š” ํฐ ํ…Œ์ด๋ธ”์ด๊ณ , date๊ฐ€ ํŒŒํ‹ฐ์…˜์ด ์—†๋Š” ์ž‘์€ ํ…Œ์ด๋ธ”์ผ ๋•Œ,

์‚ฌ์šฉ์ž๊ฐ€ ์‹ค์ˆ˜๋กœ date ํ…Œ์ด๋ธ”์— ์กฐ๊ฑด๋ฌธ์„ ๊ฑธ์—ˆ์„ ๋•Œ ์ž๋™์œผ๋กœ ์ˆ˜์ •๋˜๋Š” ๊ณผ์ •์ด๋‹ค.

 

Dynamic Partition Pruning

 

๊ธฐ๋ณธ ์„ค์ •์€ True๋กœ ๋˜์–ด์žˆ์ง€๋งŒ, ๋‹ค์Œ์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•˜๋ฉด False๋กœ ๋ฐ”๊ฟ€ ์ˆ˜๋„ ์žˆ๋‹ค.

spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false") # OFF

 


 

๐ŸŸฆ Spark Partiton ์ˆ˜ ์กฐ์ ˆ - Repartition๊ณผ Coalesce

 

1. Repartition

  • ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ฆ๊ฐ€์‹œ์ผœ ๋ณ‘๋ ฌ์„ฑ์„ ์ฆ๋Œ€
  • ๊ต‰์žฅํžˆ ํฐ ํŒŒํ‹ฐ์…˜์ด๋‚˜ Skew ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ ์กฐ์ ˆ
  • ํ•˜์ง€๋งŒ ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์•„์ง€๋ฉด ํƒœ์Šคํฌ ์Šค์ผ€์ค„๋ง์˜ ์˜ค๋ฒ„ํ—ค๋“œ ๋“ฑ์˜ ์ด์Šˆ ๋ฐœ์ƒ
  • Shuffling์ด ๋ฐœ์ƒํ•˜๋ฏ€๋กœ ๋ถˆํ•„์š”ํ•˜๊ฒŒ ์‚ฌ์šฉ๋  ๊ฒฝ์šฐ ์˜คํžˆ๋ ค ์‹œ๊ฐ„๊ณผ ๋น„์šฉ์ด ์ฆ๊ฐ€

 

Repartition ์‚ฌ์šฉ ๋ฐฉ๋ฒ•(Hash ๊ธฐ๋ฐ˜)

 

  • repartition(5)
  • rapartition(5, "city") # city ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ 5๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ repartition, ์ปฌ๋Ÿผ์€ ์—ฌ๋Ÿฌ ๊ฐœ๋„ ์ž…๋ ฅ ๊ฐ€๋Šฅ
  • repartition("city") # spark.sql.shuffle.partitions ๊ฐ’์— ๋”ฐ๋ผ repartition, AQE๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด ๋‹ค๋ฅผ ์ˆ˜ ์žˆ์Œ
  • repartitionByRange(numPartitions, *cols) # ์ง€์ •๋œ ์ปฌ๋Ÿผ ๊ฐ’์˜ ๋ฒ”์œ„๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋‚˜๋ˆ”, ์•Œ์•„์„œ ์ƒ˜ํ”Œ๋งํ•ด์„œ ๋‚˜๋ˆˆ๋‹ค.

 

2. Coalesce

  • ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋ฅผ ์ค„์ด๋Š” ์šฉ๋„๋กœ, ๋กœ์ปฌ ํŒŒํ‹ฐ์…˜๋“ค์„ Merge
  • Shuffling์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š์ง€๋งŒ, ํŒŒํ‹ฐ์…˜์„ ํ•ฉ์น˜๋Š” ๊ณผ์ •์—์„œ Skew ํŒŒํ‹ฐ์…˜์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Œ

 

> Repartition ์‹ค์Šต

# order ํ…Œ์ด๋ธ”์˜ ํŒŒํ‹ฐ์…˜๋ณ„ ๋ช‡ ๊ฐœ์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด๋ณด๊ธฐ
from pyspark.sql.functions import spark_partition_id
spark.table("order").groupBy(spark_partition_id()).count().show()

๊ท ๋“ฑํžˆ ์ €์žฅ๋˜์ง€ ๋ชปํ•œ ๋ชจ์Šต

 

# ๋ผ์šด๋“œ๋กœ๋นˆํ˜•ํƒœ๋กœ ๊ท ๋“ฑํ•˜๊ฒŒ 10๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ฆฌํŒŒํ‹ฐ์…˜๋จ
order_10 = spark.table("order").repartition(10).cache()

order_10.groupBy(spark_partition_id()).count().show()

 

์ด๋ฒˆ์—๋Š” ๊ท ๋“ฑํ•˜๊ฒŒ ์ €์žฅ๋˜์—ˆ๋‹ค.

 

# "sku" ์ปฌ๋Ÿผ์œผ๋กœ ๋ฆฌํŒŒํ‹ฐ์…˜
sku_df = spark.table("order").repartition("sku")

# AQE๊ฐ€ ํ™œ์„ฑํ™”๋˜์–ด ์žˆ์ง€ ์•Š์œผ๋ฉด spark.sql.shuffle.partitions ๊ฐ’์— ๋”ฐ๋ผ ๋ฆฌํŒŒํ‹ฐ์…˜
# ํ•˜์ง€๋งŒ ๊ธฐ๋ณธ์ ์œผ๋กœ ํ™œ์„ฑํ™”๋˜์–ด์žˆ๊ธฐ ๋•Œ๋ฌธ์— ์•Œ์•„์„œ ์ตœ์ ํ™”
sku_df.groupBy(spark_partition_id()).count().show()

 


 

๐ŸŸง DataFrame ๊ด€๋ จ ํžŒํŠธ

Spark SQL Optimizer์—๊ฒŒ Execution plan์„ ๋งŒ๋“ฌ์— ์žˆ์–ด ํŠน์ •ํ•œ ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•˜๋„๋ก ์ œ์•ˆํ•˜๋Š” ๊ฒƒ

 

1. Partitioning ๊ด€๋ จ ํžŒํŠธ

์–ด๋–ค ๋ฐฉ์‹์œผ๋กœ Partitioning์„ ํ•˜๊ณ ์‹ถ์€์ง€ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๋‹ค.

 

ex) 

COALESCE, REPARTITION, REPARTITON_BY_RANGE๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ณ , ๋’ค์˜ ์ธ์ž๋กœ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜์™€ ์ปฌ๋Ÿผ์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค.

REBALANCE๋ผ๋Š” ์˜ต์…˜๋„ ์žˆ๋Š”๋ฐ, ํŒŒ์ผ์˜ ํฌ๊ธฐ๋ฅผ ์ตœ๋Œ€ํ•œ ๋น„์Šทํ•˜๊ฒŒ ๋งŒ๋“ค์–ด์„œ ์ €์žฅํ•ด์ฃผ๋Š” ์˜ต์…˜์ด๋‹ค.(AQE ํ•„์š”)

 

2. Join ๊ด€๋ จ ํžŒํŠธ

Join ๋ฐฉ์‹์— ๊ด€๋ จ๋œ ํžŒํŠธ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

 

์˜ต์…˜์œผ๋กœ BROADCAST, MAPJOIN, MERGE(๊ธฐ๋ณธ), SHUFFLE_HASH(Full Outer Join์€ ๋ถˆ๊ฐ€), SHUFFLE_REPLICATE_NL ๋“ฑ์ด ์žˆ๋‹ค.

 

ํžŒํŠธ ์‚ฌ์šฉ ๋ฐฉ์‹

 

1. DataFrame API

์ผ๋ฐ˜์ ์ธ DataFrame Operation ๋’ค๋กœ .hint๋ฅผ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ Optimizer์—๊ฒŒ ํžŒํŠธ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

df1.join(df2, "id", "inner").hint("COALESCE", 3)
df1.join(df2.hint("broadcast"), "id", "inner").hint("COALESCE", 3) # JOIN ํžŒํŠธ๋ฅผ ์ถ”๊ฐ€ํ•œ ๊ฒฝ์šฐ

 

2. Spark SQL

/*+ hint [, ...] */ ๋ฅผ ์ฟผ๋ฆฌ๋ฌธ์•ˆ์— ์‚ฝ์ž…ํ•˜์—ฌ ์‚ฌ์šฉํ•œ๋‹ค.

SELECT /*+ REPARTITION(3) */ * FROM TABLE
SELECT /*+ BROADCAST(table1) */ * FROM table1 JOIN table2 ON table1.key = table2.key

 


 

๐ŸŸฉ AQE (Adaptive Query Execution)

Spark 3.2๋ถ€ํ„ฐ ๊ธฐ๋ณธ์œผ๋กœ ์ ์šฉ๋˜๋Š” ์ตœ์ ํ™” ํ…Œํฌ๋‹‰

 

Partition์˜ ์ˆ˜๊ฐ€ ์ ์œผ๋ฉด ๋ณ‘๋ ฌ์„ฑ์ด ๋–จ์–ด์ง€๊ณ  OOM๊ณผ disk spill์˜ ๊ฐ€๋Šฅ์„ฑ์„ ๋†’์ด๊ณ ,

Partition์˜ ์ˆ˜๊ฐ€ ๋งŽ์œผ๋ฉด task scheduler์™€ task ์ƒ์„ฑ ๊ด€๋ จ๋œ ์˜ค๋ฒ„ํ—ค๋“œ๊ฐ€ ์ƒ๊ธฐ๋ฉฐ ๋„คํŠธ์›Œํฌ ๋ณ‘๋ชฉ์„ ์ดˆ๋ž˜ํ•œ๋‹ค.

 

Spark Engine Optimizer๊ฐ€ ์•Œ์•„์„œ Partition์˜ ์ˆ˜๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค๋ฉด? >> AQE

 

AQE๋Š” ๊ธฐ์กด์˜ parsing time ์ตœ์ ํ™”๋ฟ๋งŒ ์•„๋‹ˆ๋ผ runtime ์ตœ์ ํ™”๊นŒ์ง€ ๋ณ‘ํ–‰๋˜์–ด ๋” ํšจ์œจ์ ์ธ ์ตœ์ ํ™”๋ฅผ ์ถ”๊ตฌํ•œ๋‹ค.

 

"Dynamic query optimization that happens in the middle of query execution based on runtime statistics"

 

๋‹ค์‹œ๋งํ•ด AQE๋Š” ๋™์ ์œผ๋กœ ์ฟผ๋ฆฌ ํ”Œ๋žœ์„ ๋ฐ”๊พธ๋Š”๋ฐ, ๊ทธ ์‹œ์ ์ด Shuffling์„ ํ•˜๊ธฐ ์ „ํ›„(Staging)์ด๋‹ค.

Stage DAG๋ฅผ ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰ํ•ด๋ณด๊ณ , ๋งค๋ฒˆ ์ƒˆ๋กœ์šด ์ตœ์ ํ™” ๊ธฐํšŒ๊ฐ€ ์žˆ๋Š”์ง€ ์กฐ์‚ฌํ•œ๋‹ค.

 

AQE๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ๋“ค

  • Shufflingํ›„ Partition์˜ ์ˆ˜๋ฅผ ๋™์ ์œผ๋กœ ์กฐ์ •ํ•˜๊ธฐ
  • Join Plan ๋™์ ์œผ๋กœ ๋ณ€๊ฒฝํ•˜๊ธฐ(runtime ํ†ต๊ณ„๋ฅผ ๊ฐ€์ ธ์˜ค๋ฏ€๋กœ ๊ฐ€๋Šฅ)
  • Skew Join ์ตœ์ ํ™”

 

โš™ AQE ๋™์ž‘

 

1. Dynamically coalescing shuffle partitions

 

๊ธฐ๋ณธ์ ์œผ๋กœ Job์„ ์‹คํ–‰ํ•˜๋ฉด์„œ ๋‚ด๋ถ€์ ์œผ๋กœ ๋งŽ์€ ์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์„ ์ผ๋ถ€๋Ÿฌ ์ƒ์„ฑํ•˜๊ณ , ๋งค Stage๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ๋งˆ๋‹ค ํ•„์š”ํ•˜๋‹ค๋ฉด ์ž๋™์œผ๋กœ Coalesce๋ฅผ ์ˆ˜ํ–‰ํ•˜๋ฉด์„œ ์ ๋‹นํ•œ ์ˆ˜์˜ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ๋งž์ถฐ๊ฐ€๋Š” ๋ฐฉ์‹์ด๋‹ค.

 

์ฒซ ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋Š” spark.sql.adaptive.coalescePartitions.initialPartitionNum ๊ฐ’์„ ๋”ฐ๋ฅด์ง€๋งŒ, ๊ธฐ๋ณธ๊ฐ’์€ ์—†์–ด์„œ

๊ธฐ๋ณธ์ ์œผ๋กœ  spark.sql.shuffle.partitions ์˜ ๊ฐ’์„ ๋”ฐ๋ฅธ๋‹ค.

 

์•„๋ž˜ ๊ทธ๋ฆผ์€ Shuffleํ›„์— ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๊ฐ€ ๋งŽ๋‹ค๊ณ  ํŒ๋‹จํ•ด์„œ COALESCE ๋™์ž‘์„ ์ถ”๊ฐ€ํ•˜๋Š” ๋ชจ์Šต์ด๋‹ค.

 

Dynamically coalescing shuffle partitions

 

์ด๋Š” ์•„๋ž˜์˜ ์„ค์ • ๋ณ€์ˆ˜์— ๋”ฐ๋ผ ์กฐ์ ˆ๋œ๋‹ค. ๊ด„ํ˜ธ ๊ฐ’์€ ๊ธฐ๋ณธ๊ฐ’์ด๋‹ค.

 

  • spark.sql.adaptive.coalescePartitions.enabled : ์…”ํ”Œ ํ›„ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ๋™์ ์œผ๋กœ ์ค„์ธ๋‹ค. (True)
  • spark.sql.adaptive.coalescePartitions.parallelismFirst : ๋ณ‘๋ ฌ์„ฑ์„ ๋ณด์žฅํ•  ๊ฒƒ์ธ์ง€ ์ •ํ•œ๋‹ค. ๋‘ ๊ฒฝ์šฐ๋กœ ๋‚˜๋‰œ๋‹ค. (True)
    • True์ธ ๊ฒฝ์šฐ > spark.sql.adaptive.coalescePartitions.minPartitionSize ๊ฐ’์— ๋”ฐ๋ผ Coalescing ๊ฒฐ์ • (1MB)
      • .minPartitionSize๋Š” Coalescing ํ›„ ํŒŒํ‹ฐ์…˜์˜ ์ตœ์†Œ ํฌ๊ธฐ๋กœ์จ, ์ด ๊ฐ’๋ณด๋‹ค ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ๊ฐ€ ์ž‘์œผ๋ฉด Coalesce
    • False์ธ ๊ฒฝ์šฐ(๊ณต์‹๋ฌธ์„œ ์ถ”์ฒœ) > spark.sql.adaptive.advisoryPartitionSizeInBytes ๊ฐ’์œผ๋กœ ๋งž์ถฐ์„œ ํŒŒํ‹ฐ์…”๋‹ (64MB)
      • .advisoryPartitionSizeInBytes๋Š” ์…”ํ”Œ๋ง ํ›„ ํŒŒํ‹ฐ์…˜ ์ˆ˜๋ฅผ ์ค„์ผ ๋•Œ ๋ชฉํ‘œ๋กœ ํ•˜๋Š” ํŒŒํ‹ฐ์…˜์˜ ํฌ๊ธฐ์ด๋‹ค.

 

2. Dynamically switching join strategies

 

runtime ํ†ต๊ณ„๋ฅผ ํ†ตํ•ด Join ํ”Œ๋žœ์„ ๋™์ ์œผ๋กœ ๋ฐ”๊ฟ” ๋‹ค์‹œ ์‹คํ–‰ํ•˜๋Š” ๋ฐฉ๋ฒ•

Static Query Plan(์ฒ˜์Œ ๊ธฐ๋ณธ ํ”Œ๋žœ)์ด ์—ฌ๋Ÿฌ ์ด์œ ๋กœ BHJ(Broadcast Hash Join) ๊ธฐํšŒ๋ฅผ ๋†“์นœ ๊ฒฝ์šฐ์— ํšจ์œจ์ ์ธ ๋ฐฉ๋ฒ•์ด๋‹ค.

Shuffling ํ›„ AQEShuffleRead๋ฅผ ์ˆ˜ํ–‰ํ•˜์—ฌ ์•Œ๋งž๋Š” Join ๋ฐฉ์‹์œผ๋กœ ๋ณ€๊ฒฝํ•œ๋‹ค. Join ๋ฐฉ์‹์— ๋งž๋Š” ์…”ํ”Œ๋ง์„ ๋‹ค์‹œ ์‹คํ–‰ํ•˜๊ฒŒ๋œ๋‹ค.

 

  • spark.sql.join.preferSortMergeJoin : Join์‹œ Sort Merge Join ๊ธฐ๋ณธ ์‚ฌ์šฉ ์—ฌ๋ถ€ (True)
  • spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold : Hash Join์‹œ ํŒŒํ‹ฐ์…˜ ๋ณ„ ํ•ด์‹œ๋งต ์ตœ๋Œ€ ํฌ๊ธฐ (0)
  • spark.sql.adaptive.autoBroadcastJoinThreshold : Broadcast ๊ฐ€๋Šฅํ•œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ตœ๋Œ€ ํฌ๊ธฐ (์—†์Œ), -1์ด๋ฉด ์‚ฌ์šฉx
    • spark.sql.autoBroadcastJoinThreshold์™€ ๊ธฐ๋ณธ๊ฐ’์ด ๋™์ผํ•˜๊ณ , AQE ํ™œ์„ฑํ™” ํ•„์š”
    • spark.sql.adaptive.autoBroadcastJoinThreshold ์™€ spark.sql.autoBroadcastJoinThreshold ์˜ ์ฐจ์ด๋Š” ๊ฐ๊ฐ AQE์—์„œ BroadCast Join ํŒ๋‹จ์— ์‚ฌ์šฉํ•˜๊ณ , Shuffling ์ „์— ์ฒ˜์Œ๋ถ€ํ„ฐ BroadCastJoin์ด ํ•„์š”ํ•œ์ง€ ํ™•์ธํ•˜๋Š” ๋ณ€์ˆ˜์ด๋‹ค.

 

3. Dynamically optimizing skew joins

 

Skew ํŒŒํ‹ฐ์…˜์œผ๋กœ ์ธํ•œ ์„ฑ๋Šฅ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด AQE๊ฐ€ ์ตœ์ ํ™”ํ•˜๋Š” ๋ฐฉ๋ฒ•

Skew ํŒŒํ‹ฐ์…˜์˜ ์กด์žฌ ์—ฌ๋ถ€ ํŒŒ์•… ํ›„, Skew ํŒŒํ‹ฐ์…˜์„ ์ž‘๊ฒŒ ๋‚˜๋ˆ„๊ณ  ์ƒ๋Œ€ ์กฐ์ธ ํŒŒํ‹ฐ์…˜์„ ์ค‘๋ณต ์ƒ์„ฑํ•˜์—ฌ ์กฐ์ธ์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

 

Skew Partition์ธ์ง€ ํŒ๋‹จํ•˜๋Š” ์กฐ๊ฑด์€ ๋‹ค์Œ์˜ ์„ค์ • ๋ณ€์ˆ˜๋ฅผ ๋ชจ๋‘ ๋งŒ์กฑํ•ด์•ผ AQE๊ฐ€ Skew Partition์œผ๋กœ ํŒ๋‹จํ•œ๋‹ค.

  • spark.sql.adaptive.skewJoin.skewedPartitionFactor : ์ค‘๊ฐ„ ํŒŒํ‹ฐ์…˜ ํฌ๊ธฐ๋ณด๋‹ค ์ด ๊ฐ’์˜ ๋ฐฐ์ˆ˜๋งŒํผ ํ•œ๋‹ค. (5)
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes  : ์ด ๊ฐ’๋ณด๋‹ค ํฐ ํฌ๊ธฐ์ด๋‹ค. (256(MB))

 

๊ทธ๋ฆฌ๊ณ  ์ด ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด spark.sql.adaptive.skewJoin.enabled๋ฅผ True๋กœ ์ฃผ์–ด์•ผ ํ•œ๋‹ค.

 

PART.A0๋ฅผ Skew Partition์œผ๋กœ ํŒ๋‹จํ•˜๋ฉด,

 

์—ฌ๋Ÿฌ ๊ฐœ์˜ Partition์œผ๋กœ ๋‚˜๋ˆˆ๋‹ค.

 

Skew Partition์œผ๋กœ ํŒ๋‹จ๋œ Partition.A0๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋‚˜๋ˆ„์–ด์ง„๋‹ค.

๊ทธ๋ฆฌ๊ณ  ์›๋ž˜ ์กฐ์ธ๋˜๋ ค๋˜ PART.B0์ด ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ค‘๋ณต ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ณต์ œ๋˜์–ด ๊ฐ๊ฐ ์กฐ์ธํ•˜๊ฒŒ ๋œ๋‹ค.