JUST DO IT!

Spark ๊ธฐ๋Šฅ๊ณผ ์Šค์ผ€์ค„๋ง, ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ ์•Œ์•„๋ณด๊ธฐ - TIL230724 ๋ณธ๋ฌธ

TIL

Spark ๊ธฐ๋Šฅ๊ณผ ์Šค์ผ€์ค„๋ง, ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ ์•Œ์•„๋ณด๊ธฐ - TIL230724

sunhokimDev 2023. 7. 25. 01:56

๐Ÿ“š KDT WEEK 17 DAY 1 TIL

  • Spark ๊ธฐํƒ€ ๊ธฐ๋Šฅ
  • Spark ๋ฆฌ์†Œ์Šค ํ• ๋‹น ๋ฐฉ์‹(๋ฆฌ์†Œ์Šค ์Šค์ผ€์ค„๋ง)
  • Spark Executor ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ

 

 


๐ŸŸฅ Spark ๊ธฐํƒ€ ๊ธฐ๋Šฅ

 

1. Broadcast Variable

  • ๋ฃฉ์—… ํ…Œ์ด๋ธ”๋“ฑ์„ ๋ธŒ๋กœ๋“œ์บ์ŠคํŒ…ํ•˜์—ฌ ์…”ํ”Œ๋ง์„ ๋ง‰๋Š” ๋ฐฉ์‹ (๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ์กฐ์ธ๊ณผ ๋น„์Šท)
  • ๋ชจ๋“  ๋…ธ๋“œ์— ํฐ ๊ทœ๋ชจ์˜ Input Dataset์„ ํšจ๊ณผ์ ์ธ ๋ฐฉ๋ฒ•์œผ๋กœ ์ค„ ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.
  • ๋ฃฉ์—… or ๋””๋ฉ˜์…˜ ํ…Œ์ด๋ธ”(10-20MB์˜ ์ž‘์€ ํ…Œ์ด๋ธ”)์„ Executor๋กœ ์ „์†กํ•˜๋Š”๋ฐ ์‚ฌ์šฉ
  • spark.sparkContext.broadcast๋กœ ํ˜ธ์ถœ

 

๋ฃฉ์—… ํ…Œ์ด๋ธ”(ํŒŒ์ผ)์„ UDF๋กœ ๋ณด๋‚ด๋Š” ๋ฐฉ๋ฒ•

  1. Closure : UDF์•ˆ์—์„œ ํŒŒ์ด์ฌ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ > Task ๋‹จ์œ„์˜ Serialization
  2. BroadCast : UDF์•ˆ์—์„œ ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋œ ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ > Worker Node๋‹จ์œ„์˜ Serialization

์ด Closure์™€ BroadCast ๋ฐฉ๋ฒ•์€ ๋ฐ‘์˜ ์˜ˆ์ œ ์ฝ”๋“œ๋ฅผ ์ฐธ๊ณ ํ•˜๋ฉด ์ดํ•ดํ•˜๊ธฐ ์‰ฝ๋‹ค.

 

BroadCast ๋ฐ์ดํ„ฐ์…‹์˜ ํŠน์ง•

  • Worker node๋กœ ๊ณต์œ ๋˜๋Š” ๋ณ€๊ฒฝ ๋ถˆ๊ฐ€ ๋ฐ์ดํ„ฐ์ด๋‹ค.
  • Worker node๋ณ„๋กœ ํ•œ๋ฒˆ ๊ณต์œ ๋˜๊ณ  ์บ์‹ฑ๋œ๋‹ค.
  • Task Memory์•ˆ์— ๋“ค์–ด๊ฐˆ ์ˆ˜ ์žˆ๋Š” ํฌ๊ธฐ์ด์–ด์•ผ ํ•œ๋‹ค.

ex) ์˜ˆ์ œ์ฝ”๋“œ

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# UDF
def my_func(code: str) -> str:
    # return prdCode.get(code) # Closure, Task๋‹จ์œ„์˜ Serialization์œผ๋กœ ๋น„ํšจ์œจ์ 
    return bdData.value.get(code) # Broadcast, WorkerNode๋‹จ์œ„์˜ Serialization์œผ๋กœ ํšจ์œจ์ 


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Demo") \
        .master("local[3]") \
        .getOrCreate()

