JUST DO IT!

Spark์—์„œ ๋ฐ์ดํ„ฐ Caching ํ•˜๋Š” ๋ฐฉ๋ฒ•, ์‹ค์Šตํ•ด๋ณด๊ธฐ! - TIL230725 ๋ณธ๋ฌธ

TIL

Spark์—์„œ ๋ฐ์ดํ„ฐ Caching ํ•˜๋Š” ๋ฐฉ๋ฒ•, ์‹ค์Šตํ•ด๋ณด๊ธฐ! - TIL230725

sunhokimDev 2023. 7. 25. 15:39

๐Ÿ“š KDT WEEK 17 DAY 2 TIL

  • Caching ์ด๋ก  ๋ฐ ์‹ค์Šต
  • Caching BestPractices

 

๐ŸŽŸ Caching

์ž์ฃผ ์‚ฌ์šฉ๋˜๋Š” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋ฉ”๋ชจ๋ฆฌ์— ์œ ์ง€ํ•˜์—ฌ ์ฒ˜๋ฆฌ์†๋„๋ฅผ ์ฆ๊ฐ€์‹œํ‚ค๋Š” ๋ฐฉ๋ฒ•

ํ•˜์ง€๋งŒ ๋ฉ”๋ชจ๋ฆฌ ์†Œ๋น„๋ฅผ ์ฆ๊ฐ€์‹œํ‚ค๋ฏ€๋กœ ๋ถˆํ•„์š”ํ•˜๊ฒŒ ๋ชจ๋“  ๊ฑธ ์บ์‹ฑํ•  ํ•„์š”๋Š” ์—†๋‹ค.

 

DataFrame์„ Cachingํ•˜๋Š” ๋ฐฉ๋ฒ•

  • cache()์™€ persist()๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๊ฐ€๋Šฅํ•˜๊ณ , ๋ฉ”๋ชจ๋ฆฌ๋‚˜ ๋””์Šคํฌ์— ์ €์žฅํ•˜๊ฒŒ ๋œ๋‹ค.
  • ๋ชจ๋‘ lazy execution์— ํ•ด๋‹นํ•˜์—ฌ ํ•„์š”ํ•˜๊ธฐ ์ „๊นŒ์ง€๋Š” ์บ์‹ฑํ•˜์ง€ ์•Š๋Š”๋‹ค.
  • caching์€ ํ•ญ์ƒ ํŒŒํ‹ฐ์…˜ ๋‹จ์œ„๋กœ ๋ฉ”๋ชจ๋ฆฌ์— ๋ณด์กด๋˜๋ฏ€๋กœ, ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜์ด ๋ถ€๋ถ„์ ์œผ๋กœ ์บ์‹ฑ๋˜์ง€๋Š” ์•Š๋Š”๋‹ค.

 

์‹ค์Šต

 

1. .cache() ์‚ฌ์šฉํ•ด์„œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์บ์‹ฑํ•˜๊ธฐ

# SparkSession available as 'spark'

# 1~99999์˜ ์ˆซ์ž๊ฐ€ ์ €์žฅ๋˜์–ด ๊ฐ ์ œ๊ณฑ์˜ ์ปฌ๋Ÿผ(square)์„ ๊ฐ€์ง„ df์ƒ์„ฑ >> df10_square
df = spark.range(1, 100000).toDF("id")
df10 = df.repartition(10)
from pyspark.sql.functions import expr
df10_square = df10.withColumn("square", expr("id*id"))

# caching
df10_square.cache() # ์‹ค์ œ๋กœ ๋ฐ”๋กœ ์บ์‹ฑ๋˜์ง€๋Š” ์•Š์Œ
df10_square.take(10) # 10๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ, ์—ฌ๊ธฐ์„œ ์‹ค์ œ๋กœ ํ•„์š”ํ•œ ๋งŒํผ ์บ์‹ฑ๋จ!
df10_square.count() # ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ์—ฐ์‚ฐ์— ํ•„์š”ํ•˜๋ฏ€๋กœ ์—ฌ๊ธฐ์„œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ์บ์‹ฑ๋จ!
df10_square.unpersist() # ์บ์‹ฑํ•ด์ œ, ์ด๊ฑด ๋ฐ”๋กœ ์ ์šฉ๋จ

 

