JUST DO IT!

Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ์‹ค์Šต 2 (์ปฌ๋Ÿผ๋ช…๊ณผ ํƒ€์ž… ์ถ”๊ฐ€ํ•˜๊ธฐ + ์ •๊ทœํ‘œํ˜„์‹ + Pandas์™€ ๋น„๊ต)- TIL230704 ๋ณธ๋ฌธ

TIL

Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ์‹ค์Šต 2 (์ปฌ๋Ÿผ๋ช…๊ณผ ํƒ€์ž… ์ถ”๊ฐ€ํ•˜๊ธฐ + ์ •๊ทœํ‘œํ˜„์‹ + Pandas์™€ ๋น„๊ต)- TIL230704

sunhokimDev 2023. 7. 6. 01:26

๐Ÿ“š KDT WEEK 14 DAY 2 TIL

  • Pandas๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ
  • Spark๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ
    • Spark ํ™˜๊ฒฝ ์„ค์ • ๋ฐ csv ํŒŒ์ผ ์ฒ˜๋ฆฌํ•˜๊ธฐ
    • ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์— ํ•จ์ˆ˜ ์‚ฌ์šฉํ•ด๋ณด๊ธฐ
    • Spark SQL๋กœ ์ฒ˜๋ฆฌํ•ด๋ณด๊ธฐ
    • ์•Œ์•„๋‘๋ฉด ์ข‹์€ ๊ธฐ๋Šฅ

 

 


 

์ด์ „ ๊ธ€์—์„œ Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ์‹ค์Šต์„ ํ•ด๋ณด์•˜๋‹ค.

https://sunhokimdev.tistory.com/60

 

Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ์‹ค์Šต - TIL230704

๐Ÿ“š KDT WEEK 14 DAY 2 TIL Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ Spark ๋ฐ์ดํ„ฐ๊ตฌ์กฐ ๐ŸŸฅ Spark ๋ฐ์ดํ„ฐ์ฒ˜๋ฆฌ ๋น…๋ฐ์ดํ„ฐ์˜ ํšจ์œจ์  ์ฒ˜๋ฆฌ โžก๏ธ ๋ณ‘๋ ฌ์ฒ˜๋ฆฌ โžก๏ธ ๋ฐ์ดํ„ฐ์˜ ๋ถ„์‚ฐ ํ•„์š” ํ•˜๋‘ก ๋งต์˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋‹จ์œ„๋Š” ๋ฐ์ดํ„ฐ ๋ธ”๋ก(128MB, ์กฐ์ ˆ

sunhokimdev.tistory.com

 

์ €๋ฒˆ๊ณผ ๊ฐ™์€ ํ™˜๊ฒฝ์„ ์‚ฌ์šฉํ•œ๋‹ค.

Colab์—์„œ ์ž…๋ ฅํ•  ๋•Œ๋Š” ์ œ์ผ ์•ž์— ! ์„ ๋ถ™์ด๋ฉด ๋œ๋‹ค.

pip install pyspark==3.3.1 py4j==0.10.9.5

 

์‚ฌ์šฉํ•œ ๋ฐ์ดํ„ฐ(1800.csv)

 

๊ฐ„๋‹จํ•œ csv ํŒŒ์ผ๋กœ, ํ—ค๋”๊ฐ€ ์—†๋Š” ํ˜•ํƒœ์ด๋‹ค. ์ฝ”๋“œ์—์„œ ์ปฌ๋Ÿผ๋ช…์„ ๋”ฐ๋กœ ์ถ”๊ฐ€ํ•  ๊ฒƒ์ด๋‹ค.

 

 


 

Pandas ์ฒ˜๋ฆฌ vs Spark ์ฒ˜๋ฆฌ

 

๐ŸŸฅ Pandas๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒฝ์šฐ

 

1800.csv ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์™€์„œ ๋ช‡ ๋ฒˆ์˜ ๊ณผ์ •์„ ํ†ตํ•ด ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์–ป์–ด๋‚ธ๋‹ค.

 

import pandas as pd 

pd_df = pd.read_csv(
    "1800.csv",
    names=["stationID", "date", "measure_type", "temperature"], # ํ—ค๋”๊ฐ€ ์—†์œผ๋ฏ€๋กœ ์ด๋ฆ„ ์ง€์ •
    usecols=[0, 1, 2, 3] # ์ฒ˜์Œ ๋„ค๊ฐœ์˜ ์ปฌ๋Ÿผ๋งŒ ์ฝ๋Š”๋‹ค
)

pd_df.head() # ๋ฐ์ดํ„ฐ ํ™•์ธ

# measure_type ์ปฌ๋Ÿผ์ด TMIN์ธ ๋ฐ์ดํ„ฐ๋งŒ ํ•„ํ„ฐ๋ง
pd_minTemps = pd_df[pd_df['measure_type'] == "TMIN"] 

# ๋‘ ๊ฐœ์˜ ์ปฌ๋Ÿผ๋งŒ ์„ ํƒ
pd_stationTemps = pd_minTemps[["stationID", "temperature"]] 

# stationID๋กœ Groupby + temperature๊ฐ€ ๊ฐ€์žฅ ์ž‘์€ ๊ฒƒ
pd_minTempsByStation = pd_stationTemps.groupby(["stationID"]).min("temperature")

 

์ˆ˜ํ–‰ ๊ฒฐ๊ณผ

 


 

๐ŸŸฆ Spark๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ

 

1) spark ํ™˜๊ฒฝ ์„ค์ • ๋ฐ csv ํŒŒ์ผ ๋กœ๋“œํ•˜๊ธฐ

 

