์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- redshift
- etl
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- aws
- disk spill
- topic
- AQE
- SQL
- KDT_TIL
- Kafka
- backfill
- Spark Caching
- CI/CD
- Spark
- spark executor memory
- Dag
- Airflow
- Salting
- ๋น ๋ฐ์ดํฐ
- mysql
- colab
- Spark ์ค์ต
- off heap memory
- DataFrame Hint
- Docker
- Spark SQL
- Speculative Execution
- Spark Partitioning
- k8s
- Kubernetes
- Today
- Total
JUST DO IT!
Spark ๊ธฐ๋ฅ๊ณผ ์ค์ผ์ค๋ง, ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ ์์๋ณด๊ธฐ - TIL230724 ๋ณธ๋ฌธ
Spark ๊ธฐ๋ฅ๊ณผ ์ค์ผ์ค๋ง, ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ ์์๋ณด๊ธฐ - TIL230724
sunhokimDev 2023. 7. 25. 01:56๐ KDT WEEK 17 DAY 1 TIL
- Spark ๊ธฐํ ๊ธฐ๋ฅ
- Spark ๋ฆฌ์์ค ํ ๋น ๋ฐฉ์(๋ฆฌ์์ค ์ค์ผ์ค๋ง)
- Spark Executor ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ
๐ฅ Spark ๊ธฐํ ๊ธฐ๋ฅ
1. Broadcast Variable
- ๋ฃฉ์ ํ ์ด๋ธ๋ฑ์ ๋ธ๋ก๋์บ์คํ ํ์ฌ ์ ํ๋ง์ ๋ง๋ ๋ฐฉ์ (๋ธ๋ก๋์บ์คํธ ์กฐ์ธ๊ณผ ๋น์ท)
- ๋ชจ๋ ๋ ธ๋์ ํฐ ๊ท๋ชจ์ Input Dataset์ ํจ๊ณผ์ ์ธ ๋ฐฉ๋ฒ์ผ๋ก ์ค ๋ ์ฌ์ฉํ๋ค.
- ๋ฃฉ์ or ๋๋ฉ์ ํ ์ด๋ธ(10-20MB์ ์์ ํ ์ด๋ธ)์ Executor๋ก ์ ์กํ๋๋ฐ ์ฌ์ฉ
- spark.sparkContext.broadcast๋ก ํธ์ถ
๋ฃฉ์ ํ ์ด๋ธ(ํ์ผ)์ UDF๋ก ๋ณด๋ด๋ ๋ฐฉ๋ฒ
- Closure : UDF์์์ ํ์ด์ฌ ๋ฐ์ดํฐ ๊ตฌ์กฐ๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ > Task ๋จ์์ Serialization
- BroadCast : UDF์์์ ๋ธ๋ก๋์บ์คํธ๋ ๋ฐ์ดํฐ ๊ตฌ์กฐ๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ > Worker Node๋จ์์ Serialization
์ด Closure์ BroadCast ๋ฐฉ๋ฒ์ ๋ฐ์ ์์ ์ฝ๋๋ฅผ ์ฐธ๊ณ ํ๋ฉด ์ดํดํ๊ธฐ ์ฝ๋ค.
BroadCast ๋ฐ์ดํฐ์ ์ ํน์ง
- Worker node๋ก ๊ณต์ ๋๋ ๋ณ๊ฒฝ ๋ถ๊ฐ ๋ฐ์ดํฐ์ด๋ค.
- Worker node๋ณ๋ก ํ๋ฒ ๊ณต์ ๋๊ณ ์บ์ฑ๋๋ค.
- Task Memory์์ ๋ค์ด๊ฐ ์ ์๋ ํฌ๊ธฐ์ด์ด์ผ ํ๋ค.
ex) ์์ ์ฝ๋
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# UDF
def my_func(code: str) -> str:
# return prdCode.get(code) # Closure, Task๋จ์์ Serialization์ผ๋ก ๋นํจ์จ์
return bdData.value.get(code) # Broadcast, WorkerNode๋จ์์ Serialization์ผ๋ก ํจ์จ์
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Demo") \
.master("local[3]") \
.getOrCreate()
# ์์ ๋ฐ์ดํฐ(lookup.csv)์ ๋ถ๋ฌ์ prdCode์ ๋์
๋๋ฆฌํํ๋ก ์ ์ฅ
prdCode = spark.read.csv("data/lookup.csv").rdd.collectAsMap()
# ๋์
๋๋ฆฌ ํํ์ ๋ฐ์ดํฐ๋ฅผ ๋ธ๋ก๋์บ์คํธ ๋ณ์๋ก bdData๋ก ์ ์ฅ
# ์ด ์ ์ฅ๋ bdData๋ฅผ ์์์ ์ ์ํ UDF์์ ์ฌ์ฉํ๊ฒ ๋๋ค
bdData = spark.sparkContext.broadcast(prdCode)
data_list = [("98312", "2021-01-01", "1200", "01"),
("01056", "2021-01-02", "2345", "01"),
("98312", "2021-02-03", "1200", "02"),
("01056", "2021-02-04", "2345", "02"),
("02845", "2021-02-05", "9812", "02")]
df = spark.createDataFrame(data_list) \
.toDF("code", "order_date", "price", "qty")
spark.udf.register("my_udf", my_func, StringType())
# Product๋ผ๋ ์ปฌ๋ผ์ ์์ฑํ์ฌ UDF์ ๋ฐ๋ผ ๊ฐ ๋ฃ๊ธฐ
df.withColumn("Product", expr("my_udf(code)")) \
.show()
2. Accumulators
์ผ์ข ์ ์ ์ญ ๋ณ์๋ก, ํน์ ์ด๋ฒคํธ์ ์๋ฅผ ๊ธฐ๋กํ๋๋ฐ ์ฌ์ฉ๋จ ํ๋ก์ ์นด์ดํฐ์ ํก์ฌํ๋ค.
๋ณดํต ๋น์ ์์ ์ธ ๊ฐ์ ๋ ์ฝ๋์ ์๋ฅผ ์ธ๋๋ฐ ์ฌ์ฉํ๋ค.
ํน์ง
- ์ค์นผ๋ผ๋ก ๋ง๋ค๋ฉด ์ด๋ฆ์ ์ค ์ ์์ง๋ง, ๊ทธ ์ด์ธ์๋ ๋ถ๊ฐ๋ฅํ๊ณ ์ด๋ฆ์์ด์ผ Spark Web UI์ ๋ํ๋๋ค.
- DataFrame/RDD Foreach ๋ฐฉ์์ผ๋ก ๊ตฌํํ๋ ๊ฒ์ด ์ ํํ๋ค. (์ค๋ฅ ๊ฐ๋ฅ์ฑ ์ค์ด๊ธฐ)
ex) ์์ ์ฝ๋
# ๋ค์ ๋จ์ ๋ฐฐ์ด์ ๋ฐ์ดํฐํ๋ ์ํ
data = [1, 2, 3, 4, 5]
df_test = spark.createDataFrame(data, "int").toDF("value")
# Accumulator
accumulator = spark.sparkContext.accumulator(0)
# .foreach๋ฅผ ์ฌ์ฉํ๋ฉด ๋ ์ฝ๋๋ณ๋ก ๋ฐ์ดํฐ๊ฐ ๋ค์ด๊ฐ value๋ฅผ accumulator์ ๋ํ๊ฒ๋๋ค.
def add_to_accumulator(row):
global accumulator
accumulator += row["value"]
df_test.foreach(add_to_accumulator)
print("Accumulator value: ", accumulator.value)
3. Speculative Execution
๋๋ฆฐ ํ์คํฌ๋ฅผ ๋ค๋ฅธ Worker node์ ์๋ Executor์์ ์ค๋ณต ์คํ
Worker node์ ํ๋์จ์ด ์ด์๋ก ๋๋ ค์ง Task๋ผ๋ฉด, ๋น ๋ฅธ ์คํ์ ๋ณด์ฅํ๋ Data Skew๋ก ์ธํ ๋๋ฆผ์ด๋ผ๋ฉด ๋ฆฌ์์ค ๋ญ๋น
spark.speculation์ผ๋ก ์ปจํธ๋กคํ๋ฉฐ, ๊ธฐ๋ณธ์ False๋ก ๋นํ์ฑํ๋์ด ์๋ค.
Speculative Execution ํ๊ฒฝ๋ณ์ (๊ธฐ๋ณธ๊ฐ)
- spark.speculation.interval : ๋๋ฆฐ ํ์คํฌ ์ฒดํฌ ์ฃผ๊ธฐ (100ms)
- spark.speculation.multiplier : ์ผ๋ฐ ํ์คํฌ๋ค์ ์คํ ์๊ฐ์ ์ด ๋ณ์๊ฐ์ ๊ณฑํด ๋๋ฆฐ ํ์คํฌ ๊ฒฐ์ (1.5)
- spark.speculation.quantile : ์ ์ฒด ํ์คํฌ์ ์๋ฃ์จ์ด ์ด ๋ณ์๊ฐ์ ๋์ผ๋ฉด ๋๋ฆฐ ํ์คํฌ ์ฒดํฌ (0.75)
- spark.speculation.minTaskRuntime : ๋๋ฆฐ ํ์คํฌ๋ก ํน์ ํ๋ ์ต์ ์คํ ์๊ฐ (100ms)
๐ฆ Spark ๋ฆฌ์์ค ํ ๋น ๋ฐฉ์ (๋ฆฌ์์ค ์ค์ผ์ค๋ง)
Spark Application๋ค๊ฐ์ ๋ฆฌ์์ค ํ ๋น
- ๊ธฐ๋ฐ์ด ๋๋ ๋ฆฌ์์ค ๋งค๋์ ๊ฐ ๊ฒฐ์ ํ๊ฒ ๋๋ค.
- YARN์ ๊ฒฝ์ฐ FIFO, FAIR(๊ท ๋ฑํ), CAPACITY(์ฐ์ ์์์ ๋ฐ๋ผ) ์ง์
- Static Allocation (๊ธฐ๋ณธ)
- ํ๋ฒ ๋ฆฌ์์ค๋ฅผ ํ ๋น๋ฐ์ผ๋ฉด, ํด๋น ๋ฆฌ์์ค๋ฅผ ๋๊น์ง ๋ค๊ณ ๊ฐ๋ ๊ฒ์ด ๊ธฐ๋ณธ์ด๋ค. > ๋นํจ์จ์
- Dynamic Allocation
- ์ํฉ์๋ฐ๋ผ executor๋ฅผ ๋ฆด๋ฆฌ์คํ๊ธฐ๋ํ๊ณ , ์๊ตฌํ๊ธฐ๋ ํ๋ค.
- ๋ค์์ Spark Application๋ค์ด ํ๋์ ๋ฆฌ์์ค ๋งค๋์ ๋ฅผ ๊ณต์ ํ๋ค๋ฉด ํ์ฑํํ๋ ๊ฒ์ด ์ข๋ค.
- ex) spark-submit --num-executors 100 --executor-cores 4 --executor-memory 32G
- spark.dynamicAllocation.enabled = true, spark.dynamicAllocation.shuffleTracking.enabled = true ํ์
Spark Application์์์ Job๋ค๊ฐ์ ๋ฆฌ์์ค ํ ๋น
>> Spark Scheduler : ํ๋์ Application์์์ ์ก๋ค์ ๋ฆฌ์์ค๋ฅผ ๋๋ ์ฃผ๋ ์ ์ฑ
- FIFO : ๋ฆฌ์์ค๋ฅผ ์ฒ์ ์์ฒญํ Job์๊ฒ ์ฐ์ ์์ (๊ธฐ๋ณธ๋ฐฉ์)
- FAIR : ๋ผ์ด๋๋ก๋น ๋ฐฉ์์ผ๋ก ๋ชจ๋ Job์๊ฒ ๊ณ ๋ฅด๊ฒ ๋ฆฌ์์ค๋ฅผ ๋ถ๋ฐฐ
- Pool์ด๋ ํํ๋ก ๋ฆฌ์์ค๋ฅผ ๋๋ ์ ์ฐ์ ์์๋ฅผ ๊ณ ๋ คํ ํํ๋ก ์ฌ์ฉํ๋ค.
- Pool์์์ ๋ฆฌ์์ค ๋ถ๋ฐฐ๋ FAIR ํน์ FIFO๋ก ์ง์ ๊ฐ๋ฅํ๋ค.
ex) FAIR ์์
fair.xml
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
fair.py
from pyspark.sql import SparkSession
import threading
import time
# fair.xml์์ pool_name์ ํด๋นํ๋ ํ์์ ํ ๋น
# sparkContext.setLocalProperty๋ Thread์์์ ์ฌ์ฉ๋์ด์ผ ํ๋ค.
def do_job(f1, f2, id, pool_name, format="json"):
spark.sparkContext.setLocalProperty("spark.scheduler.pool", pool_name)
print(spark.sparkContext.getLocalProperty("spark.scheduler.pool"))
if format == 'json':
df1 = spark.read.json(f1)
df2 = spark.read.json(f2)
else:
df1 = spark.read.csv(f1, header=True)
df2 = spark.read.csv(f2, header=True)
outputs.append(df1.join(df2, id, "Inner").count())
# FAIR Scheduler๋ฅผ ์ฌ์ฉํ ๋๋ ์๋์ฒ๋ผ ์ค์ ํด์ฃผ๋ฉด ๋๋ค.
spark = SparkSession\
.builder\
.appName("Fair Scheduler Demo")\
.config("spark.sql.autoBroadcastJoinThreshold", "50B")\
.config("spark.scheduler.mode", "FAIR")\
.config("spark.scheduler.allocation.file", "fair.xml")\
.getOrCreate()
outputs = []
# thread๋ฅผ ๋ณ๋ ฌ์ฒ๋ฆฌํ๋ ์๊ฐ ์ธก์ ํด๋ณด๊ธฐ
start_time_fifo = time.time()
thread1 = threading.Thread(
target=do_job,
args=(
"small_data/",
"large_data/",
"id",
"production"
)
)
thread2 = threading.Thread(
target=do_job,
args=(
"user_event.csv",
"user_metadata.csv",
"user_id",
"test",
"csv"
)
)
thread1.start()
thread2.start()
thread1.join() # ๋๋ ๋๊น์ง ๋๊ธฐํ๋ผ๋ ๋ช
๋ น์ด
thread1.join()
end_time_fifo = time.time()
# ์์ ์๊ฐ ์ถ๋ ฅํ๊ธฐ
print(f"Time taken with FAIR Scheduler: {(end_time_fifo - start_time_fifo) * 1000:.2f} ms")
๐จ Spark Executor ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ
Spark Application = ํ๋์ Driver + ํ๋ ์ด์์ Executor
Driver ์ญํ
- main ํจ์ ์คํํ๊ณ SparkSession๊ณผ SparkContext๋ฅผ ์์ฑ
- ์ฌ์ฉ์๊ฐ ๋ง๋ ์ฝ๋๋ฅผ ํ์คํฌ๋ก ๋ณํํ์ฌ DAG ์์ฑ
- ๋ฆฌ์์ค ๋งค๋์ ์ ๋์์ ๋ฐ์ ํ์คํฌ๋ค์ ์คํํ๊ณ ๊ด๋ฆฌ
- ์ ์ ๋ณด๋ค์ Web UI๋ก ๋ ธ์ถ(4040ํฌํธ)
Executor์ ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ
Driver ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ ex) spark.driver.memory = 4GB
Executor ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ ex) spark.executor.memory = 8GB
๋จ, Driver์ Executor์ ํ ๋นํ๋ ๋ฉ๋ชจ๋ฆฌ๋ YARN์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ yarn.nodemanager.resource.memory-mb๋ณด๋ค ํด ์ ์๋ค.
Executor์ ํ ๋น๋ ๋ฉ๋ชจ๋ฆฌ๋ JVM Heap Memory๋ก์จ, 3๊ฐ์ง์ ๋ฉ๋ชจ๋ฆฌ๋ก ๊ตฌ๋ถ๋๋ค.
- Reserved Memory : Spark Engine ์ ์ฉ ๋ฉ๋ชจ๋ฆฌ๋ก์จ ๋ณ๊ฒฝ ๋ถ๊ฐ(300MB)
- Spark Memory : ๋ฐ์ดํฐ ํ๋ ์ ๊ด๋ จ ์์
๊ณผ ์บ์ฑ, ๋ ๊ฐ์ง์ Memory Pool๋ก ๋๋๋ค.
- Storage Memory Pool : Spark ๋ฉ๋ชจ๋ฆฌ ์ค ์บ์ฑ์ ์ฌ์ฉ๋๋ ๋ฉ๋ชจ๋ฆฌ
- Executor Memory Pool : DataFrame Operation์ ํด๋นํ๋ ๋ฉ๋ชจ๋ฆฌ
- User Memory : UDF, RDD conversion operation ๋ฑ
spark.memory.fraction = 0.6์ฒ๋ผ ์กฐ์ ํ๋ฉด, Spark Memory์ Reserved Memory ๊ณต๊ฐ(300MB)์ ์ ์ธํ 60ํผ์ผํธ์ ๋ฉ๋ชจ๋ฆฌ๊ฐ ํ ๋น๋๊ณ , ๋๋จธ์ง๊ฐ User Memory์ ํ ๋น๋๋ค.
๋ฐ๋ผ์ User Memory ํ์์ ๋ฐ๋ผ spark.memory.fraction ์ต์ ์ ์กฐ์ ํด์ฃผ๋ฉด ์ข๋ค.
Executor์ ๋ณ๋ ฌ์ฒ๋ฆฌ
spark.executor.cores = 4๋ก ์ง์ ํด์ฃผ๋ฉด, ํ๋์ Executor์์ 4๊ฐ์ CPU ์ฌ๋กฏ(Thread)๊ฐ ์๊ฒจ ๋ณ๋ ฌ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง๋ค.
์ด ์ฌ๋กฏ๋ค์ JVM Heap Memory๋ฅผ ๊ณต์ ํ๊ฒ ๋๋ค.
Executor Memory Pool Management > Unified Memory Manager
๋์์ค์ธ ํ์คํฌ ๋์์ผ๋ก Fair Allocation์ด ๊ธฐ๋ณธ ๋์์ผ๋ก, ์คํ์ค์ธ ํ์คํฌ๊ฐ ๋ฉ๋ชจ๋ฆฌ๋ ๊ฐ์ ธ๊ฐ๋ ๊ตฌ์กฐ๋ค.
1๊ฐ๋ง ์คํ๋๋ค๋ฉด ํ๋์ ํ์คํฌ๊ฐ ๋ชจ๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ฐ์ ธ๊ฐ๊ณ , ๋ ๊ฐ๊ฐ ์คํ๋๋ค๋ฉด ๋ ํ์คํฌ๊ฐ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋๋ ๊ฐ๋๋ค.
Eviction๊ณผ Disk Spill (Unified Memory Manager)
Storage Memory Pool๊ณผ Executor Memory Pool์ spark.memory.storageFraction ์ต์ ์ผ๋ก ์ค์ ๋ ๋น์จ์ ๋ฐ๋ผ ๋๋๋ค.
๊ธฐ๋ณธ์ 0.5์ด์ง๋ง, ์ด๋ ํ์ชฝ์์ ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋ถ์กฑํด์ง๋ฉด ๋ค๋ฅธ์ชฝ์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋น๋ ค์ธ ์ ์๋ค.
ํ์ง๋ง ๋ค๋ฅธ์ชฝ ๋ฉ๋ชจ๋ฆฌ๋ ๋ถ์กฑํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
spark.memory.storageFraction์ผ๋ก ์ง์ ๋ ๋น์จ์ด ์ง์ผ์ง๋ฉด์ eviction์ด ๋ฐ์ํ๋ค.
eviction์ ๋น๋ ค์จ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋ค์ ๋๋๋ ค์ฃผ๋ ๊ณผ์ ์ ๋งํ๊ณ , ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋ค์ ๋๋๋ ค์ฃผ๋ ๊ณผ์ ์์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅ๋์ด ์๋ ๋ฐ์ดํฐ๋ ๋์คํฌ๋ก ์ฎ๊ฒจ์ง๊ฒ ๋๋ค. ์ด๊ฑธ Disk Spill์ด๋ผ๊ณ ๋ถ๋ฅธ๋ค.
Disk Spill๋ ๋ถ๊ฐ๋ฅํด์ง๋ฉด OOM(Out of Memory)๊ฐ ๋ฐ์ํ๋ค.
์ด์ธ์ ์์ ์ ์ฌ์ฉ๋๋ ๋ฉ๋ชจ๋ฆฌ๋ ์ด๋ป๊ฒ ํ ๋นํ ๊น?
- spark.memory.offHeap.enabled = True์ผ ๋, spark.memory.offHeap.size๋ฅผ ํตํด Non-JVM ์์ ์ ๋ณ๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ตฌ์ฑํ ์ ์๋ค.
- spark.executor.pyspark.memory๋ฅผ ํตํด ํ์ด์ฌ ํ๋ก์ธ์ค์ ์ง์ ๋๋ ๋ณ๋์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ตฌ์ฑํ ์ ์์ง๋ง, ๊ธฐ๋ณธ๊ฐ์ 0์ผ๋ก์จ ๋ฐ๋ก ์ง์ ๋์ง ์์ผ๋ฉด Non-JVM ๋ฉ๋ชจ๋ฆฌ(์ค๋ฒํค๋ ๋ฉ๋ชจ๋ฆฌ)๋ฅผ ์ฌ์ฉํ๋ค.
- spark.python.worker.memory๋ฅผ ํตํด Py4J์ ์ง์ ๋๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์กฐ์จํ๋ค.(๊ธฐ๋ณธ๊ฐ 512(MB))
On Heap ๋ฉ๋ชจ๋ฆฌ์ Off Heap ๋ฉ๋ชจ๋ฆฌ
์ด์ ๊น์ง ์ค๋ช ํ ๋ฉ๋ชจ๋ฆฌ๊ฐ On Heap ๋ฉ๋ชจ๋ฆฌ์ ํด๋นํ๊ณ , Spark๋ ์ด On Heap ๋ฉ๋ชจ๋ฆฌ์์ ๊ฐ์ฅ ์ ๋์ํ๊ฒ ๋๋ค.
ํ์ง๋ง JVM Heap Memory๋ Garbage collection(๋์ ๋ฉ๋ชจ๋ฆฌ ์ ๋ฆฌ)์ ๋์์ด ๋๊ณ , ๊ทธ์ ๋ฐ๋ผ ๋น์ฉ์ด ๋ฐ์ํ๋ค.
์ด ๋น์ฉ ์ฆ๊ฐ๋ฅผ ๋ง๊ธฐ ์ํด ๊ฐ์ด ์ฌ์ฉํ ์ ์๋ ๋ฉ๋ชจ๋ฆฌ๊ฐ JVM ๋ฐ์ ์๋ ๋ฉ๋ชจ๋ฆฌ, Off Heap ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋๋ค.
Off Heap ๋ฉ๋ชจ๋ฆฌ์๋ executor memory ์ฌ์ด์ฆ์ ์ผ์ ๋น์จ๋ก ์์ฑ๋๋ Overhead Memory๊ฐ ์๋ค.
spark.executor.memory.Overhead ์ต์ ์ ํตํด ๋น์จ ์กฐ์ ์ด ๊ฐ๋ฅํ๊ณ , ์ต์ 384MB ์ต๋ 0.1(10%)์ด ํ ๋น๋๋ค.
์์์ Non-JVM ์์ ์ ์ํ ๋ณ๋ ๋ฉ๋ชจ๋ฆฌ๋ก offHeap.size๋ฅผ ์กฐ์ ํ๋ค๊ณ ์ธ๊ธํ์๋ค. ์ด ๋ฉ๋ชจ๋ฆฌ๋๋ Off Heap์ ํด๋นํ๋ค.
Spark 3.x๋ Off Heap memory ์์ ์ ์ต์ ํ๋์ด JVM ์์ด ์ง์ ๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ฉฐ, DataFrame์ฉ์ผ๋ก ์ฌ์ฉํ๋ค.
๋ฐ๋ผ์ Off Heap ๋ฉ๋ชจ๋ฆฌ์ ํฌ๊ธฐ๋ spark.executor.memoryOverhead + spark.offHeap.size์ ๊ฐ๋ค.
ex)
spark.executor.memory = 8GB
spark.executor.memoryOverhead = 0.1
Spark ๋ฉ๋ชจ๋ฆฌ ์ด์ (OOM)
Driver OOM ์ผ์ด์ค
- ํฐ ๋ฐ์ดํฐ์ ์ collect ์คํ
- ํฐ ๋ฐ์ดํฐ์ ์ Broadcast JOIN
- ํ์คํฌ๊ฐ ๋๋ฌด ๋ง์ด ๋ฐ์ํ ๋
Executor OOM ์ผ์ด์ค
- Data Skew
- executor.cores๊ฐ ๋๋ฌด ๋ง์ ๋ง์ ์์ Thread๋ค์ด ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ณต์ ํ ๋ > ์ฌ์ฉ๊ฐ๋ฅํ ๋ฉ๋ชจ๋ฆฌ๊ฐ ์ ์ด์ ธ ์ด์๋ฐ์
- ๋์ ๋ณ๋ ฌ์ฑ์ ๋ฐ๋ผ๊ณ executor cores์๋ฅผ ๋๋ฌด ๋๋ฆฌ๋ฉด ์๋๊ณ , 1~5๊ฐ ์ ๋๊ฐ ์ ๋นํ๋ค.
++) JVM๊ณผ Python์ ํต์
Spark์ JVM Application์ด์ง๋ง PySpark์ Python ํ๋ก์ธ์ค๋ก, JVM์์ ๋์ํ์ง ๋ชปํด JVM ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
๋ฐ๋ผ์ ์์์ ์ธ๊ธํ pyspark.memory(Python ํ๋ก์ธ์ค)์ worker.memory(Py4J)์ ๋ฐ๋ก ๋ฉ๋ชจ๋ฆฌ๋ฅผ ํ ๋นํ๋ค.
ํ ๋นํ์ง ์์ผ๋ฉด ์๋์ผ๋ก Overhead ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ์ฉํ๊ฒ๋๋ค.
Executor์์๋ JVM ํ๋ก์ธ์ค์ Python Worker๊ฐ ์กด์ฌํ๊ฒ ๋๋๋ฐ,
์ด ๋๊ฐ์ ์ค๋ธ์ ํธ serialization/deserialization์ ์ํํด์ฃผ๋ ๊ฒ์ด Py4J์ ์ญํ ์ด ๋๋ค.