# ์ž‘์€ ๋ฐ์ดํ„ฐ(lookup.csv)์„ ๋ถˆ๋Ÿฌ์™€ prdCode์— ๋”•์…”๋„ˆ๋ฆฌํ˜•ํƒœ๋กœ ์ €์žฅ
    prdCode = spark.read.csv("data/lookup.csv").rdd.collectAsMap()

# ๋”•์…”๋„ˆ๋ฆฌ ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ ๋ณ€์ˆ˜๋กœ bdData๋กœ ์ €์žฅ
# ์ด ์ €์žฅ๋œ bdData๋ฅผ ์œ„์—์„œ ์ •์˜ํ•œ UDF์—์„œ ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค
    bdData = spark.sparkContext.broadcast(prdCode)

    data_list = [("98312", "2021-01-01", "1200", "01"),
                 ("01056", "2021-01-02", "2345", "01"),
                 ("98312", "2021-02-03", "1200", "02"),
                 ("01056", "2021-02-04", "2345", "02"),
                 ("02845", "2021-02-05", "9812", "02")]
    df = spark.createDataFrame(data_list) \
        .toDF("code", "order_date", "price", "qty")

    spark.udf.register("my_udf", my_func, StringType())
   
# Product๋ผ๋Š” ์ปฌ๋Ÿผ์„ ์ƒ์„ฑํ•˜์—ฌ UDF์— ๋”ฐ๋ผ ๊ฐ’ ๋„ฃ๊ธฐ
    df.withColumn("Product", expr("my_udf(code)")) \
        .show()

 


 

2. Accumulators

 

์ผ์ข…์˜ ์ „์—ญ ๋ณ€์ˆ˜๋กœ, ํŠน์ • ์ด๋ฒคํŠธ์˜ ์ˆ˜๋ฅผ ๊ธฐ๋กํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋จ ํ•˜๋‘ก์˜ ์นด์šดํ„ฐ์™€ ํก์‚ฌํ•˜๋‹ค.

๋ณดํ†ต ๋น„์ •์ƒ์ ์ธ ๊ฐ’์„ ๋ ˆ์ฝ”๋“œ์˜ ์ˆ˜๋ฅผ ์„ธ๋Š”๋ฐ ์‚ฌ์šฉํ•œ๋‹ค.

 

ํŠน์ง•

  • ์Šค์นผ๋ผ๋กœ ๋งŒ๋“ค๋ฉด ์ด๋ฆ„์„ ์ค„ ์ˆ˜ ์žˆ์ง€๋งŒ, ๊ทธ ์ด์™ธ์—๋Š” ๋ถˆ๊ฐ€๋Šฅํ•˜๊ณ  ์ด๋ฆ„์žˆ์–ด์•ผ Spark Web UI์— ๋‚˜ํƒ€๋‚œ๋‹ค.
  • DataFrame/RDD Foreach ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„ํ•˜๋Š” ๊ฒƒ์ด ์ •ํ™•ํ•˜๋‹ค. (์˜ค๋ฅ˜ ๊ฐ€๋Šฅ์„ฑ ์ค„์ด๊ธฐ)

 

ex) ์˜ˆ์ œ์ฝ”๋“œ

# ๋‹ค์Œ ๋‹จ์ˆœ ๋ฐฐ์—ด์„ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ํ™”
data = [1, 2, 3, 4, 5] 
df_test = spark.createDataFrame(data, "int").toDF("value")

# Accumulator
accumulator = spark.sparkContext.accumulator(0)

# .foreach๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ ˆ์ฝ”๋“œ๋ณ„๋กœ ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ€ value๋ฅผ accumulator์— ๋”ํ•˜๊ฒŒ๋œ๋‹ค.
def add_to_accumulator(row):
    global accumulator
    accumulator += row["value"]

df_test.foreach(add_to_accumulator)

print("Accumulator value: ", accumulator.value)

 

Accumulator ์‹คํ–‰๊ฒฐ๊ณผ

 


 

3. Speculative Execution

