MRS 3.0.x獲取Kafka服務指標Java樣例
912
2022-05-29
相信小伙伴們看了小菌上一篇博客《什么是MapReduce(入門篇)》后,對MapReduce的概念有了更深的認知!本篇博客,小菌為大家帶來的則是MapReduce的實戰——統計指定文本文件中每一個單詞出現的總次數。
我們先來確定初始的數據源,即wordcount.txt文件!
跟之前使用API一樣,我們同樣需要在IDEA中使用JAVA代碼來書寫MapReduce!
項目POM文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
定義一個mapper類
package demo02; //首先要定義四個泛型的類型 //keyin: LongWritable valuein: Text //keyout: Text valueout:IntWritable import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @Auther: 封茗囧菌 * @Date: 2019/11/11 17:43 * @Description: * 需求:在一堆給定的文本文件中統計輸出每一個單詞出現的總次數 */ // long,int,String ... 是java中的數據類型,hadoop中并不識別.hadoop中有對應的數據類型 public class WordCountMapper extends Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
定義一個reducer類
package demo02; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @Auther: 封茗囧菌 * @Date: 2019/11/11 17:56 * @Description: */ //計算單詞的總和 public class WordCountReducer extends Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
定義一個主類,用來描述job并提交job
package demo02; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @Auther: 封茗囧菌 * @Date: 2019/11/11 18:06 * @Description: */ public class WordCountRunner{ //把業務邏輯相關的信息(哪個是mapper,哪個是reducer,要處理的數據在哪里,輸出的結果放哪里……)描述成一個job對象 //把這個描述好的job提交給集群去運行 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //創建一個Configuration實體類對象 Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); // 指定我這個job所在的jar包 // wcjob.setJar("/home/hadoop/wordcount.jar"); wcjob.setJarByClass(WordCountRunner.class); wcjob.setMapperClass(WordCountMapper.class); wcjob.setReducerClass(WordCountReducer.class); //設置我們的業務邏輯Mapper 類的輸出 key 和 value 的數據類型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(LongWritable.class); //設置我們的業務邏輯 Reducer 類的輸入key 和 value 的數據類型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setOutputValueClass(LongWritable.class); long startTime=System.currentTimeMillis(); //獲取開始時間 //指定要處理的數據所在的位置 FileInputFormat.setInputPaths(wcjob,"G:\\wordcount.txt"); //指定處理完成之后的結果所保存的位置 FileOutputFormat.setOutputPath(wcjob, new Path("E:\\result")); // 向yarn集群提交這個job boolean res = wcjob.waitForCompletion(true); long endTime=System.currentTimeMillis(); //獲取結束時間 System.out.println(res?0:1); System.out.println("程序運行時間: "+(endTime-startTime)+"ms"); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
錯誤提醒:
如果遇到這個錯誤,
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/":root:supergroup:drwxr-xr-x
直接將hdfs-site.xml當中的權限關閉即可
1
2
3
4
重啟hdfs集群,重新運行即可。(可以把項目打成jar包放在虛擬機上運行,也可以在IDEA上直接運行!這里介紹的是在IDEA上運行的效果)
讓我們來查看一下效果!
在我們本地的E盤上,出現了result文件夾
打開進入并用Notepad++ 打開文件查看內容!發現統計的結果已經呈現在里面了!說明我們的程序運行成功了!
思路回顧:
每讀取一行數據,MapReduce就會調用一次map方法,在map方法中我們把每行數據用空格" "分隔成一個數組,遍歷數組,把數組中的每一個元素作為key,1作為value作為map的輸出傳遞給reduce。reduce把收集到的數據根據key值進行分區,把每個分區的內容進行單獨計算,并把結果輸出。
本期的分享就到這里了,小伙伴們有什么疑惑或好的建議可以積極在評論區留言~,小菌會持續更新新鮮好玩的技術,喜歡的小伙伴們不要忘了,關注小菌吶ヾ(?°?°?)??。
Hadoop MapReduce
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。