JUST DO IT!

Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ์‹ค์Šต - TIL230704 ๋ณธ๋ฌธ

TIL

Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ์‹ค์Šต - TIL230704

sunhokimDev 2023. 7. 5. 23:39

๐Ÿ“š KDT WEEK 14 DAY 2 TIL

  • Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ
  • Spark ๋ฐ์ดํ„ฐ๊ตฌ์กฐ

๐ŸŸฅ Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ

  • ๋น…๋ฐ์ดํ„ฐ์˜ ํšจ์œจ์  ์ฒ˜๋ฆฌ โžก๏ธ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ โžก๏ธ ๋ฐ์ดํ„ฐ์˜ ๋ถ„์‚ฐ ํ•„์š”
  • ํ•˜๋‘ก ๋งต์˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋‹จ์œ„๋Š” ๋ฐ์ดํ„ฐ ๋ธ”๋ก(128MB, ์กฐ์ ˆ๊ฐ€๋Šฅ)
  • Spark์—์„œ๋Š” ์ด ๋ฐ์ดํ„ฐ ๋ธ”๋ก์„ ํŒŒํ‹ฐ์…˜(Partition)์ด๋ผ๊ณ  ๋ถ€๋ฅธ๋‹ค.

 

์ ์ ˆํ•œ ํŒŒํ‹ฐ์…˜์˜ ์ˆ˜ : Executor์˜ ์ˆ˜ x Executor์˜ CPU์˜ ์ˆ˜ โžก๏ธ ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์ตœ๋Œ€ํ™”

 

 

โ™’ Spark ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํ๋ฆ„

๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์€ ์ž‘์€ ํŒŒํ‹ฐ์…˜๋“ค๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.

์ž…๋ ฅ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์›ํ•˜๋Š” ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์˜ฌ ๋•Œ๊นŒ์ง€ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์œผ๋กœ ๊ณ„์† ๋ณ€ํ™˜๋˜๋Š” ๊ณผ์ •์œผ๋กœ ํ๋ฅธ๋‹ค.

ex) sort, group by, filter, map, join.. ๋“ฑ์˜ ํ•จ์ˆ˜๋กœ ์ธํ•ด ๊ณ„์† ๋ณ€ํ™˜๋˜๋Š” ๊ฒƒ!

 

Data Frame and Partition

 

๐Ÿ”„๏ธ ์…”ํ”Œ๋ง

์œ„์—์„œ group by, sort ๊ฐ™์€ ์˜คํผ๋ ˆ์ด์…˜๋“ค์€ ์ƒˆ๋กœ์šด ํŒŒํ‹ฐ์…˜์ด ๋งŒ๋“ค์–ด์ง์—๋”ฐ๋ผ ํŒŒํ‹ฐ์…˜๊ฐ„์˜ ๋ฐ์ดํ„ฐ ์ด๋™ ํ•„์š”ํ•˜๋‹ค.

์ƒˆ๋กœ์šด ํŒŒํ‹ฐ์…˜์— ๋ฐ์ดํ„ฐ๋ฅผ ์ด๋™์‹œํ‚ค๋Š” ๊ณผ์ •์— ๋”ฐ๋ผ Data Skewness๊ฐ€ ๋ฐœ์ƒํ•˜๊ธฐ ์‰ฝ๋‹ค.

๋”ฐ๋ผ์„œ ์…”ํ”Œ๋ง์„ ์ตœ์†Œํ™”ํ•˜๊ณ  ํŒŒํ‹ฐ์…˜์„  ์ตœ์ ํ™”ํ•˜๋Š” ๊ฒƒ์ด ์ค‘์š”ํ•˜๋‹ค.

 

Data Skew

 


 

๐ŸŸฆ Spark ๋ฐ์ดํ„ฐ ๊ตฌ์กฐ

Spark ๋ฐ์ดํ„ฐ๋Š” Immutable(๋ถˆ๋ณ€ํ•˜๊ณ ) Distributed(๋ถ„์‚ฐ๋œ) ๋ฐ์ดํ„ฐ์ด๋‹ค.

 

