HTTP 狀態消息
1209
2025-04-02
MRS集群默認會帶上Python2.7.5和Python3.8.0兩個版本的Python。默認使用的是Python2.7.5。但是有時候我們希望使用的是我們指定的Python版本來運行pyspark任務,因此需要自行上傳對應的Python版本包。由于Python較依賴環境,不同環境編譯出來的Python版本可能并不通用。目前遇到過因為libffi和libffi-devel版本不一致導致pyspark運行的時候報錯。因此我們在上傳Python版本壓縮包之前最好在集群系統相同的Linux機器上編譯對應的Python。
本文主要介紹從購買華為云ECS到運行pyspark的wordcount樣例的過程。
1????? 準備環境
測試環境是一套MRS_3.1.0普通集群;
希望使用的Python版本是Python3.6.6;
1.1????? 購買華為云ECS
MRS使用的ECS系統版本一般都是EulerOS2.2或者EulerOS2.5,因此我們購買ECS的時候可以選擇2.2或者2.5的EulerOS系統。
如果這個ECS只用于編譯Python,需要的資源并不多,可以選擇最低規格的ECS。本次測試使用的規格為:通用計算型-2U4G EulerOS2.2
另外最好給ECS配置上EIP,方便訪問公網下載Python源碼。
購買ECS具體操作參考:https://support.huaweicloud.com/qs-ecs/zh-cn_topic_0030831985.html
1.2????? 編譯Python
使用以下命令安裝三方依賴軟件:
yum install zlib zlib-devel zip -y
下載對應Python版本源碼:
wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz
解壓Python源碼:
tar -zxvf Python-3.6.6.tgz
創建安裝目錄:
mkdir /opt/python36
編譯Python:
cd Python-3.6.6 ./configure --prefix=/opt/python36
出現以下內容表明上述命令執行成功
make -j8
出現以下內容表明上述命令執行成功
make install
出現以下內容表明上述命令執行成功
至此,Python已經安裝完成。
1.3????? 安裝任務依賴模塊
系統默認已經帶上2.7版本Python,需要修改環境變量:
export PYTHON_HOME=/opt/python36 export PATH=$PYTHON_HOME/bin:$PATH
安裝三方模塊:
pip3 install helloword
本地測試是否安裝成功:
使用python3進入python交互界面,執行以下代碼:
import helloworld helloworld.say_hello("test")
如果出現以下內容則說明安裝成功:
1.4????? 打包Python.zip
cd /opt/python36/ zip -r python36.zip ./*
將壓縮包發送到需要使用的MRS客戶端節點上,我們以客戶端節點的/opt目錄為存放位置。
2????? 測試運行
解壓文件,配置環境變量:
cd /opt unzip python36.zip -d python36 export PYSPARK_PYTHON=/opt/python36/bin/python3
上傳壓縮包到HDFS上
hdfs dfs -mkdir /user/python hdfs dfs -put python36.zip /user/python
2.1????? 本地運行pyspark
使用pyspark啟動local模式的交互界面,執行以下代碼測試三方包是否生效:
>>> import helloworld Hello, Sara! >>> helloworld.say_hello("test") 'Hello, test!'
測試執行sql是否正常:
spark.sql("show tables").show() spark.sql("select count(*) from test1").show()
2.2????? pyspark on yarn client模式
cd /opt pyspark --master yarn --deploy-mode client \ --conf spark.pyspark.python=./python36.zip/bin/python3 \ --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip
因為client模式下driver是在客戶端側運行,因此需要對driver的python環境單獨指定:
spark.pyspark.driver.python=/opt/python36/bin/python3
同樣使用上一步的代碼測試功能是否正常
增加測試executor是否拿到三方模塊檢查(/tmp/log1.txt是一個存放在hdfs上面的文本文件,內容不限定):
from pyspark import SparkContext sc = SparkContext.getOrCreate() inputPath = "/tmp/log1.txt" lines = sc.textFile(name = inputPath) words = lines.flatMap(lambda line:line.split(" "),True) pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True) result = pairWords.reduceByKey(lambda v1,v2:v1+v2) result.foreach(lambda t :print(t))
可以看到executor日志里面打印了相關信息:
將最終結果打印到控制臺:
output=result.collect() print(output) for (word, count) in output: print(word,count)
2.3????? spark-submit on yarn client模式
將上面的測試命令寫到test.py腳本里面:
# -*- coding: utf-8 -* import helloworld from pyspark import SparkConf, SparkContext if __name__ == "__main__": helloworld.say_hello("test") #創建SparkConf conf = SparkConf().setAppName("wordcount") #創建SparkContext 注意參數要傳遞conf=conf sc = SparkContext(conf=conf) inputPath = "/tmp/log1.txt" lines = sc.textFile(name = inputPath) #每一行數據按照空格拆分 得到一個個單詞 words = lines.flatMap(lambda line:line.split(" "),True) #將每個單詞 組裝成一個tuple 計數1 pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True) #reduceByKey進行匯總 result = pairWords.reduceByKey(lambda v1,v2:v1+v2) #executor上打印結果 result.foreach(lambda t :print(t)) #搜集所有結果 output = result.collect() #打印匯總結果 ptint(output) #分開打印結果 for (word, count) in output: print(word,count) #退出任務 sc.stop()
啟動命令:
spark-submit --master yarn --deploy-mode client \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \ test.py
查看executor日志和控制臺打印內容,確認結果有被打印。
2.4????? spark-submit on yarn cluster模式
依舊使用上面的test.py腳本運行任務。
啟動命令:
spark-submit --master yarn --deploy-mode cluster \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \ test.py
查看executor日志和driver日志打印內容,確認結果有被打印。
3????? 結論
至此所有操作步驟都執行完成。關鍵操作就是編譯Python和spark on yarn的client與cluster模式下driver的Python環境配置。
參考文檔:
https://bbs.huaweicloud.com/blogs/168935
MapReduce服務 spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。