์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Dag
- aws
- KDT_TIL
- Speculative Execution
- topic
- Kubernetes
- spark executor memory
- etl
- backfill
- Kafka
- ๋น ๋ฐ์ดํฐ
- colab
- k8s
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- mysql
- redshift
- Spark Partitioning
- Spark ์ค์ต
- Airflow
- CI/CD
- Spark SQL
- Salting
- AQE
- disk spill
- Spark Caching
- SQL
- DataFrame Hint
- off heap memory
- Docker
- Spark
- Today
- Total
JUST DO IT!
Spark์์ ๋ฐ์ดํฐ Caching ํ๋ ๋ฐฉ๋ฒ, ์ค์ตํด๋ณด๊ธฐ! - TIL230725 ๋ณธ๋ฌธ
Spark์์ ๋ฐ์ดํฐ Caching ํ๋ ๋ฐฉ๋ฒ, ์ค์ตํด๋ณด๊ธฐ! - TIL230725
sunhokimDev 2023. 7. 25. 15:39๐ KDT WEEK 17 DAY 2 TIL
- Caching ์ด๋ก ๋ฐ ์ค์ต
- Caching BestPractices
๐ Caching
์์ฃผ ์ฌ์ฉ๋๋ ๋ฐ์ดํฐํ๋ ์์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์งํ์ฌ ์ฒ๋ฆฌ์๋๋ฅผ ์ฆ๊ฐ์ํค๋ ๋ฐฉ๋ฒ
ํ์ง๋ง ๋ฉ๋ชจ๋ฆฌ ์๋น๋ฅผ ์ฆ๊ฐ์ํค๋ฏ๋ก ๋ถํ์ํ๊ฒ ๋ชจ๋ ๊ฑธ ์บ์ฑํ ํ์๋ ์๋ค.
DataFrame์ Cachingํ๋ ๋ฐฉ๋ฒ
- cache()์ persist()๋ฅผ ์ฌ์ฉํ๋ฉด ๊ฐ๋ฅํ๊ณ , ๋ฉ๋ชจ๋ฆฌ๋ ๋์คํฌ์ ์ ์ฅํ๊ฒ ๋๋ค.
- ๋ชจ๋ lazy execution์ ํด๋นํ์ฌ ํ์ํ๊ธฐ ์ ๊น์ง๋ ์บ์ฑํ์ง ์๋๋ค.
- caching์ ํญ์ ํํฐ์ ๋จ์๋ก ๋ฉ๋ชจ๋ฆฌ์ ๋ณด์กด๋๋ฏ๋ก, ํ๋์ ํํฐ์ ์ด ๋ถ๋ถ์ ์ผ๋ก ์บ์ฑ๋์ง๋ ์๋๋ค.
์ค์ต
1. .cache() ์ฌ์ฉํด์ ๋ฐ์ดํฐํ๋ ์ ์บ์ฑํ๊ธฐ
# SparkSession available as 'spark'
# 1~99999์ ์ซ์๊ฐ ์ ์ฅ๋์ด ๊ฐ ์ ๊ณฑ์ ์ปฌ๋ผ(square)์ ๊ฐ์ง df์์ฑ >> df10_square
df = spark.range(1, 100000).toDF("id")
df10 = df.repartition(10)
from pyspark.sql.functions import expr
df10_square = df10.withColumn("square", expr("id*id"))
# caching
df10_square.cache() # ์ค์ ๋ก ๋ฐ๋ก ์บ์ฑ๋์ง๋ ์์
df10_square.take(10) # 10๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ธฐ, ์ฌ๊ธฐ์ ์ค์ ๋ก ํ์ํ ๋งํผ ์บ์ฑ๋จ!
df10_square.count() # ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ์ฐ์ฐ์ ํ์ํ๋ฏ๋ก ์ฌ๊ธฐ์ ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ์บ์ฑ๋จ!
df10_square.unpersist() # ์บ์ฑํด์ , ์ด๊ฑด ๋ฐ๋ก ์ ์ฉ๋จ
.take(10)์ ๊ฒฝ์ฐ 10๊ฐ์ ๋ฐ์ดํฐ๋ง ์บ์ฑ๋๋ฏ๋ก ๋ชจ๋ ํํฐ์ ์ด ์บ์ฑ๋์ง ์๊ณ ํ๋๋ง ์บ์ฑ๋์๊ณ ,
.count()์ ๊ฒฝ์ฐ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์บ์ฑํ๋ฏ๋ก ๋ชจ๋ ํํฐ์ (10๊ฐ)๊ฐ ์บ์ฑ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
2. SparkSQL ์ฌ์ฉํด์ ์ฟผ๋ฆฌ๋ก ์บ์ฑํ๊ธฐ
# Spark SQL ์ฌ์ฉํด์ ์บ์ฑํด๋ณด๊ธฐ
df10_square.createOrReplaceTempView("df10_square")
spark.sql("CACHE TABLE df10_square") # SqarkSQL์ ๊ฒฝ์ฐ ๋ฐ๋ก ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ ์บ์ฑ๋จ
# spark.sql("CACHE lazy TABLE df10_square") # lazy๋ฅผ ๋ถ์ด๋ฉด ํ์ํ ๋ ์บ์ฑํ๋๋ก ํ ์๋ ์์
spark.sql("UNCACHE TABLE df10_square") # ์บ์ฑ ํด์
spark.catalog.isCached("df10_square") # df10_square๊ฐ ์บ์ฑ๋ ์ํ์ธ์ง ํ์ธํ๋ ๋ช
๋ น์ด > False
๊ทธ๋ ๋ค๋ฉด ์บ์ฑ๋ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ๋ ์ธ์ ์ผ๊น?
Spark Optimizer๋ ์๋ฏธ์ ์ผ๋ก ๋์ผํ ์ฟผ๋ฆฌ์ ๋ํด ์๋ฒฝํ ๋ฉ๋ชจ๋ฆฌ์ ์๋ ๋ฐ์ดํฐ๋ก ๋์ํ์ง๋ ๋ชปํ๋ค.
# ํน์ ์กฐ๊ฑด์ผ๋ก ํํฐ๋ง๋ ๊ฒฐ๊ณผ๋ฅผ ์บ์ฑ
df10_squared_filtered = df10_square.select("id", "square").filter("id > 50000").cache()
df10_squared_filetered.count()
# ์์ ์์๋ง ๋ค๋ฅด๊ณ ์๋ฏธ์ ์ผ๋ก ๊ฐ์ ๋ช
๋ น
df10_square.filter("id > 50000").select("id","square").count()
# Physical Plan ํ์ธํ๋ ๋ช
๋ น์ด, Memory์์ ๊ฐ์ ธ์๋ค๋ ๋ด์ฉ์ ํ์ธํ ์ ์๋ค.
df10_square.filter("id > 50000").select("id","square").explain()
Memory์ ์๋ ํ ์ด๋ธ์ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ InMemoryTableScan์ด๋ผ๋ ๋ฌธ๊ตฌ๊ฐ ๋ณด์ธ๋ค.
๋ง์ฝ ์์ ํ ๋์ผํ ๋ช ๋ น (df10_square.select("id", "square").filter("id > 50000"))์ ์ฌ์ฉํ๊ฑฐ๋, ์บ์ฑ๋ ๋ณ์(df10_squared_filtered)๋ฅผ ์ฌ์ฉํด์ ์ด๋ค ๋ช ๋ น์ ํ๋ค๋ฉด, ์ด๋๋ ์บ์ฑ๋ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ค.
3. Persist ์ฌ์ฉํด์ ์บ์ฑํ๊ธฐ
import pyspark
# .persist(pyspark.StorageLevel(์ธ์))๋ฅผ ํตํด persist์ ์ต์
์กฐ์
# cache์ ๋ง์ฐฌ๊ฐ์ง๋ก ๋ฐ๋ก ์บ์ฑ๋์ง๋ ์๊ณ , ์ฌ์ฉ๋ ๋ ์บ์ฑ
df_persisted = df10.withColumn("suqare", expr("id*id")).persist(
pyspark.StorageLevel(False, True, False, True, 1)
)
pyspark.StorageLevel์ ์ธ์์ ์๋ฏธ๋ ๋ค์์ ์์๋ก ์๋ฏธํ๋ค.
- useDisk : ๋์คํฌ ์ ์ฅ์ฌ๋ถ
- useMemory : ๋ฉ๋ชจ๋ฆฌ ์ ์ฅ์ฌ๋ถ
- useOffHeap : OffHeap ๋ฉ๋ชจ๋ฆฌ ์ ์ฅ์ฌ๋ถ (Off Heap ์ค์ ํ์)
- deserialized : ๋ฐ์ดํฐ Serialization ์ฌ๋ถ, True์ ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ฆ๊ฐ + CPU ๊ณ์ฐ ๊ฐ์ / True๋ ๋ฉ๋ชจ๋ฆฌ์์๋ง ๊ฐ๋ฅ
- replication : ๋ช ๊ฐ์ ๋ณต์ฌ๋ณธ์ ์๋ก ๋ค๋ฅธ executor์ ์ ์ฅํ ์ง ๊ฒฐ์
๐ Caching Best Practices
- ์บ์ฑ๋ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ ๋๋ ์บ์ฑ๋ ๋ณ์๋ฅผ ์ฌ์ฉํจ์ผ๋ก์จ ๋ถ๋ช ํ๊ฒ ์ฌ์ฌ์ฉํ๊ธฐ
- ์ปฌ๋ผ์ด ๋ง์ ๋ฐ์ดํฐ์ ์ ํ์ํ ์ปฌ๋ผ๋ง ์บ์ฑํ๊ธฐ
- ๋ถํ์ํ ๋๋ uncacheํ๊ธฐ
- ๋๋ก๋ ๋งค๋ฒ ์๋ก ๊ณ์ฐํ๋ ๊ฒ์ด ์บ์ฑ๋ณด๋ค ๋น ๋ฅผ์๋ ์๋ค!
- > Parquet ํฌ๋งท์ ํฐ ๋ฐ์ดํฐ์ ์ ์บ์ฑํ ๊ฒฝ์ฐ
- > ์บ์ฑ๊ฒฐ๊ณผ๊ฐ ๋๋ฌด ์ปค์ ๋ฉ๋ชจ๋ฆฌ์๋ง ์์ ์ ์๊ณ , ๋์คํฌ์๋ ์ ์ฅ๋ ๊ฒฝ์ฐ ๋ฑ๋ฑ..
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Spark Partitioning๊ณผ AQE ์์๋ณด๊ณ ์ค์ตํ๊ธฐ - TIL230725(2) (0) | 2023.07.25 |
---|---|
Spark ๊ธฐ๋ฅ๊ณผ ์ค์ผ์ค๋ง, ๋ฉ๋ชจ๋ฆฌ ๊ตฌ์ฑ ์์๋ณด๊ธฐ - TIL230724 (0) | 2023.07.25 |
๋จธ์ ๋ฌ๋์ ์ํ ํ๋ฅ ๊ธฐ์ด - TIL230719 (0) | 2023.07.24 |
๋จธ์ ๋ฌ๋์ ์ํ ๊ธฐ์ด ์ ํ ๋์ ์์๋ณด๊ธฐ - TIL230718 (0) | 2023.07.20 |
jupyter์์ ๋จธ์ ๋ฌ๋ End to End ์ค์ตํด๋ณด๊ธฐ - TIL230717 (0) | 2023.07.20 |