๋Š๋ฆฐ ํƒœ์Šคํฌ๋ฅผ ๋‹ค๋ฅธ Worker node์— ์žˆ๋Š” Executor์—์„œ ์ค‘๋ณต ์‹คํ–‰

Worker node์˜ ํ•˜๋“œ์›จ์–ด ์ด์Šˆ๋กœ ๋Š๋ ค์ง„ Task๋ผ๋ฉด, ๋น ๋ฅธ ์‹คํ–‰์„ ๋ณด์žฅํ•˜๋‚˜ Data Skew๋กœ ์ธํ•œ ๋Š๋ฆผ์ด๋ผ๋ฉด ๋ฆฌ์†Œ์Šค ๋‚ญ๋น„

spark.speculation์œผ๋กœ ์ปจํŠธ๋กคํ•˜๋ฉฐ, ๊ธฐ๋ณธ์€ False๋กœ ๋น„ํ™œ์„ฑํ™”๋˜์–ด ์žˆ๋‹ค.

 

Speculative Execution ํ™˜๊ฒฝ๋ณ€์ˆ˜ (๊ธฐ๋ณธ๊ฐ’)

  • spark.speculation.interval : ๋Š๋ฆฐ ํƒœ์Šคํฌ ์ฒดํฌ ์ฃผ๊ธฐ (100ms)
  • spark.speculation.multiplier : ์ผ๋ฐ˜ ํƒœ์Šคํฌ๋“ค์˜ ์‹คํ–‰ ์‹œ๊ฐ„์— ์ด ๋ณ€์ˆ˜๊ฐ’์„ ๊ณฑํ•ด ๋Š๋ฆฐ ํƒœ์Šคํฌ ๊ฒฐ์ • (1.5)
  • spark.speculation.quantile : ์ „์ฒด ํƒœ์Šคํฌ์˜ ์™„๋ฃŒ์œจ์ด ์ด ๋ณ€์ˆ˜๊ฐ’์„ ๋„˜์œผ๋ฉด ๋Š๋ฆฐ ํƒœ์Šคํฌ ์ฒดํฌ (0.75)
  • spark.speculation.minTaskRuntime : ๋Š๋ฆฐ ํƒœ์Šคํฌ๋กœ ํŠน์ •ํ•˜๋Š” ์ตœ์†Œ ์‹คํ–‰ ์‹œ๊ฐ„ (100ms)

 


 

๐ŸŸฆ Spark ๋ฆฌ์†Œ์Šค ํ• ๋‹น ๋ฐฉ์‹ (๋ฆฌ์†Œ์Šค ์Šค์ผ€์ค„๋ง)

 

Spark Application๋“ค๊ฐ„์˜ ๋ฆฌ์†Œ์Šค ํ• ๋‹น

  • ๊ธฐ๋ฐ˜์ด ๋˜๋Š” ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €๊ฐ€ ๊ฒฐ์ •ํ•˜๊ฒŒ ๋œ๋‹ค.
  • YARN์˜ ๊ฒฝ์šฐ FIFO, FAIR(๊ท ๋“ฑํžˆ), CAPACITY(์šฐ์„ ์ˆœ์œ„์— ๋”ฐ๋ผ) ์ง€์›
  • Static Allocation (๊ธฐ๋ณธ)
    • ํ•œ๋ฒˆ ๋ฆฌ์†Œ์Šค๋ฅผ ํ• ๋‹น๋ฐ›์œผ๋ฉด, ํ•ด๋‹น ๋ฆฌ์†Œ์Šค๋ฅผ ๋๊นŒ์ง€ ๋“ค๊ณ ๊ฐ€๋Š” ๊ฒƒ์ด ๊ธฐ๋ณธ์ด๋‹ค. > ๋น„ํšจ์œจ์ 
  • Dynamic Allocation
    • ์ƒํ™ฉ์—๋”ฐ๋ผ executor๋ฅผ ๋ฆด๋ฆฌ์Šคํ•˜๊ธฐ๋„ํ•˜๊ณ , ์š”๊ตฌํ•˜๊ธฐ๋„ ํ•œ๋‹ค.
    • ๋‹ค์ˆ˜์˜ Spark Application๋“ค์ด ํ•˜๋‚˜์˜ ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €๋ฅผ ๊ณต์œ ํ•œ๋‹ค๋ฉด ํ™œ์„ฑํ™”ํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.
    • ex) spark-submit --num-executors 100 --executor-cores 4 --executor-memory 32G
    • spark.dynamicAllocation.enabled = true, spark.dynamicAllocation.shuffleTracking.enabled = true ํ•„์š”

 

