JUST DO IT!

Spark์—์„œ Parquet ๋‹ค๋ฃจ๊ธฐ ์‹ค์Šต + Execution Plan ์•Œ์•„๋ณด๊ธฐ - TIL230706 ๋ณธ๋ฌธ

TIL

Spark์—์„œ Parquet ๋‹ค๋ฃจ๊ธฐ ์‹ค์Šต + Execution Plan ์•Œ์•„๋ณด๊ธฐ - TIL230706

sunhokimDev 2023. 7. 9. 23:19

๐Ÿ“š KDT WEEK 14 DAY 4 TIL

  • Spark ํŒŒ์ผํฌ๋งท(Parquet)
  • Spark Execution Plan
  • Bucketing๊ณผ File System Partitioning

 


๐ŸŸฅ Spark ํŒŒ์ผํฌ๋งท

 

Parquet : Spark์˜ ๊ธฐ๋ณธ ํŒŒ์ผ ํฌ๋งท

 

Parquet ํŽ˜์ด์ง€ : http://parquet.incubator.apache.org/

 

Apache Parquet

The Apache Parquet Website

parquet.incubator.apache.org

Parquet์€ Structured ํฌ๋งท์œผ๋กœ, ์••์ถ•๋œ ๋ฐ”์ด๋„ˆ๋ฆฌ ํŒŒ์ผ์ด๋ฉฐ ๋‚ด๋ถ€์— ์Šคํ‚ค๋งˆ ์ •๋ณด(ํ•„๋“œ ํƒ€์ž…๊นŒ์ง€)๋ฅผ ๊ฐ€์ง„ ํŒŒ์ผ ํฌ๋งท์ด๋‹ค.

Structured ex) Parquet, AVRO, ORC, SequenceFile ...

 

๐Ÿ’ป ์‹ค์Šต(Colab)

 

์‚ฌ์šฉํ•  ํ™˜๊ฒฝ ์„ค์น˜

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

Spark Session ์ƒ์„ฑ - avro ํฌ๋งท์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด ๋”ฐ๋กœ ์ง€์ •ํ•„์š”

from pyspark.sql import *
from pyspark.sql.functions import *

# avro ํฌ๋งท์€ Spark์— ๋ณ„๋„๋กœ ์ง€์ •์ด ํ•„์š”ํ•˜๋‹ค.
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Spark Writing Demo") \
        .master("local[3]") \
        .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.1") \
        .getOrCreate()

 

๋ฐ์ดํ„ฐ๋ฅผ ์œ„ํ•œ csv ํŒŒ์ผ ๋กœ๋”ฉ

# ๊ฐ€์ง€๊ณ  ์žˆ๋˜ appl_stock.csv ํŒŒ์ผ ๋กœ๋”ฉ
df = spark.read.format("csv").load("appl_stock.csv")

 

 

csvํŒŒ์ผ์ด ๋กœ๋”ฉ๋œ df์˜ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜, ๋ ˆ์ฝ”๋“œ ๊ฐœ์ˆ˜๋ฅผ ์•Œ์•„๋ณด์ž.

 

print("Num Partitions before: " + str(df.rdd.getNumPartitions())) # ๋ช‡ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์ด ์žˆ๋Š”์ง€?
df.groupBy(spark_partition_id()).count().show() # ํŒŒํ‹ฐ์…˜๋ณ„๋กœ ๋ช‡ ๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์žˆ๋Š”์ง€?

 

  • .rdd.getNumPartitions() : ํ•ด๋‹น ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์˜ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜ ์–ป๊ธฐ
  • spark_partition_id : ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๊ตฌ์„ฑํ•˜๋Š” ํŒŒํ‹ฐ์…˜๋ณ„๋กœ ์•„์ด๋”” ๋ฆฌํ„ด

 

df๋ฅผ ์‚ฌ์šฉํ•ด์„œ df2์— ๋„ค ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜, df3์— ๋‘ ๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์„ ๊ฐ€์ง€๋„๋ก ์„ค์ •

df2 = df.repartition(4) # ๋ฆฌํŒŒํ‹ฐ์…˜ํ•ด์„œ ๋„ค ๊ฐœ๋กœ ๋งŒ๋“ค์–ด์คŒ
df3 = df2.coalesce(2) # ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜๋ฅผ ์ค„์—ฌ์คŒ(์…”ํ”Œ๋ง์„ ์ตœ์†Œํ™”ํ•˜๋Š” ๋ฐฉํ–ฅ์œผ๋กœ)

 