๋จผ์ € ๋ฐ์ดํ„ฐ ์ข…๋ฅ˜๋กœ๋Š” RDD, DataFrame, Dataset์ด ์žˆ๋‹ค.

RDD(Resilient Distributed Dataset)

  • ๋กœ์šฐ๋ ˆ๋ฒจ ๋ฐ์ดํ„ฐ๋กœ, ํด๋Ÿฌ์Šคํ„ฐ๋‚ด์˜ ์„œ๋ฒ„์— ๋ถ„์‚ฐ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ง€์นญ
  • ๋ ˆ์ฝ”๋“œ๋ณ„๋กœ ์กด์žฌํ•˜์ง€๋งŒ ์Šคํ‚ค๋งˆ๊ฐ€ ๋”ฐ๋กœ ์กด์žฌํ•˜์ง€ ์•Š์•„ ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ๋‚˜ ๋น„๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ ๋ชจ๋‘ ์ง€์›
  • RDD๋Š” ๋‹ค์ˆ˜์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ตฌ์„ฑ๋˜๋ฉฐ, ๋กœ์šฐ๋ ˆ๋ฒจ์˜ ํ•จ์ˆ˜ํ˜• ๋ณ€ํ™˜(map, filter, flatMap ๋“ฑ)์„ ์ง€์›ํ•œ๋‹ค.
  • ์ผ๋ฐ˜ ํŒŒ์ด์ฌ ๋ฐ์ดํ„ฐ(๋ฆฌ์ŠคํŠธ)๋ฅผ parallelize์™€ collect ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด RDD๋กœ ๋ณ€ํ™˜ํ•˜๊ฑฐ๋‚˜ ๋‹ค์‹œ ๋˜๋Œ๋ฆด์ˆ˜ ์žˆ๋‹ค.

 

DataFrame๊ณผ Dataset

  • RDD์œ„์— ์กด์žฌํ•˜๋Š” ํ•˜์ด๋ ˆ๋ฒจ ๋ฐ์ดํ„ฐ๋กœ, RDD์™€๋Š” ๋‹ฌ๋ฆฌ ํ•„๋“œ ์ •๋ณด๋ฅผ ๊ฐ–๊ณ  ์žˆ์Œ(ํ…Œ์ด๋ธ”)
  • Dataset์€ ํƒ€์ž… ์ •๋ณด๊ฐ€ ์กด์žฌํ•˜์—ฌ ์ปดํŒŒ์ผ ์–ธ์–ด(Scala/Java) ์‚ฌ์šฉ๊ฐ€๋Šฅ
  • PySpark์—์„œ๋Š” DataFrame(ํŒ๋‹ค์Šค ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„๊ณผ ํก์‚ฌ) ์‚ฌ์šฉ

 

๐Ÿ—๏ธ Spark ํ”„๋กœ๊ทธ๋žจ ๊ตฌ์กฐ

 

SparkSession

  • Spark ํ”„๋กœ๊ทธ๋žจ์˜ ์‹œ์ž‘ โžก๏ธ SparkSession์„ ๋งŒ๋“œ๋Š” ๊ฒƒ(Singleton)
  • Dataframe, SQL, Streaming, ML API ๋ชจ๋‘ ์ด ๊ฐ์ฒด๋กœ ํ†ต์‹ 
  • RDD ๊ด€๋ จ๋œ ์ž‘์—…์—๋Š” sparkContext ๊ฐ์ฒด ์‚ฌ์šฉ

 

Spark Session

 

Spark Session ํ™˜๊ฒฝ ๋ณ€์ˆ˜

  • spark.executor.memory : executor๋ณ„ ๋ฉ”๋ชจ๋ฆฌ
  • spark.executor.cores : executor๋ณ„ CPU ์ˆ˜
  • spark.driver.memory : driver ๋ฉ”๋ชจ๋ฆฌ
  • spark.sql.shuffle.partitions : Shuffleํ›„ Partiton์˜ ์ˆ˜(์ตœ๋Œ€๊ฐ’)