Spark Application์•ˆ์—์„œ Job๋“ค๊ฐ„์˜ ๋ฆฌ์†Œ์Šค ํ• ๋‹น

>> Spark Scheduler : ํ•˜๋‚˜์˜ Application์•ˆ์—์„œ ์žก๋“ค์— ๋ฆฌ์†Œ์Šค๋ฅผ ๋‚˜๋ˆ ์ฃผ๋Š” ์ •์ฑ…

  1. FIFO : ๋ฆฌ์†Œ์Šค๋ฅผ ์ฒ˜์Œ ์š”์ฒญํ•œ Job์—๊ฒŒ ์šฐ์„ ์ˆœ์œ„ (๊ธฐ๋ณธ๋ฐฉ์‹)
  2. FAIR : ๋ผ์šด๋“œ๋กœ๋นˆ ๋ฐฉ์‹์œผ๋กœ ๋ชจ๋“  Job์—๊ฒŒ ๊ณ ๋ฅด๊ฒŒ ๋ฆฌ์†Œ์Šค๋ฅผ ๋ถ„๋ฐฐ
    • Pool์ด๋ž€ ํ˜•ํƒœ๋กœ ๋ฆฌ์†Œ์Šค๋ฅผ ๋‚˜๋ˆ ์„œ ์šฐ์„ ์ˆœ์œ„๋ฅผ ๊ณ ๋ คํ•œ ํ˜•ํƒœ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.
    • Pool์•ˆ์—์„œ ๋ฆฌ์†Œ์Šค ๋ถ„๋ฐฐ๋„ FAIR ํ˜น์€ FIFO๋กœ ์ง€์ • ๊ฐ€๋Šฅํ•˜๋‹ค.

 

์™ผ์ชฝ๋ถ€ํ„ฐ FIFO ๋ฐฉ์‹๊ณผ FAIR ๋ฐฉ์‹, job2๊ฐ€ ์ž‘์•„ FAIR์ชฝ์ด ๋” ํšจ์œจ์ ์ธ ๋ชจ์Šต์ด๋‹ค.

 

ex) FAIR ์˜ˆ์‹œ

 

fair.xml

<?xml version="1.0"?>
<allocations>
    <pool name="production">
        <schedulingMode>FAIR</schedulingMode>
        <weight>1</weight>
        <minShare>2</minShare>
    </pool>
    <pool name="test">
        <schedulingMode>FIFO</schedulingMode>
        <weight>2</weight>
        <minShare>3</minShare>
    </pool>
</allocations>

 

fair.py

from pyspark.sql import SparkSession
import threading
import time

# fair.xml์—์„œ pool_name์— ํ•ด๋‹นํ•˜๋Š” ํ’€์—์„œ ํ• ๋‹น
# sparkContext.setLocalProperty๋Š” Thread์•ˆ์—์„œ ์‚ฌ์šฉ๋˜์–ด์•ผ ํ•œ๋‹ค.
def do_job(f1, f2, id, pool_name, format="json"):
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool_name)
    print(spark.sparkContext.getLocalProperty("spark.scheduler.pool"))
    if format == 'json':
    	df1 = spark.read.json(f1)
        df2 = spark.read.json(f2)
    else:
    	df1 = spark.read.csv(f1, header=True)
        df2 = spark.read.csv(f2, header=True)
        
    outputs.append(df1.join(df2, id, "Inner").count())

# FAIR Scheduler๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์•„๋ž˜์ฒ˜๋Ÿผ ์„ค์ •ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.
spark = SparkSession\
    .builder\
    .appName("Fair Scheduler Demo")\
    .config("spark.sql.autoBroadcastJoinThreshold", "50B")\
    .config("spark.scheduler.mode", "FAIR")\
    .config("spark.scheduler.allocation.file", "fair.xml")\
    .getOrCreate()
    