ํŒŒํ‹ฐ์…˜์„ ๋‚˜๋ˆด์„ ๋•Œ์˜ df, df2, df3์˜ ํŒŒํ‹ฐ์…˜ ์ •๋ณด

 

๊ทธ๋ฆฌ๊ณ  df, df2, df3๋ฅผ ๊ฐ๊ฐ avro, parquet, json์œผ๋กœ ์ €์žฅํ•ด๋ณด์ž.

 

# df : ํŒŒํ‹ฐ์…˜์ด ํ•˜๋‚˜์˜€๋˜ ๊ฒƒ
# avro๋กœ ์„ธ์ด๋ธŒ
# ์˜ต์…˜์œผ๋กœ path ์ง€์ •๊ฐ€๋Šฅ
df.write \
    .format("avro") \
    .mode("overwrite") \
    .option("path", "dataOutput/avro/") \
    .save()
    
# df2 : ํŒŒํ‹ฐ์…˜์ด ๋„ค ๊ฐœ์˜€๋˜ ๊ฒƒ
# parquet ๋กœ ์„ธ์ด๋ธŒ
df2.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "dataOutput/parquet/") \
    .save()
    
# df3 : ํŒŒํ‹ฐ์…˜์ด ๋‘ ๊ฐœ์˜€๋˜ ๊ฒƒ
# json ๋กœ ์„ธ์ด๋ธŒ
df3.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "dataOutput/json/") \
    .save()

 

๊ทธ๋ฆฌ๊ณ  ls ๋ช…๋ น์–ด ์‚ฌ์šฉํ•ด์„œ ํ™•์ธํ•ด๋ณด๋ฉด, ๊ฐ ํฌ๋งท๋ณ„๋กœ ํด๋”๊ฐ€ ์ €์žฅ๋˜๊ณ  ๊ทธ ์•ˆ์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋œ๋‹ค.

 

 

๊ฐ ๋‚ด์šฉ์„ ํ™•์ธํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

parquet ํฌ๋งท๋งŒ์˜ ํŠน์ง•์ด ๋ณด์ธ๋‹ค.

 

parquet์˜ ๊ฒฝ์šฐ snappy๊ฐ€ ๋’ค์— ๋ถ™์–ด์„œ ์••์ถ•๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ฃผ์„์—๋„ ์“ฐ์—ฌ์žˆ๋“ฏ snappy๋Š” spiltableํ•œ ์••์ถ• ํ˜•ํƒœ๋กœ, ์••์ถ•์ด ๋˜์—ˆ์–ด๋„ ๋ธ”๋ก๋‹จ์œ„๋กœ ๋‚˜๋ˆŒ ์ˆ˜ ์žˆ๋Š” ์œ ์šฉํ•œ ์••์ถ• ๋‹จ์œ„์ด๋‹ค.

 

์ด๋ฒˆ์—๋Š” Parquet ํŒŒ์ผํฌ๋งท์˜ Schema Evolution ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•ด๋ณด์ž.

 

๊ฐ๊ธฐ๋‹ค๋ฅธ ์Šคํ‚ค๋งˆ๋ฅผ ๊ฐ€์ง„ ์„ธ ๊ฐœ์˜ Parquet ํŒŒ์ผ์„ ๊ฐ€์ ธ์˜จ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜์ž.

๊ฐ parquet ํŒŒ์ผ์ด ์ €์žฅ๋˜์–ด์žˆ๊ณ , ๋ถˆ๋Ÿฌ์™€์„œ ๋ฐ์ดํ„ฐ๋ฅผ show() ํ•˜์˜€๋‹ค.

 

schema1.parquet

 

schema2.parquet

 

schema3.parquet

 

์ด๋ฏธ์ง€์—์„œ ๋ณด์ด๋“ฏ ๊ฐ parquet ํŒŒ์ผ์€ ํ•˜๋‚˜์˜ ์ปฌ๋Ÿผ์ด ์ถ”๊ฐ€๋˜๋Š” ํ˜•์‹์œผ๋กœ ์Šคํ‚ค๋งˆ๊ฐ€ ๋‹ค๋ฅด๋‹ค.

