데이터 분석가가 Pyspark를 쓰는 이유라하면, 역시 대용량의 데이터를 효율적으로 처리하기 위함이다.
Pyspark는 pandas 같은 문법으로도, SQL같은 문법으로도 활용이 가능하다해서 데이터 가공을 여러가지 코드로 작성해보며 Pyspark를 익혀보려고 한다.예제 데이터는 참고 문헌 첫번째에 있는 블로그와 같은 데이터를 사용했다! 그럼 천천히 따라해보도록 하자.
👇 예제 데이터 👇
https://www.kaggle.com/datasets/mansoordaku/ckdisease
0.라이브러리 불러오기
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
1.스파크 세션 연결 및 데이터 불러오기
spark = SparkSession.builder.master("local[1]").appName("kidney_disease").getOrCreate()
data = spark.read.option("header", True).csv("kidney_disease.csv")
2.불러온 데이터 확인
데이터 스키마 확인 | printSchema()
#데이터 스키마 확인
data.printSchema()
데이터 20개 확인 | show()
#데이터 확인하기
data.show()
#truncate=False를 사용하면, 테이블 내용이 잘리지 않도록 보여줌
data.show(truncate = False)
테이블에서 행을 가져오는 함수 | collect()
#행 가져오기
data.collect()
데이터 서머리 | describe()
#행 가져오기
data.subcribe()
3.SELECT
원하는 컬럼만 추출 | select("column")
#원하는 컬럼(id,age,bp,sg)만 출력
data.select("id","age","bp","sg").show()
#원하는 컬럼만 SELECT 해서 새 DataFrame에 담기
data_select=data.select("id","age","bp","sg")
data_select.show()
원하는 컬럼만 제외하여 추출 | select("column")
#원하는 컬럼(id,age,bp,sg)만 제외해서 출력
data.drop("id","age","bp","sg").show()
4.WHERE = FILTER
원하는 조건의 데이터만 추출 | filter(조건) or where (조건)
#id=2인 데이터만 출력
data.filter(data.id == 2).show()
data.where(data.id == 2).show()
data.filter("id = 2").show()
data.where("id = 2").show()
여러가지 조건 필터링
data.filter((data.bp==60) & (data.ba=="present")).show()
data.where((data.bp==60) & (data.ba=="present")).show()
data.filter("bp==60 AND ba=='present'").show()
data.where("bp==60 AND ba=='present'").show()
IN 조건 필터링 | isin()
#in
data.filter(data.al.isin(1.0, 2.0)).show()
data.where(data.al.isin(1.0, 2.0)).show()
#not in
data.filter(~data.al.isin(1.0, 2.0)).show()
data.where(~data.al.isin(1.0, 2.0)).show()
LIKE 조건 필터링 | startswitch, endswitch, contains, like()
#not으로 시작하는 데이터
data.filter(data.ba.startswith("not")).show()
data.filter(data.ba.like("not%")).show()
data.where(data.ba.startswith("not")).show()
data.where(data.ba.like("not%")).show()
#sent로 끝나는 데이터
data.filter(data.ba.endswith("sent")).show()
data.filter(data.ba.like("%sent")).show()
data.where(data.ba.endswith("sent")).show()
data.where(data.ba.like("%sent")).show()
#not이 포함되어 있는 데이터
data.filter(data.ba.contains("not")).show()
data.filter(data.ba.like("%not%")).show()
data.where(data.ba.contains("not")).show()
data.where(data.ba.like("%not%")).show()
5.GROUP BY
단일 컬럼에 대한 집계 함수 사용 | groupBy()
- 집계 함수를 사용하려하니, 데이터형식이 다 string이라 사용이 안된다. numeric인 것들은 numeric으로 바꿔주자.
from pyspark.sql.functions import col
# 변환할 컬럼 리스트
columns_to_convert = ["id", "age", "bp", "sg", "al", "su", "bgr", "bu", "sc", "sod", "pot", "hemo", "pcv", "wc", "rc"]
# 모든 컬럼을 numeric으로 변환
for col_name in columns_to_convert:
data = data.withColumn(col_name, col(col_name).cast("float"))
#al의 value_counts()
data.groupBy("al").count().show()
#al을 기준으로 sc를 연산
data.groupBy("al").sum("sc").show()
data.groupBy("al").min("sc").show()
data.groupBy("al").max("sc").show()
data.groupBy("al").avg("sc").show()
여러 컬럼에 대한 집계 함수 사용 | groupBy()
data.groupBy("al","htn").avg("bu","hemo").show()
6.ORDER BY
from pyspark.sql.functions import asc, desc
data.orderBy(asc("al"), desc("htn")).show()
from pyspark.sql.functions import col
data.orderBy(col("al").asc(), col("htn").desc()).show()
7.JOIN
조인을 위한 데이터 쪼개기
data_a=data.select("id","age")
data_a=data_a.filter(data_a.id.isin(['1','2','3','5','7','9','10']))
data_b=data.select("id","bp","sg","al")
data_b=data_b.filter(data_a.id.isin(['2','3','5','6','8','10','11']))
data_a.show()
data_b.show()
inner join
#pandas merge 느낌
inner_data1=data_a.join(data_b, on='id',how='inner')
inner_data1.show()
inner_data2=data_a.join(data_b, 'id')
inner_data2.show()
#좀 더 SQL 스러운 느낌
inner_data3=data_a.join(data_b, data_a.id == data_b.id)
left join
#pandas merge 느낌
left_data1=data_a.join(data_b,'id',how='left')
left_data1.show()
left_data2=data_a.join(data_b,'id','left')
left_data2.show()
#SQL 느낌
left_data3=data_a.join(data_b, data_a.id == data_b.id,how='left')
left_data3.show()
right join
#pandas merge 느낌
right_data1=data_a.join(data_b,'id',how='right')
right_data1.show()
left_data2=data_a.join(data_b,'id','right')
left_data2.show()
#SQL 느낌
right_data3=data_a.join(data_b, data_a.id == data_b.id,how='right')
right_data3.show()
full outer join
full_data1=data_a.join(data_b, data_a.id == data_b.id, 'outer')
full_data2=data_a.join(data_b, data_a.id == data_b.id, 'full')
full_data3=data_a.join(data_b, data_a.id == data_b.id, 'fullouter')
cross join
cross_data=data_a.crossJoin(data_b)
cross_data.show()
판다스랑 SQL이랑 반반 섞어 놓은 것 같은 문법이라 어렵지는 않은데 엄청나게 헷갈릴 것 같은 느낌이 든다.
이렇게 데이터 프레임을 다룰 수도 있고, spark sql을 사용해서 sql처럼 데이터 프레임을 다뤄도되니 맘에 들거나 익숙한 코드를 골라 숙지하도록 하면 조금 더 편할 것 같다. 꿀팁이라면 코드 모음 페이지를 따로 만들어서 그 때 그 때 복붙해서 사용하는 것도 추천~
8.그 외 다양한 함수들
공식 Docs
[참고 자료]
https://3months.tistory.com/567
https://assaeunji.github.io/python/2022-03-26-pyspark/
https://eyeballs.tistory.com/442
https://brunch.co.kr/@yysttong/60
https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html
'스파크 Spark' 카테고리의 다른 글
[Spark] Pyspark 데이터 전처리 단골 코드 (Pyspark Method ver.) (0) | 2024.04.26 |
---|---|
[Spark] 원격 주피터 허브에서 로컬 Spark 세션 및 hdfs 불러오기 (0) | 2024.04.17 |
[Spark] 우분투(Ubuntu)에서 하둡(Hadoop)위에 Spark(스파크)를 올리고, jupyter notebook으로 열기 (0) | 2024.03.29 |
[Spark] Pyspark spark sql 사용하기 (0) | 2024.02.20 |
[Spark] Apache Spark란? (0) | 2024.02.19 |
댓글