csv ํŒŒ์ผ์„ ๋กœ๋“œํ•œ ๋’ค, ์ปฌ๋Ÿผ์„ ์ง€์ •ํ•ด์ฃผ์—ˆ๋‹ค.

from pyspark.sql import SparkSession
from pyspark import SparkConf

# Spark ํ™˜๊ฒฝ ์„ค์ •
conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #1")
conf.set("spark.master", "local[*]") # Google Colab์˜ CPU ์ˆ˜๋งŒํผ ํ• ๋‹น

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()
        
# csv ํŒŒ์ผ๋กœ๋“œํ•˜๊ธฐ (์ปฌ๋Ÿผ, ํƒ€์ž…์ง€์ • ์—†์ด) > ํƒ€์ž…์ด ๋ชจ๋‘ string์ด๋จ
df = spark.read.format("csv").load("1800.csv") # spark.read.csv("1800.csv")

# csv ํŒŒ์ผ๋กœ๋“œํ•˜๊ธฐ (ํ•˜๋‚˜์”ฉ ์ปฌ๋Ÿผ์ง€์ •ํ•˜๊ธฐ)
df = spark.read.format("csv")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")
    
# .option์œผ๋กœ ์Šคํ‚ค๋งˆ๋ฅผ ์ถ”์ธกํ•˜๋ผ๊ณ  ํ˜ธ์ถœํ•˜๋ฉด, spark์ด ๋ ˆ์ฝ”๋“œ ๋ช‡ ๊ฐœ๋ฅผ ๋ณด๊ณ  ์ถ”์ธกํ•ด์„œ ํƒ€์ž… ๋„ฃ์–ด์คŒ
df = spark.read.format("csv")\
    .option("inferSchema", "true")\
    .load("1800.csv")\
    .toDF("stationID", "date", "measure_type", "temperature", "_c4", "_c5", "_c6", "_c7")

 

๊ฐ ์ปฌ๋Ÿผ๋ณ„ ํƒ€์ž…์— ์ง‘์ค‘ํ•ด๋ณด์ž.

 

๊ฐ๊ฐ ์Šคํ‚ค๋งˆ๋ฅผ ์ถœ๋ ฅํ•ด๋ดค์„ ๋•Œ, ์ปฌ๋Ÿผ๋ช…๊ณผ ํƒ€์ž…์ด ๋‹ค๋ฆ„์„ ๋น„๊ตํ•ด๋ณด์ž. 

ํƒ€์ž…์„ ๋ช…์‹œํ•ด์ฃผ์ง€ ์•Š์œผ๋ฉด ๊ทธ๋ƒฅ String ํƒ€์ž…์œผ๋กœ ๋ถ™์—ฌ๋ฒ„๋ฆฐ๋‹ค.

