機器學(xué)習(xí)服務(wù)提取圖片的特征向量">使用SAP Leonardo上的機器學(xué)習(xí)服務(wù)提取圖片的特征向量
795
2025-04-01
MapReduce 教程:簡介
在這篇 MapReduce 教程博客中,我將向您介紹 MapReduce,它是 Hadoop 框架中處理的核心構(gòu)建塊之一。在繼續(xù)之前,我建議您熟悉我在之前的HDFS 教程博客中介紹的 HDFS 概念。這將幫助您快速輕松地理解 MapReduce 概念。
在開始之前,讓我們對以下內(nèi)容做一個簡單的了解。
什么是大數(shù)據(jù)?
大數(shù)據(jù)可以稱為海量數(shù)據(jù),傳統(tǒng)的數(shù)據(jù)處理單元幾乎無法處理這些數(shù)據(jù)。大數(shù)據(jù)的一個更好的例子是目前流行的社交媒體網(wǎng)站,如 Facebook、Instagram、WhatsApp 和 YouTube。
什么是Hadoop?
Hadoop 是由 Apache Foundation 設(shè)計和部署的大數(shù)據(jù)框架。它是一種開源軟件實用程序,可在計算機網(wǎng)絡(luò)中并行工作,以找到大數(shù)據(jù)的解決方案并使用 MapReduce 算法對其進行處理。
谷歌在 2004 年 12 月發(fā)表了一篇關(guān)于 MapReduce 技術(shù)的論文。這成為了 Hadoop 處理模型的起源。因此,MapReduce 是一種編程模型,它允許我們對龐大的數(shù)據(jù)集進行并行和分布式處理。我在這個 MapReduce 教程博客中涵蓋的主題如下:
并行和分布式處理的傳統(tǒng)方式
什么是 MapReduce?
MapReduce 示例
MapReduce 的優(yōu)勢
MapReduce 程序
MapReduce 程序說明
MapReduce 用例:KMeans 算法
MapReduce 教程:傳統(tǒng)方式
讓我們了解一下,當 MapReduce 框架不存在時,并行和分布式處理過去是如何以傳統(tǒng)方式發(fā)生的。所以,讓我們舉個例子,我有一個天氣日志,其中包含從 2000 年到 2015 年的年平均氣溫。在這里,我想計算每年溫度最高的那一天。
因此,就像傳統(tǒng)方式一樣,我將數(shù)據(jù)拆分為更小的部分或塊,并將它們存儲在不同的機器中。然后,我將找到存儲在相 應(yīng)機器中的每個零件的最高溫度。最后,我將結(jié)合從每臺機器接收到的結(jié)果以獲得最終輸出。讓我們看看與這種傳統(tǒng)方法相關(guān)的挑戰(zhàn):
關(guān)鍵路徑問題:在不延遲下一個里程碑或?qū)嶋H完成日期的情況下完成工作所花費的時間。因此,如果任何?機器延遲工作,整個工作都會被延遲。
可靠性問題:如果任何處理部分數(shù)據(jù)的機器出現(xiàn)故障怎么辦?這種故障轉(zhuǎn)移的管理成為一個挑戰(zhàn)。
等分問題:我將如何將數(shù)據(jù)分成更小的塊,以便每臺機器都可以使用部分數(shù)據(jù)。換句話說,如何平均分配數(shù)據(jù),以免單個機器過載或未充分利用。
單個拆分可能會失敗:如果任何機器無法提供輸出,我將無法計算結(jié)果。所以,應(yīng)該有一種機制來保證系統(tǒng)的這種容錯能力。
結(jié)果的聚合:?應(yīng)該有一種機制來聚合每臺機器生成的結(jié)果以產(chǎn)生最終輸出。
這些是我在使用傳統(tǒng)方法并行處理大量數(shù)據(jù)集時必須單獨注意的問題。
為了克服這些問題,我們有 MapReduce 框架,它允許我們執(zhí)行這樣的并行計算,而無需擔心可靠性、容錯等問題。因此,MapReduce 為您提供了編寫代碼邏輯的靈活性,而無需關(guān)心系統(tǒng)的設(shè)計問題.
MapReduce 教程:什么是 MapReduce?
MapReduce 是一個編程框架,它允許我們在分布式環(huán)境中對大型數(shù)據(jù)集進行分布式和并行處理。
大數(shù)據(jù)Hadoop認證培訓(xùn)課程
講師指導(dǎo)的課程
真實案例研究
評估
終身訪問
MapReduce 由兩個不同的任務(wù)組成——Map 和 Reduce。
正如 MapReduce 名稱所暗示的那樣,reducer 階段發(fā)生在 mapper 階段完成之后。
因此,第一個是映射作業(yè),其中讀取并處理一個數(shù)據(jù)塊以生成鍵值對作為中間輸出。
Mapper 或 map 作業(yè)(鍵值對)的輸出是 Reducer 的輸入。
減速器從多個映射作業(yè)接收鍵值對。
然后,reducer 將這些中間數(shù)據(jù)元組(中間鍵值對)聚合成一組較小的元組或鍵值對,即最終輸出。
讓我們更多地了解 MapReduce 及其組件。MapReduce 主要有以下三個類。他們是,
映射器類
使用 MapReduce 進行數(shù)據(jù)處理的第一階段是Mapper 類。在這里,RecordReader 處理每個 Input 記錄并生成相應(yīng)的鍵值對。Hadoop 的 Mapper 存儲將這些中間數(shù)據(jù)保存到本地磁盤中。
輸入拆分
它是數(shù)據(jù)的邏輯表示。它代表了一個工作塊,其中包含 MapReduce 程序中的單個映射任務(wù)。
記錄閱讀器
它與 Input split 交互,將得到的數(shù)據(jù)以Key-Value?Pairs的形式進行轉(zhuǎn)換。
減速機類
映射器生成的中間輸出被饋送到減速器,減速器對其進行處理并生成最終輸出,然后將其保存在HDFS 中。
司機班
MapReduce 作業(yè)的主要組件是驅(qū)動程序類。它負責(zé)設(shè)置一個 MapReduce Job 來運行 Hadoop。我們用數(shù)據(jù)類型和它們各自的作業(yè)名稱來指定Mapper和Reducer類的名稱。
MapReduce 教程:MapReduce 的字數(shù)統(tǒng)計示例
讓我們通過一個例子來理解 MapReduce 是如何工作的,我有一個?名為 example.txt 的文本文件,其內(nèi)容如下:
現(xiàn)在,假設(shè)我們必須使用 MapReduce 對 sample.txt 執(zhí)行字數(shù)統(tǒng)計。因此,我們將找到獨特的詞和這些獨特詞的出現(xiàn)次數(shù)。
首先,我們將輸入分成三部分,如圖所示。這將在所有地圖節(jié)點之間分配工作。
然后,我們對每個映射器中的單詞進行標記,并為每個標記或單詞提供一個硬編碼值 (1)。將硬編碼值設(shè)為 1 的基本原理是每個單詞本身都會出現(xiàn)一次。
現(xiàn)在,將創(chuàng)建一個鍵值對列表,其中鍵是單個單詞,值是一個。因此,對于第一行(Dear Bear River),我們有 3 個鍵值對——Dear, 1;熊,1;River, 1. 映射過程在所有節(jié)點上保持不變。
在映射器階段之后,會發(fā)生分區(qū)過程,在該過程中進行排序和混洗,以便將所有具有相同鍵的元組發(fā)送到相應(yīng)的減速器。
因此,在排序和改組階段之后,每個 reducer 都會有一個唯一的鍵和與該鍵對應(yīng)的值列表。例如熊,[1,1];汽車,[1,1,1]...等
現(xiàn)在,每個 Reducer 計算該值列表中存在的值。如圖所示,reducer 獲取了一個值為 [1,1] 的鍵 Bear 的值列表。然后,它計算列表中 1 的數(shù)量,并給出最終輸出 - Bear, 2。
最后,然后收集所有輸出鍵/值對并將其寫入輸出文件中。
MapReduce 教程:MapReduce 的優(yōu)勢
MapReduce 的兩個最大優(yōu)點是:
1.并行處理:
在 MapReduce 中,我們將作業(yè)分配給多個節(jié)點,每個節(jié)點同時處理作業(yè)的一部分。因此,MapReduce 基于分而治之的范式,它幫助我們使用不同的機器處理數(shù)據(jù)。由于數(shù)據(jù)由多臺機器而不是一臺機器并行處理,因此處理數(shù)據(jù)所需的時間大大減少,如下圖(2)所示。
圖:?傳統(tǒng)方式對比。MapReduce 方式 – MapReduce 教程
2. 數(shù)據(jù)局部性:
我們不是將數(shù)據(jù)移動到處理單元,而是將處理單元移動到 MapReduce 框架中的數(shù)據(jù)。在傳統(tǒng)系統(tǒng)中,我們習(xí)慣于將數(shù)據(jù)帶到處理單元進行處理。但是,隨著數(shù)據(jù)的增長并變得非常龐大,將大量數(shù)據(jù)帶到處理單元會帶來以下問題:
將大量數(shù)據(jù)轉(zhuǎn)移到處理過程中成本高昂,并且會降低網(wǎng)絡(luò)性能。
處理需要時間,因為數(shù)據(jù)由單個單元處理,這成為瓶頸。
主節(jié)點可能會負擔過重并可能出現(xiàn)故障。
現(xiàn)在,MapReduce 允許我們通過將處理單元帶入數(shù)據(jù)來克服上述問題。因此,正如您在上圖中所看到的,數(shù)據(jù)分布在多個節(jié)點之間,每個節(jié)點處理駐留在其上的部分數(shù)據(jù)。這使我們具有以下優(yōu)勢:
將處理單元移到數(shù)據(jù)上是非常劃算的。
由于所有節(jié)點都并行處理其部分數(shù)據(jù),因此減少了處理時間。
每個節(jié)點都有一部分數(shù)據(jù)要處理,因此,節(jié)點不會負擔過重。
MapReduce 教程:MapReduce 示例程序
在深入細節(jié)之前,讓我們先看一下 MapReduce 示例程序,以便對 MapReduce 環(huán)境中的實際工作方式有一個基本的了解。我采用了相同的字數(shù)統(tǒng)計示例,我必須找出每個單詞的出現(xiàn)次數(shù)。不要擔心伙計們,如果你不明白當您第一次看到代碼時,請耐心等待我引導(dǎo)您完成 MapReduce 代碼的每個部分。
MapReduce 教程:MapReduce 程序講解
整個 MapReduce 程序可以從根本上分為三個部分:
映射器階段代碼
減速器階段代碼
驅(qū)動程序代碼
我們將依次理解這三個部分的代碼。
映射器代碼:
public static class Map extends Mapper
我們創(chuàng)建了一個 Map 類,它擴展了 MapReduce 框架中已經(jīng)定義的類 Mapper。
我們在類聲明之后使用尖括號定義輸入和輸出鍵/值對的數(shù)據(jù)類型。
Mapper 的輸入和輸出都是一個鍵/值對。
輸入:
的關(guān)鍵是什么,但在文本文件中的每一行的偏移:LongWritable
的值是每個單獨的線(如圖所示在右邊的圖):?文本
輸出:
的關(guān)鍵是切分詞:文本
在我們的例子中,我們有硬編碼的值是 1:IntWritable
示例 – 親愛的 1、熊 1 等。
我們已經(jīng)編寫了一個 java 代碼,其中我們對每個單詞進行了標記,并為它們分配了一個等于1的硬編碼值。
減速機代碼:
public static class Reduce extends Reducer
我們創(chuàng)建了一個 Reduce 類,它像 Mapper 一樣擴展了類 Reducer。
我們在類聲明之后使用尖括號定義輸入和輸出鍵/值對的數(shù)據(jù)類型,就像對 Mapper 所做的那樣。
Reducer 的輸入和輸出都是一個鍵值對。
輸入:
在關(guān)鍵的無非是那些已經(jīng)在整理和洗牌階段之后,已經(jīng)產(chǎn)生唯一詞:文本
該值是與每個鍵對應(yīng)的整數(shù)列表:IntWritable
示例 – Bear、[1, 1] 等。
輸出:
的關(guān)鍵是所有的唯一單詞出現(xiàn)在輸入文本文件:文本
該值是每個唯一單詞的出現(xiàn)次數(shù):IntWritable
例子——熊,2;汽車、3等
我們匯總了與每個鍵對應(yīng)的每個列表中存在的值,并生成了最終答案。
通常,為每個唯一的單詞創(chuàng)建一個 reducer,但是,您可以在 mapred-site.xml 中指定 reducer 的數(shù)量。
驅(qū)動程序代碼:
Configuration conf= new Configuration(); Job job = new Job(conf,"My Word Count Program"); job.setJarByClass(WordCount.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); //Configuring the input/output path from the filesystem into the job FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
在驅(qū)動程序類中,我們將 MapReduce 作業(yè)的配置設(shè)置為在 Hadoop 中運行。
我們指定作業(yè)的名稱,映射器和化簡器的輸入/輸出數(shù)據(jù)類型。
我們還指定了映射器和化簡器類的名稱。
還指定了輸入和輸出文件夾的路徑。
方法 setInputFormatClass () 用于指定 Mapper 將如何讀取輸入數(shù)據(jù)或工作單元是什么。在這里,我們選擇了 TextInputFormat,以便映射器一次從輸入文本文件中讀取一行。
main() 方法是驅(qū)動程序的入口點。在此方法中,我們?yōu)樽鳂I(yè)實例化一個新的 Configuration 對象。
C源頌歌:
package co.edureka.mapreduce; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.fs.Path; public class WordCount{ public static class Map extends Mapper<LongWritable,Text,Text,IntWritable> { public void map(LongWritable key, Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { value.set(tokenizer.nextToken()); context.write(value, new IntWritable(1)); } } } public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException { int sum=0; for(IntWritable x: values) { sum+=x.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf= new Configuration(); Job job = new Job(conf,"My Word Count Program"); job.setJarByClass(WordCount.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path outputPath = new Path(args[1]); //Configuring the input/output path from the filesystem into the job FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //deleting the output path automatically from hdfs so that we don't have to delete it explicitly outputPath.getFileSystem(conf).delete(outputPath); //exiting the job only if the flag value becomes false System.exit(job.waitForCompletion(true) ? 0 : 1); } }
運行 MapReduce 代碼:
運行 MapReduce 代碼的命令是:
hadoop jar hadoop-mapreduce-example.jar WordCount /sample/input /sample/output
現(xiàn)在,我們將研究基于 MapReduce 算法的用例。
用例:使用 Hadoop 的 MapReduce 的 KMeans 集群。
KMeans 算法是最簡單的無監(jiān)督機器學(xué)習(xí)算法之一。通常,無監(jiān)督算法僅使用輸入向量從數(shù)據(jù)集進行推斷,而不參考已知或標記的結(jié)果。
使用 Python 和較小的數(shù)據(jù)集或.csv文件執(zhí)行 KMeans 算法很容易。但是,當涉及到在大數(shù)據(jù)級別執(zhí)行數(shù)據(jù)集時,通常的程序就不能再方便了。
這正是您使用大數(shù)據(jù)工具處理大數(shù)據(jù)的時候。Hadoop 的MapReduce。以下代碼片段是 MapReduce 執(zhí)行Mapper、Reducer和Driver作業(yè)的組件
//映射器類
public void map(LongWritable key, Text value, OutputCollector
//減速器類
public static class Reduce extends MapReduceBase implements Reducer
//驅(qū)動類
public static void run(String[] args) throws Exception { IN = args[0]; OUT = args[1]; String input = IN; String output = OUT + System.nanoTime(); String again_input = output; int iteration = 0; boolean isdone = false; while (isdone == false) { JobConf conf = new JobConf(KMeans.class); if (iteration == 0) { Path hdfsPath = new Path(input + CENTROID_FILE_NAME); DistributedCache.addCacheFile(hdfsPath.toUri(), conf); } else { Path hdfsPath = new Path(again_input + OUTPUT_FIE_NAME); DistributedCache.addCacheFile(hdfsPath.toUri(), conf); } conf.setJobName(JOB_NAME); conf.setMapOutputKeyClass(DoubleWritable.class); conf.setMapOutputValueClass(DoubleWritable.class); conf.setOutputKeyClass(DoubleWritable.class); conf.setOutputValueClass(Text.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input + DATA_FILE_NAME)); FileOutputFormat.setOutputPath(conf, new Path(output)); JobClient.runJob(conf); Path ofile = new Path(output + OUTPUT_FIE_NAME); FileSystem fs = FileSystem.get(new Configuration()); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(ofile))); List
現(xiàn)在,我們將通過完整的可執(zhí)行代碼
//源代碼
import java.io.IOException; import java.util.*; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.Reducer; @SuppressWarnings("deprecation") public class KMeans { public static String OUT = "outfile"; public static String IN = "inputlarger"; public static String CENTROID_FILE_NAME = "/centroid.txt"; public static String OUTPUT_FILE_NAME = "/part-00000"; public static String DATA_FILE_NAME = "/data.txt"; public static String JOB_NAME = "KMeans"; public static String SPLITTER = "t| "; public static List
現(xiàn)在,你們對 MapReduce 框架有了基本的了解。您可能已經(jīng)意識到 MapReduce 框架如何幫助我們編寫代碼來處理 HDFS 中存在的大量數(shù)據(jù)。與 Hadoop 1.x 相比,Hadoop 2.x 中的 MapReduce 框架發(fā)生了重大變化。這些更改將在本 MapReduce 教程系列的下一篇博客中討論。我將在那個博客中分享一個可下載的綜合指南,它解釋了 MapReduce 程序的每個部分。
Hadoop MapReduce
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。