elasticsearch入門系列">elasticsearch入門系列
1096
2022-05-28
上篇博文給大家?guī)淼氖欠謪^(qū)的介紹以及怎樣自定義分區(qū),這次博主為大家?guī)淼氖顷P(guān)于排序的博文,希望大家能夠喜歡。
目錄
一. Shuffle之排序(sort)
1.1 排序的簡單介紹
1.2 排序的分類
1.3 自定義排序
二. WritableComparable排序案例
2.1 需求
2.2 需求分析
2.3 編寫代碼
1. FlowBean對象在在需求1基礎(chǔ)上增加了比較功能
2. 編寫Mapper類
3. 編寫Reducer類
4. 編寫Driver類
2.4 運(yùn)行及結(jié)果
1. 運(yùn)行
2. 結(jié)果
一. Shuffle之排序(sort)
今天我們講的是第六步,sort排序操作。
1.1 排序的簡單介紹
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均會對數(shù)據(jù)
按照key
進(jìn)行排序。該操作屬于Hadoop的默認(rèn)行為。
任何應(yīng)用程序中的數(shù)據(jù)均會被排序,而不管邏輯上是否需要否需要
。
默認(rèn)排序是按照
字典順序排序
,且實(shí)現(xiàn)該排序的方法是
快速排序
。
對于MapTask,它會將處理的結(jié)果暫時放在環(huán)形緩沖區(qū)中,
當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后,再對緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序
,并將這些有序數(shù)據(jù)溢寫到磁盤上,而當(dāng)數(shù)據(jù)處理完畢后,它會
對磁盤上所有文件進(jìn)行歸并排序
。
對于ReduceTask,它從每個MapTask上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值,則溢寫磁盤上,否則存儲在內(nèi)存中,如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次歸并排序以生成一個更大文件;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值,則進(jìn)行一次合并后將數(shù)據(jù)溢寫到磁盤上。當(dāng)所有數(shù)據(jù)拷貝完畢后,
ReduceTask統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序
。
1.2 排序的分類
1. 部分排序
MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。
保證輸出的每個文件內(nèi)部有序
。
2. 全排序
最終輸出結(jié)果只有一個文件,且文件內(nèi)部有序
。實(shí)現(xiàn)方式是只設(shè)置一個ReduceTask。但該方法在處理大型文件時效率極低,因?yàn)橐慌_機(jī)器處理所有文件,完全喪失了MapReduce所提供的并行框架。
3. 輔助排序:(GroupingComparator分組)
在Reduce端對key進(jìn)行分組。應(yīng)用于:在接收的key為bean對象時,想讓一個活幾個字段相同(全部字段比較不相同)的key進(jìn)入到同一個reduce方法時,可以采用分組排序。
4. 二次排序
在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。
1.3 自定義排序
原理分析:
bean對象做為key傳輸,需要實(shí)現(xiàn)WritableComparable接口重寫compareTo方法,就可以實(shí)現(xiàn)排序。
@Override public int compareTo(FlowBean o) { int result; // 按照總流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
二. WritableComparable排序案例
案例還是用的上兩篇博文的文檔。
2.1 需求
期望輸出數(shù)據(jù) 13509468723 7335 110349 117684 13736230513 2481 24681 27162 13956435636 132 1512 1644 13846544121 264 0 264
1
2
3
4
5
6
2.2 需求分析
1. 查看需求
2. 查看我們需要排序的數(shù)據(jù)
2.3 編寫代碼
1. FlowBean對象在在需求1基礎(chǔ)上增加了比較功能
package com.buwenbuhuo.WritableComparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class FlowBean implements WritableComparable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
2. 編寫Mapper類
package com.buwenbuhuo.WritableComparable; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class SortMapper extends Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
3. 編寫Reducer類
package com.buwenbuhuo.WritableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class SortReducer extends Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
4. 編寫Driver類
package com.buwenbuhuo.WritableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class SortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 獲取配置信息,或者job對象實(shí)例 Job job = Job.getInstance(new Configuration()); // 2 指定本程序的jar包所在的本地路徑j(luò)ob.setJarByClass(com.buwenbuhuo.WritableComparable.SortDriver.class); // 3 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); // 4 指定mapper輸出數(shù)據(jù)的kv類型job.setMapOutputKeyClass(com.buwenbuhuo.WritableComparable.FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 指定最終輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("d:\output")); FileOutputFormat.setOutputPath(job, new Path("d:\output2")); // 7 將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2.4 運(yùn)行及結(jié)果
1. 運(yùn)行
2. 結(jié)果
本期的分享就到這里了,小伙伴們有什么疑惑或好的建議可以積極在評論區(qū)留言~,博主會持續(xù)更新新鮮好玩的技術(shù),喜歡的小伙伴們不要忘了,記得要關(guān)注博主吶ヾ(?°?°?)??。
MapReduce
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。