ํ•˜์ง€๋งŒ csv ํŒŒ์ผ์„ ๋กœ๋“œํ•˜๋Š” ๊ณผ์ •์—์„œ option์œผ๋กœ inferSchema๋ฅผ ์ฃผ๋ฉด, ์•Œ์•„์„œ ์ถ”์ธกํ•ด์ค€๋‹ค.

 

์ถ”๊ฐ€์ ์ธ ๋ฐฉ๋ฒ•์œผ๋กœ, ๋ช…์‹œ์ ์œผ๋กœ ํƒ€์ž…์„ ์•Œ๋ ค์ฃผ๋Š” ๋ฐฉ๋ฒ•๋„ ์žˆ๋‹ค.

 

 

์ด๋ ‡๊ฒŒ ๋ช…์‹œํ•ด์ฃผ๋Š” ๋ฐฉ๋ฒ•์ด ํ™•์‹คํ•œ ๊ฒƒ ๊ฐ™๋‹ค.

 

๋ช…์‹œํ•ด์ค„ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ํƒ€์ž…์œผ๋กœ๋Š” ๋‹ค์Œ์˜ ๋งํฌ๋ฅผ ์ฐธ๊ณ ํ•˜์ž.

pyspark.sql.types์˜ ํƒ€์ž… ๋ฆฌ์ŠคํŠธ : https://spark.apache.org/docs/latest/sql-ref-datatypes.html

 

Data Types - Spark 3.4.1 Documentation

 

spark.apache.org

 

 

2) ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์— ํ•จ์ˆ˜ ์‚ฌ์šฉํ•ด๋ณด๊ธฐ

 

Pandas์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋•Œ์ฒ˜๋Ÿผ ๋ถ€๋ถ„์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.

# measure_type = "TMIN" ๊ฐ’์„ ์ฐพ๋Š” ์„ธ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•(๊ฒฐ๊ณผ ๋™์ผ)
minTemps = df.filter(df.measure_type == "TMIN")
minTemps = df.where(df.measure_type == "TMIN")
minTemps = df.where("measure_type = 'TMIN'") # SQL where ์กฐ๊ฑด์ ˆ ๋ช…์‹œํ•˜๋“ฏ์ด

# stationID ์ปฌ๋Ÿผ์œผ๋กœ GroupBy + temperature์˜ ์ตœ์†Ÿ๊ฐ’์— ํ•ด๋‹นํ•˜๋Š” ๋ฐ์ดํ„ฐ ์ฐพ๊ธฐ
minTempsByStation = minTemps.groupBy("stationID").min("temperature")

# ๋‘ ๊ฐ€์ง€ ์ปฌ๋Ÿผ์„ ์ง€์ •ํ•ด์„œ SELECTํ•˜๋Š” ๋‘ ๊ฐ€์ง€ ๋ฐฉ๋ฒ•(๊ฒฐ๊ณผ ๋™์ผ)
stationTemps = minTemps[["stationID", "temperature"]]
stationTemps = minTemps.select("stationID", "temperature")

# ๊ฒฐ๊ณผ ์ถœ๋ ฅํ•ด๋ณด๊ธฐ
results = minTempsByStation.collect()
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

 

 

๊ฒฐ๊ณผ

 

3) Spark SQL๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ

 

๊ฐ„๋‹จํ•˜๊ฒŒ SQL ์ฟผ๋ฆฌ๋ฌธ์„ ์‚ฌ์šฉํ•ด์„œ, ์–ด๋ ต์ง€ ์•Š๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

 

4) ์•Œ์•„๋‘๋ฉด ์ข‹์€ ๊ธฐ๋Šฅ!

 

a) agg ํ•จ์ˆ˜ ์ปฌ๋Ÿผ์— ์ด๋ฆ„ ์ง€์ •ํ•˜๊ธฐ(withColumnRenamed, sql.functions)

 