df10_square.take(10)์˜ ์›น UI ๊ฒฐ๊ณผ

 

df10_square.count()์˜ ์›น UI ๊ฒฐ๊ณผ

 

.take(10)์˜ ๊ฒฝ์šฐ 10๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋งŒ ์บ์‹ฑ๋˜๋ฏ€๋กœ ๋ชจ๋“  ํŒŒํ‹ฐ์…˜์ด ์บ์‹ฑ๋˜์ง€ ์•Š๊ณ  ํ•˜๋‚˜๋งŒ ์บ์‹ฑ๋˜์—ˆ๊ณ ,

.count()์˜ ๊ฒฝ์šฐ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์บ์‹ฑํ•˜๋ฏ€๋กœ ๋ชจ๋“  ํŒŒํ‹ฐ์…˜(10๊ฐœ)๊ฐ€ ์บ์‹ฑ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

2. SparkSQL ์‚ฌ์šฉํ•ด์„œ ์ฟผ๋ฆฌ๋กœ ์บ์‹ฑํ•˜๊ธฐ

# Spark SQL ์‚ฌ์šฉํ•ด์„œ ์บ์‹ฑํ•ด๋ณด๊ธฐ
df10_square.createOrReplaceTempView("df10_square")

spark.sql("CACHE TABLE df10_square") # SqarkSQL์˜ ๊ฒฝ์šฐ ๋ฐ”๋กœ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ ์บ์‹ฑ๋จ
# spark.sql("CACHE lazy TABLE df10_square") # lazy๋ฅผ ๋ถ™์ด๋ฉด ํ•„์š”ํ•  ๋•Œ ์บ์‹ฑํ•˜๋„๋ก ํ•  ์ˆ˜๋Š” ์žˆ์Œ
spark.sql("UNCACHE TABLE df10_square") # ์บ์‹ฑ ํ•ด์ œ

spark.catalog.isCached("df10_square") # df10_square๊ฐ€ ์บ์‹ฑ๋œ ์ƒํƒœ์ธ์ง€ ํ™•์ธํ•˜๋Š” ๋ช…๋ น์–ด > False

 

๊ทธ๋ ‡๋‹ค๋ฉด ์บ์‹ฑ๋œ ๋ฉ”๋ชจ๋ฆฌ๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๋Š” ์–ธ์ œ์ผ๊นŒ?

 

Spark Optimizer๋Š” ์˜๋ฏธ์ ์œผ๋กœ ๋™์ผํ•œ ์ฟผ๋ฆฌ์— ๋Œ€ํ•ด ์™„๋ฒฝํžˆ ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋กœ ๋Œ€์‘ํ•˜์ง€๋Š” ๋ชปํ•œ๋‹ค.

# ํŠน์ • ์กฐ๊ฑด์œผ๋กœ ํ•„ํ„ฐ๋ง๋œ ๊ฒฐ๊ณผ๋ฅผ ์บ์‹ฑ
df10_squared_filtered = df10_square.select("id", "square").filter("id > 50000").cache()
df10_squared_filetered.count()

# ์œ„์™€ ์ˆœ์„œ๋งŒ ๋‹ค๋ฅด๊ณ  ์˜๋ฏธ์ ์œผ๋กœ ๊ฐ™์€ ๋ช…๋ น
df10_square.filter("id > 50000").select("id","square").count()

# Physical Plan ํ™•์ธํ•˜๋Š” ๋ช…๋ น์–ด, Memory์—์„œ ๊ฐ€์ ธ์™”๋‹ค๋Š” ๋‚ด์šฉ์„ ํ™•์ธํ•  ์ˆ˜ ์—†๋‹ค.
df10_square.filter("id > 50000").select("id","square").explain()

 

 

์œ„ ์˜ˆ์ œ์˜ ๋งˆ์ง€๋ง‰ ์ฝ”๋“œ ๊ฒฐ๊ณผ, Memory์— ๊ด€๋ จ๋œ ๋‚ด์šฉ์€ ์—†๋‹ค.

 

Memory์— ์žˆ๋Š” ํ…Œ์ด๋ธ”์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ InMemoryTableScan์ด๋ผ๋Š” ๋ฌธ๊ตฌ๊ฐ€ ๋ณด์ธ๋‹ค.