outputs = []

# thread๋ฅผ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌํ•˜๋Š” ์‹œ๊ฐ„ ์ธก์ •ํ•ด๋ณด๊ธฐ
start_time_fifo = time.time()
thread1 = threading.Thread(
    target=do_job,
    args=(
    	"small_data/",
        "large_data/",
        "id",
        "production"
    )
)

thread2 = threading.Thread(
    target=do_job,
    args=(
    	"user_event.csv",
        "user_metadata.csv",
        "user_id",
        "test",
        "csv"
    )
)

thread1.start()
thread2.start()

thread1.join() # ๋๋‚  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋ผ๋Š” ๋ช…๋ น์–ด
thread1.join() 

end_time_fifo = time.time()

# ์†Œ์š” ์‹œ๊ฐ„ ์ถœ๋ ฅํ•˜๊ธฐ
print(f"Time taken with FAIR Scheduler: {(end_time_fifo - start_time_fifo) * 1000:.2f} ms")

 


 

๐ŸŸจ Spark Executor ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ

 

Spark Application = ํ•˜๋‚˜์˜ Driver + ํ•˜๋‚˜ ์ด์ƒ์˜ Executor

 

Driver ์—ญํ• 

  • main ํ•จ์ˆ˜ ์‹คํ–‰ํ•˜๊ณ  SparkSession๊ณผ SparkContext๋ฅผ ์ƒ์„ฑ
  • ์‚ฌ์šฉ์ž๊ฐ€ ๋งŒ๋“  ์ฝ”๋“œ๋ฅผ ํƒœ์Šคํฌ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ DAG ์ƒ์„ฑ
  • ๋ฆฌ์†Œ์Šค ๋งค๋‹ˆ์ €์˜ ๋„์›€์„ ๋ฐ›์•„ ํƒœ์Šคํฌ๋“ค์„ ์‹คํ–‰ํ•˜๊ณ  ๊ด€๋ฆฌ
  • ์œ„ ์ •๋ณด๋“ค์„ Web UI๋กœ ๋…ธ์ถœ(4040ํฌํŠธ)

 

Executor์˜ ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ

 

Driver ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ ex) spark.driver.memory = 4GB

Executor ๋ฉ”๋ชจ๋ฆฌ ๊ตฌ์„ฑ ex) spark.executor.memory = 8GB

 

๋‹จ, Driver์™€ Executor์— ํ• ๋‹นํ•˜๋Š” ๋ฉ”๋ชจ๋ฆฌ๋Š” YARN์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ yarn.nodemanager.resource.memory-mb๋ณด๋‹ค ํด ์ˆ˜ ์—†๋‹ค.

 

Executor์— ํ• ๋‹น๋œ ๋ฉ”๋ชจ๋ฆฌ๋Š” JVM Heap Memory๋กœ์จ, 3๊ฐ€์ง€์˜ ๋ฉ”๋ชจ๋ฆฌ๋กœ ๊ตฌ๋ถ„๋œ๋‹ค.

 

  • Reserved Memory : Spark Engine ์ „์šฉ ๋ฉ”๋ชจ๋ฆฌ๋กœ์จ ๋ณ€๊ฒฝ ๋ถˆ๊ฐ€(300MB)
  • Spark Memory : ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„ ๊ด€๋ จ ์ž‘์—…๊ณผ ์บ์‹ฑ, ๋‘ ๊ฐ€์ง€์˜ Memory Pool๋กœ ๋‚˜๋‰œ๋‹ค.
    • Storage Memory Pool : Spark ๋ฉ”๋ชจ๋ฆฌ ์ค‘ ์บ์‹ฑ์— ์‚ฌ์šฉ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ
    • Executor Memory Pool : DataFrame Operation์— ํ•ด๋‹นํ•˜๋Š” ๋ฉ”๋ชจ๋ฆฌ
  • User Memory : UDF, RDD conversion operation ๋“ฑ

