스파크 Spark

[Spark] 원격 주피터 허브에서 로컬 Spark 세션 및 hdfs 불러오기

sseozytank 2024. 4. 17.

원격 주피터 허브에서, 로컬에서 띄운 스파크 세션을 불러오려면 이렇게 하면 된다. (단, 스파크 및 하둡이 모두 설치 및 환경변수 설정까지 완료되어 있다는 전제 하에) 

 

흑흑 글에는 에러 발생했을 때 이렇게 하세요 라고 쉽게 적혀져 있지만, 이 에러 해결법들을 찾기 위해 챗지피티와 맞짱도 뜨고 스택 오버플로우와 죽마고우가 되며... 팀장님을 괴롭히고.... 아무튼 그런 슬픈 과정들이 계속 있었다. 

후...

1.Spark만 먼저 연결해보기 

우선 spark 와 hadoop을 모두 실행 시켜 준다. (아래 경로는 본인에 맞게 수정)

~/hadoop/sbin/start-all.sh;  ~/spark-3.5.1-bin-hadoop3/sbin/start-all.sh;

 

jps으로 다 잘 켜졌는지 확인! 

jps

 

다 잘 실행되었다. 그럼이제 원격 주피터 서버로 가서 스파크 세션을 연결해보도록 하자. 주피터에서 아래 코드를 통해 실행시켜주면 된다. 

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder \
.appName("RemoteSparkSession") \
.master("spark://master:7077") \ #spark://xxx.xxx.x.xx:7077
.getOrCreate()

# SparkContext 가져오기
sc = spark.sparkContext

# 연결 여부 확인을 위한 메시지 출력
if sc is not None:
print("Spark 클러스터와의 연결이 확인되었습니다.")
else:
print("Spark 클러스터와의 연결을 확인할 수 없습니다.")

원격 주피터 서버에서 연결을 확인할 수 있다.

 

 

🙋‍♂️ 마스터는 어떻게 확인해요? 

jps 입력 시, 마스터 옆에 pid는 2243 이다. 

그럼 아래 명령어를 쳐주면 된다 .

netstat -tulnp | grep 2243

 

그럼 이렇게 나오는데, Spark는 7077 포트를 사용하니 저 파란 박스에 있는 부분을 Master 부분에 적어주면 된다! 

세션 설정이 완료 되었고, 연결이 확인 되었으면 아래 코드로 테스트해보도록 하자. 

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# 예제 데이터를 정의합니다.
data = [("John", 30), ("Alice", 35), ("Bob", 25)]

# 데이터 스키마를 정의합니다.
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# 데이터 프레임을 생성합니다.
df = spark.createDataFrame(data, schema)

# 데이터 프레임을 출력합니다.
df.show()

 

 

무탈 없이 결과가 출력되는 것을 확인할 수 있음. 

 

 

⛔ 에러1.WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

 

리소스가 충분하지 않다는 오류가 뜨는데 , 나의 경우 리소스 문제가 아니었다. 그럼 리소스 문제인지 아닌지 확인하려면 ? Web ui에 stderr의 실제 로그를 확인해주면 된다. (참고로 웹 ui는 master:8080) 

 

 

들어가면, 상세 에러 로그를 볼 수 있다. 

 

에러가 만약 진짜 리소스 관련이라면 ? 

worker를 할당해주지 않아서거나, 내 서버가 감당할 수 없는 worker일수도 있음. 그런 고로 전자의 경우 아래, 

 

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient reso

I'm trying to run the spark examples from Eclipse and getting this generic error: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have

stackoverflow.com

후자의 경우 해당 블로그를 참고해서 에러를 해결하자. 

 

[Spark Streaming] WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that worke

Spark Streaming 관련해서 에러가 발생하였습니다. 에러는 아래 포스팅과 관련 있습니다. https://mjs1995.tistory.com/275 [Spark Streaming] py4j.protocol.Py4JJavaError: An error occurred while calling o121.start.: java.lang.IllegalSt

mjs1995.tistory.com

 

나의 경우 상세 로그는 SecurityManager: authentication disabled; ui acls disabled; users with view permissions: tank, jupyter- 주피터이름 ; groups with view permissions: EMPTY; users with modify permissions: tank, jupyter-주피터이름; groups with modify permissions: EMPTY Caused by: java.net.UnknownHostException: tank 

 

즉, 원격 주피터와 나의 리눅스간 통신을 하고 싶은데.. tank라는 이름을 도통 찾을 수가 없다는 것이다. 그럼 내 주소가 탱크라는 것을 알려주면 된다. 어떻게!? 바로 이렇게. sudo vi /etc/hosts [ip] [유저명]

 

[ip][띄어쓰기][이름]

도메인에 대한 지식이 있으면 좀 이해하기 쉬운데.. 정말 쉽게 말하자면, 주피터와 나는 전화통화를 해야하는데, 내 이름이 tank인건 알지만 내 전화번호를 모르는거임. 그래서 내 번호는 이거야! 라고 전화번호부에 저장해준다 한다고 생각하면 좀 편하다. 

 

이야호! 드디어 에러를 고쳤다!!!! 근데 또 다른 에러가 발생했다. 

 

⛔ 에러2.Caused by: java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID = 823754013007382808, local class serialVersionUID = 351머시기 

 

다행히 위에거에 비해 이건 훨씬 간단하게 해결이 된다. 바로 pyspark와 linux spark 버전이 일치하지 않아서 생기는 문제임. 그런고로 jupyter에서 pyspark 버전을 업그레이드 해주면 된다. 

pip install --upgrade pyspark

 

 

2.HDFS 불러오기 

진짜 스파크 연결이 끝났고! HDFS도 같이 불러와보자. 그 전에 hadoop의 서버에서 설정 하나를 해줘야하는데, 

cd ~/hadoop/etc/hadoop

vi core-site.xml

 

 

core-site.xml에서 아이피 부분을 0.0.0.0으로 바꾸어 다른 서버에서도 접속할 수 있도록 수신 대기를 해준다. (이미 0.0.0.0이면 그냥 냅둬도 된다. 아니면 내가 특정 ip만 수신하고 싶으면 그렇게 설정해줘도 무방) 

 

이후 주피터에서 위에 코드에서 한줄만 추가해주면 된다. 

from pyspark.sql import SparkSession
 
# SparkSession 생성
spark = SparkSession.builder \
    .appName("RemoteSparkSession") \
    .master("spark://master:7077") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://ip:9000") \
    .getOrCreate() \
 
 
# SparkContext 가져오기
sc = spark.sparkContext
 
# 연결 여부 확인을 위한 메시지 출력
if sc is not None:
    print("Spark 클러스터와의 연결이 확인되었습니다.")
else:
    print("Spark 클러스터와의 연결을 확인할 수 없습니다.")

 

🙋‍♂️ ip 부분엔 뭘 넣어야해용 ? 

hdfs 가 있는 서버의 ip를 넣어주면 된다. 

ifconfig

 

 

잘 설정되었다면, hdfs에 파일을 불러와보는 예제로 마무리 ! 빠이빠이! 

# HDFS의 CSV 파일 경로

# CSV 파일 읽기
df = spark.read.format("csv") \
    .option("header", "true") \
    .load("/test/smoking.csv")

# 데이터 프레임 보기
df.show()

 

hdfs의 데이터 불러온 것을 확인

 

 

댓글