Parquet์€ Schema Evolution์„ ์ง€์›ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ด๋ฅผ ๋งˆ์น˜ ํ•˜๋‚˜์˜ ์Šคํ‚ค๋งˆ์ฒ˜๋Ÿผ ํ•œ๋ฒˆ์— ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜๊ฐ€ ์žˆ๋‹ค.

 

Schema Evolution

 

.option("mergeSchema", True) ์˜ต์…˜์„ ์‚ฌ์šฉํ•ด์„œ Schema Evolution ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜์˜€๋‹ค.

schema1์ด ์›๋ž˜ ๊ฐ€์ง€๊ณ  ์žˆ๋˜ ๋ฐ์ดํ„ฐ์ฒ˜๋Ÿผ ์ถ”๊ฐ€๋œ ์ปฌ๋Ÿผ์— ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์œผ๋ฉด NULL๋กœ ์ฑ„์›Œ์ง„๋‹ค.

 

๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋ฉด ์ด๋ ‡๋‹ค.

 


 

 

 

๐ŸŸฆ Spark์˜ Execution Plan

 

๐Ÿ“… Execution Plan : ๊ฐœ๋ฐœ์ž๊ฐ€ ์ž‘์„ฑํ•œ ์ฝ”๋“œ๋ฅผ Spark์ด ์–ด๋–ป๊ฒŒ ์‹ค์ œ๋กœ ์‹คํ–‰์„ ํ•˜๋Š”๊ฐ€?

 

Spark์—์„œ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ where, groupby, select ๋“ฑ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ๋•Œ, ๋ช…๋ น์–ด๋งˆ๋‹ค ๋ฐ”๋กœ๋ฐ”๋กœ ์ฒ˜๋ฆฌํ•˜์ง€ ์•Š๋Š”๋‹ค.

ํ•ด๋‹น ์ž‘์—…๋“ค์„ ๋ชจ์•„๋‘์—ˆ๋‹ค๊ฐ€, show()๋‚˜ collect() ๋“ฑ์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ง์ ‘ ๋ณด์—ฌ์ค˜์•ผํ•  ๋•Œ ํ•œ๊บผ๋ฒˆ์— ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋œ๋‹ค!

 

์ด๋•Œ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์— ์ง์ ‘ ์ž‘์—…์„ ํ•˜๋Š” ๋ช…๋ น์–ด๋“ค์„ Transformations๋ผ๊ณ  ํ•˜๊ณ ,

show()๋‚˜ collect()์ฒ˜๋Ÿผ ๋ณด์—ฌ์ฃผ๊ฑฐ๋‚˜ Read, Write์ฒ˜๋Ÿผ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜์–ด์•ผ ์ž‘์—…์ด ๊ฐ€๋Šฅํ•œ ๋ช…๋ น์–ด๋ฅผ Actions๋ผ๊ณ  ํ•œ๋‹ค.

 

1. Transformations

  • Narrow Dependencies : ๋…๋ฆฝ์ ์ธ Partition level ์ž‘์—… / ์…”ํ”Œ๋ง x
    • select, filter, map ๋“ฑ๋“ฑ
  • Wide Dependencies : ์…”ํ”Œ๋ง์ด ํ•„์š”ํ•œ ์ž‘์—…
    • groupby, reduceby, partitionby, repartition, coalesce ๋“ฑ๋“ฑ

2. Actions

  • Read, Write, Show, Collect : Job์„ ์‹คํ–‰์‹œ์ผœ ์‹ค์ œ ์ฝ”๋“œ๊ฐ€ ์‹คํ–‰๋˜๋Š” ๋ช…๋ น์–ด
  • Lazy Execution : Job์„ ํ•˜๋‚˜ ๋งŒ๋“ค์–ด๋‚ด๊ณ  ์ฝ”๋“œ๋ฅผ ์‹ค์ œ๋กœ ์ˆ˜ํ–‰
    • ์ตœ๋Œ€ํ•œ ๋” ๋งŽ์€ ์˜คํผ๋ ˆ์ด์…˜์„ ๋ชจ์•„๋‘์—ˆ๋‹ค๊ฐ€ ํ•œ๋ฒˆ์— ์ตœ์ ํ™”๋ฅผ ํ•ด์•ผ ๋” ํšจ์œจ์ ์ด๊ธฐ ๋•Œ๋ฌธ!

 

