스파크 Spark

[Spark] Pyspark DataFrame Method 정리

sseozytank 2024. 2. 20.

데이터 분석가가 Pyspark를 쓰는 이유라하면, 역시 대용량의 데이터를 효율적으로 처리하기 위함이다. 

Pyspark는 pandas 같은 문법으로도, SQL같은 문법으로도 활용이 가능하다해서 데이터 가공을 여러가지 코드로 작성해보며 Pyspark를 익혀보려고 한다.예제 데이터는 참고 문헌 첫번째에 있는 블로그와 같은 데이터를 사용했다! 그럼 천천히 따라해보도록 하자. 


 

 

👇 예제 데이터 👇

https://www.kaggle.com/datasets/mansoordaku/ckdisease

kidney_disease.csv
0.05MB

 

0.라이브러리 불러오기

import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

 

1.스파크 세션 연결 및 데이터 불러오기 

spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("kidney_disease.csv")

 

2.불러온 데이터 확인 

데이터 스키마 확인 | printSchema() 

#데이터 스키마 확인
data.printSchema()

 

데이터 20개 확인 | show() 

#데이터 확인하기
data.show()

#truncate=False를 사용하면, 테이블 내용이 잘리지 않도록 보여줌 
data.show(truncate = False)

 

테이블에서 행을 가져오는 함수 | collect() 

#행 가져오기
data.collect()

 

데이터 서머리 | describe()

#행 가져오기
data.subcribe()

 

3.SELECT 

원하는 컬럼만 추출 | select("column")

#원하는 컬럼(id,age,bp,sg)만 출력 
data.select("id","age","bp","sg").show()

#원하는 컬럼만 SELECT 해서 새 DataFrame에 담기 
data_select=data.select("id","age","bp","sg")
data_select.show()

 

원하는 컬럼만 제외하여 추출 | select("column")

#원하는 컬럼(id,age,bp,sg)만 제외해서 출력
data.drop("id","age","bp","sg").show()

 

4.WHERE = FILTER

원하는 조건의 데이터만 추출 | filter(조건) or where (조건) 

#id=2인 데이터만 출력 
data.filter(data.id == 2).show()
data.where(data.id == 2).show()

data.filter("id = 2").show()
data.where("id = 2").show()

 

여러가지 조건 필터링 

data.filter((data.bp==60) & (data.ba=="present")).show() 
data.where((data.bp==60) & (data.ba=="present")).show()

data.filter("bp==60 AND ba=='present'").show()
data.where("bp==60 AND ba=='present'").show()

 

 

IN 조건 필터링 | isin() 

#in
data.filter(data.al.isin(1.0, 2.0)).show()
data.where(data.al.isin(1.0, 2.0)).show()

#not in
data.filter(~data.al.isin(1.0, 2.0)).show()
data.where(~data.al.isin(1.0, 2.0)).show()

 

LIKE 조건 필터링 | startswitch, endswitch, contains, like()

#not으로 시작하는 데이터
data.filter(data.ba.startswith("not")).show()
data.filter(data.ba.like("not%")).show()
data.where(data.ba.startswith("not")).show()
data.where(data.ba.like("not%")).show()

#sent로 끝나는 데이터
data.filter(data.ba.endswith("sent")).show()
data.filter(data.ba.like("%sent")).show()
data.where(data.ba.endswith("sent")).show()
data.where(data.ba.like("%sent")).show()

#not이 포함되어 있는 데이터
data.filter(data.ba.contains("not")).show()
data.filter(data.ba.like("%not%")).show()
data.where(data.ba.contains("not")).show()
data.where(data.ba.like("%not%")).show()

 

5.GROUP BY 

단일 컬럼에 대한 집계 함수 사용  | groupBy()   

- 집계 함수를 사용하려하니, 데이터형식이 다 string이라 사용이 안된다. numeric인 것들은 numeric으로 바꿔주자. 

from pyspark.sql.functions import col