๋“ฑ ์—ฌ๋Ÿฌ๊ฐ€์ง€๋ฅผ ์กฐ์œจํ•  ์ˆ˜ ์žˆ๊ณ , ์ด๋Š” SparkSession์„ ์ƒ์„ฑํ•˜๋ฉด์„œ ์„ค์ •ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค.

 

ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์„ค์ • ๋ฐฉ๋ฒ•

from pyspark.sql import SparkSession

# SparkSession์€ ์‹ฑ๊ธ€ํ„ด
spark = SparkSession.builder\
 .master("local[*]")\
 .appName('PySpark Tutorial')\
 .config("spark.some.config.option1", "some-value") \ # ํ™˜๊ฒฝ ์„ค์ •์ด๋ฆ„, ์„ค์ • ๊ฐ’ ํ˜•ํƒœ๋กœ ์„ค์ •
 .config("spark.some.config.option2", "some-value") \
 .getOrCreate()

 

๋˜๋Š”

 

from pyspark.sql import SparkSession
from pyspark import SparkConf

# ํ™˜๊ฒฝ ๋ณ€์ˆ˜
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")

# SparkSession
spark = SparkSession.builder\
 .config(conf=conf) \ # ์—ฌ๊ธฐ์„œ ํ™˜๊ฒฝ ๋ณ€์ˆ˜ ์ง€์ •
 .getOrCreate()

 

Spark Session์ด ์ง€์›ํ•˜๋Š” ๋ฐ์ดํ„ฐ ์†Œ์Šค

 

  • HDFS ํŒŒ์ผ(csv, json, orc, text, parquet...)
  • HIVE ํ…Œ์ด๋ธ”
  • JDBC ๊ด€๊ณ„ํ˜• DB
  • ํด๋ผ์šฐ๋“œ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์‹œ์Šคํ…œ
  • ์ŠคํŠธ๋ฆฌ๋ฐ ์‹œ์Šคํ…œ
  • ๊ทธ ์™ธ ์†Œ์Šค : https://spark.apache.org/docs/latest/sql-data-sources.html
 

Data Sources - Spark 3.4.1 Documentation

 

spark.apache.org

 

  • spark.read(DataFrameReader)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋กœ๋“œ
  • spark.write(DataFrameWriter)๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์ €์žฅ

 


 

๐ŸŸฉ Spark ์‹ค์Šต

 

์‹ค์Šต ํ™˜๊ฒฝ์€ Google Colab์—์„œ Spark Cluster Manager๋กœ local[n]์„ ์ง€์ •ํ•˜์—ฌ Local Standalone Spark์„ ์‚ฌ์šฉํ•œ๋‹ค.

์ด ํ™˜๊ฒฝ์€ ์ฃผ๋กœ ๊ฐœ๋ฐœ์ด๋‚˜ ๊ฐ„๋‹จํ•œ ํ…Œ์ŠคํŠธ ์šฉ๋„๋กœ ์‚ฌ์šฉ๋˜๋Š” ํ™˜๊ฒฝ์ด๋ฉฐ, JVM์—์„œ ๋ชจ๋“  ํ”„๋กœ์„ธ์Šค๋ฅผ ์‹คํ–‰ํ•˜๊ฒŒ ๋œ๋‹ค.

Spark Web UI๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์ ‘๊ทผ ๋ถˆ๊ฐ€ํ•˜๊ณ , Py4J๋ฅผ ์ถ”๊ฐ€๋กœ ์„ค์น˜ํ•˜์—ฌ ํŒŒ์ด์ฌ์—์„œ JVM์˜ ์ž๋ฐ” ๊ฐ์ฒด๋ฅผ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•œ๋‹ค.

 

Colab์—์„œ ์•„๋ž˜์˜ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•˜์—ฌ Pyspark์™€ py4j๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

!pip install pyspark==3.3.1 py4j==0.10.9.5

 

๋จผ์ €, Spark๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด SparkSession์„ ์ƒˆ๋กœ ํ•˜๋‚˜ ์ƒ์„ฑํ•œ๋‹ค.