spark.memory.fraction = 0.6์ฒ˜๋Ÿผ ์กฐ์ •ํ•˜๋ฉด, Spark Memory์— Reserved Memory ๊ณต๊ฐ„(300MB)์„ ์ œ์™ธํ•œ 60ํผ์„ผํŠธ์˜ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ํ• ๋‹น๋˜๊ณ , ๋‚˜๋จธ์ง€๊ฐ€ User Memory์— ํ• ๋‹น๋œ๋‹ค.

๋”ฐ๋ผ์„œ User Memory ํ•„์š”์— ๋”ฐ๋ผ spark.memory.fraction ์˜ต์…˜์„ ์กฐ์ •ํ•ด์ฃผ๋ฉด ์ข‹๋‹ค.

 

Executor์˜ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ

spark.executor.cores = 4๋กœ ์ง€์ •ํ•ด์ฃผ๋ฉด, ํ•˜๋‚˜์˜ Executor์•ˆ์— 4๊ฐœ์˜ CPU ์Šฌ๋กฏ(Thread)๊ฐ€ ์ƒ๊ฒจ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.

์ด ์Šฌ๋กฏ๋“ค์€ JVM Heap Memory๋ฅผ ๊ณต์œ ํ•˜๊ฒŒ ๋œ๋‹ค.

 

Spark Memory๋Š” Storage Memory Pool๊ณผ Executor Memory Pool๋กœ ๋‚˜๋‰˜์–ด์ง„๋‹ค.

 

Executor Memory Pool Management > Unified Memory Manager

๋™์ž‘์ค‘์ธ ํƒœ์Šคํฌ ๋Œ€์ƒ์œผ๋กœ Fair Allocation์ด ๊ธฐ๋ณธ ๋™์ž‘์œผ๋กœ, ์‹คํ–‰์ค‘์ธ ํƒœ์Šคํฌ๊ฐ€ ๋ฉ”๋ชจ๋ฆฌ๋Š” ๊ฐ€์ ธ๊ฐ€๋Š” ๊ตฌ์กฐ๋‹ค.

1๊ฐœ๋งŒ ์‹คํ–‰๋œ๋‹ค๋ฉด ํ•˜๋‚˜์˜ ํƒœ์Šคํฌ๊ฐ€ ๋ชจ๋“  ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ฐ€์ ธ๊ฐ€๊ณ , ๋‘ ๊ฐœ๊ฐ€ ์‹คํ–‰๋œ๋‹ค๋ฉด ๋‘ ํƒœ์Šคํฌ๊ฐ€ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๋‚˜๋ˆ ๊ฐ–๋Š”๋‹ค.

 

Eviction๊ณผ Disk Spill (Unified Memory Manager)

Storage Memory Pool๊ณผ Executor Memory Pool์€ spark.memory.storageFraction ์˜ต์…˜์œผ๋กœ ์„ค์ •๋œ ๋น„์œจ์— ๋”ฐ๋ผ ๋‚˜๋‰œ๋‹ค.

๊ธฐ๋ณธ์€ 0.5์ด์ง€๋งŒ, ์–ด๋Š ํ•œ์ชฝ์—์„œ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋ถ€์กฑํ•ด์ง€๋ฉด ๋‹ค๋ฅธ์ชฝ์˜ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๋นŒ๋ ค์“ธ ์ˆ˜ ์žˆ๋‹ค.

 

ํ•˜์ง€๋งŒ ๋‹ค๋ฅธ์ชฝ ๋ฉ”๋ชจ๋ฆฌ๋„ ๋ถ€์กฑํ•˜๋ฉด ์–ด๋–ป๊ฒŒ ๋ ๊นŒ?

spark.memory.storageFraction์œผ๋กœ ์ง€์ •๋œ ๋น„์œจ์ด ์ง€์ผœ์ง€๋ฉด์„œ eviction์ด ๋ฐœ์ƒํ•œ๋‹ค.

eviction์€ ๋นŒ๋ ค์˜จ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๋‹ค์‹œ ๋˜๋Œ๋ ค์ฃผ๋Š” ๊ณผ์ •์„ ๋งํ•˜๊ณ , ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๋‹ค์‹œ ๋˜๋Œ๋ ค์ฃผ๋Š” ๊ณผ์ •์—์„œ ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅ๋˜์–ด ์žˆ๋˜ ๋ฐ์ดํ„ฐ๋Š” ๋””์Šคํฌ๋กœ ์˜ฎ๊ฒจ์ง€๊ฒŒ ๋œ๋‹ค. ์ด๊ฑธ Disk Spill์ด๋ผ๊ณ  ๋ถ€๋ฅธ๋‹ค.

