MapReduce編程模型和計算框架

      網友投稿 768 2025-04-01

      1 概述

      源于Google的MapReduce論文,發表于2004年12月。Hadoop MapReduce是Google MapReduce的克隆版。Hadoop問世前,已有分布式計算,但都是專用系統,僅處理某一類計算,比如進行大規模數據排序。這樣的系統無法復用到其他大數據計算場景,每種應用都需要開發與維護專門系統。

      而Hadoop MapReduce造就了大數據計算通用編程。只要遵循MapReduce編程模型編寫業務處理邏輯代碼,就能運行在Hadoop分布式集群。我們只需關心業務邏輯,無需關心系統調用與運行環境。

      大數據計算的核心思路:移動計算比移動數據劃算。既然計算方法跟傳統計算方法不同,移動計算而非移動數據,那用傳統編程模型進行大數據計算就會遇到很多困難,因此Hadoop使用MapReduce的編程模型。

      MapReduce編程模型并非Hadoop原創,也非Google原創,但Google和Hadoop創造性地將MapReduce編程模型用到大數據計算,讓復雜的各種各樣的機器學習、數據挖掘、SQL處理等大數據計算變得簡單。

      Hadoop解決大規模數據分布式計算的方案——MapReduce。

      MapReduce既是編程模型,也是計算框架:開發人員必須基于MapReduce編程模型進行code,然后將程序通過MapReduce計算框架分發到Hadoop集群中運行。

      MapReduce編程模型執行步驟

      準備map處理的輸入數據

      Mapper處理

      Shuffle

      Reduce處理

      結果輸出

      InputFormat

      決定在哪里以及怎樣持久化作業結果。Hadoop為不同類型的格式提供了一系列的類和接口,實現自定義操作只要繼承其中的某個類或接口即可。你可能已經熟悉了默認的OutputFormat,也就是TextOutputFormat,它是一種以行分隔,包含制表符界定的鍵值對的文本文件格式。盡管如此,對多數類型的數據而言,如再常見不過的數字,文本序列化會浪費一些空間,由此帶來的結果是運行時間更長且資源消耗更多。為了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它將對象表示成二進制形式而不再是文本文件,并將結果進行壓縮。

      Split

      Split:交由MapReduce作業來處理的數據塊,是MapReduce中最小的計算單元。

      HDFS:blocksize是HDFS中最小的存儲單元,128M

      默認他們兩一一對應,當然我們也可以手工設置他們之間的關系(不建議)

      2 MapReduce編程模型

      模型只包含Map、Reduce兩個過程:

      map的輸入是一對值,經過map計算后輸出一對值;然后將相同Key合并,形成

      再將這個輸入reduce,經過計算輸出零個或多個

      強大,不管是關系代數運算(SQL計算),還是矩陣運算(圖計算),大數據領域幾乎所有的計算需求都可以通過MapReduce實現。

      3 WordCount

      文本處理中詞頻統計的問題,就是統計文本中每一個單詞出現的次數。如果只是統計一篇文章的詞頻,幾十KB到幾MB的數據,只需要寫一個程序,將數據讀入內存,建一個Hash表記錄每個詞出現的次數就可以了。這個統計過程你可以看下面這張圖。

      3.1 單機處理

      # 文本前期處理 strl_ist = str.replace('\n', '').lower().split(' ') count_dict = {} # 如果字典里有該單詞則加1,否則添加入字典 for str in strl_ist: if str in count_dict.keys(): count_dict[str] = count_dict[str] + 1 else: count_dict[str] = 1

      建個Hash表,然后將字符串里的每個詞放進Hash表。若該詞第一次放到Hash表,就新建一個KV對,K=該詞,V=1。若Hash表里已有該詞,則給該詞的V + 1。

      小數據量用單機統計詞頻很簡單,但若想統計互聯網所有網頁(萬億計)的詞頻數(Google這種體量搜索引擎的需求),不可能寫一個程序把全世界的網頁都讀入內存,這時候就需要用MapReduce編程。

      3.2 WordCount的MapReduce

      public class WordCount { public static class TokenizerMapper extends Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } }

      MapReduce版本WordCount程序的核心是map、reduce函數。

      map函數的輸入主要是一個對,在這個例子里,Value是要統計的所有文本中的一行數據,Key在一般計算中都不會用到。

      public void map(Object key, Text value, Context context )

      map函數的計算過程是,將這行文本中的單詞提取出來,針對每個單詞輸出一個這樣的KV對。

      MapReduce計算框架會將這些收集起來,將相同的word放在一起,形成>這樣的數據,然后將其輸入給reduce函數。

      public void reduce(Text key, Iterable values, Context context )

      這里reduce的輸入參數Values就是由很多個1組成的集合,而Key就是具體的單詞word。

      reduce將這個集合里的1求和,再將單詞(word)和這個和(sum)組成一個,即輸出,即一個單詞和它的詞頻統計總和。

      一個map函數能針對一部分數據進行運算,這就能將一個大數據切分成很多塊(HDFS),MapReduce計算框架為每個數據塊分配一個map函數去計算,從而實現大數據的分布式計算。

      假設有兩個數據塊的文本數據需要進行詞頻統計,MapReduce計算過程:

      但這樣一個MapReduce程序要想在分布式環境中執行,并處理海量的大規模數據,還需一個計算框架,調度執行這個MapReduce程序,使它在分布式的集群中并行運行,這個計算框架也叫MapReduce。

      所以,說MapReduce時:

      可能指編程模型,即一個MapReduce程序

      也可能是指計算框架,調度執行大數據的分布式計算

      這個過程有兩個關鍵問題:

      如何為每個數據塊分配一個Map計算任務,即代碼如何發送到數據塊所在服務器,發送后如何啟動,啟動后如何知道自己需要計算的數據在文件什么位置(BlockID是啥)

      處于不同服務器的map輸出的 ,如何把相同的Key聚合在一起發送給Reduce任務進行處理

      這倆問題對應MapReduce計算過程圖中的兩處“MapReduce框架處理”:

      MapReduce作業啟動和運行

      MapReduce數據合并與連接

      4 MapReduce作業啟動和運行機制

      以Hadoop 1為例,MapReduce運行過程涉及三類關鍵進程:

      4.1 大數據應用進程

      啟動MapReduce程序的主入口。指定Map和Reduce類、輸入輸出文件路徑等,并提交作業給Hadoop集群,即JobTracker進程。這是由用戶啟動的MapReduce程序進程,如WordCount程序。

      4.2 JobTracker進程

      根據要處理的輸入數據量,命令TaskTracker進程啟動相應數量的Map和Reduce進程任務,并管理整個作業生命周期的任務調度和監控。

      這是Hadoop集群的常駐進程,JobTracker進程在整個Hadoop集群全局唯一。

      4.3 TaskTracker進程

      負責啟動和管理Map進程以及Reduce進程。因為需要每個數據塊都有對應的map函數,TaskTracker進程通常和HDFS的DataNode進程啟動在同一個服務器。即Hadoop集群中絕大多數服務器同時運行DataNode進程和TaskTracker進程。

      JobTracker進程和TaskTracker進程是主從關系:

      主服務器通常只有一臺或另有一臺備機提供高可用,但運行時只有一臺服務器對外服務,真正起作用的只有一臺

      從服務器可能有幾百上千臺,所有從服務器聽從主服務器的控制和調度安排。主服務器負責為應用程序分配服務器資源以及作業執行的調度,具體計算操作在從服務器完成

      MapReduce主服務器JobTracker,從服務器TaskTracker。HDFS主服務器NameNode,從服務器DataNode,Yarn、Spark等都是類似的一主多從的服務器架構:主服務器一臺,掌控全局;從服務器多臺,負責具體事情。這樣很多臺服務器可以有效組織起來,對外表現出一個統一又強大的計算能力。

      5 作業啟動和計算過程

      1、應用進程JobClient將用戶作業jar包存儲在HDFS,這些JAR包后續會分發給Hadoop集群中的服務器執行MapReduce計算

      2、應用程序提交job作業給JobTracker

      3、JobTracker根據作業調度策略創建JobInProcess樹,每個作業都會有一個自己的JobInProcess樹

      4、JobInProcess根據輸入數據分片數目(一般就是數據塊的數目)和設置的Reduce數目創建相應數量的TaskInProcess

      5、TaskTracker進程和JobTracker進程定時通信

      6、若TaskTracker有空閑的計算資源(有空閑CPU核心),JobTracker就會給它分配任務。分配任務時會根據TaskTracker的服務器名字匹配在同一臺機器上的數據塊計算任務給它,使啟動的計算任務正好處理本機上的數據,實現“移動計算比移動數據劃算”

      7、TaskTracker收到任務后根據任務類型(Map or Reduce)和任務參數(作業jar包路徑、輸入數據文件路徑、要處理的數據在文件中的起始位置和偏移量、數據塊多個備份的DataNode主機名等),啟動相應的Map或Reduce進程。

      8、Map或者Reduce進程啟動后,檢查本地是否有要執行任務的JAR包文件,若無,就去HDFS下載,然后加載Map或Reduce代碼開始執行。

      9、若為Map進程,從HDFS讀數據(通常要讀取的數據塊正好存儲在本機);若為Reduce進程,將結果數據寫到HDFS

      這樣,MapReduce能將大數據作業計算任務分布在整個Hadoop集群中運行,每個Map計算任務要處理的數據通常都能從本地磁盤讀取到。

      而我們要做的僅是編寫一個map函數和一個reduce函數,其余一切都由MapReduce計算框架自動完成!

      6 MapReduce數據合并與連接機制

      WordCount想統計相同單詞在所有輸入數據中出現的次數,而一個Map只能處理一部分數據,一個熱門單詞幾乎會出現在所有Map中,這意味著同一個單詞必須要合并到一起進行統計才能得到正確結果。

      幾乎所有的大數據計算場景都需要處理數據關聯,像WordCount這種簡單的只要對Key進行合并,對于像數據庫的join操作這種復雜的,需要對兩種類型(或者更多類型)的數據根據Key進行連接。

      在map輸出與reduce輸入之間,MapReduce計算框架處理數據合并與連接操作-shuffle

      7 shuffle

      每個Map任務的計算結果都會寫入本地文件系統,等Map任務快計算完成時,MapReduce計算框架會啟動shuffle過程,在Map任務進程調用一個Partitioner接口,對Map產生的每個KV進行Reduce分區選擇,然后通過HTTP通信發送給對應的Reduce進程。

      這樣不管Map位于哪個服務器節點,相同K一定會被發到相同Reduce進程。Reduce任務進程對收到的KV進行排序和合并,相同K放一起,組成一個傳遞給Reduce執行。

      map輸出的shuffle到哪個Reduce進程是關鍵,由Partitioner實現,MapReduce框架默認的Partitioner用Key的哈希值對Reduce任務數量取模,相同K落在相同Reduce任務ID:

      /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }

      分布式計算需要將不同服務器上的相關數據合并到一起進行下一步計算,這就是shuffle。

      只要是大數據批處理計算,一定有shuffle過程,只有讓數據關聯起來,數據的內在關系和價值才會呈現出來。若不理解shuffle,在map和reduce編程中肯定困惑,如何正確設計map的輸出和reduce的輸入。shuffle也是整個MapReduce過程中最難、最消耗性能,MapReduce早期的一半代碼都是shuffle處理。

      8 優點

      海量數量離線處理

      易開發

      易運行

      9 缺點

      實時流式計算

      MapReduce架構

      1.x 架構

      MapReduce 2.x 架構

      Java 實現 wordCount

      重構

      Combiner編程

      Partitoner

      總結

      MapReduce編程模型和計算框架

      模型是人們對一類事物的概括與抽象,可以幫助我們更好地理解事物的本質,更方便地解決問題。比如,數學公式是我們對物理與數學規律的抽象,地圖和沙盤是我們對地理空間的抽象,軟件架構圖是軟件工程師對軟件系統的抽象。

      通過抽象,我們更容易把握事物的內在規律,而不是被紛繁復雜的事物表象所迷惑,更進一步深刻地認識這個世界。通過抽象,伽利略發現力是改變物體運動的原因,而不是使物體運動的原因,為全人類打開了現代科學的大門。

      遇到問題時,停下來思考:這個問題為什么會出現,它揭示出來背后的規律是什么,我應該如何做。甚至有時候會把這些優秀的人帶入進思考:如果是大佬,他會如何看待、如何解決這個問題。通過這種不斷地訓練,雖然和那些最優秀的人相比還是有巨大的差距,但是仍然能夠感受到自己的進步。

      Hadoop 分布式

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何在Excel中檢查年份是否為a年?
      下一篇:怎么把只讀模式改為讀寫模式(怎樣改只讀模式)
      相關文章
      久久精品国产亚洲αv忘忧草| 亚洲日韩v无码中文字幕| 亚洲综合精品香蕉久久网| 欧美亚洲精品一区二区| 亚洲人av高清无码| 国产成人亚洲综合一区| 亚洲av专区无码观看精品天堂| 亚洲国产一区在线观看| 亚洲国产精品白丝在线观看| 亚洲一卡二卡三卡四卡无卡麻豆 | 亚洲五月丁香综合视频| 91亚洲精品麻豆| 亚洲国产成人手机在线电影bd| 亚洲女人18毛片水真多| 亚洲精品视频久久| 亚洲男人的天堂久久精品 | 激情综合色五月丁香六月亚洲| 国产成人亚洲精品狼色在线| 亚洲无线观看国产精品| 亚洲成AV人片在线观看无| 亚洲国产精品婷婷久久| 亚洲无线电影官网| 亚洲毛片基地日韩毛片基地| 亚洲精品中文字幕无乱码麻豆 | 亚洲深深色噜噜狠狠爱网站| 亚洲AV综合色区无码一区爱AV| 亚洲AV午夜成人片| 亚洲熟妇无码爱v在线观看| 2020国产精品亚洲综合网| 亚洲乱色熟女一区二区三区蜜臀| 理论亚洲区美一区二区三区| 亚洲无码高清在线观看| 亚洲精品国产美女久久久| 亚洲五月六月丁香激情| 亚洲伊人久久大香线蕉啊| 亚洲国产区男人本色| 亚洲免费一区二区| 亚洲AV日韩AV永久无码免下载| 亚洲第一精品电影网| 亚洲无人区码一二三码区别图片| 男人的天堂亚洲一区二区三区|