微吼云上線多路互動直播服務 加速多場景互動直播落地
1012
2025-04-03
MapReduce 是一種可用于數據處理的編程模型。該模型比較簡單,但要想寫出有用的程序卻不太容易。Hadoop 可以運行各種語言版本的 MapReduce 程序, 比如Java、Ruby、Python和C++語言版本。最重要的是,MapReduce 程序本質上是并行運行的,因此可以將大規模的數據分析任務分發給任何一個擁有足夠多機器的數據中心。 MapReduce 的優勢在于處理大規模數據集,所以這里我們先來看一個數據集。
氣象數據集
在篇要講的是,寫一個挖掘氣象數據的程序。氣象數據是通過分布在美國全國各地區的很多氣象傳感器每隔一小時進行收集,這些數據是半結構化數據且是按照記錄方式存儲的,因此非常適合使用 MapReduce 程序來統計分析。
MapReduce 是一種可用于數據處理的編程模型。該模型比較簡單,但要想寫出有用的程序卻不太容易。Hadoop 可以運行各種語言版本的 MapReduce 程序, 比如Java、Ruby、Python和C++語言版本。最重要的是,MapReduce 程序本質上是并行運行的,因此可以將大規模的數據分析任務分發給任何一個擁有足夠多機器的數據中心。 MapReduce 的優勢在于處理大規模數據集,所以這里我們先來看一個數據集。
氣象數據集
在篇要講的是,寫一個挖掘氣象數據的程序。氣象數據是通過分布在美國全國各地區的很多氣象傳感器每隔一小時進行收集,這些數據是半結構化數據且是按照記錄方式存儲的,因此非常適合使用 MapReduce 程序來統計分析。
我們使用的數據來自美國國家氣候數據中心、美國國家海洋和大氣管理局(簡稱 NCDC NOAA),這些數據按行并以 ASCII 格式存儲,其中每一行是一條記錄。 下面我們展示一行采樣數據,其中重要的字段被突出顯示。該行數據被分割成很多行以突出每個字段,但在實際文件中,這些字段被整合成一行且沒有任何分隔符。
數據文件按照氣象站和日期進行組織,每個氣象站都是一個總目錄,而且每個氣象站下面從 1980 年到 2010 年,每一年又都作為一個子目錄。 因為美國有成千上萬個氣象站,所以整個數據集由大量的小文件組成。通常情況下,處理少量的大型文件更容易、更有效,因此,這些數據需要經過預處理,將每個氣象站的數據文件拼接成一個單獨的文件。 預處理過的數據文件示例如下所示:
數據集導入HDFS
一旦數據下載并解壓到本地目錄之后,我們通過已經安裝好hadoop-eclipse-plugin-xxx.jar插件,很容易將氣象站數據導入HDFS。首先通過插件連接 HDFS(連接地址:hdfs://xxx002:9000):
規劃好氣象站數據集在HDFS中的目錄結構(hdfs://xxx002:9000/middle/weather),然后右鍵點擊weather目錄選擇Upload files to DFS,最后選中需要上傳的氣象站數據集。
當所有數據集上傳至HDFS,我們就可以使用MapReduce jobs來統計分析氣象站數據集。
我們也可以通過命令行訪問剛剛上傳至HDFS的數據集:
[hadoop@xxx002 hadoop]$ bin/hdfs dfs -ls /middle/weather/
使用 Hadoop 來分析數據
為了充分利用 Hadoop 提供的并行處理優勢,我們需要將查詢表示成 MapReduce 作業。完成本地小規模測試之后,就可以把作業部署到集群上面運行。 那么 MapReduce 作業到底由哪幾個部分組成的呢?接下來我們詳細介紹。
map 和 reduce
MapReduce 任務過程分為兩個處理階段:map 階段和reduce階段 。每個階段都以鍵值對作為輸入和輸出,其類型由程序員自己來選擇。程序員還需要寫兩個函數:map 函數和 reduce 函數。
在這里,map 階段的輸入是 NCDC NOAA 原始數據。我們選擇文本格式作為輸入格式,將數據集的每一行作為文本輸入。鍵是某一行起始位置相對于文件起始位置的偏移量,不過我們不需要這個信息,所以將其忽略。
我們的 map 函數很簡單。由于我們只對氣象站和氣溫感興趣,所以只需要取出這兩個字段數據。在本課程中,map 函數只是一個數據準備階段, 通過這種方式來準備數據,使 reducer 函數能夠繼續對它進行處理:即統計出每個氣象站 30 年來的平均氣溫。map 函數還是一個比較合適去除已損記錄的地方,在 map 函數里面,我們可以篩掉缺失的或者錯誤的氣溫數據。
為了全面了解 map 的工作方式,我們考慮以下輸入數據的示例:
1985 07 31 02? ?200? ? 94 10137? ?220? ? 26? ? ?1? ? ?0 -9999
1985 07 31 03? ?172? ? 94 10142? ?240? ? ?0? ? ?0? ? ?0 -9999
1985 07 31 04? ?156? ? 83 10148? ?260? ? 10? ? ?0? ? ?0 -9999
1985 07 31 05? ?133? ? 78 -9999? ?250? ? ?0 -9999? ? ?0 -9999
1985 07 31 06? ?122? ? 72 -9999? ? 90? ? ?0 -9999? ? ?0? ? ?0
1985 07 31 07? ?117? ? 67 -9999? ? 60? ? ?0 -9999? ? ?0 -9999
1985 07 31 08? ?111? ? 61 -9999? ? 90? ? ?0 -9999? ? ?0 -9999
1985 07 31 09? ?111? ? 61 -9999? ? 60? ? ?5 -9999? ? ?0 -9999
1985 07 31 10? ?106? ? 67 -9999? ? 80? ? ?0 -9999? ? ?0 -9999
1985 07 31 11? ?100? ? 56 -9999? ? 50? ? ?5 -9999? ? ?0 -9999
這些行以鍵/值對的方式作為 map 函數的輸入:
(0,1985 07 31 02? ?200? ? 94 10137? ?220? ? 26? ? ?1? ? ?0 -9999)
(62,1985 07 31 03? ?172? ? 94 10142? ?240? ? ?0? ? ?0? ? ?0 -9999)
(124,1985 07 31 04? ?156? ? 83 10148? ?260? ? 10? ? ?0? ? ?0 -9999)
(186,1985 07 31 05? ?133? ? 78 -9999? ?250? ? ?0 -9999? ? ?0 -9999)
(248,1985 07 31 06? ?122? ? 72 -9999? ? 90? ? ?0 -9999? ? ?0? ? ?0)
(310,1985 07 31 07? ?117? ? 67 -9999? ? 60? ? ?0 -9999? ? ?0 -9999)
(371,1985 07 31 08? ?111? ? 61 -9999? ? 90? ? ?0 -9999? ? ?0 -9999)
(434,1985 07 31 09? ?111? ? 61 -9999? ? 60? ? ?5 -9999? ? ?0 -9999)
(497,1985 07 31 10? ?106? ? 67 -9999? ? 80? ? ?0 -9999? ? ?0 -9999)
(560,1985 07 31 11? ?100? ? 56 -9999? ? 50? ? ?5 -9999? ? ?0 -9999)
鍵(key)是文件中的偏移量,map 函數并不需要這個信息,所以將其忽略。map 函數的功能僅限于提取氣象站和氣溫信息,并將它們作為輸出。
map 函數的輸出經由 MapReduce 框架處理后,最后發送到 reduce 函數。這個處理過程基于鍵來對鍵值對進行排序和分組。因此在這個示例中,reduce 函數看到的是如下輸入:
(03103,[200,172,156,133,122,117,111,111,106,100])
每個氣象站后面緊跟著一系列氣溫數據,reduce 函數現在要做的是遍歷整個列表并統計出平均氣溫:
03103 132
這是最終輸出結果即每一個氣象站歷年的平均氣溫。
下圖代表了 MapReduce 高層設計:
Java MapReduce
我們明白 MapReduce 程序的工作原理之后,下一步就是寫代碼實現它。我們需要編寫三塊代碼內容:一個 map 函數、一個 reduce 函數和一些用來運行作業的代碼。
map 函數由 Mapper 類實現來表示,Mapper 聲明一個 map() 虛方法,其內容由我們自己來實現。
下面我們來編寫 Mapper 類,實現 map() 方法,提取氣象站和氣溫數據。
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
*
* @function 統計美國各個氣象站30年來的平均氣溫
* @author cs
*
*/
public class Temperature extends Configured implements Tool {
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
/**
* @function Mapper 解析氣象站數據
* @input key=偏移量? value=氣象站數據
* @output key=weatherStationId value=temperature
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString(); //每行氣象數據
int temperature = Integer.parseInt(line.substring(14, 19).trim());//每小時氣溫值
if (temperature != -9999) { //過濾無效數據
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String weatherStationId = fileSplit.getPath().getName().substring(5, 10);//通過文件名稱提取氣象站id
context.write(new Text(weatherStationId), new IntWritable(temperature));
}
}
}
}
這個 Mapper 類是一個泛型類型,它有四個形參類型,分別指定 map 函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。 就本示例來說,輸入鍵是一個長整數偏移量,輸入值是一行文本,輸出鍵是氣象站id,輸出值是氣溫(整數)。Hadoop 本身提供了一套可優化網絡序列化傳輸的基本類型,而不是使用 java 內嵌的類型。這些類型都在 org.apache.hadoop.io 包中。 這里使用 LongWritable 類型(相當于 Java 的 Long 類型)、Text 類型(相當于 Java 中的 String 類型)和 IntWritable 類型(相當于 Java 的 Integer 類型)。
map() 方法的輸入是一個鍵(key)和一個值(value),我們首先將 Text 類型的 value 轉換成 Java 的 String 類型, 之后使用 substring()方法截取我們業務需要的值。map() 方法還提供了 Context 實例用于輸出內容的寫入。 在這種情況下,我們將氣象站id按 Text 對象進行讀/寫(因為我們把氣象站id當作鍵),將氣溫值封裝在 IntWritale 類型中。只有氣溫數據不缺失,這些數據才會被寫入輸出記錄中。
我們以上面類似的方法用 Reducer 來定義 reduce 函數,統計每個氣象站的平均氣溫:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @function 統計美國各個氣象站30年來的平均氣溫
* @author cs
*
*/
public class Temperature extends Configured implements Tool {
/**
*
* @function Reducer 統計美國各個氣象站的平均氣溫
* @input key=weatherStationId? value=temperature
* @output key=weatherStationId value=average(temperature)
*/
public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
int count = 0;
//統計每個氣象站的氣溫值總和
for (IntWritable val : values) {
sum += val.get();
count++;
}
//求每個氣象站的氣溫平均值
result.set(sum / count);
context.write(key, result);
}
}
}
同樣,reduce 函數也有四個形式參數類型用于指定輸入和輸出類型。reduce 函數的輸入類型必須匹配 map 函數的輸出類型:即 Text 類型和 IntWritable 類型。 在這種情況下,reduce 函數的輸出類型也必須是 Text 和 IntWritable 類型,分別是氣象站id和平均氣溫。在 map 的輸出結果中,所有相同的氣象站(key)被分配到同一個reduce執行,這個平均氣溫就是針對同一個氣象站(key),通過循環所有氣溫值(values)求和并求平均數所得到的。
第三部分代碼負責運行 MapReduce 作業 :
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @function 統計美國各個氣象站30年來的平均氣溫
* @author cs
*
*/
public class Temperature extends Configured implements Tool {
/**
* @function 任務驅動方法
* @param args
* @return
* @throws Exception
*/
@Override
public int run(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();//讀取配置文件
Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = new Job(conf, "temperature");//新建一個任務
job.setJarByClass(Temperature.class);// 設置主類
FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑
job.setMapperClass(TemperatureMapper.class);// Mapper
job.setReducerClass(TemperatureReducer.class);// Reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true)?0:1;//提交任務
}
/**
* @function main 方法
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
//數據輸入路徑和輸出路徑
String[] args0 = {
"hdfs://xxx002:9000/middle/weather/",
"hdfs://xxx002:9000/middle/weather/out/"
};
int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
System.exit(ec);
}
}
Configuration 類讀取 Hadoop 的配置文件,如 site-core.xml、mapred-site.xml、hdfs-site.xml 等。
Job 對象指定作業執行規范,我們可以用它來控制整個作業的運行。我們在 Hadoop 集群上運行這個作業時,要把代碼打包成一個 JAR 文件(Hadoop 在集群上發布這個文件)。 不必明確指定 JAR 文件的名稱,在 Job 對象的 setJarByClass 方法中傳遞一個類即可,Hadoop 利用這個類來查找包含它的 JAR 文件,進而找到相關的 JAR 文件。
構造 Job 對象之后,需要指定輸入和輸出數據的路徑。調用 FileInputFormat 類的靜態方法 addInputPath() 來定義輸入數據的路徑,這個路徑可以是單個的文件、一個目錄(此時,將目錄下所有文件當作輸入)或符合特定文件模式的一系列文件。由函數名可知,可以多次調用 addInputPath() 來實現多路徑的輸入。 調用 FileOutputFormat 類中的靜態方法 setOutputPath() 來指定輸出路徑(只能有一個輸出路徑)。這個方法指定的是 reduce 函數輸出文件的寫入目錄。 在運行作業前該目錄是不應該存在的,否則 Hadoop 會報錯并拒絕運行作業。這種預防措施的目的是防止數據丟失(長時間運行的作業如果結果被意外覆蓋,肯定是件可怕的事情)。
接著,通過 setMapperClass() 和 setReducerClass() 指定 map 類型和reduce 類型。
setOutputKeyClass() 和 setOutputValueClass() 控制 map 和 reduce 函數的輸出類型,正如本例所示,這兩個輸出類型一般都是相同的。如果不同,則通過 setMapOutputKeyClass()和setMapOutputValueClass()來設置 map 函數的輸出類型。
輸入的類型通過 InputFormat 類來控制,我們的例子中沒有設置,因為使用的是默認的 TextInputFormat(文本輸入格式)。
在設置定義 map 和 reduce 函數的類之后,可以開始運行作業。Job 中的 waitForCompletion() 方法提交作業并等待執行完成。該方法中的布爾參數是個詳細標識,所以作業會把進度寫到控制臺。 waitForCompletion() 方法返回一個布爾值,表示執行的成(true)敗(false),這個布爾值被轉換成程序的退出代碼 0 或者 1。
運行測試
編寫好 MapReduce 作業之后,通常要拿一個小型數據集進行測試以排除代碼問題。程序測試成功之后,我們通過 eclipse 工具將 MapReduce作業打成jar包即temperature.jar,然后然后上傳至/home/hadoop/xxx/ 目錄下,由 hadoop 腳本來執行。
[hadoop@xxx002 hadoop-2.2.0-x64]$ export HADOOP_CLASSPATH=/home/hadoop/xxx/temperature.jar
[hadoop@xxx002 hadoop-2.2.0-x64]$ hadoop com.xxx.hadoop.advance.Temperature
如果調用 hadoop 命令的第一個參數是類名,Hadoop 就會啟動一個 JVM 來運行這個類。使用 hadoop 命令運行作業比直接使用 Java 命令來運行更方便,因為前者將 Hadoop 庫文件(及其依賴關系)路徑加入到類路徑參數中, 同時也能獲得 Hadoop 的配置文件。需要定義一個 HADOOP_CLASSPATH 環境變量用于添加應用程序類的路徑,然后由 Hadoop 腳本來執行相關操作。
運行作業所得到的輸出提供了一些有用的信息。例如我們可以看到,這個作業有指定的標識,即job_1432108212572_0001,并且執行了一個 map 任務和一個 reduce 任務。
輸出的最后一部分,以 Counter 為標題,顯示 Hadoop 上運行的每個作業的一些統計信息。這些信息對檢查數據是否按照預期進行處理非常有用。
輸出數據寫入out 目錄,其中每個 reducer 都有一個輸出文件。我們的例子中只有一個 reducer,所以只能找到一個名為 part-00000 的文件:
[hadoop@xxx002 hadoop-2.2.0-x64]$ hadoop fs -cat /weather/out/part-r-00000
03103? ?132
這個結果和我們之前手動尋找的結果一樣。我們把這個結果解釋為編號為03103氣象站的平均氣溫為13.2攝氏度。
注意:上面輸出結果的輸入數據集是示例中的10條數據,而不是下載的整個數據集。如果大家使用整個數據集跑出的結果不一致,很正常。
監控 Hadoop
Ambari 儀表盤和監視器工具為我們提供了比較直觀的Hadoop集群信息。一旦Hadoop 集群啟動,Ambari可以監控整個集群的健康狀況。
另外我們還可以通過Azkaban 來負責MapReduce的作業調度,同時它能提供MapReduce運行信息和詳細的執行日志,包括運行時間:
至于Azkaban 作業調度以及Ambari集群部署監控,在后續的篇幅中再說明。 目前這個階段重點掌握MapReduce的編程套路即可。
數據可視化
當 MapReduce Job執行完畢,我們可以將結果集入庫然后使用可視化技術對數據進行可視化。
如有問題歡迎探討 謝謝!
Hadoop MapReduce
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。