亞寵展、全球寵物產業風向標——亞洲寵物展覽會深度解析
917
2022-05-28
目錄
Spark內核原理
RDD 依賴
窄依賴(Narrow Dependency)
Shuffle 依賴(寬依賴 Wide Dependency)
如何區分寬窄依賴
DAG和Stage
詞匯表
Spark內存迭代
所以,我們說Spark比MR效率高主要就是2個原因:
為什么可以內存傳輸或者網絡直傳呢?
Spark基本概念
1.Application:應用,就是程序員編寫的Spark代碼,如WordCount代碼
2.Driver:驅動,就是用來執行main方法的JVM進程,里面會執行一些Drive端的代碼,如創建SparkContext,設置應用名,設置日志級別...
3.SparkContext:Spark運行時的上下文環境,用來和ClusterManager進行通信的,并進行資源的申請、任務的分配和監控等
4.ClusterManager:集群管理器,對于Standalone模式,就是Master,對于Yarn模式就是ResourceManager/ApplicationMaster,在集群上做統一的資源管理的進程
5.Worker:工作節點,是擁有CPU/內存的機器,是真正干活的節點
6.Executor:運行在Worker中的JVM進程!
7.RDD:彈性分布式數據集
8.DAG:有向無環圖,就是根據Action形成的RDD的執行流程圖---靜態的圖
9.Job:作業,按照DAG進行執行就形成了Job---按照圖動態的執行
10.Stage:DAG中,根據shuffle依賴劃分出來的一個個的執行階段!
11.Task:一個分區上的一系列操作(pipline上的一系列操作)就是一個Task,同一個Stage中的多個Task可以并行執行!(每一個Task由線程執行),所以也可以這樣說:Task(線程)是運行在Executor(進程)中的最小單位!
12.TaskSet:任務集,就是同一個Stage中的各個Task組成的集合!
Job調度流程
Spark運行基本流程
Spark的任務調度總體來說分兩路進行:Stage級的調度和Task級的調度
一個Spark應用程序包括Job、Stage及Task:
Spark內核原理
Spark的核心是根據RDD來實現的,Spark Scheduler則為Spark核心實現的重要一環,其作用就是任務調度。Spark的任務調度就是如何組織任務去處理RDD中每個分區的數據,根據RDD的依賴關系構建DAG,基于DAG劃分Stage,將每個Stage中的任務發到指定節點運行。基于Spark的任務調度原理,可以合理規劃資源利用,做到盡可能用最少的資源高效地完成任務計算。
以詞頻統計WordCount程序為例,Job執行是DAG圖:
RDD 依賴
RDD 的容錯機制是通過將 RDD 間轉移操作構建成有向無環圖來實現的。從抽象的角度看,RDD 間存在著血統繼承關系,其本質上是 RDD之間的依賴(Dependency)關系。
從圖的角度看,RDD 為節點,在一次轉換操作中,創建得到的新 RDD 稱為子 RDD,同時會產生新的邊,即依賴關系,子 RDD 依賴向上依賴的 RDD 便是父 RDD,可能會存在多個父 RDD。可以將這種依賴關系進一步分為兩類,分別是窄依賴(NarrowDependency)和 Shuffle 依賴(Shuffle Dependency 在部分文獻中也被稱為 Wide Dependency,即寬依賴)。
窄依賴(Narrow Dependency)
窄依賴中:即父 RDD 與子 RDD 間的分區是一對一的。換句話說父RDD中,一個
分區內的數據是不能被分割的,只能由子RDD中的一個分區整個利用。
上圖中 P代表 RDD中的每個分區(Partition),我們看到,RDD 中每個分區內的數據在上面的幾種轉移操作之后被一個分區所使用,即其依賴的父分區只有一個。比如圖中的 map、union 和 join 操作,都是窄依賴的。注意,join 操作比較特殊,可能同時存在寬、窄依賴。
Shuffle 依賴(寬依賴 Wide Dependency)
Shuffle 有“洗牌、攪亂”的意思,這里所謂的 Shuffle 依賴也會打亂原 RDD 結構的操作。具體來說,父 RDD 中的分區可能會被多個子 RDD 分區使用。因為父 RDD 中一個分區內的數據會被分割并發送給子 RDD 的所有分區,因此 Shuffle 依賴也意味著父 RDD與子 RDD 之間存在著 Shuffle 過程。
上圖中 P 代表 RDD 中的多個分區,我們會發現對于 Shuffle 類操作而言,結果 RDD 中的每個分區可能會依賴多個父 RDD 中的分區。需要說明的是,依賴關系是 RDD 到 RDD 之間的一種映射關系,是兩個 RDD 之間的依賴,如果在一次操作中涉及多個父 RDD,也有可能同時包含窄依賴和 Shuffle 依賴。
如何區分寬窄依賴
區分RDD之間的依賴為寬依賴還是窄依賴,主要在于父RDD分區數據與子RDD分區數據關系:
窄依賴:父RDD的一個分區只會被子RDD的一個分區依賴;
寬依賴:父RDD的一個分區會被子RDD的多個分區依賴,涉及Shuffle;
為什么要設計寬窄依賴??
1)、對于窄依賴來說
Spark可以并行計算
如果有一個分區數據丟失,只需要從父RDD的對應個分區重新計算即可,不需要重新計算整個任務,提高容錯。
2)、對應寬依賴來說
劃分Stage的依據,產生Shuffle
DAG和Stage
在圖論中,如果一個有向圖無法從任意頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。而在Spark中,由于計算過程很多時候會有先后順序,受制于某些任務必須比另一些任務較早執行的限制,必須對任務進行排隊,形成一個隊列的任務集合,這個隊列的任務集合就是DAG圖,每一個定點就是一個任務,每一條邊代表一種限制約束(Spark中的依賴關系)。
Spark中DAG生成過程的重點是對Stage的劃分,其劃分的依據是RDD的依賴關系,對于不同的依賴關系,高層調度器會進行不同的處理。
對于窄依賴,RDD之間的數據不需要進行Shuffle,多個數據處理可以在同一臺機器的內存中完成,所以窄依賴在Spark中被劃分為同一個Stage;
對于寬依賴,由于Shuffle的存在,必須等到父RDD的Shuffle處理完成后,才能開始接下來的計算,所以會在此處進行Stage的切分。
在Spark中,DAG生成的流程關鍵在于回溯,在程序提交后,高層調度器將所有的RDD看成是一個Stage,然后對此Stage進行從后往前的回溯,遇到Shuffle就斷開,遇到窄依賴,則歸并到同一個Stage。等到所有的步驟回溯完成,便生成一個DAG圖。
把DAG劃分成互相依賴的多個Stage,劃分依據是RDD之間的寬依賴,Stage是由一組并行的Task組成。Stage切割規則:從后往前,遇到寬依賴就切割Stage。
Stage計算模式:pipeline管道計算模式,pipeline只是一種計算思想、模式,來一條數據然后計算一條數據,把所有的邏輯走完,然后落地。準確的說:一個task處理一串分區的數據,整個計算邏輯全部走完。
詞匯表
http://spark.apache.org/docs/2.4.5/cluster-overview.html
The following table summarizes terms you’ll see used to refer to cluster concepts:
Term
Meaning
Application
User program built on Spark. Consists of a?driver program?and?executors?on the cluster.
Application jar
A jar containing the user's Spark application. In some cases users will want to create an "uber jar" containing their application along with its dependencies. The user's jar should never include Hadoop or Spark libraries, however, these will be added at runtime.
Driver program
The process running the main() function of the application and creating the SparkContext
Cluster manager
An external service for acquiring resources on the cluster (e.g. standalone manager, Mesos, YARN)
Deploy mode
Distinguishes where the driver process runs. In "cluster" mode, the framework launches the driver inside of the cluster. In "client" mode, the submitter launches the driver outside of the cluster.
Worker node
Any node that can run application code in the cluster
Executor
A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.
Task
A unit of work that will be sent to one executor
Job
A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g.?save,?collect); you'll see this term used in the driver's logs.
Stage
Each job gets divided into smaller sets of tasks called?stages?that depend on each other (similar to the map and reduce stages in MapReduce); you'll see this term used in the driver's logs.
Spark內存迭代
我們說Spark的性能對比MR是劃時代的。主要原因是基于內存的迭代,具體是如何迭代的呢?
我們先明白一個概念:DAG
前面說過,DAG是一個有向無環圖,而有向無環圖中的每一個節點,就是一個個的算子。
首先,MR的計算模型中,只有2個算子,一個Map?一個Reduce。
僅有的兩個算子,就導致了許多復雜的任務很難用這兩個算子計算出來。
很多復雜任務需要進行MR任務的迭代計算,也就是一個MR結束后下一個MR緊接著啟動。
如果將這一整個復雜任務描述為DAG的話,類似于:
反之看一下算子豐富的Spark任務,如果這個復雜任務用Spark開發,其DAG可能是類似這樣:
所以,我們說Spark比MR效率高主要就是2個原因:
MR計算模型中,Map算子和Reduce算子進行數據傳輸需要通過硬盤進行
MR計算模型的算子匱乏,只有Map和Reduce兩個算子,導致復雜任務需要串接多個MR任務,中間的傳輸都經過HDFS硬盤
也就是M和R之間走硬盤,多個MR之間也走硬盤,同時涉及到多次的MapReduce任務的啟動和釋放,對效率很影響。
反觀Spark(Flink),由于算子豐富,任務基本上都能一個Spark任務搞定,這就避免了多個Spark任務串聯。同時,在Spark內部,多個算子之間的數據溝通是通過內存或者網絡進行直接傳輸的,避免了低效的硬盤傳輸。
為什么可以內存傳輸或者網絡直傳呢?
Spark的最小執行單位是Task也就是單個線程。Task運行在Executor內。一個節點可以有多個Executor,一個集群可以有多個節點。
一個算子可以被并行執行,每個并行就是一個線程(一個task)
如果算子A的所有Task在Executor1、3中執行,算子B的所有Task運行在Executor2、4中執行。
算子AB的關系是 先計算A然后基于A的結果計算B
那么執行可能為:
如果Executor1和3在同一個節點之上,那么內存傳輸即可
如果Executor3和5在不同節點上,那么數據走網絡傳輸即可
Spark會盡量安排DAG中的數據流轉在內存中流轉。盡量避免網絡。
實在不行走網絡,也比MR的硬盤快了太多了。
Spark基本概念
官方文檔:
http://spark.apache.org/docs/2.4.5/cluster-overview.html#glossary
Spark Application運行時,涵蓋很多概念,主要如下表格:
1.Application:應用,就是程序員編寫的Spark代碼,如WordCount代碼
2.Driver:驅動,就是用來執行main方法的JVM進程,里面會執行一些Drive端的代碼,如創建SparkContext,設置應用名,設置日志級別...
3.SparkContext:Spark運行時的上下文環境,用來和ClusterManager進行通信的,并進行資源的申請、任務的分配和監控等
4.ClusterManager:集群管理器,對于Standalone模式,就是Master,對于Yarn模式就是ResourceManager/ApplicationMaster,在集群上做統一的資源管理的進程
5.Worker:工作節點,是擁有CPU/內存的機器,是真正干活的節點
6.Executor:運行在Worker中的JVM進程!
7.RDD:彈性分布式數據集
8.DAG:有向無環圖,就是根據Action形成的RDD的執行流程圖---靜態的圖
9.Job:作業,按照DAG進行執行就形成了Job---按照圖動態的執行
10.Stage:DAG中,根據shuffle依賴劃分出來的一個個的執行階段!
11.Task:一個分區上的一系列操作(pipline上的一系列操作)就是一個Task,同一個Stage中的多個Task可以并行執行!(每一個Task由線程執行),所以也可以這樣說:Task(線程)是運行在Executor(進程)中的最小單位!
12.TaskSet:任務集,就是同一個Stage中的各個Task組成的集合!
Job調度流程
Spark運行基本流程
1.當一個Spark應用被提交時,首先需要為這個Spark Application構建基本的運行環境,即由任務控制節點(Driver)創建一個SparkContext(還會構建DAGScheduler和TaskScheduler)
2.SparkContext向資源管理器注冊并申請運行Executor資源;
3.資源管理器為Executor分配資源并啟動Executor進程,Executor運行情況將隨著心跳發送到資源管理器上;
4.SparkContext根據RDD的依賴關系構建成DAG圖,并提交給DAGScheduler進行解析劃分成Stage,并把該Stage中的Task組成的Taskset發送給TaskScheduler。
5.TaskScheduler將Task發放給Executor運行,同時SparkContext將應用程序代碼發放給Executor。
6.Executor將Task丟入到線程池中執行,把執行結果反饋給任務調度器,然后反饋給DAG調度器,運行完畢后寫入數據并釋放所有資源。
Spark Application應用的用戶代碼都是基于RDD的一系列計算操作,實際運行時,這些計算操作是Lazy執行的,并不是所有的RDD操作都會觸發Spark往Cluster上提交實際作業,基本上只有一些需要返回數據或者向外部輸出的操作才會觸發實際計算工作(Action算子),其它的變換操作基本上只是生成對應的RDD記錄依賴關系(Transformation算子)。
當RDD調用Action函數(比如count、saveTextFile或foreachPartition)時,觸發一個Job執行,調度中流程如下圖所示:
Spark RDD通過其Transactions操作,形成了RDD血緣關系圖,即DAG,最后通過Action的調用,觸發Job并調度執行。
Spark的任務調度總體來說分兩路進行:Stage級的調度和Task級的調度
DAGScheduler負責Stage級的調度,主要是將DAG依據RDD寬依賴切分成若干Stages,并將每個Stage打包成TaskSet交給TaskScheduler調度。
TaskScheduler負責Task級的調度,將DAGScheduler給過來的TaskSet按照指定的調度策略分發到Executor上執行,調度過程中SchedulerBackend負責提供可用資源,其中SchedulerBackend有多種實現,分別對接不同的資源管理系統。
一個Spark應用程序包括Job、Stage及Task:
Job/DAG是以Action方法為界,遇到一個Action方法則觸發一個Job;
Stage是Job的子集,以RDD寬依賴(即Shuffle)為界,遇到Shuffle做一次劃分;
Task是Stage的子集,以并行度(分區數)來衡量,分區數是多少,則有多少個task。
spark 大數據
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。