【Spark】(task1)PySpark基礎數據處理
學習總結
文章目錄
學習總結
一、Spark介紹
1.1 Scala和PySpark
1.2 Spark原理
1.3 一個具體栗子
二、安裝方式
三、測試是否安裝成功
四、Spark程序的模塊分類
五、數據處理任務
5.1 使用Python鏈接Spark環境
5.2 創建dateframe數據
5.3 用spark執行以下邏輯:找到數據行數、列數
5.4 用spark篩選class為1的樣本
5.5 用spark篩選language >90 或 math> 90的樣本
任務匯總:
一、Spark介紹
hadoop生態圈:
1.1 Scala和PySpark
(1)Scala 是一門多范式(multi-paradigm)的編程語言,設計初衷是要集成面向對象編程和函數式編程的各種特性。
Scala 運行在 Java 虛擬機上,并兼容現有的 Java 程序。
Scala 源代碼被編譯成 Java 字節碼,所以它可以運行于 JVM 之上,并可以調用現有的 Java 類庫。
(2)Apache Spark是用 Scala編程語言 編寫的。為了用Spark支持Python,Apache Spark社區發布了一個工具PySpark。使用PySpark,也可以使用Python編程語言中的 RDD 。
(3)PySpark提供了 PySpark Shell,它將Python API鏈接到spark核心并初始化Spark上下文。將Python與Spark集成就對數據科學研究更加方便。
Spark的開發語言是Scala,這是Scala在并行和并發計算方面優勢的體現,這是微觀層面函數式編程思想的一次勝利。此外,Spark在很多宏觀設計層面都借鑒了函數式編程思想,如接口、惰性求值和容錯等。
1.2 Spark原理
Spark是業界主流的大數據處理利器。
分布式:指的是計算節點之間不共享內存,需要通過網絡通信的方式交換數據。
Spark 是一個分布式計算平臺。Spark 最典型的應用方式就是建立在大量廉價的計算節點上,
這些節點可以是廉價主機,也可以是虛擬的 Docker Container(Docker 容器)
。
Spark 的架構圖中:
Spark 程序由 Manager Node(管理節點)進行調度組織
由 Worker Node(工作節點)進行具體的計算任務執行
最終將結果返回給 Drive Program(驅動程序)。
在物理的 Worker Node 上,數據還會分為不同的 partition(數據分片),可以說 partition 是 Spark 的基礎數據單元。
Spark 計算集群能夠比傳統的單機高性能服務器具備更強大的計算能力,就是由這些成百上千,甚至達到萬以上規模的工作節點并行工作帶來的。
1.3 一個具體栗子
那在執行一個具體任務的時候,Spark 是怎么協同這么多的工作節點,通過并行計算得出最終的結果呢?這里我們用一個任務來解釋一下 Spark 的工作過程。
一個具體任務過程:
(1)先從本地硬盤讀取文件 textFile;
(2)再從分布式文件系統 HDFS 讀取文件 hadoopFile;
(3)然后分別對它們進行處理;
(4)再把兩個文件按照 ID 都 join 起來得到最終的結果。
在 Spark 平臺上處理這個任務的時候,會將這個任務拆解成一個子任務 DAG(Directed Acyclic Graph,有向無環圖),再根據 DAG 決定程序各步驟執行的方法。從圖 2 中可以看到,這個 Spark 程序分別從 textFile 和 hadoopFile 讀取文件,再經過一系列 map、filter 等操作后進行 join,最終得到了處理結果。
最關鍵的過程是要理解
哪些是可以純并行處理的部分,哪些是必須 shuffle(混洗)和 reduce 的部分
:這里的 shuffle 指的是所有 partition 的數據必須進行洗牌后才能得到下一步的數據,最典型的操作就是圖 2 中的 groupByKey 操作和 join 操作。以 join 操作為例,必須對 textFile 數據和 hadoopFile 數據做全量的匹配才可以得到 join 后的 dataframe(Spark 保存數據的結構)。而 groupByKey 操作則需要對數據中所有相同的 key 進行合并,也需要全局的 shuffle 才能完成。
與之相比,map、filter 等操作僅需要逐條地進行數據處理和轉換,不需要進行數據間的操作,因此各 partition 之間可以完全并行處理。
在得到最終的計算結果之前,程序需要進行 reduce 的操作,從各 partition 上匯總統計結果,隨著 partition 的數量逐漸減小,reduce 操作的并行程度逐漸降低,直到將最終的計算結果匯總到 master 節點(主節點)上??梢哉f,shuffle 和 reduce 操作的觸發決定了純并行處理階段的邊界。
注意:
(1)
shuffle 操作需要在不同計算節點之間進行數據交換,非常消耗計算、通信及存儲資源
,因此 shuffle 操作是 spark 程序應該盡量避免的。shuffle可以理解為一個串行操作,需要等到在此之前的并行工作完成之后才可以順序開始。
(2)簡述Spark 的計算過程:Stage 內部數據高效并行計算,Stage 邊界處進行消耗資源的 shuffle 操作或者最終的 reduce 操作。
二、安裝方式
Windows 10:不適合開發程序,因為不支持命令行工具、隱藏坑較多、解決方案的資料較少
Windows Subsystem Linux (WSL):需要安裝較多軟件和配置較多環境變量,非常麻煩
ubuntu / CentOS:未嘗試,但與WSL比較相似
docker:簡單、高效、可遷移
docker方式(在ubuntu環境):
安裝docker:curl -fsSL https://get.docker.com | bash -s docker --mirror Aliyun;
拉取鏡像:docker pull jupyter/pyspark-notebook
創建容器:
docker run \ -d \ -p 8022:22 \ -p 4040:4040 \ -v /home/fyb:/data \ -e GRANT_SUDO=yes \ --name myspark \ jupyter/pyspark-notebook
1
2
3
4
5
6
7
8
配置docker容器的SSH登錄
安裝openssh-server等常用軟件:apt update && apt install openssh-server htop tmux
設置允許root通過ssh登錄:echo "PermitRootLogin yes" >> /etc/ssh/sshd_config
重啟ssh服務:service ssh --full-restart,設置root用戶密碼:passwd root
測試docker容器內的ssh是否設置成功:ssh root@127.0.0.1 -p 8022
容器內的配置python環境:
以root用戶登錄SSH會話后,安裝python依賴工具:apt install pip
安裝PySpark依賴包:pip3 install pyspark numpy pandas tqdm
測試是否正確安裝并執行了全部修改:python3 /usr/local/spark/examples/src/main/python/pi.py
三、測試是否安裝成功
四、Spark程序的模塊分類
五、數據處理任務
5.1 使用Python鏈接Spark環境
import pandas as pd from pyspark.sql import SparkSession # 創建spark應用 mypyspark spark = SparkSession.builder.appName('mypyspark').getOrCreate()
1
2
3
4
5
5.2 創建dateframe數據
這里和pandas等工具類似,創建表時注意這里的表頭組成的list列表,放在數據的后面。
test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63), ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99), ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)], ['number','class','language','math','english','physic','chemical']) test.show(5)
1
2
3
4
5
5.3 用spark執行以下邏輯:找到數據行數、列數
# 查看表前2行 test.head(2) test.describe().show() # 列出表頭屬性 test.columns # 列出第一行的數據 test.first() # 數據大小 shape print('test.shape: %s行 %s列'%(test.count(), len(test.columns))) # 上面打印出 test.shape: 9行 7列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
5.4 用spark篩選class為1的樣本
這里可以使用df.filter或df.where:
# 方法一 test.filter(test['class'] ==1).show() # 方法二 test.filter('class == 1' ).show()
1
2
3
4
5.5 用spark篩選language >90 或 math> 90的樣本
test.filter('language>90 or math>90').show() test.where('language>90 or math>90').show() test.filter((test['language']>90)|(test['math']>90)).show()
1
2
3
任務匯總:
Python spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。