๐Ÿงฑ ๊ธฐ๋ณธ ๊ตฌ์กฐ : Action > Job > 1 + Stages > 1 + Tasks

 

  • Action : Job์„ ํ•˜๋‚˜ ๋งŒ๋“ค์–ด๋‚ด๊ณ , ์ฝ”๋“œ๊ฐ€ ์‹ค์ œ๋กœ ์‹คํ–‰๋จ
  • Job : ํ•˜๋‚˜ ํ˜น์€ ๊ทธ ์ด์ƒ์˜ Stage๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.
    • Stage๋Š” ์…”ํ”Œ๋ง์ด ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ ์ƒˆ๋กœ ์ƒ๊ธด๋‹ค. ex) groupby
  • Stage : DAG ํ˜•ํƒœ๋กœ ๊ตฌ์„ฑ๋œ Task๋“ค์ด ์กด์žฌํ•˜์—ฌ ๋ณ‘๋ ฌ ์‹คํ–‰๋œ๋‹ค.
  • Task : ๊ฐ€์žฅ ์ž‘์€ ์‹คํ–‰ ์œ ๋‹›์œผ๋กœ, Executor์— ์˜ํ•ด ์‹คํ–‰๋œ๋‹ค.

 

๐Ÿ’ป ์‹ค์Šต(Colab)

 

1. WordCount.py

 

from pyspark.sql import *
from pyspark.sql.functions import *

# spark.sql.adaptive.enabled : Fasle๋Š” ์ดํ•ดํ•˜๊ธฐ ์–ด๋ ค์šด ์ตœ์ ํ™”๋Š” Unable
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("SparkSchemaDemo") \
    .config("spark.sql.adaptive.enabled", False) \
    .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 3)

# ์—ฌ๊ธฐ์˜ read์˜ ๊ฒฝ์šฐ ํ—ค๋”๋ฅผ ํ™•์ธํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— JOB์„ ์‹คํ–‰ํ•˜์ง€๋Š” ์•Š์Œ
# ํ—ค๋”๋ฅผ ํ™•์ธํ•ด์•ผ ํ•  ๋•Œ๋งŒ ๋ฐ์ดํ„ฐ๋ฅผ ์ง์ ‘ ํ™•์ธํ•˜๋ฏ€๋กœ JOB์„ ์‹คํ–‰
df = spark.read.text("shakespeare.txt")

# value(๊ธฐ๋ณธ ์ปฌ๋Ÿผ๋ช…)์„ ๊ณต๋ฐฑ์„ ๊ธฐ์ค€์œผ๋กœ ๋‚˜๋ˆ„์–ด word ์ปฌ๋Ÿผ์— ๊ฐ๊ฐ ๋„ฃ๊ณ , ๊ทธ ๊ฐœ์ˆ˜๋ฅผ ์ƒˆ๋Š” SQL
df_count = df.select(explode(split(df.value, " ")).alias("word")).groupBy("word").count()

df_count.show()

input("stopping ...")

 

WordCount ์ฝ”๋“œ์˜ Execution Plan

 

Groupby ๋•Œ๋ฌธ์— ์ƒˆ๋กœ์šด Stage๊ฐ€ ์ƒ์„ฑ๋œ ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

 

 

2. JOIN.py

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

spark = SparkSession \
    .builder \
    .appName("Shuffle Join Demo") \
    .master("local[3]") \
    .config("spark.sql.shuffle.partitions", 3) \
    .config("spark.sql.adaptive.enabled", False) \
    .getOrCreate()

# json ํŒŒ์ผ์˜ read๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ์•Œ๊ธฐ ์œ„ํ•ด ์ฝ์–ด์•ผํ•˜๋ฏ€๋กœ read๋Š” ACTION์ด ๋œ๋‹ค.
df_large = spark.read.json("large_data/")
df_small = spark.read.json("small_data/")

join_expr = df_large.id == df_small.id
join_df = df_large.join(df_small, join_expr, "inner")

join_df.collect()
input("Waiting ...")

spark.stop()

 

JOIN ์ฝ”๋“œ์˜ Execution Plan

 

๊ฐ๊ฐ json์„ ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์˜ค๋Š” job์—์„œ Stage๋ฅผ ํ•˜๋‚˜์”ฉ ๊ฐ–๊ฒŒ ๋˜๊ณ , JOIN์„ ํ†ตํ•ด Stage๋ฅผ ํ•˜๋‚˜ ๋” ์ƒ์„ฑํ•˜๊ฒŒ ๋œ๋‹ค.

