大數(shù)據(jù)服務(wù)上云的思考">大數(shù)據(jù)服務(wù)上云的思考
689
2025-04-02
學(xué)習(xí)hadoop,必不可少的就是編寫 MapReduce 程序。當(dāng)然,對于簡單的分析程序,我們只需一個(gè)MapReduce任務(wù)就能搞定,然而對于比較復(fù)雜的分析程序,我們可能需要多個(gè)Job或者多個(gè)Map或者Reduce進(jìn)行分析計(jì)算。 本次主要說明的是多個(gè) Job 或者多個(gè) MapReduce 的編程形式。
MapReduce 的主要有以下幾種編程形式
迭代式 MapReduce:
MapReduce 迭代方式,通常是將上一個(gè) MapReduce 任務(wù)的輸出作為下一個(gè) MapReduce 任務(wù)的輸入,可只保留 MapReduce 任務(wù)的最終結(jié)果,中間數(shù)據(jù)可以刪除或保留,可根據(jù)業(yè)務(wù)需要自行決定。
迭代式 MapReduce 的示例代碼如下所示:
Configuration conf = new Configuration();
//第一個(gè) MapReduce 任務(wù)
Job job1 = new Job(conf,"job1");
.....
FileInputFormat.addInputPath(job1,input);//job1的輸入
FileOutputFromat.setOutputPath(job1,out1);//job1的輸出
job1.waitForCompletion(true);
//第二個(gè) Mapreduce 任務(wù)
Job job2 = new Job(conf,"job2");
.....
FileInputFormat.addInputPath(job2,out1);//job1的輸出作為job2的輸入
FileOutputFromat.setOutputPath(job2,out2);//job2 的輸出
job2.waitForCompletion(true);
//第三個(gè) Mapreduce 任務(wù)
Job job3 = new Job(conf,"job3");
.....
FileInputFormat.addInputPath(job3,out2);//job2的輸出作為job3的輸入
FileOutputFromat.setOutputPath(job3,out3);//job3 的輸出
job3.waitForCompletion(true);
.....
雖然 MapReduce 的迭代可實(shí)現(xiàn)多任務(wù)的執(zhí)行,但是它具有如下兩個(gè)缺點(diǎn):
1、每次迭代,如果所有 Job 對象重復(fù)創(chuàng)建,代價(jià)將非常高。
2、每次迭代,數(shù)據(jù)都要寫入本地,然后從本地讀取,I/O和網(wǎng)絡(luò)傳輸?shù)拇鷥r(jià)比較大。
依賴關(guān)系式 MapReuce:
依賴關(guān)系式 MapReduce主要是由 org.apache.hadoop.mapred.jobcontrol 包中的 JobControl 類來實(shí)現(xiàn)。JobControl 的實(shí)例表示一個(gè)作業(yè)的運(yùn)行圖, 你可以加入作業(yè)配置,然后告知 JobControl 實(shí)例作業(yè)之間的依賴關(guān)系。在一個(gè)線程中運(yùn)行 JobControl 時(shí),它將按照依賴順序來執(zhí)行這些作業(yè)。也可以查看進(jìn)程, 在作業(yè)結(jié)束后,可以查詢作業(yè)的所有狀態(tài)和每個(gè)失敗相關(guān)的錯(cuò)誤信息。如果一個(gè)作業(yè)失敗,JobControl 將不執(zhí)行與之有依賴關(guān)系的后續(xù)作業(yè)。
依賴關(guān)系式 MapReuce 的示例代碼如下所示:
注意:hadoop的JobControl類實(shí)現(xiàn)了線程Runnable接口。我們需要實(shí)例化一個(gè)線程來啟動(dòng)它。直接調(diào)用JobControl的run()方法,線程將無法結(jié)束。
線性鏈?zhǔn)?MapReduce
大量的數(shù)據(jù)處理任務(wù)涉及對記錄的預(yù)處理和后處理。
例如:在處理信息檢索的文檔時(shí),可能一步是移除 stop words(像a、the和is這樣經(jīng)常出現(xiàn)但不太有意義的詞),另一步做stemming(轉(zhuǎn)換一個(gè)詞的不同形式為相同的形式,例如轉(zhuǎn)換finishing和finished為finish)。
你可以為預(yù)處理與后處理步驟各自編寫一個(gè) MapReduce 作業(yè),并把它們鏈接起來。在這些步驟中可以使用IdentityReducer(或完全不同的 Reducer)。 由于過程中每一個(gè)步驟的中間結(jié)果都需要占用I/O和存儲(chǔ)資源,這種做法是低效的。另一種方法是自己寫 mapper去預(yù)先調(diào)用所有的預(yù)處理步驟,再讓reducer調(diào)用所有的后處理步驟。這將強(qiáng)制你采用模塊化和可組合的方式來構(gòu)建預(yù)處理和后處理。因此Hadoop引入了ChainMapper 和ChainReducer類來簡化預(yù)處理和后處理的構(gòu)成。
hadoop提供了專門的鏈?zhǔn)紺hainMapper和ChainReducer來處理線性鏈?zhǔn)組apReduce任務(wù)。在Map或者Reduce階段存在多個(gè)Mapper,這些Mapper像Linux管道一樣,前一個(gè)Mapper的輸出結(jié)果直接重定向到后一個(gè)Mapper的輸入,形成流水線。 其調(diào)用形式如下:
... //預(yù)處理
ChainMapper.addMapper(...);
ChainReducer.setReducer(...);
ChainReducer.addMapper(...);
... ?//后處理
//addMapper()調(diào)用的方法形式如下:
public static void addMapper(Job job,
Class< extends Mapper> mclass,
Class< extends K1> inputKeyClass,
Class< extends V1> inputValueClass,
Class< extends K2> outputKeyClass,
Class< extends V2> outputValueClass,
Configuration conf
)
addMapper()方法有8個(gè)參數(shù)。第一個(gè)和最后一個(gè)分別為全局的Job和本地的configuration對象。第二個(gè)參數(shù)是 Mapper類,負(fù)責(zé)數(shù)據(jù)處理。余下4個(gè)參數(shù) inputKeyClass、inputValueClass、outputKeyClass和outputValueClass是這個(gè)Mapper類中輸入/輸出類的類型。ChainReducer專門提供了一個(gè)setReducer()方法來設(shè)置整個(gè)作業(yè)唯一的Reducer,語法與addMapper()方法類似。
線性鏈?zhǔn)?MapReduce 的示例代碼如下所示:
Hadoop 大數(shù)據(jù) MapReduce
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。