# 변환할 컬럼 리스트
columns_to_convert = ["id", "age", "bp", "sg", "al", "su", "bgr", "bu", "sc", "sod", "pot", "hemo", "pcv", "wc", "rc"]

# 모든 컬럼을 numeric으로 변환
for col_name in columns_to_convert:
    data = data.withColumn(col_name, col(col_name).cast("float"))
#al의 value_counts()
data.groupBy("al").count().show()

#al을 기준으로 sc를 연산
data.groupBy("al").sum("sc").show()
data.groupBy("al").min("sc").show()
data.groupBy("al").max("sc").show()
data.groupBy("al").avg("sc").show()

 

여러 컬럼에 대한 집계 함수 사용  | groupBy()   

data.groupBy("al","htn").avg("bu","hemo").show()

 

6.ORDER BY 

from pyspark.sql.functions import asc, desc
data.orderBy(asc("al"), desc("htn")).show()

from pyspark.sql.functions import col
data.orderBy(col("al").asc(), col("htn").desc()).show()

 

7.JOIN

조인을 위한 데이터 쪼개기 

data_a=data.select("id","age")
data_a=data_a.filter(data_a.id.isin(['1','2','3','5','7','9','10']))

data_b=data.select("id","bp","sg","al")
data_b=data_b.filter(data_a.id.isin(['2','3','5','6','8','10','11']))
data_a.show()
data_b.show()

좌 - data_a , 우 - data_b

inner join

#pandas merge 느낌
inner_data1=data_a.join(data_b, on='id',how='inner')
inner_data1.show()

inner_data2=data_a.join(data_b, 'id')
inner_data2.show()


#좀 더 SQL 스러운 느낌
inner_data3=data_a.join(data_b, data_a.id == data_b.id)

 

1번 2번 3번 당연히 같은 결과를 보여주며, inner join이기 때문에 data_a와 data_b는 당연히 순서를 바꿔도 같은 결과를 출력한다.

 

left join

#pandas merge 느낌 
left_data1=data_a.join(data_b,'id',how='left')
left_data1.show()

left_data2=data_a.join(data_b,'id','left')
left_data2.show()

#SQL 느낌
left_data3=data_a.join(data_b, data_a.id == data_b.id,how='left')
left_data3.show()

 

 

right join

#pandas merge 느낌
right_data1=data_a.join(data_b,'id',how='right')
right_data1.show()

left_data2=data_a.join(data_b,'id','right')
left_data2.show()

#SQL 느낌
right_data3=data_a.join(data_b, data_a.id == data_b.id,how='right')
right_data3.show()

 

full outer join

full_data1=data_a.join(data_b, data_a.id == data_b.id, 'outer')  
full_data2=data_a.join(data_b, data_a.id == data_b.id, 'full')  
full_data3=data_a.join(data_b, data_a.id == data_b.id, 'fullouter')

 

cross join

cross_data=data_a.crossJoin(data_b)
cross_data.show()

 

 

 

판다스랑 SQL이랑 반반 섞어 놓은 것 같은 문법이라 어렵지는 않은데 엄청나게 헷갈릴 것 같은 느낌이 든다. 

이렇게 데이터 프레임을 다룰 수도 있고, spark sql을 사용해서 sql처럼 데이터 프레임을 다뤄도되니 맘에 들거나 익숙한 코드를 골라 숙지하도록 하면 조금 더 편할 것 같다. 꿀팁이라면 코드 모음 페이지를 따로 만들어서 그 때 그 때 복붙해서 사용하는 것도 추천~ 

 

 

8.그 외 다양한 함수들 

공식 Docs

 

pyspark.sql.DataFrame — PySpark 3.1.3 documentation

Calculate the sample covariance for the given columns, specified by their names, as a double value.

spark.apache.org

 

 

[참고 자료]

https://3months.tistory.com/567

https://assaeunji.github.io/python/2022-03-26-pyspark/

https://eyeballs.tistory.com/442

https://brunch.co.kr/@yysttong/60

https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html

 

 

 

 

 

댓글