스파크 Spark

[Spark] Pyspark 데이터 전처리 단골 코드 (Pyspark Method ver.)

sseozytank 2024. 4. 26.

 

👆 목차를 활용하면 편하게 읽을 수 있다  (오른쪽에 목차가 같이 따라다녀요!)

pandas와 비슷한 pyspark...! 하지만 묘하게 헷갈린다. 자주 쓰는 코드들을 정리해보자. 내가 만드는 코드집 우선 전처리편 👩‍🎨 

1.컬럼명 변경 

샘플 데이터 

data = [
    (5.1, 3.5, 1.4, 0.2, "Iris-setosa"),
    (4.9, 3.0, 1.4, 0.2, "Iris-setosa"),
    (4.7, 3.2, 1.3, 0.2, "Iris-setosa"),
    (4.6, 3.1, 1.5, 0.2, "Iris-setosa"),
    (5.0, 3.6, 1.4, 0.2, "Iris-setosa")
]

# DataFrame 생성
df = spark.createDataFrame(data, ["_c0", "_c1", "_c2", "_c3", "_c4"])

 

withColumn Renamed 사용 

.withColumnRenamed를 계속 추가해주면, 원하는 컬럼만 한번에 컬럼명 변경이 가능하다.

#df.withColumnRenamed("기존컬럼명", "바꿀컬럼명")
df=df.withColumnRenamed("_c0", "sepal_length").withColumnRenamed("_c1", "sepal_width")

 

 

컬럼명에 패턴이 있을 경우에는 for문 사용 

지금처럼 _c1 , _c2 같이 기존 컬럼명에 패턴이 있는 경우 for문으로 코딩해줄 수 있다. start=n 옵션을 줘서 n번째 행부터 시작하게 하거나, 바꿀 컬럼의 리스트를 처음 몇개만 설정해줄 수도 있다. 

# 컬럼명 설정
columns = ["sepal_length", "sepal_width", "petal_length", "petal_width","class"]
for i, col_name in enumerate(columns):
    df = df.withColumnRenamed("_c{}".format(i), col_name)

 

2. null 처리 

샘플 데이터 

data = [
    (5.1, 3.5, 1.4, None, "Iris-setosa"),
    (4.9, None, 1.4, 0.2, "Iris-setosa"),
    (4.7, 3.2, None, 0.2, "Iris-setosa"),
    (None, None, 1.5, 0.2, "Iris-setosa"),
    (5.0, 3.6, 1.4, 0.2, None)
]

# DataFrame 생성
df = spark.createDataFrame(data, ["sepal_length", "sepal_width", "petal_length", "petal_width", "class"])
from pyspark.sql.functions import col

(1) null 찾기 

전체 데이터에서 null값이 있는지 확인

null값이 있으면, 어느 컬럼에서 몇 개인지 까지 확인 가능 

from pyspark.sql.functions import count, when, isnull
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

 

 

특정 컬럼에서 null값이 있는지 확인

#df.filter(col("검사할 컬럼명").isNull()).count()
df.filter(col("sepal_length").isNull()).count()

 

(2) null 삭제 

전체 데이터에서 null이 있는 행 삭제 

모든 행에서 하나 이상의 null 값을 포함하는 행을 삭제 

df=df.na.drop()

df.na.drop().show()

 

 

특정 컬럼에 대해서만 null값 삭제 

해당 컬럼에 null값을 포함하는 행만 삭제 

df=df.na.drop(subset=["column_name"])

df.na.drop(subset["sepal_length"]).show()

 

(3) null 대체

전체 데이터에서 모든 null값을 대체 

df_filled = df.fillna(0)

df_filled.show()

 

특정 컬럼의 null값을 원하는값으로 대체

#df.fillna({"컬럼명1": 바꿀값, "컬러명2": 바꿀값})
df_filled = df.fillna({"sepal_length": 100, "sepal_width": 200})

df_filled.show()

3. 이상치 처리 

샘플 데이터 

data = [(1, 10), (2, 15), (9, 50), (10, 55),(11,4545)]
df = spark.createDataFrame(data, ["id", "value"])

사분위수 활용 이상치 제거  

from pyspark.sql.functions import col

#df.approxQuantile("컬럼명",[확률값1,확률값2].상대오차(낮을 수록 근사치))
quantiles = df.approxQuantile("value", [0.25, 0.75], 0.0)
Q1 = quantiles[0]
Q3 = quantiles[1]
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

filtered_df = df.filter((col("value") >= lower_bound) & (col("value") <= upper_bound))

 

 

 

4. 데이터 타입 변환 

샘플 데이터 

data = [(1, 10), (2, 15), (9, 50), (10, 55)]
df = spark.createDataFrame(data, ["id", "value"])

 

(1) 데이터 타입 확인

전체 컬럼에 대해서 데이터 타입 확인

df.printSchema()

 

특정 컬럼에 대해서만 데이터 타입 확인

#df.dtypes[df.columns.index("컬럼명")]
df.dtypes[df.columns.index("value")]

 

(2) 데이터 타입 변환

데이터 샘플

#df.dtypes[df.columns.index("컬럼명")]
df.dtypes[df.columns.index("value")]

 

전체 컬럼에 대해서 데이터 타입 일괄 변환

from pyspark.sql.functions import col

# 모든 열을 문자열로 변환
for col_name in df.columns:
    df = df.withColumn(col_name, col(col_name).cast("string"))
    
#string,integer or int, float or double, boolean, timestamp

 

특정 컬럼에 대한 데이터 타입 변경

from pyspark.sql.types import *

#df = df.withColumn("타입 변환 후 사용할 컬럼명",df["타입 변환할 컬럼명"].cast(바꿀타입())
df = df.withColumn("id", df["id"].cast(StringType()))

#StringType: 문자열 타입
#IntegerType: 정수 타입
#FloatType: 부동 소수점 타입
#DoubleType: 배정밀도 부동 소수점 타입
#BooleanType: 불리언 타입
#DateType: 날짜 타입
#TimestampType: 타임스탬프 타입
#ArrayType: 배열 타입
#MapType: 맵 타입
#StructType: 구조체 타입
#DecimalType: 소수 타입

 

 

5. 날짜 데이터 전처리 

데이터 샘플

data = [("2024-01-01",),
        ("2024-01-02",),
        ("2024-01-03",),
        ("2024-01-04",),
        ("2024-01-05",)]

schema = ["date"]

df = spark.createDataFrame(data, schema)

날짜 연/월/일 분리 

from pyspark.sql.functions import year, month, dayofmonth

# 연도, 월, 일 컬럼 추가
df = df.withColumn("year", year("date"))
df = df.withColumn("month", month("date"))
df = df.withColumn("day", dayofmonth("date"))

 

 

댓글