๋งŒ์•ฝ ์™„์ „ํžˆ ๋™์ผํ•œ ๋ช…๋ น (df10_square.select("id", "square").filter("id > 50000"))์„ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜, ์บ์‹ฑ๋œ ๋ณ€์ˆ˜(df10_squared_filtered)๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์–ด๋–ค ๋ช…๋ น์„ ํ•œ๋‹ค๋ฉด, ์ด๋•Œ๋Š” ์บ์‹ฑ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋œ๋‹ค.

 

3. Persist ์‚ฌ์šฉํ•ด์„œ ์บ์‹ฑํ•˜๊ธฐ

import pyspark

# .persist(pyspark.StorageLevel(์ธ์ž))๋ฅผ ํ†ตํ•ด persist์˜ ์˜ต์…˜ ์กฐ์ ˆ
# cache์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ๋ฐ”๋กœ ์บ์‹ฑ๋˜์ง€๋Š” ์•Š๊ณ , ์‚ฌ์šฉ๋  ๋•Œ ์บ์‹ฑ
df_persisted = df10.withColumn("suqare", expr("id*id")).persist(
	pyspark.StorageLevel(False, True, False, True, 1)
)

 

pyspark.StorageLevel์˜ ์ธ์ž์˜ ์˜๋ฏธ๋Š” ๋‹ค์Œ์˜ ์ˆœ์„œ๋กœ ์˜๋ฏธํ•œ๋‹ค.

  • useDisk : ๋””์Šคํฌ ์ €์žฅ์—ฌ๋ถ€
  • useMemory : ๋ฉ”๋ชจ๋ฆฌ ์ €์žฅ์—ฌ๋ถ€
  • useOffHeap : OffHeap ๋ฉ”๋ชจ๋ฆฌ ์ €์žฅ์—ฌ๋ถ€ (Off Heap ์„ค์ •ํ•„์š”)
  • deserialized : ๋ฐ์ดํ„ฐ Serialization ์—ฌ๋ถ€, True์‹œ ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ์ฆ๊ฐ€ + CPU ๊ณ„์‚ฐ ๊ฐ์†Œ / True๋Š” ๋ฉ”๋ชจ๋ฆฌ์—์„œ๋งŒ ๊ฐ€๋Šฅ
  • replication : ๋ช‡ ๊ฐœ์˜ ๋ณต์‚ฌ๋ณธ์„ ์„œ๋กœ ๋‹ค๋ฅธ executor์— ์ €์žฅํ• ์ง€ ๊ฒฐ์ •

 


 

๐Ÿ† Caching Best Practices

  • ์บ์‹ฑ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ์บ์‹ฑ๋œ ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ ๋ถ„๋ช…ํ•˜๊ฒŒ ์žฌ์‚ฌ์šฉํ•˜๊ธฐ
  • ์ปฌ๋Ÿผ์ด ๋งŽ์€ ๋ฐ์ดํ„ฐ์…‹์€ ํ•„์š”ํ•œ ์ปฌ๋Ÿผ๋งŒ ์บ์‹ฑํ•˜๊ธฐ
  • ๋ถˆํ•„์š”ํ•  ๋•Œ๋Š” uncacheํ•˜๊ธฐ
  • ๋•Œ๋กœ๋Š” ๋งค๋ฒˆ ์ƒˆ๋กœ ๊ณ„์‚ฐํ•˜๋Š” ๊ฒƒ์ด ์บ์‹ฑ๋ณด๋‹ค ๋น ๋ฅผ์ˆ˜๋„ ์žˆ๋‹ค!
    • > Parquet ํฌ๋งท์˜ ํฐ ๋ฐ์ดํ„ฐ์…‹์„ ์บ์‹ฑํ•œ ๊ฒฝ์šฐ
    • > ์บ์‹ฑ๊ฒฐ๊ณผ๊ฐ€ ๋„ˆ๋ฌด ์ปค์„œ ๋ฉ”๋ชจ๋ฆฌ์—๋งŒ ์žˆ์„ ์ˆ˜ ์—†๊ณ , ๋””์Šคํฌ์—๋„ ์ €์žฅ๋œ ๊ฒฝ์šฐ ๋“ฑ๋“ฑ..