# ๋’ค์— withColumnRenamed ์‚ฌ์šฉ
df_ca = df.groupBy("cust_id").sum("amount_spent").withColumnRenamed("sum(amount_spent)", "sum")

# sql.functions ์‚ฌ์šฉํ•ด์„œ ๊ฐ„๋‹จํ•˜๊ฒŒ ์—ฌ๋Ÿฌ ๊ฐœ ๋ช…์‹œํ•˜๊ธฐ
import pyspark.sql.functions as f
df.groupBy("cust_id") \
   .agg(
       f.sum('amount_spent').alias('sum'),
       f.max('amount_spent').alias('max'),
       f.avg('amount_spent').alias('avg')).collect()

 

b) ํ…์ŠคํŠธ ํŒŒ์ผ ๋ถˆ๋Ÿฌ์™€์„œ Spark๋กœ regex(์ •๊ทœํ‘œํ˜„์‹) ์ ์šฉํ•ด์„œ ์ถ”์ถœํ•ด๋ณด๊ธฐ

 

transfer_cost.txt(์ผ๋ถ€)

On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today

 

# Spark ์„ค์ •์€ ์œ„ ์‹ค์Šต๊ณผ ๋™์ผ

import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt") # txt ํŒŒ์ผ์ด๋ฏ€๋กœ ,text ์‚ฌ์šฉ

transfer_cost_df.show(truncate=False) # truncate=False ์‚ฌ์šฉํ•ด์„œ ์ž˜๋ฆฌ๋Š” ๊ฑฐ ์—†์ด ์ผ๋ถ€(20๊ฐœ)๋ฅผ ์ถœ๋ ฅ

# regex ์ ์šฉํ•ด์„œ text ๋‚ด์šฉ์„ ๊ฐ ์ปฌ๋Ÿผ์œผ๋กœ ๋„ฃ๊ธฐ
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

# withColumn("์ถ”๊ฐ€ํ•˜๊ฑฐ๋‚˜ ์กด์žฌํ•˜๋Š” ์ปฌ๋Ÿผ์ด๋ฆ„", "์ฑ„์›Œ์งˆ ๊ฐ’")
# regexp_extract("์ถ”์ถœํ•  ์ปฌ๋Ÿผ", "ํƒ€์ž…", ๋งค์นญ๋˜๋Š” ๊ฒƒ์ค‘ ๋ช‡ ๋ฒˆ์งธ์ธ์ง€(1๋ถ€ํ„ฐ์‹œ์ž‘))
df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
    .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
    .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))
    
final_df = df_with_new_columns.drop("text") # ๊ธฐ์กด์˜ text ์ปฌ๋Ÿผ์€ drop

 

 

c) ๋‚ด์šฉ์„ csv๊ณผ json์œผ๋กœ ์ €์žฅํ•ด๋ณด๊ธฐ

 

# ๋กœ์ปฌ์— csv ํŒŒ์ผ๋กœ ์ €์žฅํ•˜์ง€๋งŒ, ํด๋”๋กœ ์ €์žฅ๋จ
final_df.write.csv("extracted.csv")

# json ํŒŒ์ผ๋กœ ์ €์žฅํ•˜์ง€๋งŒ, ํด๋”๋กœ ์ €์žฅ๋จ
final_df.write.format("json").save("extracted.json")

 

 

์›๋ž˜ ๋น…๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์šฉ์ด๋ฏ€๋กœ, ํด๋”๋กœ ์ €์žฅ๋˜์–ด ๋ฐ์ดํ„ฐ ๋ธ”๋ก๋‹จ์œ„๋กœ ํด๋” ์•ˆ์— ๋‚˜๋ˆ„์–ด ์ €์žฅ๋œ๋‹ค.

์‹ค์Šต ๋ฐ์ดํ„ฐ๋Š” ๊ทธ๋ ‡๊ฒŒ ํฌ์ง€ ์•Š์•„์„œ ํ•˜๋‚˜๋กœ๋งŒ ์ €์žฅ๋˜์—ˆ๊ณ , ๊ทธ ์•ˆ์„ ํ™•์ธํ•ด๋ณด๋ฉด ์ž˜ ์ €์žฅ๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.