์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- Spark Caching
- redshift
- KDT_TIL
- Dag
- mysql
- DataFrame Hint
- Speculative Execution
- colab
- Spark ์ค์ต
- disk spill
- Kafka
- off heap memory
- Spark Partitioning
- aws
- Airflow
- spark executor memory
- ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
- SQL
- k8s
- etl
- Spark SQL
- Docker
- topic
- CI/CD
- AQE
- ๋น ๋ฐ์ดํฐ
- backfill
- Kubernetes
- Salting
- Spark
- Today
- Total
JUST DO IT!
Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ ์ค์ต 2 (์ปฌ๋ผ๋ช ๊ณผ ํ์ ์ถ๊ฐํ๊ธฐ + ์ ๊ทํํ์ + Pandas์ ๋น๊ต)- TIL230704 ๋ณธ๋ฌธ
Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ ์ค์ต 2 (์ปฌ๋ผ๋ช ๊ณผ ํ์ ์ถ๊ฐํ๊ธฐ + ์ ๊ทํํ์ + Pandas์ ๋น๊ต)- TIL230704
sunhokimDev 2023. 7. 6. 01:26๐ KDT WEEK 14 DAY 2 TIL
- Pandas๋ก ์ฒ๋ฆฌํ๊ธฐ
- Spark๋ก ์ฒ๋ฆฌํ๊ธฐ
- Spark ํ๊ฒฝ ์ค์ ๋ฐ csv ํ์ผ ์ฒ๋ฆฌํ๊ธฐ
- ๋ฐ์ดํฐ ํ๋ ์์ ํจ์ ์ฌ์ฉํด๋ณด๊ธฐ
- Spark SQL๋ก ์ฒ๋ฆฌํด๋ณด๊ธฐ
- ์์๋๋ฉด ์ข์ ๊ธฐ๋ฅ
์ด์ ๊ธ์์ Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ ์ค์ต์ ํด๋ณด์๋ค.
https://sunhokimdev.tistory.com/60
์ ๋ฒ๊ณผ ๊ฐ์ ํ๊ฒฝ์ ์ฌ์ฉํ๋ค.
Colab์์ ์ ๋ ฅํ ๋๋ ์ ์ผ ์์ ! ์ ๋ถ์ด๋ฉด ๋๋ค.
pip install pyspark==3.3.1 py4j==0.10.9.5
์ฌ์ฉํ ๋ฐ์ดํฐ(1800.csv)
๊ฐ๋จํ csv ํ์ผ๋ก, ํค๋๊ฐ ์๋ ํํ์ด๋ค. ์ฝ๋์์ ์ปฌ๋ผ๋ช ์ ๋ฐ๋ก ์ถ๊ฐํ ๊ฒ์ด๋ค.
Pandas ์ฒ๋ฆฌ vs Spark ์ฒ๋ฆฌ
๐ฅ Pandas๋ก ์ฒ๋ฆฌํ๋ ๊ฒฝ์ฐ
1800.csv ํ์ผ์ ๋ถ๋ฌ์์ ๋ช ๋ฒ์ ๊ณผ์ ์ ํตํด ํ์ํ ๋ฐ์ดํฐ๋ฅผ ์ป์ด๋ธ๋ค.
import pandas as pd
pd_df = pd.read_csv(
"1800.csv",
names=["stationID", "date", "measure_type", "temperature"], # ํค๋๊ฐ ์์ผ๋ฏ๋ก ์ด๋ฆ ์ง์
usecols=[0, 1, 2, 3] # ์ฒ์ ๋ค๊ฐ์ ์ปฌ๋ผ๋ง ์ฝ๋๋ค
)
pd_df.head() # ๋ฐ์ดํฐ ํ์ธ
# measure_type ์ปฌ๋ผ์ด TMIN์ธ ๋ฐ์ดํฐ๋ง ํํฐ๋ง
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"]
# ๋ ๊ฐ์ ์ปฌ๋ผ๋ง ์ ํ
pd_stationTemps = pd_minTemps[["stationID", "temperature"]]
# stationID๋ก Groupby + temperature๊ฐ ๊ฐ์ฅ ์์ ๊ฒ
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")
๐ฆ Spark๋ก ์ฒ๋ฆฌํ๊ธฐ
1) spark ํ๊ฒฝ ์ค์ ๋ฐ csv ํ์ผ ๋ก๋ํ๊ธฐ
csv ํ์ผ์ ๋ก๋ํ ๋ค, ์ปฌ๋ผ์ ์ง์ ํด์ฃผ์๋ค.
from pyspark.sql import SparkSession
from pyspark import SparkConf
# Spark ํ๊ฒฝ ์ค์
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]") # Google Colab์ CPU ์๋งํผ ํ ๋น
spark = SparkSession.builder\
.config(conf=conf)\
.getOrCreate()
# csv ํ์ผ๋ก๋ํ๊ธฐ (์ปฌ๋ผ, ํ์
์ง์ ์์ด) > ํ์
์ด ๋ชจ๋ string์ด๋จ
df = spark.read.format("csv").load("1800.csv") # spark.read.csv("1800.csv")
# csv ํ์ผ๋ก๋ํ๊ธฐ (ํ๋์ฉ ์ปฌ๋ผ์ง์ ํ๊ธฐ)
df = spark.read.format("csv")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
# .option์ผ๋ก ์คํค๋ง๋ฅผ ์ถ์ธกํ๋ผ๊ณ ํธ์ถํ๋ฉด, spark์ด ๋ ์ฝ๋ ๋ช ๊ฐ๋ฅผ ๋ณด๊ณ ์ถ์ธกํด์ ํ์
๋ฃ์ด์ค
df = spark.read.format("csv")\
.option("inferSchema", "true")\
.load("1800.csv")\
.toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
๊ฐ๊ฐ ์คํค๋ง๋ฅผ ์ถ๋ ฅํด๋ดค์ ๋, ์ปฌ๋ผ๋ช ๊ณผ ํ์ ์ด ๋ค๋ฆ์ ๋น๊ตํด๋ณด์.
ํ์ ์ ๋ช ์ํด์ฃผ์ง ์์ผ๋ฉด ๊ทธ๋ฅ String ํ์ ์ผ๋ก ๋ถ์ฌ๋ฒ๋ฆฐ๋ค.
ํ์ง๋ง csv ํ์ผ์ ๋ก๋ํ๋ ๊ณผ์ ์์ option์ผ๋ก inferSchema๋ฅผ ์ฃผ๋ฉด, ์์์ ์ถ์ธกํด์ค๋ค.
์ถ๊ฐ์ ์ธ ๋ฐฉ๋ฒ์ผ๋ก, ๋ช ์์ ์ผ๋ก ํ์ ์ ์๋ ค์ฃผ๋ ๋ฐฉ๋ฒ๋ ์๋ค.
์ด๋ ๊ฒ ๋ช ์ํด์ฃผ๋ ๋ฐฉ๋ฒ์ด ํ์คํ ๊ฒ ๊ฐ๋ค.
๋ช ์ํด์ค ๋ ์ฌ์ฉํ๋ ํ์ ์ผ๋ก๋ ๋ค์์ ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ์.
pyspark.sql.types์ ํ์ ๋ฆฌ์คํธ : https://spark.apache.org/docs/latest/sql-ref-datatypes.html
2) ๋ฐ์ดํฐ ํ๋ ์์ ํจ์ ์ฌ์ฉํด๋ณด๊ธฐ
Pandas์์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ๋์ฒ๋ผ ๋ถ๋ถ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ค.
# measure_type = "TMIN" ๊ฐ์ ์ฐพ๋ ์ธ ๊ฐ์ง ๋ฐฉ๋ฒ(๊ฒฐ๊ณผ ๋์ผ)
minTemps = df.filter(df.measure_type == "TMIN")
minTemps = df.where(df.measure_type == "TMIN")
minTemps = df.where("measure_type = 'TMIN'") # SQL where ์กฐ๊ฑด์ ๋ช
์ํ๋ฏ์ด
# stationID ์ปฌ๋ผ์ผ๋ก GroupBy + temperature์ ์ต์๊ฐ์ ํด๋นํ๋ ๋ฐ์ดํฐ ์ฐพ๊ธฐ
minTempsByStation = minTemps.groupBy("stationID").min("temperature")
# ๋ ๊ฐ์ง ์ปฌ๋ผ์ ์ง์ ํด์ SELECTํ๋ ๋ ๊ฐ์ง ๋ฐฉ๋ฒ(๊ฒฐ๊ณผ ๋์ผ)
stationTemps = minTemps[["stationID", "temperature"]]
stationTemps = minTemps.select("stationID", "temperature")
# ๊ฒฐ๊ณผ ์ถ๋ ฅํด๋ณด๊ธฐ
results = minTempsByStation.collect()
for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))
3) Spark SQL๋ก ์ฒ๋ฆฌํ๊ธฐ
๊ฐ๋จํ๊ฒ SQL ์ฟผ๋ฆฌ๋ฌธ์ ์ฌ์ฉํด์, ์ด๋ ต์ง ์๊ฒ ์ฒ๋ฆฌํ ์ ์๋ค.
4) ์์๋๋ฉด ์ข์ ๊ธฐ๋ฅ!
a) agg ํจ์ ์ปฌ๋ผ์ ์ด๋ฆ ์ง์ ํ๊ธฐ(withColumnRenamed, sql.functions)
# ๋ค์ withColumnRenamed ์ฌ์ฉ
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")
# sql.functions ์ฌ์ฉํด์ ๊ฐ๋จํ๊ฒ ์ฌ๋ฌ ๊ฐ ๋ช
์ํ๊ธฐ
import pyspark.sql.functions as f
df.groupBy("cust_id") \
.agg(
f.sum('amount_spent').alias('sum'),
f.max('amount_spent').alias('max'),
f.avg('amount_spent').alias('avg')).collect()
b) ํ ์คํธ ํ์ผ ๋ถ๋ฌ์์ Spark๋ก regex(์ ๊ทํํ์) ์ ์ฉํด์ ์ถ์ถํด๋ณด๊ธฐ
transfer_cost.txt(์ผ๋ถ)
On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today
# Spark ์ค์ ์ ์ ์ค์ต๊ณผ ๋์ผ
import pyspark.sql.functions as F
from pyspark.sql.types import *
schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt") # txt ํ์ผ์ด๋ฏ๋ก ,text ์ฌ์ฉ
transfer_cost_df.show(truncate=False) # truncate=False ์ฌ์ฉํด์ ์๋ฆฌ๋ ๊ฑฐ ์์ด ์ผ๋ถ(20๊ฐ)๋ฅผ ์ถ๋ ฅ
# regex ์ ์ฉํด์ text ๋ด์ฉ์ ๊ฐ ์ปฌ๋ผ์ผ๋ก ๋ฃ๊ธฐ
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'
# withColumn("์ถ๊ฐํ๊ฑฐ๋ ์กด์ฌํ๋ ์ปฌ๋ผ์ด๋ฆ", "์ฑ์์ง ๊ฐ")
# regexp_extract("์ถ์ถํ ์ปฌ๋ผ", "ํ์
", ๋งค์นญ๋๋ ๊ฒ์ค ๋ช ๋ฒ์งธ์ธ์ง(1๋ถํฐ์์))
df_with_new_columns = transfer_cost_df\
.withColumn('week', regexp_extract('text', regex_str, 1))\
.withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
.withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
.withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
.withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
final_df = df_with_new_columns.drop("text") # ๊ธฐ์กด์ text ์ปฌ๋ผ์ drop
c) ๋ด์ฉ์ csv๊ณผ json์ผ๋ก ์ ์ฅํด๋ณด๊ธฐ
# ๋ก์ปฌ์ csv ํ์ผ๋ก ์ ์ฅํ์ง๋ง, ํด๋๋ก ์ ์ฅ๋จ
final_df.write.csv("extracted.csv")
# json ํ์ผ๋ก ์ ์ฅํ์ง๋ง, ํด๋๋ก ์ ์ฅ๋จ
final_df.write.format("json").save("extracted.json")
์๋ ๋น ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ฉ์ด๋ฏ๋ก, ํด๋๋ก ์ ์ฅ๋์ด ๋ฐ์ดํฐ ๋ธ๋ก๋จ์๋ก ํด๋ ์์ ๋๋์ด ์ ์ฅ๋๋ค.
์ค์ต ๋ฐ์ดํฐ๋ ๊ทธ๋ ๊ฒ ํฌ์ง ์์์ ํ๋๋ก๋ง ์ ์ฅ๋์๊ณ , ๊ทธ ์์ ํ์ธํด๋ณด๋ฉด ์ ์ ์ฅ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
'TIL' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Spark UDF์ explode ๊ธฐ๋ฅ Colab์์ ์ค์ตํ๊ธฐ - TIL230705 (0) | 2023.07.09 |
---|---|
Colab์์ Spark SQL ๊ฐ๋จ ์ค์ตํด๋ณด๊ธฐ(+ Hive ๋ฉํ์คํ ์ด) - TIL230705 (0) | 2023.07.09 |
Spark ๋ฐ์ดํฐ์ฒ๋ฆฌ ์ค์ต - TIL230704 (0) | 2023.07.05 |
๋น ๋ฐ์ดํฐ์ Spark ์์๋ณด๊ธฐ - TIL230703 (0) | 2023.07.04 |
๋ฐ์ดํฐ ์นดํ๋ก๊ทธ - TIL230623(2) (0) | 2023.06.23 |