MapReduce快速入門系列(7) | Shuffle之排序(sort)詳解及全排序

      網(wǎng)友投稿 1096 2022-05-28

      上篇博文給大家?guī)淼氖欠謪^(qū)的介紹以及怎樣自定義分區(qū),這次博主為大家?guī)淼氖顷P(guān)于排序的博文,希望大家能夠喜歡。

      目錄

      一. Shuffle之排序(sort)

      1.1 排序的簡單介紹

      1.2 排序的分類

      1.3 自定義排序

      二. WritableComparable排序案例

      2.1 需求

      2.2 需求分析

      2.3 編寫代碼

      1. FlowBean對象在在需求1基礎(chǔ)上增加了比較功能

      2. 編寫Mapper類

      3. 編寫Reducer類

      4. 編寫Driver類

      2.4 運(yùn)行及結(jié)果

      1. 運(yùn)行

      2. 結(jié)果

      一. Shuffle之排序(sort)

      今天我們講的是第六步,sort排序操作。

      1.1 排序的簡單介紹

      排序是MapReduce框架中最重要的操作之一。

      MapTask和ReduceTask均會對數(shù)據(jù)

      按照key

      進(jìn)行排序。該操作屬于Hadoop的默認(rèn)行為。

      任何應(yīng)用程序中的數(shù)據(jù)均會被排序,而不管邏輯上是否需要否需要

      默認(rèn)排序是按照

      字典順序排序

      ,且實(shí)現(xiàn)該排序的方法是

      快速排序

      對于MapTask,它會將處理的結(jié)果暫時放在環(huán)形緩沖區(qū)中,

      當(dāng)環(huán)形緩沖區(qū)使用率達(dá)到一定閾值后,再對緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次快速排序

      ,并將這些有序數(shù)據(jù)溢寫到磁盤上,而當(dāng)數(shù)據(jù)處理完畢后,它會

      對磁盤上所有文件進(jìn)行歸并排序

      對于ReduceTask,它從每個MapTask上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過一定閾值,則溢寫磁盤上,否則存儲在內(nèi)存中,如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次歸并排序以生成一個更大文件;如果內(nèi)存中文件大小或者數(shù)目超過一定閾值,則進(jìn)行一次合并后將數(shù)據(jù)溢寫到磁盤上。當(dāng)所有數(shù)據(jù)拷貝完畢后,

      ReduceTask統(tǒng)一對內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次歸并排序

      1.2 排序的分類

      1. 部分排序

      MapReduce根據(jù)輸入記錄的鍵對數(shù)據(jù)集排序。

      保證輸出的每個文件內(nèi)部有序

      2. 全排序

      最終輸出結(jié)果只有一個文件,且文件內(nèi)部有序

      。實(shí)現(xiàn)方式是只設(shè)置一個ReduceTask。但該方法在處理大型文件時效率極低,因?yàn)橐慌_機(jī)器處理所有文件,完全喪失了MapReduce所提供的并行框架。

      3. 輔助排序:(GroupingComparator分組)

      在Reduce端對key進(jìn)行分組。應(yīng)用于:在接收的key為bean對象時,想讓一個活幾個字段相同(全部字段比較不相同)的key進(jìn)入到同一個reduce方法時,可以采用分組排序。

      4. 二次排序

      在自定義排序過程中,如果compareTo中的判斷條件為兩個即為二次排序。

      1.3 自定義排序

      原理分析:

      bean對象做為key傳輸,需要實(shí)現(xiàn)WritableComparable接口重寫compareTo方法,就可以實(shí)現(xiàn)排序。

      @Override public int compareTo(FlowBean o) { int result; // 按照總流量大小,倒序排列 if (sumFlow > bean.getSumFlow()) { result = -1; }else if (sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      二. WritableComparable排序案例

      案例還是用的上兩篇博文的文檔。

      2.1 需求

      期望輸出數(shù)據(jù) 13509468723 7335 110349 117684 13736230513 2481 24681 27162 13956435636 132 1512 1644 13846544121 264 0 264

      1

      2

      3

      4

      5

      6

      2.2 需求分析

      1. 查看需求

      2. 查看我們需要排序的數(shù)據(jù)

      2.3 編寫代碼

      1. FlowBean對象在在需求1基礎(chǔ)上增加了比較功能

      package com.buwenbuhuo.WritableComparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class FlowBean implements WritableComparable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } @Override public String toString() { return upFlow + "\t" + downFlow +"\t" + sumFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } /** * 序列化方法 * @param out 框架給我們提供的數(shù)據(jù)出口 * @throws IOException */ public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /** * 反序列化方法 * @param in 框架提供的數(shù)據(jù)來源 * @throws IOException */ public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public int compareTo(FlowBean o) { return Long.compare(o.sumFlow, this.sumFlow); } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      51

      52

      53

      54

      55

      56

      57

      58

      59

      60

      61

      62

      63

      64

      65

      66

      67

      68

      69

      70

      71

      72

      73

      74

      75

      76

      77

      78

      79

      80

      81

      82

      83

      84

      85

      86

      2. 編寫Mapper類

      package com.buwenbuhuo.WritableComparable; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class SortMapper extends Mapper { private com.buwenbuhuo.WritableComparable.FlowBean flow = new FlowBean(); private Text phone = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 2 截取 String[] fields = value.toString().split("\t"); // 3 封裝對象 phone.set(fields[0]); flow.setUpFlow(Long.parseLong(fields[1])); flow.setDownFlow(Long.parseLong(fields[2])); flow.setSumFlow(Long.parseLong(fields[3])); // 輸出 context.write(flow, phone); } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      3. 編寫Reducer類

      package com.buwenbuhuo.WritableComparable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class SortReducer extends Reducer { @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { // 循環(huán)輸出,避免總流量相同情況 for (Text value : values) { context.write(value, key); } } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      4. 編寫Driver類

      package com.buwenbuhuo.WritableComparable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-24 0:33 * com.buwenbuhuo.WritableComparable - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class SortDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 獲取配置信息,或者job對象實(shí)例 Job job = Job.getInstance(new Configuration()); // 2 指定本程序的jar包所在的本地路徑j(luò)ob.setJarByClass(com.buwenbuhuo.WritableComparable.SortDriver.class); // 3 指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類 job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); // 4 指定mapper輸出數(shù)據(jù)的kv類型job.setMapOutputKeyClass(com.buwenbuhuo.WritableComparable.FlowBean.class); job.setMapOutputValueClass(Text.class); // 5 指定最終輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path("d:\output")); FileOutputFormat.setOutputPath(job, new Path("d:\output2")); // 7 將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包, 提交給yarn去運(yùn)行 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      MapReduce快速入門系列(7) | Shuffle之排序(sort)詳解及全排序

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      2.4 運(yùn)行及結(jié)果

      1. 運(yùn)行

      2. 結(jié)果

      本期的分享就到這里了,小伙伴們有什么疑惑或好的建議可以積極在評論區(qū)留言~,博主會持續(xù)更新新鮮好玩的技術(shù),喜歡的小伙伴們不要忘了,記得要關(guān)注博主吶ヾ(?°?°?)??。

      MapReduce

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:自動駕駛仿CARLA使用記錄
      下一篇:更改文字、圖片和視頻大小(縮放)
      相關(guān)文章
      国产亚洲老熟女视频| 精品韩国亚洲av无码不卡区| 国产亚洲视频在线观看| 2020天堂在线亚洲精品专区| 亚洲综合国产精品| 亚洲AV福利天堂一区二区三| 亚洲人成伊人成综合网久久久 | 亚洲国产精品VA在线看黑人| 亚洲色偷偷综合亚洲AVYP| 亚洲无人区午夜福利码高清完整版| 亚洲性在线看高清h片| 亚洲区小说区图片区| 国产成人高清亚洲| 曰韩亚洲av人人夜夜澡人人爽| 久久亚洲2019中文字幕| 亚洲人成网77777色在线播放| 国产亚洲福利精品一区| 亚洲AV无码一区东京热久久 | 国产亚洲精品美女| 区三区激情福利综合中文字幕在线一区亚洲视频1 | 亚洲国产女人aaa毛片在线| 亚洲欧洲日韩国产综合在线二区| 久久亚洲AV成人无码电影| 亚洲美女视频一区二区三区| 亚洲国产视频一区| 日韩亚洲产在线观看| 亚洲AV日韩AV无码污污网站| 亚洲A∨精品一区二区三区| 亚洲日韩中文字幕日韩在线| 亚洲啪啪AV无码片| 久久亚洲国产成人精品性色| 亚洲一区二区三区久久久久| 亚洲精品天堂无码中文字幕| 内射无码专区久久亚洲| 国产gv天堂亚洲国产gv刚刚碰| 亚洲成av人片天堂网| 亚洲日韩国产精品无码av| 亚洲综合中文字幕无线码| 亚洲av乱码一区二区三区按摩| 亚洲男人在线无码视频| 国产亚洲精AA在线观看SEE|