SparkSession์„ ์ƒ์„ฑํ•˜๋ฉด์„œ local[*]์„ ์ง€์ •ํ•˜๋ฉด ๋œ๋‹ค.

 

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\ # LocalMode, ์ปดํ“จํ„ฐ์˜ CPU์˜ ์ˆ˜๋งŒํผ ์“ฐ๋ ˆ๋“œ๋ฅผ Executor์— ์ƒ์„ฑ
        .appName('PySpark Tutorial')\
        .getOrCreate() # SparkSession์€ Singleton ๊ฐ์ฒด๋กœ, ์—ฌ๋Ÿฌ๊ฐœ๋ฅผ ๋งŒ๋“ค์ง€ ์•Š๋Š”๋‹ค.

 

Colab์—์„œ์˜ ๊ฒฐ๊ณผ ํ™”๋ฉด

 

1. Python ๊ฐ์ฒด RDD๋กœ ๋ณ€ํ™˜ํ•˜๊ธฐ

 

๋ฐ์ดํ„ฐ๋กœ๋Š” ๋ฆฌ์ŠคํŠธ์•ˆ์— Stringํ˜•ํƒœ๋กœ json์„ ์ €์žฅํ•˜์˜€๋‹ค.

์ด ๋ฐ์ดํ„ฐ๋ฅผ parallelize ํ•จ์ˆ˜๋กœ RDD ๋ณ€ํ™˜, collect ํ•จ์ˆ˜๋กœ ๋‹ค์‹œ ๋˜๋Œ๋ฆด ์ˆ˜ ์žˆ๋‹ค.

 

์•„๋ž˜ ์ฝ”๋“œ์—์„œ๋Š” parallelize > map(json ํŒŒ์‹ฑ) > collect ๊ณผ์ •์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๊ฐ€ ๊ฐ€๊ณต๋˜๋Š” ๊ณผ์ •์„ ๊ฑฐ์น˜๊ฒŒ๋œ๋‹ค.

name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
rdd = spark.sparkContext.parallelize(name_list_json) # parallelize ํ•จ์ˆ˜๋กœ RDD ๋ณ€ํ™˜
rdd.count() # rdd์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ ๊ฐœ์ˆ˜ ์ถœ๋ ฅ

import json
parsed_rdd = rdd.map(lambda el:json.loads(el)) # json์œผ๋กœ ํŒŒ์‹ฑ
parsed_rdd.collect() # collect ํ•จ์ˆ˜๋กœ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ์Œ, json์œผ๋กœ ํŒŒ์‹ฑ๋œ ํ˜•ํƒœ๋กœ ๊ฐ€์ ธ์˜จ๋‹ค.

parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"]) # ์ด๋Ÿฐ ํ˜•ํƒœ๋„ ๊ฐ€๋Šฅ
parsed_name_rdd.collect()

 

์‹คํ–‰ ๊ฒฐ๊ณผ

 

parsed_rdd๋งŒ์œผ๋กœ๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์ถœ๋ ฅ๋˜์ง€ ์•Š๋Š” ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

.collect() ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•ด์„œ RDD์—์„œ ๋‹ค์‹œ ์ด์ชฝ์œผ๋กœ ๊ฐ€์ ธ์˜ค๋Š” ๊ณผ์ •์ด ํ•„์š”ํ•œ ๊ฒƒ์ด๋‹ค.

 

 

2. ํŒŒ์ด์ฌ ๋ฆฌ์ŠคํŠธ๋ฅผ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์œผ๋กœ ๋ณ€ํ™˜

 

.createDataFrame() ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด, ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์œผ๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

from pyspark.sql.types import StringType 

# ๋‘ ๋ฒˆ์งธ ์ธ์ž๋กœ ์Šคํ‚ค๋งˆ ์ง€์ •, ์ผ๋‹จ StringType์œผ๋กœ ์ง€์ •ํ•œ๋‹ค.
# ์ด ๊ฒฝ์šฐ, ๊ธฐ๋ณธ์ ์œผ๋กœ value๋ผ๋Š” ํ•„๋“œ๋กœ ์ƒ์„ฑ๋œ๋‹ค. (์‹คํ–‰ ๊ฒฐ๊ณผ ์ฐธ๊ณ )
df = spark.createDataFrame(name_list_json, StringType()) 