Disk Spill๋„ ๋ถˆ๊ฐ€๋Šฅํ•ด์ง€๋ฉด OOM(Out of Memory)๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

 

์ด์™ธ์˜ ์ž‘์—…์— ์‚ฌ์šฉ๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ๋Š” ์–ด๋–ป๊ฒŒ ํ• ๋‹นํ• ๊นŒ?

 

  • spark.memory.offHeap.enabled = True์ผ ๋•Œ, spark.memory.offHeap.size๋ฅผ ํ†ตํ•ด Non-JVM ์ž‘์—…์˜ ๋ณ„๋„ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.
  • spark.executor.pyspark.memory๋ฅผ ํ†ตํ•ด ํŒŒ์ด์ฌ ํ”„๋กœ์„ธ์Šค์— ์ง€์ •๋˜๋Š” ๋ณ„๋„์˜ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ๊ธฐ๋ณธ๊ฐ’์€ 0์œผ๋กœ์จ ๋”ฐ๋กœ ์ง€์ •๋˜์ง€ ์•Š์œผ๋ฉด Non-JVM ๋ฉ”๋ชจ๋ฆฌ(์˜ค๋ฒ„ํ—ค๋“œ ๋ฉ”๋ชจ๋ฆฌ)๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
  • spark.python.worker.memory๋ฅผ ํ†ตํ•ด Py4J์— ์ง€์ •๋˜๋Š” ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์กฐ์œจํ•œ๋‹ค.(๊ธฐ๋ณธ๊ฐ’ 512(MB))

 

On Heap ๋ฉ”๋ชจ๋ฆฌ์™€ Off Heap ๋ฉ”๋ชจ๋ฆฌ

์ด์ œ๊นŒ์ง€ ์„ค๋ช…ํ•œ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ On Heap ๋ฉ”๋ชจ๋ฆฌ์— ํ•ด๋‹นํ•˜๊ณ , Spark๋Š” ์ด On Heap ๋ฉ”๋ชจ๋ฆฌ์—์„œ ๊ฐ€์žฅ ์ž˜ ๋™์ž‘ํ•˜๊ฒŒ ๋œ๋‹ค.

ํ•˜์ง€๋งŒ JVM Heap Memory๋Š” Garbage collection(๋™์  ๋ฉ”๋ชจ๋ฆฌ ์ •๋ฆฌ)์˜ ๋Œ€์ƒ์ด ๋˜๊ณ , ๊ทธ์— ๋”ฐ๋ผ ๋น„์šฉ์ด ๋ฐœ์ƒํ•œ๋‹ค.

์ด ๋น„์šฉ ์ฆ๊ฐ€๋ฅผ ๋ง‰๊ธฐ ์œ„ํ•ด ๊ฐ™์ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ JVM ๋ฐ–์— ์žˆ๋Š” ๋ฉ”๋ชจ๋ฆฌ, Off Heap ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ๋œ๋‹ค.

 

Off Heap ๋ฉ”๋ชจ๋ฆฌ์—๋Š” executor memory ์‚ฌ์ด์ฆˆ์˜ ์ผ์ • ๋น„์œจ๋กœ ์ƒ์„ฑ๋˜๋Š” Overhead Memory๊ฐ€ ์žˆ๋‹ค.

spark.executor.memory.Overhead ์˜ต์…˜์„ ํ†ตํ•ด ๋น„์œจ ์กฐ์ •์ด ๊ฐ€๋Šฅํ•˜๊ณ , ์ตœ์†Œ 384MB ์ตœ๋Œ€ 0.1(10%)์ด ํ• ๋‹น๋œ๋‹ค.

 

