大數據“復活”記
905
2025-03-31
前言
本章主要講述Spark基本概念,了解Spark中RDD、DataSet 、DataFrame結構的異同點。理解Spark SQL、Spark Streaming、Structured Streaming三種常用組件的特點。
目標
學完本章后,您將能夠:
理解Spark應用場景,學據Spark特點。
掌握Spark計算能力及具技術架構。
一、Spark概述
Spark簡介
2009年誕生于美國加州大學伯克利分校AMP實驗室。
Apache Spark是一種基于內存的快速、通用、可擴展的大數據計算引擎。
Spark是一站式解決方案,集批處理、實時流處理、交互式查詢、圖計算與機器學習于一體。
Spark應用場景
批處理可用于ETL(抽取、轉換、加載)。
機器學習可用于自動判斷淘寶的買家評論是好評還是差評。
交互式分析可用于查詢Hive數據倉庫。
流處理可用于頁面點擊流分析,推薦系統,輿情分析等實時業務。
Spark的特點
Spark具有簡潔、表達能力強、高效的特點
輕:Spark核心代碼只有3萬行
快:Spark對小數據Spark對小數據的延遲
靈:Spark提供了不同層面的靈活性,它支持批處理計算、流處理計算、圖處理計算和實時響應查詢這些計算方式
巧:巧妙借力現有大數據組件,spark與Hadoop大數據平臺已經無縫的融合了,Spark能夠利用Yarn作為資源管理調度器
Spark采用傳統的計算引擎,十分之一的資源可以提高大約三倍的效率。另外隨著數據容量的增加,spark處理時間并沒有呈現指數級別的增加,而是線性增加,所以說Spark是一種高可靠、高可用、并且高效的大數據分布式計算引擎。
二、Spark數據結構
Spark核心概念RDD
RDD(Resilient Distributed Datasets)即彈性分布式數據集,是一個只讀的,可分區的分布式數據集。
RDD默認存儲在內存,當內存不足時,溢寫到磁盤。
RDD 數據以分區的形式在集群中存儲。
RDD具有血統機制(Lineage),發生數據丟失時,可快速進行數據恢復。
基于RDD的流式計算任務可描述為:從穩定的物理存儲(如分布式文件系統)中加載記錄,記錄被傳入由一組確定性操作構成的DAG,然后寫回穩定存儲。另外RDD還可以將數據集緩存到內存中,使得在多個操作之間可以重用數據集,基于這個特點可以很方便地構建迭代型應用(圖計算、機器學習等)或者交互式數據分析應用。
RDD的依賴關系
RDD的依賴關系主要是分為了寬依賴和窄依賴,左邊是窄依賴,右邊是寬依賴。從圖中我們可以發現,對于父RDD的每一個分區,最多只被一個RDD的一個分區所使用,這個呢叫做窄依賴。寬依賴對于父RDD的每一個分區,被子RDD的多個分區所引用。另外寬依賴是RDD進行的劃分stage的標準。從圖中我們還可以了解另外一個概念,就是血統的概念。血統就是指的是從一個RDD變化到另外一個RDD,它的鏈接。
寬窄依賴的區別 - - 算子
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。如map,filter、union
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition。如groupByKey、reduceByKey、sortByKey
寬窄依賴的區別-容錯性
假如某個節點出故障了:
窄依賴:只要重算和子RDD分區對應的父RDD分區即可;寬依賴:極端情況下,所有的父RDD分區都要進行重新計算。
如下圖所示,b1分區丟失,則需要重新計算a1,a2和a3
寬窄依賴的區別-傳輸
寬依賴往往對應著shuffle操作,需要在運行過程中將同一個父RDD的分區傳入到不同的子RDD分區中,中間可能涉及多個節點之間的數據傳輸;
窄依賴的每個父RDD的分區只會傳入到一個子RDD分區中,通常可以在一個節點內完成轉換。
RDD的Stage劃分
Spark任務會根據RDD之間的依賴關系,形成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG劃分相互依賴的多個stage,劃分stage的依據就是RDD之間的寬窄依賴。遇到寬依賴就劃分stage,每個stage包含一個或多個task任務。然后將這些task以taskSet的形式提交給TaskScheduler運行。 stage是由一組并行的task組成。
(1) 窄依賴(narrow dependencies)
可以支持在同一個集群Executor上,以pipeline管道形式順序執行多條命令,例如在執行了map后,緊接著執行filter。分區內的計算收斂,不需要依賴所有分區的數據,可以并行地在不同節點進行計算。所以它的失敗恢復也更有效,因為它只需要重新計算丟失的parent partition即可
(2)寬依賴(shuffle dependencies)
則需要所有的父分區都是可用的,必須等RDD的parent partition數據全部ready之后才能開始計算,可能還需要調用類似MapReduce之類的操作進行跨節點傳遞。從失敗恢復的角度看,shuffle dependencies 牽涉RDD各級的多個parent partition。
劃分完stage之后,同一個stage里面只有窄依賴,沒有寬依賴,可以實現流水線計算,stage中的每一個分區對應一個task,同一個stage中就有很多可以并行運行的task
RDD操作類型
Spark中的操作大致可以分為創建操作、轉換操作、控制操作和行為操作。
創建操作(Creation Operation):用于RDD創建工作。RDD創建只有兩種方法,一種是來自于內存集合和外部存儲系統,另一種是通過轉換操作生成的RDD。
轉換操作(Transformation Operation):將RDD通過一定的操作轉變成新的RDD,RDD的轉換操作是惰性操作,它只是定義了一個新的RDD,并沒有立即執行。
控制操作(Control Operation):進行RDD持久化,可以讓RDD按不同的存儲策略保存在磁盤或者內存中,比如cache接口默認將RDD緩存在內存中。
行動操作(Action Operation):能夠觸發Spark運行的操作。Spark中行動操作分為兩類,一類操作輸出計算結果,另一類將RDD保存到外部文件系統或者數據庫中。
1、創建操作
目前有兩種類型的基礎RDD:
并行集合接收一個已經存在的集合,然后進行并行計算。
外部存儲:在一個文件的每條記錄上運行函數。只要文件系統是HDFS,或者Hadoop支持的任意存儲系統即可。
這兩種類型的RDD都可以通過相同的方式進行操作,從而獲得子RDD等一系列拓形成血統關系圖。
2、控制操作
Spark可以將RDD持久化到內存或磁盤文件系統中,把RDD持久化到內存中可以極大地提高迭代計算以及各計算模型之間的數據共享,一般情況下執行節點60%內存用于緩存數據,剩下的40%用于運行任務。Spark中使用persist和cache操作進行持久化,其中cache是persist()的特例。
3、轉換操作-Transformation算子
Transformation
map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD。
filter(func):對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD。
reduceBykey(func,[numTasks]): 類似groupBykey,但是每一個key對應的value會根據提供的func進行計算以得到一個新的值。
join(otherDataset,[numTasks]):(K,W),返回(K,(V, W))同時支持leftOuterJoin,rightOutJoin,和fullOuterJoin。
4、行動操作– Action算子
reduce(func):根據函數聚合數據集里的元素。
collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組。
count():統計數據集中元素個數。
first():獲取數據集第一個元素。
take(n):獲取數據集最上方的幾個元素,返回—個數組。
saveAsTextFile(path):把dataset寫到一個textfile中,或者HDFS,Spark把每條記錄都轉換為一行記錄,然后寫到file中。
DataFrame概念
與RDD類似,DataFrame也是一個不可變彈性分布式數據集。除了數據以外,還記錄數據的結構信息,即schema。類似二維表格。
DataFrame的查詢計劃可以通過Spark Catalyst Optimiser進行優化,即使Spark經驗并不豐富,用Dataframe寫得程序也可以盡量被轉化為高效的形式予以執行。
DataSet概念
Dataframe是Dataset的特例,DataFrame=Dataset[Row],所以可以通過as方法將Dataframe轉換為Dataset。Row是一個通用的類型,所有的表結構信息都用Row來表示。
DataSet是強類型的,可以有Dataset[Car],Dataset[Person]等。
DataFrame、DataSet、RDD表現形式的區別
如上圖所示,對RDD中的數據它是不知道類型,也不知道結構。對于DataFrame中的數據,我們知道結構,但是我們并不知道這種結構下對應的數據是什么類型。比如說這里面的張三,他是一個字符串,但是dDataFrame在編譯的時候并不知道張三是一個字符串,我們只有取出來了進行轉換之后,才變成了字符串。因此在編譯的時候,DataFrame容易出錯。再看Dataset中的每一個記錄都是有類型的,因此在編譯的時候能夠去發現一些錯誤。比如說我們對這里的字符串進行減法操作,DataFrame就不能夠發現,但是對于Dataset就能夠發現。因為DataFrame和Dataset它具有結構性,因此它能夠使用spark SQL中的優化操作,能夠對執行的流程進行優化。即使我們對spark不是非常熟悉,也可以寫出了非常高效的spark語句。
三、Spark原理與架構
Spark體系架構
在spark中,spark庫是居于核心地位的。spark庫是類似于MapReduce一種分布式的內存計算架構,其中的Standalone支持單獨的資源管理調度,也支持Yarn或者Mesos的資源管理調度。在Spark庫之上有Spark SQL。Spark SQL是用于處理結構化數據以及SQL查詢的一種工具,Spark Streaming是叫做vp處理,它將流數據劃分成了vp,然后就有spark庫進行分析處理,MLlib和GraphX是spark的機器學習庫以及圖學習庫。另外Structured Streaming這個融入了Flume的一些特性,提升了spark的流處理的特性。我們可以認為是Structured 是Spark Streaming的提升。
典型案例-WordCount
Word Count 顧名思義就是對單詞進行計數,首先會對文件中的單詞做統計計數,然后輸出出現次數最多的 單詞。
單詞計數是最簡單也是最能體現MapReduce思想的程序之一,可以稱為MapReduce版"Hello World",該程序的完整代碼可以在Hadoop安裝包的"src/examples"目錄下找到。單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數
分別通過map和reduce方法統計文本中每個單詞出現的次數,然后按照字母的順序排列輸出,Map過程首先是多個map并行提取多個句子里面的單詞然后分別列出來每個單詞,出現次數為1,全部列舉出來。
Reduce過程首先將相同key的數據進行查找分組然后合并,比如對于key為Hello的數據分組為:
Spark SQL概述
Spark SQL是Spark中用于結構化數據處理的模塊。在Spark應用中,可以無縫的使用SQL語句亦或是DataFrame API對結構化數據進行查詢。
Spark SQL vs Hive區別:
Spark SQL的執行引擎為Spark core,Hive默認執行引擎為MapReduce。
Spark SQL的執行速度是Hive的10-100倍
Spark SQL不支持buckets,Hive支持。
聯系:
Spark SQL依賴Hive的元數據。
Spark SQL兼容絕大部分Hive的語法和函數。
Spark SQL可以使用Hive的自定義函數。
Structured Streaming概述
Structured Streaming是構建在Spark SQL引擎上的流式數據處理引擎。可以像使用靜態RDD數據那樣編寫流式計算過程。當流數據連續不斷的產生時,Spark sQL將會增示的、持續不斷的處理這些數據,并將結果更新到結果集中。
如上圖所示structured streaming的計算流程。structured streaming將流數據看作成無界表新的記錄,當新的數據產生時是structured streaming將新的數據作為無界表新的行。那么structured streaming將新的數據當作了靜態的RDD一樣的利用spark sql所提供的api進行了分析處理。
接下來我們看一個structured streaming的計算模型示例。這里我們采用了nc輸入數據,在1時刻我們輸入的數據就變成了dataframe。對這個dataframe進行分析處理得到一個結果,在2時刻又輸入了新的數據,那這個新的數據也插不到這個所謂的無界表中,產生新的數據。在3時刻又增加了兩個單詞,也是插入到這個無界表中產生新的結果。structure streaming它引入一個特征狀態。當隨著數據數據增加的時候,它計算出來的結果會合并先前的計算結果,產生最終的計算結果。
Spark Streaming概述
Spark Streaming的基本原理是將實時輸入數據流以時間片(秒級)為單位進行拆分,然后經Spark引擎以類似批處理的方式處理每個時間片數據。
Spark Streaming類似于Apache Storm,用于流式數據的處理。Spark Streaming有高吞吐量和容錯能力強等特點,而且Spark Streaming易用、容錯、易整合到Spark體系 。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每個時間區間收到的數據都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此 得名“離散化”)。
窗口在Dstream上滑動,合并和操作落入窗口內的RDDs,產生窗口化的RDDs;
窗口長度:窗口的持續時間
滑動窗口間隔:窗口操作執行的時間間隔
Spark Streaming vs Storm
從流式數據處理的角度上來說,Spark Streaming與Storm存在一些差距。比如說Storm是一個真正的純實時的流處理引擎,處理的速度可以達到了毫秒級。另外Storm支持了完善的事務機制,但是Spark Streaming對事物機制支撐不夠完善。如果說針對一些場景要求,流數據的處理不多不少,我們需要使用Storm,對一些小型規模的公司來說,大數據資源是有限的,因此我們需要動態的調整的并行度,Storm也可以很好的支撐這種應用場景。對于Spark Streaming來說,它最大的優勢是它產生于了spark的生態圈,它可以無縫的與Spark 以及Spark SQL進行的連接。對于軟件產生出來的實時數據可以無縫的批處理,以及spark sql進行查詢分析。
總結
本章節主要介紹了Spark的基本概念、技術架構,涉及SparkSQL、 StructuredStreaming、Spark Streaming多個組件基本功能。
學習推薦
華為Learning網站:?https://support.huawei.com/learning
華為e學云:?https://support.huawei.com/learning/elearning
華為Support案例庫:https://support.huawei.com/enterprise
本文整理自華為云社區【內容共創系列】活動。
查看活動詳情:https://bbs.huaweicloud.com/blogs/314887
相關任務詳情:Spark基于內存的分布式計算
spark 分布式
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。