df.count() # ๋ฐ์ดํ„ฐ ๊ฐœ์ˆ˜
df.printSchema() # ์Šคํ‚ค๋งˆ ํ™•์ธ ๊ฐ€๋Šฅ
df.select('*').collect() # ๋ชจ๋“  ๋ฐ์ดํ„ฐ ํ™•์ธ

 

 

 

3. RDD๋ฅผ DataFrame์œผ๋กœ ๋ณ€ํ™˜

 

RDD ๋ฐ์ดํ„ฐ์˜ ๊ฒฝ์šฐ .toDF() ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ด ๋œ๋‹ค.

 

df_parsed_rdd = parsed_rdd.toDF() # parsed_rdd๋Š” ์•„๊นŒ json์œผ๋กœ ํŒŒ์‹ฑ๋œ ํ˜•ํƒœ๋กœ, toDF ํ•จ์ˆ˜๋กœ ๊ฐ„๋‹จํžˆ ๋ณ€ํ™˜ ๊ฐ€๋Šฅ
df_parsed_rdd.printSchema() # ์Šคํ‚ค๋งˆ ํ™•์ธ
df_parsed_rdd.select('name').collect()

 

 

4. csv ํŒŒ์ผ์„ Spark ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์œผ๋กœ ๋กœ๋“œํ•ด๋ณด๊ธฐ

 

df = spark.read.option("header", True).csv("name_gender.csv") # ํ—ค๋”๊ฐ€ ์žˆ์œผ๋ฉด ์ด๋ ‡๊ฒŒํ•ด์•ผ ์ •์ƒ์ธ์‹
df.printSchema() # ์Šคํ‚ค๋งˆ ํ™•์ธ

df.show() # ํ…Œ์ด๋ธ” ํ™•์ธ
df.head(5) # ์œ„์—์„œ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ 5๊ฐœ ํ™•์ธ

df.groupby(["gender"]).count().collect() # groupby ์‚ฌ์šฉํ•˜๊ธฐ, collect๋ฅผ ๊ผญ ์จ์•ผ ๋ณด์ธ๋‹ค.

 

 

5. ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ํ…Œ์ด๋ธ”๋ทฐ๋กœ ๋งŒ๋“ค์–ด์„œ SparkSQL ์ฒ˜๋ฆฌํ•˜๊ธฐ

 

spark.sql์„ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ”๋กœ ์ฟผ๋ฆฌ๋ฌธ์„ ์ ์šฉํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

 

df.createOrReplaceTempView("namegender") # df๋ฅผ ๋งˆ์น˜ ํ…Œ์ด๋ธ”์ฒ˜๋Ÿผ ์‚ฌ์šฉ, ์ด๋ฆ„์„ namegender๋กœ ์„ค์ •
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1") # ์ฟผ๋ฆฌ๋ฌธ์œผ๋กœ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„ ์ƒ์„ฑ
namegender_group_df.collect() # collect๋ฅผ ํ†ตํ•ด ๊ฐ€์ ธ์˜ค๊ธฐ
spark.catalog.listTables() # ํ…Œ์ด๋ธ” ๋ณด๊ธฐ

 

 

6. ํŒŒํ‹ฐ์…˜ ์ˆ˜ ๊ณ„์‚ฐํ•ด๋ณด๊ณ , ๋ฆฌํŒŒํ‹ฐ์…˜ํ•ด๋ณด๊ธฐ

 

namegender_group_df.rdd.getNumPartitions() # ํŒŒํ‹ฐ์…˜์ด ํ•˜๋‚˜์ธ๊ฑธ ํ™•์ธ
two_namegender_group_df = namegender_group_df.repartition(2) # ๋ณ„๋‹ค๋ฅธ ์ง€์ •์ด ์—†์œผ๋ฉด ๋žœ๋คํ•˜๊ฒŒ ๋‘ ๊ฐœ๋กœ ๋‚˜๋ˆ”
two_namegender_group_df.rdd.getNumPartitions() # ์ด๋ฒˆ์—๋Š” ๋‘ ๊ฐœ์ธ ๊ฒƒ์„ ํ™•์ธ