從python編譯到運行pyspark樣例

      網友投稿 1209 2025-04-02

      MRS集群默認會帶上Python2.7.5和Python3.8.0兩個版本的Python。默認使用的是Python2.7.5。但是有時候我們希望使用的是我們指定的Python版本來運行pyspark任務,因此需要自行上傳對應的Python版本包。由于Python較依賴環境,不同環境編譯出來的Python版本可能并不通用。目前遇到過因為libffi和libffi-devel版本不一致導致pyspark運行的時候報錯。因此我們在上傳Python版本壓縮包之前最好在集群系統相同的Linux機器上編譯對應的Python。

      本文主要介紹從購買華為云ECS到運行pyspark的wordcount樣例的過程。

      1????? 準備環境

      測試環境是一套MRS_3.1.0普通集群;

      希望使用的Python版本是Python3.6.6;

      1.1????? 購買華為云ECS

      MRS使用的ECS系統版本一般都是EulerOS2.2或者EulerOS2.5,因此我們購買ECS的時候可以選擇2.2或者2.5的EulerOS系統。

      如果這個ECS只用于編譯Python,需要的資源并不多,可以選擇最低規格的ECS。本次測試使用的規格為:通用計算型-2U4G EulerOS2.2

      另外最好給ECS配置上EIP,方便訪問公網下載Python源碼。

      購買ECS具體操作參考:https://support.huaweicloud.com/qs-ecs/zh-cn_topic_0030831985.html

      1.2????? 編譯Python

      使用以下命令安裝三方依賴軟件:

      yum install zlib zlib-devel zip -y

      下載對應Python版本源碼:

      wget https://www.python.org/ftp/python/3.6.6/Python-3.6.6.tgz

      解壓Python源碼:

      tar -zxvf Python-3.6.6.tgz

      創建安裝目錄:

      mkdir /opt/python36

      編譯Python:

      cd Python-3.6.6 ./configure --prefix=/opt/python36

      出現以下內容表明上述命令執行成功

      make -j8

      出現以下內容表明上述命令執行成功

      make install

      出現以下內容表明上述命令執行成功

      至此,Python已經安裝完成。

      1.3????? 安裝任務依賴模塊

      系統默認已經帶上2.7版本Python,需要修改環境變量:

      export PYTHON_HOME=/opt/python36 export PATH=$PYTHON_HOME/bin:$PATH

      安裝三方模塊:

      pip3 install helloword

      本地測試是否安裝成功:

      使用python3進入python交互界面,執行以下代碼:

      import helloworld helloworld.say_hello("test")

      如果出現以下內容則說明安裝成功:

      1.4????? 打包Python.zip

      cd /opt/python36/ zip -r python36.zip ./*

      將壓縮包發送到需要使用的MRS客戶端節點上,我們以客戶端節點的/opt目錄為存放位置。

      2????? 測試運行

      解壓文件,配置環境變量:

      cd /opt unzip python36.zip -d python36 export PYSPARK_PYTHON=/opt/python36/bin/python3

      上傳壓縮包到HDFS上

      hdfs dfs -mkdir /user/python hdfs dfs -put python36.zip /user/python

      2.1????? 本地運行pyspark

      使用pyspark啟動local模式的交互界面,執行以下代碼測試三方包是否生效:

      >>> import helloworld Hello, Sara! >>> helloworld.say_hello("test") 'Hello, test!'

      測試執行sql是否正常:

      spark.sql("show tables").show() spark.sql("select count(*) from test1").show()

      2.2????? pyspark on yarn client模式

      cd /opt pyspark --master yarn --deploy-mode client \ --conf spark.pyspark.python=./python36.zip/bin/python3 \ --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip

      因為client模式下driver是在客戶端側運行,因此需要對driver的python環境單獨指定:

      spark.pyspark.driver.python=/opt/python36/bin/python3

      同樣使用上一步的代碼測試功能是否正常

      增加測試executor是否拿到三方模塊檢查(/tmp/log1.txt是一個存放在hdfs上面的文本文件,內容不限定):

      from pyspark import SparkContext sc = SparkContext.getOrCreate() inputPath = "/tmp/log1.txt" lines = sc.textFile(name = inputPath) words = lines.flatMap(lambda line:line.split(" "),True) pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True) result = pairWords.reduceByKey(lambda v1,v2:v1+v2) result.foreach(lambda t :print(t))

      可以看到executor日志里面打印了相關信息:

      將最終結果打印到控制臺:

      output=result.collect() print(output) for (word, count) in output: print(word,count)

      2.3????? spark-submit on yarn client模式

      將上面的測試命令寫到test.py腳本里面:

      # -*- coding: utf-8 -* import helloworld from pyspark import SparkConf, SparkContext if __name__ == "__main__": helloworld.say_hello("test") #創建SparkConf conf = SparkConf().setAppName("wordcount") #創建SparkContext 注意參數要傳遞conf=conf sc = SparkContext(conf=conf) inputPath = "/tmp/log1.txt" lines = sc.textFile(name = inputPath) #每一行數據按照空格拆分 得到一個個單詞 words = lines.flatMap(lambda line:line.split(" "),True) #將每個單詞 組裝成一個tuple 計數1 pairWords = words.map(lambda word:(helloworld.say_hello(word),1),True) #reduceByKey進行匯總 result = pairWords.reduceByKey(lambda v1,v2:v1+v2) #executor上打印結果 result.foreach(lambda t :print(t)) #搜集所有結果 output = result.collect() #打印匯總結果 ptint(output) #分開打印結果 for (word, count) in output: print(word,count) #退出任務 sc.stop()

      啟動命令:

      spark-submit --master yarn --deploy-mode client \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.pyspark.driver.python=/opt/python36/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \ test.py

      查看executor日志和控制臺打印內容,確認結果有被打印。

      2.4????? spark-submit on yarn cluster模式

      依舊使用上面的test.py腳本運行任務。

      啟動命令:

      spark-submit --master yarn --deploy-mode cluster \ --conf spark.pyspark.python=./Python/bin/python3 \ --conf spark.yarn.dist.archives=hdfs://hacluster/user/python/python36.zip#Python \ test.py

      查看executor日志和driver日志打印內容,確認結果有被打印。

      3????? 結論

      從python編譯到運行pyspark樣例

      至此所有操作步驟都執行完成。關鍵操作就是編譯Python和spark on yarn的client與cluster模式下driver的Python環境配置。

      參考文檔:

      https://bbs.huaweicloud.com/blogs/168935

      MapReduce服務 spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:模具企業業務生產管理系統(模具加工系統)
      下一篇:尾注刪不掉的回車符怎么辦(腳注刪不掉的回車符)
      相關文章
      亚洲性无码一区二区三区| 亚洲一区二区三区高清视频| 亚洲福利在线播放| 亚洲国产成人久久综合碰碰动漫3d| 亚洲中文无码av永久| 亚洲男人的天堂在线播放| 久久精品国产96精品亚洲 | 亚洲精品夜夜夜妓女网| 亚洲s码欧洲m码吹潮| 亚洲黄色免费电影| 亚洲熟妇无码AV在线播放| 亚洲综合精品伊人久久| 2020天堂在线亚洲精品专区| 精品亚洲麻豆1区2区3区| 亚洲综合一区二区国产精品| 亚洲婷婷五月综合狠狠爱| 亚洲精品白浆高清久久久久久| 在线观看亚洲av每日更新| 亚洲第一永久AV网站久久精品男人的天堂AV | 亚洲kkk4444在线观看| 亚洲最大福利视频| 亚洲欧美日本韩国| 亚洲一区二区免费视频| 久久久久久亚洲精品影院| 亚洲乱亚洲乱妇24p| 亚洲av无码专区国产不乱码| 国产偷国产偷亚洲清高APP| 亚洲国产系列一区二区三区| 亚洲一区二区三区成人网站| 亚洲人成自拍网站在线观看| 国产精品亚洲专区无码不卡| 亚洲日韩精品国产3区| 性色av极品无码专区亚洲| 亚洲精品成人久久久| 一本色道久久88综合亚洲精品高清 | 中文字幕精品无码亚洲字| 亚洲男人的天堂在线va拉文| 国产亚洲av人片在线观看| 亚洲一区精品无码| 亚洲图片一区二区| 亚洲香蕉久久一区二区三区四区|