์œ„์—์„œ Non-JVM ์ž‘์—…์„ ์œ„ํ•œ ๋ณ„๋„ ๋ฉ”๋ชจ๋ฆฌ๋กœ offHeap.size๋ฅผ ์กฐ์ •ํ•œ๋‹ค๊ณ  ์–ธ๊ธ‰ํ–ˆ์—ˆ๋‹ค. ์ด ๋ฉ”๋ชจ๋ฆฌ๋„๋„ Off Heap์— ํ•ด๋‹นํ•œ๋‹ค.

Spark 3.x๋Š” Off Heap memory ์ž‘์—…์— ์ตœ์ ํ™”๋˜์–ด JVM ์—†์ด ์ง์ ‘ ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋ฉฐ, DataFrame์šฉ์œผ๋กœ ์‚ฌ์šฉํ•œ๋‹ค.

 

๋”ฐ๋ผ์„œ Off Heap ๋ฉ”๋ชจ๋ฆฌ์˜ ํฌ๊ธฐ๋Š” spark.executor.memoryOverhead + spark.offHeap.size์™€ ๊ฐ™๋‹ค.

 

ex)

spark.executor.memory = 8GB

spark.executor.memoryOverhead = 0.1

Executor Memory View

 


 

Spark ๋ฉ”๋ชจ๋ฆฌ ์ด์Šˆ (OOM)

 

Driver OOM ์ผ€์ด์Šค

  • ํฐ ๋ฐ์ดํ„ฐ์…‹์— collect ์‹คํ–‰
  • ํฐ ๋ฐ์ดํ„ฐ์…‹์„ Broadcast JOIN
  • ํƒœ์Šคํฌ๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์ด ๋ฐœ์ƒํ•  ๋•Œ

 

Executor OOM ์ผ€์ด์Šค

  • Data Skew
  • executor.cores๊ฐ€ ๋„ˆ๋ฌด ๋งŽ์•„ ๋งŽ์€ ์ˆ˜์˜ Thread๋“ค์ด ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ๊ณต์œ ํ•  ๋•Œ > ์‚ฌ์šฉ๊ฐ€๋Šฅํ•œ ๋ฉ”๋ชจ๋ฆฌ๊ฐ€ ์ ์–ด์ ธ ์ด์Šˆ๋ฐœ์ƒ
    • ๋†’์€ ๋ณ‘๋ ฌ์„ฑ์„ ๋ฐ”๋ผ๊ณ  executor cores์ˆ˜๋ฅผ ๋„ˆ๋ฌด ๋Š˜๋ฆฌ๋ฉด ์•ˆ๋˜๊ณ , 1~5๊ฐœ ์ •๋„๊ฐ€ ์ ๋‹นํ•˜๋‹ค.

 

++) JVM๊ณผ Python์˜ ํ†ต์‹ 

๋”๋ณด๊ธฐ

Spark์€ JVM Application์ด์ง€๋งŒ PySpark์€ Python ํ”„๋กœ์„ธ์Šค๋กœ, JVM์—์„œ ๋™์ž‘ํ•˜์ง€ ๋ชปํ•ด JVM ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋‹ค.

๋”ฐ๋ผ์„œ ์œ„์—์„œ ์–ธ๊ธ‰ํ•œ pyspark.memory(Python ํ”„๋กœ์„ธ์Šค)์™€ worker.memory(Py4J)์— ๋”ฐ๋กœ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ํ• ๋‹นํ•œ๋‹ค.

ํ• ๋‹นํ•˜์ง€ ์•Š์œผ๋ฉด ์ž๋™์œผ๋กœ Overhead ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ๋œ๋‹ค.

 

 

Python๊ณผ JVM

 

Executor์•ˆ์—๋Š” JVM ํ”„๋กœ์„ธ์Šค์™€ Python Worker๊ฐ€ ์กด์žฌํ•˜๊ฒŒ ๋˜๋Š”๋ฐ,

์ด ๋‘˜๊ฐ„์˜ ์˜ค๋ธŒ์ ํŠธ serialization/deserialization์„ ์ˆ˜ํ–‰ํ•ด์ฃผ๋Š” ๊ฒƒ์ด Py4J์˜ ์—ญํ• ์ด ๋œ๋‹ค.

 

Py4J๋ฅผ ํ†ตํ•ด ํ†ต์‹ ํ•œ๋‹ค.