์œ„์˜ ์ด๋ฏธ์ง€๊ฐ€ ๊ธฐ๋ณธ์ ์ธ JOIN์˜ Execution Plan ํ˜•ํƒœ๊ฐ€ ๋œ๋‹ค.

 


 

๐ŸŸฉ Bucketing๊ณผ File System Partitioning

๋ฐ์ดํ„ฐ ์ €์žฅ์„ ์ดํ›„ ๋ฐ˜๋ณต์ฒ˜๋ฆฌ์— ์ตœ์ ํ™”๋œ ๋ฐฉ๋ฒ•์œผ๋กœ ํ•˜๋Š” ๊ฒƒ์ด ๋ชฉ์ ์œผ๋กœ, Hive ๋ฉ”ํƒ€์Šคํ† ์–ด ์‚ฌ์šฉ์ด ํ•„์š”ํ•˜๋‹ค.

 

1. Bucketing

  • DataFrame์„ ํŠน์ • ID ๊ธฐ์ค€์œผ๋กœ ๋‚˜๋ˆ ์„œ ํ…Œ์ด๋ธ”๋กœ ์ €์žฅ
  • ํ•œ๋ฒˆ์˜ ์…”ํ”Œ๋ง์ด ํ•„์š”ํ•˜๋‚˜, ๋‚˜์ค‘์— ๋‹ค์ˆ˜ ์…”ํ”Œ๋ง์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•œ ๋Œ€๋น„์ฑ…์ด ๋จ
  • ๋ฐ์ดํ„ฐ์˜ ํŠน์„ฑ์„ ์ž˜ ์•Œ๊ณ ์žˆ๋Š” ๊ฒฝ์šฐ๋„ค ์‚ฌ์šฉ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

Aggregation, Window ํ•จ์ˆ˜, JOIN์—์„œ ๋งŽ์ด ์‚ฌ์šฉ๋˜๋Š” ์ปฌ๋Ÿผ์ด ์žˆ๋Š”์ง€ ๊ฒ€์‚ฌ > ์ด ์ปฌ๋Ÿผ์„ ๊ธฐ์ค€์œผ๋กœ ํ…Œ์ด๋ธ” ์ €์žฅ

 

2. File System Partitioning = Hive์˜ Partitioning

  • ๋ฐ์ดํ„ฐ์˜ ํŠน์ • ์ปฌ๋Ÿผ๋“ค์„ ๊ธฐ์ค€์œผ๋กœ ํด๋” ๊ตฌ์กฐ๋ฅผ ๋งŒ๋“ค์–ด ๋ฐ์ดํ„ฐ ์ €์žฅ์„ ์ตœ์ ํ™” > ๋กœ๋”ฉ, ํ•„ํ„ฐ๋ง์˜ ์˜ค๋ฒ„ํ—ค๋“œ ์ค„์ž„
  • Partition Key : ํด๋” ๊ตฌ์กฐ๋ฅผ ๋งŒ๋“ค ํŠน์ • ์ปฌ๋Ÿผ
  • DataFrameWriter์˜ partitionBy ์‚ฌ์šฉ
  • Partiton Key๋Š” Cardinality๊ฐ€ ์ ์€ ์ปฌ๋Ÿผ์ด ์ข‹๋‹ค.

 

ex) ๋กœ๊ทธ ํŒŒ์ผ์€ ๋ฐ์ดํ„ฐ ์ƒ์„ฑ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜์œผ๋กœ ๋ฐ์ดํ„ฐ ์ฝ๊ธฐ๋ฅผ ๋งŽ์ด ํ•˜๊ฒŒ ๋จ.

์ด ๋กœ๊ทธ ํŒŒ์ผ์ด ๊ต‰์žฅํžˆ ํฌ๋‹ค๋ฉด ๋ฐ์ดํ„ฐ ์ž์ฒด๋ฅผ ์—ฐ๋„-์›”-์ผ์˜ ํด๋” ๊ตฌ์กฐ๋กœ ์ €์žฅํ•˜๋Š” ๊ฒƒ! > ์ฝ๊ธฐ ๊ณผ์ • ์ตœ์†Œํ™”, ๊ด€๋ฆฌ ์‰ฌ์›€