์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- aws
- Spark Partitioning
- Kafka
- Airflow
- Docker
- AQE
- SQL
- Speculative Execution
- colab
- disk spill
- off heap memory
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- Spark
- Spark ์ค์ต
- Salting
- Kubernetes
- etl
- mysql
- ๋น ๋ฐ์ดํฐ
- Spark Caching
- backfill
- topic
- Spark SQL
- spark executor memory
- CI/CD
- DataFrame Hint
- KDT_TIL
- Dag
- k8s
- redshift
- Today
- Total
JUST DO IT!
Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ ์ค์ต - TIL230704 ๋ณธ๋ฌธ
๐ KDT WEEK 14 DAY 2 TIL
- Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ
- Spark ๋ฐ์ดํฐ๊ตฌ์กฐ
๐ฅ Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ
- ๋น ๋ฐ์ดํฐ์ ํจ์จ์ ์ฒ๋ฆฌ โก๏ธ ๋ณ๋ ฌ์ฒ๋ฆฌ โก๏ธ ๋ฐ์ดํฐ์ ๋ถ์ฐ ํ์
- ํ๋ก ๋งต์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋จ์๋ ๋ฐ์ดํฐ ๋ธ๋ก(128MB, ์กฐ์ ๊ฐ๋ฅ)
- Spark์์๋ ์ด ๋ฐ์ดํฐ ๋ธ๋ก์ ํํฐ์ (Partition)์ด๋ผ๊ณ ๋ถ๋ฅธ๋ค.
์ ์ ํ ํํฐ์ ์ ์ : Executor์ ์ x Executor์ CPU์ ์ โก๏ธ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ต๋ํ
โ Spark ๋ฐ์ดํฐ ์ฒ๋ฆฌ ํ๋ฆ
๋ฐ์ดํฐํ๋ ์์ ์์ ํํฐ์ ๋ค๋ก ๊ตฌ์ฑ๋๋ค.
์ ๋ ฅ ๋ฐ์ดํฐํ๋ ์์ ์ํ๋ ๊ฒฐ๊ณผ๊ฐ ๋์ฌ ๋๊น์ง ๋ค๋ฅธ ๋ฐ์ดํฐ ํ๋ ์์ผ๋ก ๊ณ์ ๋ณํ๋๋ ๊ณผ์ ์ผ๋ก ํ๋ฅธ๋ค.
ex) sort, group by, filter, map, join.. ๋ฑ์ ํจ์๋ก ์ธํด ๊ณ์ ๋ณํ๋๋ ๊ฒ!
๐๏ธ ์ ํ๋ง
์์์ group by, sort ๊ฐ์ ์คํผ๋ ์ด์ ๋ค์ ์๋ก์ด ํํฐ์ ์ด ๋ง๋ค์ด์ง์๋ฐ๋ผ ํํฐ์ ๊ฐ์ ๋ฐ์ดํฐ ์ด๋ ํ์ํ๋ค.
์๋ก์ด ํํฐ์ ์ ๋ฐ์ดํฐ๋ฅผ ์ด๋์ํค๋ ๊ณผ์ ์ ๋ฐ๋ผ Data Skewness๊ฐ ๋ฐ์ํ๊ธฐ ์ฝ๋ค.
๋ฐ๋ผ์ ์ ํ๋ง์ ์ต์ํํ๊ณ ํํฐ์ ์ ์ต์ ํํ๋ ๊ฒ์ด ์ค์ํ๋ค.
๐ฆ Spark ๋ฐ์ดํฐ ๊ตฌ์กฐ
Spark ๋ฐ์ดํฐ๋ Immutable(๋ถ๋ณํ๊ณ ) Distributed(๋ถ์ฐ๋) ๋ฐ์ดํฐ์ด๋ค.
๋จผ์ ๋ฐ์ดํฐ ์ข ๋ฅ๋ก๋ RDD, DataFrame, Dataset์ด ์๋ค.
RDD(Resilient Distributed Dataset)
- ๋ก์ฐ๋ ๋ฒจ ๋ฐ์ดํฐ๋ก, ํด๋ฌ์คํฐ๋ด์ ์๋ฒ์ ๋ถ์ฐ๋ ๋ฐ์ดํฐ๋ฅผ ์ง์นญ
- ๋ ์ฝ๋๋ณ๋ก ์กด์ฌํ์ง๋ง ์คํค๋ง๊ฐ ๋ฐ๋ก ์กด์ฌํ์ง ์์ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ๋ ๋น๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ ๋ชจ๋ ์ง์
- RDD๋ ๋ค์์ ํํฐ์ ์ผ๋ก ๊ตฌ์ฑ๋๋ฉฐ, ๋ก์ฐ๋ ๋ฒจ์ ํจ์ํ ๋ณํ(map, filter, flatMap ๋ฑ)์ ์ง์ํ๋ค.
- ์ผ๋ฐ ํ์ด์ฌ ๋ฐ์ดํฐ(๋ฆฌ์คํธ)๋ฅผ parallelize์ collect ํจ์๋ฅผ ํตํด RDD๋ก ๋ณํํ๊ฑฐ๋ ๋ค์ ๋๋๋ฆด์ ์๋ค.
DataFrame๊ณผ Dataset
- RDD์์ ์กด์ฌํ๋ ํ์ด๋ ๋ฒจ ๋ฐ์ดํฐ๋ก, RDD์๋ ๋ฌ๋ฆฌ ํ๋ ์ ๋ณด๋ฅผ ๊ฐ๊ณ ์์(ํ ์ด๋ธ)
- Dataset์ ํ์ ์ ๋ณด๊ฐ ์กด์ฌํ์ฌ ์ปดํ์ผ ์ธ์ด(Scala/Java) ์ฌ์ฉ๊ฐ๋ฅ
- PySpark์์๋ DataFrame(ํ๋ค์ค ๋ฐ์ดํฐ ํ๋ ์๊ณผ ํก์ฌ) ์ฌ์ฉ
๐๏ธ Spark ํ๋ก๊ทธ๋จ ๊ตฌ์กฐ
SparkSession
- Spark ํ๋ก๊ทธ๋จ์ ์์ โก๏ธ SparkSession์ ๋ง๋๋ ๊ฒ(Singleton)
- Dataframe, SQL, Streaming, ML API ๋ชจ๋ ์ด ๊ฐ์ฒด๋ก ํต์
- RDD ๊ด๋ จ๋ ์์ ์๋ sparkContext ๊ฐ์ฒด ์ฌ์ฉ
Spark Session ํ๊ฒฝ ๋ณ์
- spark.executor.memory : executor๋ณ ๋ฉ๋ชจ๋ฆฌ
- spark.executor.cores : executor๋ณ CPU ์
- spark.driver.memory : driver ๋ฉ๋ชจ๋ฆฌ
- spark.sql.shuffle.partitions : Shuffleํ Partiton์ ์(์ต๋๊ฐ)
๋ฑ ์ฌ๋ฌ๊ฐ์ง๋ฅผ ์กฐ์จํ ์ ์๊ณ , ์ด๋ SparkSession์ ์์ฑํ๋ฉด์ ์ค์ ํด์ค ์ ์๋ค.
ํ๊ฒฝ ๋ณ์ ์ค์ ๋ฐฉ๋ฒ
from pyspark.sql import SparkSession
# SparkSession์ ์ฑ๊ธํด
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Tutorial')\
.config("spark.some.config.option1", "some-value") \ # ํ๊ฒฝ ์ค์ ์ด๋ฆ, ์ค์ ๊ฐ ํํ๋ก ์ค์
.config("spark.some.config.option2", "some-value") \
.getOrCreate()
๋๋
from pyspark.sql import SparkSession
from pyspark import SparkConf
# ํ๊ฒฝ ๋ณ์
conf = SparkConf()
conf.set("spark.app.name", "PySpark Tutorial")
conf.set("spark.master", "local[*]")
# SparkSession
spark = SparkSession.builder\
.config(conf=conf) \ # ์ฌ๊ธฐ์ ํ๊ฒฝ ๋ณ์ ์ง์
.getOrCreate()
Spark Session์ด ์ง์ํ๋ ๋ฐ์ดํฐ ์์ค
- HDFS ํ์ผ(csv, json, orc, text, parquet...)
- HIVE ํ ์ด๋ธ
- JDBC ๊ด๊ณํ DB
- ํด๋ผ์ฐ๋ ๊ธฐ๋ฐ ๋ฐ์ดํฐ ์์คํ
- ์คํธ๋ฆฌ๋ฐ ์์คํ
- ๊ทธ ์ธ ์์ค : https://spark.apache.org/docs/latest/sql-data-sources.html
- spark.read(DataFrameReader)๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐํ๋ ์์ผ๋ก ๋ก๋
- spark.write(DataFrameWriter)๋ฅผ ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐํ๋ ์์ ์ ์ฅ
๐ฉ Spark ์ค์ต
์ค์ต ํ๊ฒฝ์ Google Colab์์ Spark Cluster Manager๋ก local[n]์ ์ง์ ํ์ฌ Local Standalone Spark์ ์ฌ์ฉํ๋ค.
์ด ํ๊ฒฝ์ ์ฃผ๋ก ๊ฐ๋ฐ์ด๋ ๊ฐ๋จํ ํ ์คํธ ์ฉ๋๋ก ์ฌ์ฉ๋๋ ํ๊ฒฝ์ด๋ฉฐ, JVM์์ ๋ชจ๋ ํ๋ก์ธ์ค๋ฅผ ์คํํ๊ฒ ๋๋ค.
Spark Web UI๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ ๊ทผ ๋ถ๊ฐํ๊ณ , Py4J๋ฅผ ์ถ๊ฐ๋ก ์ค์นํ์ฌ ํ์ด์ฌ์์ JVM์ ์๋ฐ ๊ฐ์ฒด๋ฅผ ์ฌ์ฉ ๊ฐ๋ฅํ๊ฒ ํ๋ค.
Colab์์ ์๋์ ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ์ฌ Pyspark์ py4j๋ฅผ ์ฌ์ฉํ๋ค.
!pip install pyspark==3.3.1 py4j==0.10.9.5
๋จผ์ , Spark๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด SparkSession์ ์๋ก ํ๋ ์์ฑํ๋ค.
SparkSession์ ์์ฑํ๋ฉด์ local[*]์ ์ง์ ํ๋ฉด ๋๋ค.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\ # LocalMode, ์ปดํจํฐ์ CPU์ ์๋งํผ ์ฐ๋ ๋๋ฅผ Executor์ ์์ฑ
.appName('PySpark Tutorial')\
.getOrCreate() # SparkSession์ Singleton ๊ฐ์ฒด๋ก, ์ฌ๋ฌ๊ฐ๋ฅผ ๋ง๋ค์ง ์๋๋ค.
1. Python ๊ฐ์ฒด RDD๋ก ๋ณํํ๊ธฐ
๋ฐ์ดํฐ๋ก๋ ๋ฆฌ์คํธ์์ Stringํํ๋ก json์ ์ ์ฅํ์๋ค.
์ด ๋ฐ์ดํฐ๋ฅผ parallelize ํจ์๋ก RDD ๋ณํ, collect ํจ์๋ก ๋ค์ ๋๋๋ฆด ์ ์๋ค.
์๋ ์ฝ๋์์๋ parallelize > map(json ํ์ฑ) > collect ๊ณผ์ ์ ํตํด ๋ฐ์ดํฐ๊ฐ ๊ฐ๊ณต๋๋ ๊ณผ์ ์ ๊ฑฐ์น๊ฒ๋๋ค.
name_list_json = [ '{"name": "keeyong"}', '{"name": "benjamin"}', '{"name": "claire"}' ]
rdd = spark.sparkContext.parallelize(name_list_json) # parallelize ํจ์๋ก RDD ๋ณํ
rdd.count() # rdd์ ์ ์ฅ๋ ๋ฐ์ดํฐ ๊ฐ์ ์ถ๋ ฅ
import json
parsed_rdd = rdd.map(lambda el:json.loads(el)) # json์ผ๋ก ํ์ฑ
parsed_rdd.collect() # collect ํจ์๋ก ๊ฐ์ ธ์ฌ ์ ์์, json์ผ๋ก ํ์ฑ๋ ํํ๋ก ๊ฐ์ ธ์จ๋ค.
parsed_name_rdd = rdd.map(lambda el:json.loads(el)["name"]) # ์ด๋ฐ ํํ๋ ๊ฐ๋ฅ
parsed_name_rdd.collect()
์คํ ๊ฒฐ๊ณผ
parsed_rdd๋ง์ผ๋ก๋ ๋ฐ์ดํฐ๊ฐ ์ถ๋ ฅ๋์ง ์๋ ๊ฑธ ๋ณผ ์ ์๋ค.
.collect() ํจ์๋ฅผ ์ฌ์ฉํด์ RDD์์ ๋ค์ ์ด์ชฝ์ผ๋ก ๊ฐ์ ธ์ค๋ ๊ณผ์ ์ด ํ์ํ ๊ฒ์ด๋ค.
2. ํ์ด์ฌ ๋ฆฌ์คํธ๋ฅผ ๋ฐ์ดํฐ ํ๋ ์์ผ๋ก ๋ณํ
.createDataFrame() ํจ์๋ฅผ ์ฌ์ฉํ๋ฉด, ๋ฐ์ดํฐํ๋ ์์ผ๋ก ๋ณํํ ์ ์๋ค.
from pyspark.sql.types import StringType
# ๋ ๋ฒ์งธ ์ธ์๋ก ์คํค๋ง ์ง์ , ์ผ๋จ StringType์ผ๋ก ์ง์ ํ๋ค.
# ์ด ๊ฒฝ์ฐ, ๊ธฐ๋ณธ์ ์ผ๋ก value๋ผ๋ ํ๋๋ก ์์ฑ๋๋ค. (์คํ ๊ฒฐ๊ณผ ์ฐธ๊ณ )
df = spark.createDataFrame(name_list_json, StringType())
df.count() # ๋ฐ์ดํฐ ๊ฐ์
df.printSchema() # ์คํค๋ง ํ์ธ ๊ฐ๋ฅ
df.select('*').collect() # ๋ชจ๋ ๋ฐ์ดํฐ ํ์ธ
3. RDD๋ฅผ DataFrame์ผ๋ก ๋ณํ
RDD ๋ฐ์ดํฐ์ ๊ฒฝ์ฐ .toDF() ํจ์๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฐ์ดํฐํ๋ ์์ด ๋๋ค.
df_parsed_rdd = parsed_rdd.toDF() # parsed_rdd๋ ์๊น json์ผ๋ก ํ์ฑ๋ ํํ๋ก, toDF ํจ์๋ก ๊ฐ๋จํ ๋ณํ ๊ฐ๋ฅ
df_parsed_rdd.printSchema() # ์คํค๋ง ํ์ธ
df_parsed_rdd.select('name').collect()
4. csv ํ์ผ์ Spark ๋ฐ์ดํฐ ํ๋ ์์ผ๋ก ๋ก๋ํด๋ณด๊ธฐ
df = spark.read.option("header", True).csv("name_gender.csv") # ํค๋๊ฐ ์์ผ๋ฉด ์ด๋ ๊ฒํด์ผ ์ ์์ธ์
df.printSchema() # ์คํค๋ง ํ์ธ
df.show() # ํ
์ด๋ธ ํ์ธ
df.head(5) # ์์์๋ถํฐ ๋ฐ์ดํฐ 5๊ฐ ํ์ธ
df.groupby(["gender"]).count().collect() # groupby ์ฌ์ฉํ๊ธฐ, collect๋ฅผ ๊ผญ ์จ์ผ ๋ณด์ธ๋ค.
5. ๋ฐ์ดํฐํ๋ ์์ ํ ์ด๋ธ๋ทฐ๋ก ๋ง๋ค์ด์ SparkSQL ์ฒ๋ฆฌํ๊ธฐ
spark.sql์ ์ฌ์ฉํ๋ฉด ๋ฐ๋ก ์ฟผ๋ฆฌ๋ฌธ์ ์ ์ฉํ ์๋ ์๋ค.
df.createOrReplaceTempView("namegender") # df๋ฅผ ๋ง์น ํ
์ด๋ธ์ฒ๋ผ ์ฌ์ฉ, ์ด๋ฆ์ namegender๋ก ์ค์
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1") # ์ฟผ๋ฆฌ๋ฌธ์ผ๋ก ๋ฐ์ดํฐ ํ๋ ์ ์์ฑ
namegender_group_df.collect() # collect๋ฅผ ํตํด ๊ฐ์ ธ์ค๊ธฐ
spark.catalog.listTables() # ํ
์ด๋ธ ๋ณด๊ธฐ
6. ํํฐ์ ์ ๊ณ์ฐํด๋ณด๊ณ , ๋ฆฌํํฐ์ ํด๋ณด๊ธฐ
namegender_group_df.rdd.getNumPartitions() # ํํฐ์
์ด ํ๋์ธ๊ฑธ ํ์ธ
two_namegender_group_df = namegender_group_df.repartition(2) # ๋ณ๋ค๋ฅธ ์ง์ ์ด ์์ผ๋ฉด ๋๋คํ๊ฒ ๋ ๊ฐ๋ก ๋๋
two_namegender_group_df.rdd.getNumPartitions() # ์ด๋ฒ์๋ ๋ ๊ฐ์ธ ๊ฒ์ ํ์ธ