elasticsearch入門系列">elasticsearch入門系列
945
2022-05-29
Hello,大家好,本次為大家帶來的是Hadoop的序列化操作。
目錄
一. 序列化的簡單介紹
1.1. 什么是序列化
1.2. 為什么要序列化
1.3. 為什么不用Java的序列化
二. 自定義bean對象實現序列化接口
三. 序列化的實際操作展示
3.1. 需求
3.2. 需求分析
3.3. 編寫MapReduce程序
1. 編寫流量統計的Bean對象
2. 編寫Mapper類
3. 編寫Reducer類
4. 編寫Driver驅動類
5. 運行并查看結果
一. 序列化的簡單介紹
1.1. 什么是序列化
序列化
:把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便于存儲到磁盤(持久化)和網絡傳輸。
反序列化
:將收到字節序列(或其他數據傳輸協議)或者是磁盤的持久化數據,轉換成內存中的對象。
1.2. 為什么要序列化
一般來說,"活得"對象只生存在內存里,冠機斷點就沒有了。而且"活的"對象只能由本地的進程使用,不能被發送到網絡上的另外一臺計算機。
然而序列化可能存儲"活的"對象,可以將"活的"對象發送到遠程計算機。
1.3. 為什么不用Java的序列化
Java的序列化是一個重量級序列化框架(Serializable),一個對象被序列化后,會附帶很多額外的信息(各種校驗信息,Header,繼承體系等),不便于在網絡中高效傳輸。所以,Hadoop自己開發了以套系列化機制(Writable)。
Hadoop序列化特點:
1.緊湊:高效使用存儲空間
2.快速:讀寫數據的額外開銷小
3.可擴展:隨著通信協議的升級而可升級
4.互操作:支持多語言的互交
二. 自定義bean對象實現序列化接口
在企業開發中往往常用的基本序列化類型不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean對象,那么該對象就需要實現序列化接口。
具體實現bean對象序列化步驟如下7步:
1. 必須實現Writable接口
2. 反序列化時,需要反射調用空參構造函數,所以必須有空參構造
public FlowBean() { super(); }
1
2
3
4
3. 重寫序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
1
2
3
4
5
6
7
4. 重寫反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); }
1
2
3
4
5
6
7
5.
注意反序列化的順序和序列化的順序完全一致
6. 要想把結果顯示在文件中,需要重寫toString(),可用”\t”分開,方便后續用。
7. 如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。詳見后面排序案例。
@Override public int compareTo(FlowBean o) { // 倒序排列,從大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
1
2
3
4
5
6
三. 序列化的實際操作展示
3.1. 需求
統計每一個手機號耗費的總上行流量、下行流量、總流量
1. 輸入數據
如果有需要的可以復制下列數據并保存文檔,文檔名為:phone_data
1 13736230513 192.196.2.1 www.shouhu.com 2481 24681 200 2 13846544121 192.196.2.2 264 0 200 3 13956435636 192.196.2.3 132 1512 200 4 13966251146 192.168.2.1 240 0 404 5 18271575951 192.168.2.2 www.shouhu.com 1527 2106 200 6 18240717138 192.168.2.3 www.hao123.com 4116 1432 200 7 13590439668 192.168.2.4 1116 954 200 8 15910133277 192.168.2.5 www.hao123.com 3156 2936 200 9 13729199489 192.168.2.6 240 0 200 10 13630577991 192.168.2.7 www.shouhu.com 6960 690 200 11 15043685818 192.168.2.8 www.baidu.com 3659 3538 200 12 15959002129 192.168.2.9 www.hao123.com 1938 180 500 13 13560439638 192.168.2.10 918 4938 200 14 13470253144 192.168.2.11 180 180 200 15 13682846555 192.168.2.12 www.qq.com 1938 2910 200 16 13992314666 192.168.2.13 www.gaga.com 3008 3720 200 17 13509468723 192.168.2.14 www.qinghua.com 7335 110349 404 18 18390173782 192.168.2.15 www.sogou.com 9531 2412 200 19 13975057813 192.168.2.16 www.baidu.com 11058 48243 200 20 13768778790 192.168.2.17 120 120 200 21 13568436656 192.168.2.18 www.alibaba.com 2481 24681 200 22 13568436656 192.168.2.19 1116 954 200
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2. 輸出數據格式:
7 13590439668 192.168.2.4 1116 954 200 id 手機號碼 網絡ip 上行流量 下行流量 網絡狀態碼
1
2
3
3. 期望輸出數據格式
13560436666 1116 954 2070 手機號碼 上行流量 下行流量 總流量
1
2
3
3.2. 需求分析
3.3. 編寫MapReduce程序
1. 編寫流量統計的Bean對象
package com.buwenbuhuo.flowsun; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-23 14:14 * com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ // 1 實現writable接口 public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; //2 反序列化時,需要反射調用空參構造函數,所以必須有 public FlowBean() { } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } //3 寫序列化方法 public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } //4 反序列化方法 //5 反序列化方法讀順序必須和寫序列化方法的寫順序必須一致 public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
2. 編寫Mapper類
package com.buwenbuhuo.flowsun; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author 卜溫不火 * @create 2020-04-23 14:14 * com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class FlowMapper extends Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
3. 編寫Reducer類
package com.buwenbuhuo.flowsun; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author 卜溫不火 * @create 2020-04-23 14:15 * com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class FlowReducer extends Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
4. 編寫Driver驅動類
package com.buwenbuhuo.flowsun; /** * @author 卜溫不火 * @create 2020-04-23 14:14 * com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 獲取job實例 Job job = Job.getInstance(new Configuration()); // 2.設置類路徑 job.setJarByClass(FlowDriver.class); // 3 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); // 4 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 5 指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); // 6 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 7 將job中配置的相關參數,以及job所用的java類所在的jar包, 提交給yarn去運行 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
5. 運行并查看結果
1. 運行
2. 查看結果
本期的分享就到這里了,小伙伴們有什么疑惑或好的建議可以積極在評論區留言~,博主會持續更新新鮮好玩的技術,喜歡的小伙伴們不要忘了,記得要關注博主吶ヾ(?°?°?)??。
Hadoop MapReduce
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。