MapReduce編程之Join多種應(yīng)用場景與使用

      網(wǎng)友投稿 1010 2022-05-29

      join操作概述

      在關(guān)系型數(shù)據(jù)庫中 Join 是非常常見的操作,各種優(yōu)化手段已經(jīng)到了極致。在海量數(shù)據(jù)的環(huán)境下,不可避免的也會碰到這種類型的需求, 例如在數(shù)據(jù)分析時需要連接從不同的數(shù)據(jù)源中獲取到數(shù)據(jù)。不同于傳統(tǒng)的單機模式,在分布式存儲下采用 MapReduce 編程模型,也有相應(yīng)的處理措施和優(yōu)化方法。

      我們先簡要地描述待解決的問題。假設(shè)有兩個數(shù)據(jù)集:氣象站數(shù)據(jù)庫和天氣記錄數(shù)據(jù)庫,并考慮如何合二為一。一個典型的查詢是:輸出氣象站的歷史信息,同時各行記錄也包含氣象站的元數(shù)據(jù)信息。

      Reduce join

      在Reudce端進行連接是MapReduce框架實現(xiàn)join操作最常見的方式,其具體的實現(xiàn)原理如下:

      Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區(qū)別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。

      reduce端的主要工作:在reduce端以連接字段作為key的分組已經(jīng)完成,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在map階段已經(jīng)打標志)分開,最后進行合并就ok了

      Reduce Join 實現(xiàn)方式一

      ● 適用場景:兩個表連接

      ● 實現(xiàn)方式:二次排序

      ● 代碼實現(xiàn):

      自定義TextPair作為JoinStationMapper和JoinRecordMapper的輸出key:

      package com.xxx.hadoop.join;

      import org.apache.hadoop.io.WritableComparable;

      import java.io.*;

      import org.apache.hadoop.io.*;

      public class TextPair implements WritableComparable {

      private? ? Text first;//Text 類型的實例變量 first

      private? ? Text second;//Text 類型的實例變量 second

      public TextPair() {set(new Text(),new Text());}

      public TextPair(String first, String second) { set(new Text(first),new Text(second));}

      public TextPair(Text first, Text second) {set(first, second);}

      public void set(Text first, Text second) {

      this.first = first;

      this.second = second;

      }

      public Text getFirst() { return first;}

      public Text getSecond() { return second;}

      //將對象轉(zhuǎn)換為字節(jié)流并寫入到輸出流out中

      public void write(DataOutput out)throws IOException {

      first.write(out);

      second.write(out);

      }

      //從輸入流in中讀取字節(jié)流反序列化為對象

      public void readFields(DataInput in)throws IOException {

      first.readFields(in);

      second.readFields(in);

      }

      @Override

      public int hashCode() {return first.hashCode() *163+second.hashCode();}

      @Override

      public boolean equals(Object o) {

      if(o instanceof TextPair) {

      TextPair tp = (TextPair) o;

      return first.equals(tp.first) && second.equals(tp.second);

      }

      return false;

      }

      @Override

      public String toString() {return first +"\t"+ second;}

      //排序

      public int compareTo(TextPair o) {

      // TODO Auto-generated method stub

      if(!first.equals(o.first)){

      return first.compareTo(o.first);

      }

      else if(!second.equals(o.second)){

      return second.compareTo(o.second);

      }else{return 0;}

      }

      }

      自定義分區(qū)KeyPartitioner

      package com.xxx.hadoop.join;

      import org.apache.hadoop.io.Text;

      import org.apache.hadoop.mapreduce.Partitioner;

      //joinkey + "0"

      public class KeyPartitioner? extends Partitioner< TextPair,Text>{

      public int getPartition(TextPair key,Text value,int numPartitions){

      return (key.getFirst().hashCode()&Integer.MAX_VALUE)% numPartitions;

      }

      }

      自定義分組GroupingComparator

      package com.xxx.hadoop.join;

      import org.apache.hadoop.io.Text;

      import org.apache.hadoop.io.WritableComparable;

      import org.apache.hadoop.io.WritableComparator;

      public class GroupingComparator extends WritableComparator{

      protected GroupingComparator(){

      super(TextPair.class, true);

      }

      @Override

      //Compare two WritableComparables.

      public int compare(WritableComparable w1, WritableComparable w2){

      TextPair ip1 = (TextPair) w1;

      TextPair ip2 = (TextPair) w2;

      Text l = ip1.getFirst();

      Text r = ip2.getFirst();

      return l.compareTo(r);

      }

      }

      由于 TextPair 經(jīng)過了二次排序,所以 reducer 會先接收到氣象站數(shù)據(jù)。因此從中抽取氣象站名稱,并將其作為后續(xù)每條輸出記錄的一部分寫到輸出文件。JoinReducer 的代碼如下所示。

      package com.xxx.hadoop.join;

      import java.io.IOException;

      import java.util.Iterator;

      import org.apache.hadoop.io.Text;

      import org.apache.hadoop.mapreduce.Reducer;

      public class JoinReducer extends Reducer< TextPair,Text,Text,Text>{

      protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{

      Iterator< Text> iter = values.iterator();

      Text stationName = new Text(iter.next());//氣象站名稱

      while(iter.hasNext()){

      Text record = iter.next();//天氣記錄的每條數(shù)據(jù)

      Text outValue = new Text(stationName.toString()+"\t"+record.toString());

      context.write(key.getFirst(),outValue);

      }

      }

      }

      下面我們定義作業(yè)的驅(qū)動類 ReduceJoinBySecondarySort,在該類中,關(guān)鍵在于根據(jù)組合鍵的第一個字段(即氣象站 ID)進行分區(qū)和分組,即使用一個自定義的 Partitioner 和 一個自定義的分組 comparator 作為TextPair 的嵌套類。ReduceJoinBySecondarySort 類的代碼如下所示。

      /*

      * 通過二次排序?qū)崿F(xiàn)reduce join

      * 適用場景:其中一個表的連接字段key唯一

      */

      public class ReduceJoinBySecondarySort extends Configured implements Tool{

      public int run(String[] args) throws Exception{

      Configuration conf = new Configuration();// 讀取配置文件

      Path mypath = new Path(args[2]);

      FileSystem hdfs = mypath.getFileSystem(conf);// 創(chuàng)建輸出路徑

      if (hdfs.isDirectory(mypath)) {

      hdfs.delete(mypath, true);

      }

      Job job = Job.getInstance(conf, "join");// 新建一個任務(wù)

      job.setJarByClass(ReduceJoinBySecondarySort.class);// 主類

      Path recordInputPath = new Path(args[0]);//天氣記錄數(shù)據(jù)源

      Path stationInputPath = new Path(args[1]);//氣象站數(shù)據(jù)源

      Path outputPath = new Path(args[2]);//輸出路徑

      MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//讀取天氣記錄Mapper

      MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//讀取氣象站Mapper

      FileOutputFormat.setOutputPath(job,outputPath);

      job.setReducerClass(JoinReducer.class);// Reducer

      job.setPartitionerClass(KeyPartitioner.class);//自定義分區(qū)

      job.setGroupingComparatorClass(GroupingComparator.class);//自定義分組

      job.setMapOutputKeyClass(TextPair.class);

      job.setMapOutputValueClass(Text.class);

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(Text.class);

      return job.waitForCompletion(true)?0:1;

      }

      public static void main(String[] args) throws Exception{

      String[] args0 = {"hdfs://xxx002:9000/xxx/records.txt"

      ,"hdfs://xxx002:9000/xxx/station.txt"

      ,"hdfs://xxx002:9000/xxx/ssReduceJoin-out"

      };

      int exitCode = ToolRunner.run(new ReduceJoinBySecondarySort(),args);

      System.exit(exitCode);

      }

      }

      下載該樣本數(shù)據(jù)上運行程序,獲得以下輸出結(jié)果。

      011990-99999SIHCCAJAVRI1950051507000

      011990-99999SIHCCAJAVRI19500515120022

      011990-99999SIHCCAJAVRI195005151800-11

      012650-99999TYNSET-HANSMOEN194903241200111

      012650-99999TYNSET-HANSMOEN19490324180078

      Reduce Join 實現(xiàn)方式二

      ● 適用場景:兩個表連接

      ● 實現(xiàn)方式:笛卡爾積

      ● 代碼實現(xiàn):

      /*

      * 兩個大表

      * 通過笛卡爾積實現(xiàn) reduce join

      * 適用場景:兩個表的連接字段key都不唯一(包含一對多,多對多的關(guān)系)

      */

      public class ReduceJoinByCartesianProduct {

      /**

      為來自不同表(文件)的key/value對打標簽以區(qū)別不同來源的記錄。

      然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。

      */

      public static class ReduceJoinByCartesianProductMapper extends Mapper{

      private Text joinKey=new Text();

      private Text combineValue=new Text();

      @Override

      protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      String pathName=((FileSplit)context.getInputSplit()).getPath().toString();

      //如果數(shù)據(jù)來自于records,加一個records的標記

      if(pathName.endsWith("records.txt")){

      String[] valueItems=StringUtils.split(value.toString(),"\s+");

      //過濾掉臟數(shù)據(jù)

      if(valueItems.length!=3){

      return;

      }

      joinKey.set(valueItems[0]);

      combineValue.set("records.txt" + valueItems[1] + "\t" + valueItems[2]);

      }else if(pathName.endsWith("station.txt")){

      //如果數(shù)據(jù)來自于station,加一個station的標記

      String[] valueItems=StringUtils.split(value.toString(),"\s+");

      //過濾掉臟數(shù)據(jù)

      if(valueItems.length!=2){

      return;

      }

      joinKey.set(valueItems[0]);

      combineValue.set("station.txt" + valueItems[1]);

      }

      context.write(joinKey,combineValue);

      }

      }

      /*

      * reduce 端做笛卡爾積

      */

      public static class ReduceJoinByCartesianProductReducer extends Reducer{

      private List leftTable=new ArrayList();

      private List rightTable=new ArrayList();

      private Text result=new Text();

      @Override

      protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

      //一定要清空數(shù)據(jù)

      leftTable.clear();

      rightTable.clear();

      //相同key的記錄會分組到一起,我們需要把相同key下來自于不同表的數(shù)據(jù)分開,然后做笛卡爾積

      for(Text value : values){

      String val=value.toString();

      if(val.startsWith("station.txt")){

      leftTable.add(val.replaceFirst("station.txt",""));

      }else if(val.startsWith("records.txt")){

      rightTable.add(val.replaceFirst("records.txt",""));

      }

      }

      //笛卡爾積

      for(String leftPart:leftTable){

      for(String rightPart:rightTable){

      result.set(leftPart+"\t"+rightPart);

      context.write(key, result);

      }

      }

      }

      }

      public static void main(String[] args) throws Exception{

      Configuration conf = new Configuration();

      String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

      if (otherArgs.length < 2) {

      System.err.println("Usage: reducejoin [...] ");

      System.exit(2);

      }

      //輸出路徑

      Path mypath = new Path(otherArgs[otherArgs.length - 1]);

      FileSystem hdfs = mypath.getFileSystem(conf);// 創(chuàng)建輸出路徑

      if (hdfs.isDirectory(mypath)) {

      hdfs.delete(mypath, true);

      }

      Job job = Job.getInstance(conf, "ReduceJoinByCartesianProduct");

      job.setJarByClass(ReduceJoinByCartesianProduct.class);

      job.setMapperClass(ReduceJoinByCartesianProductMapper.class);

      job.setReducerClass(ReduceJoinByCartesianProductReducer.class);

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(Text.class);

      //添加輸入路徑

      for (int i = 0; i < otherArgs.length - 1; ++i) {

      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

      }

      //添加輸出路徑

      FileOutputFormat.setOutputPath(job,

      new Path(otherArgs[otherArgs.length - 1]));

      System.exit(job.waitForCompletion(true) ? 0 : 1);

      }

      }

      下載該樣本數(shù)據(jù)上運行程序,獲得以下輸出結(jié)果。

      011990-99999SIHCCAJAVRI1950051507000

      011990-99999SIHCCAJAVRI19500515120022

      011990-99999SIHCCAJAVRI195005151800-11

      012650-99999TYNSET-HANSMOEN194903241200111

      012650-99999TYNSET-HANSMOEN19490324180078

      Reduce Join 實現(xiàn)方式三

      ● 適用場景:一個大表和一個小表連接

      ● 實現(xiàn)方式:分布式緩存

      分布式緩存知識點補充:

      當 MapReduce 處理大型數(shù)據(jù)集間的 join 操作時,此時如果一個數(shù)據(jù)集很大而另外一個集合很小,以至于可以分發(fā)到集群中的每個節(jié)點之中。 這種情況下,我們就用到了 Hadoop 的分布式緩存機制,它能夠在任務(wù)運行過程中及時地將文件和存檔復(fù)制到任務(wù)節(jié)點以供使用。為了節(jié)約網(wǎng)絡(luò)寬帶,在每一個作業(yè)中, 各個文件通常只需要復(fù)制到一個節(jié)點一次。

      1、用法

      Hadoop 命令行選項中,有三個命令可以實現(xiàn)文件復(fù)制分發(fā)到任務(wù)的各個節(jié)點。

      1)用戶可以使用 -files 選項指定待分發(fā)的文件,文件內(nèi)包含以逗號隔開的 URL 列表。文件可以存放在本地文件系統(tǒng)、HDFS、或其它 Hadoop 可讀文件系統(tǒng)之中。 如果尚未指定文件系統(tǒng),則這些文件被默認是本地的。即使默認文件系統(tǒng)并非本地文件系統(tǒng),這也是成立的。

      2)用戶可以使用 -archives 選項向自己的任務(wù)中復(fù)制存檔文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,這些文件會被解檔到任務(wù)節(jié)點。

      3)用戶可以使用 -libjars 選項把 JAR 文件添加到 mapper 和 reducer 任務(wù)的類路徑中。如果作業(yè) JAR 文件并非包含很多庫 JAR 文件,這點會很有用。

      2、工作機制

      當用戶啟動一個作業(yè),Hadoop 會把由 -files、-archives、和 -libjars 等選項所指定的文件復(fù)制到分布式文件系統(tǒng)之中。接著,在任務(wù)運行之前, tasktracker 將文件從分布式文件系統(tǒng)復(fù)制到本地磁盤(緩存)使任務(wù)能夠訪問文件。此時,這些文件就被視為 “本地化” 了。從任務(wù)的角度來看, 這些文件就已經(jīng)在那兒了,它并不關(guān)心這些文件是否來自 HDFS 。此外,有 -libjars 指定的文件會在任務(wù)啟動前添加到任務(wù)的類路徑(classpath)中。

      3、分布式緩存 API

      由于可以通過 Hadoop 命令行間接使用分布式緩存,大多數(shù)應(yīng)用不需要使用分布式緩存 API。然而,一些應(yīng)用程序需要用到分布式緩存的更高級的特性,這就需要直接使用 API 了。 API 包括兩部分:將數(shù)據(jù)放到緩存中的方法,以及從緩存中讀取數(shù)據(jù)的方法。

      1)首先掌握數(shù)據(jù)放到緩存中的方法,以下列舉 Job 中可將數(shù)據(jù)放入到緩存中的相關(guān)方法:

      在緩存中可以存放兩類對象:文件(files)和存檔(achives)。文件被直接放置在任務(wù)節(jié)點上,而存檔則會被解檔之后再將具體文件放置在任務(wù)節(jié)點上。

      2)其次掌握在 map 或者 reduce 任務(wù)中,使用 API 從緩存中讀取數(shù)據(jù)。

      public Path[] getLocalCacheFiles() throws IOException;

      public Path[] getLocalCacheArchives() throws IOException;

      public Path[] getFileClassPaths();

      public Path[] getArchiveClassPaths();

      我們可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法獲取緩存中的文件或者存檔的引用。 當處理存檔時,將會返回一個包含解檔文件的目的目錄。相應(yīng)的,用戶可以通過 getFileClassPaths()和getArchivesClassPaths()方法獲取被添加到任務(wù)的類路徑下的文件和文檔。

      ● 代碼實現(xiàn):

      /*

      * 通過分布式緩存實現(xiàn)Reduce Join

      * 適用場景:其中一個表比較小,能放入內(nèi)存

      */

      public class ReduceJoinByDistributedCache? extends Configured implements Tool {

      //直接輸出大表數(shù)據(jù)records.txt

      public static class ReduceJoinByDistributedCacheMapper extends

      Mapper< LongWritable, Text, Text, Text> {

      public void map(LongWritable key, Text value, Context context)

      throws IOException, InterruptedException {

      String[] arr = StringUtils.split(value.toString(),"\s+");

      if (arr.length == 3) {

      context.write(new Text(arr[0]), value);

      }

      }

      }

      //在reduce 端通過緩存文件實現(xiàn)join操作

      public static class ReduceJoinByDistributedCacheReducer extends

      Reducer< Text, Text, Text, Text> {

      //定義Hashtable存放緩存數(shù)據(jù)

      private Hashtable< String, String> table = new Hashtable< String, String>();

      /**

      * 獲取分布式緩存文件

      */

      protected void setup(Context context) throws IOException,

      InterruptedException {

      BufferedReader br;

      String infoAddr = null;

      // 返回緩存文件路徑

      Path[] cacheFilesPaths = context.getLocalCacheFiles();

      for (Path path : cacheFilesPaths) {

      String pathStr = path.toString();

      br = new BufferedReader(new FileReader(pathStr));

      while (null != (infoAddr = br.readLine())) {

      // 按行讀取并解析氣象站數(shù)據(jù)

      String[] records = StringUtils.split(infoAddr.toString(),

      "\s+");

      if (null != records)//key為stationID,value為stationName

      table.put(records[0], records[1]);

      }

      }

      }

      public void reduce(Text key, Iterable< Text> values, Context context)

      throws IOException, InterruptedException {

      //天氣記錄根據(jù)stationId 獲取stationName

      String stationName = table.get(key.toString());

      for (Text value : values) {

      context.write(new Text(stationName), value);

      }

      }

      }

      public int run(String[] args) throws Exception {

      Configuration conf = new Configuration();

      String[] otherArgs = new GenericOptionsParser(conf, args)

      .getRemainingArgs();

      if (otherArgs.length < 2) {System.err.println("Usage: cache [...] ");

      System.exit(2);

      }

      //輸出路徑

      Path mypath = new Path(otherArgs[otherArgs.length - 1]);

      FileSystem hdfs = mypath.getFileSystem(conf);// 創(chuàng)建輸出路徑

      if (hdfs.isDirectory(mypath)) {

      hdfs.delete(mypath, true);

      }

      Job job = Job.getInstance(conf, "ReduceJoinByDistributedCache");

      //添加緩存文件

      job.addCacheFile(new Path(otherArgs[0]).toUri());//station.txt

      job.setJarByClass(ReduceJoinByDistributedCache.class);

      job.setMapperClass(ReduceJoinByDistributedCacheMapper.class);

      job.setReducerClass(ReduceJoinByDistributedCacheReducer.class);

      job.setOutputKeyClass(Text.class);

      job.setOutputValueClass(Text.class);

      //添加輸入路徑

      for (int i = 1; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));

      }

      //添加輸出路徑FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));return job.waitForCompletion(true) ? 0 : 1;

      }? ? public static void main(String[] args) throws Exception {? ? int ec = ToolRunner.run(new Configuration(),new ReduceJoinByDistributedCache(), args);? ? System.exit(ec);

      }

      }

      下載該樣本數(shù)據(jù)上運行程序,獲得以下輸出結(jié)果:

      Reduce Join的不足

      這里主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,是因為整體數(shù)據(jù)被分割了,每個map task只處理一部分數(shù)據(jù)而不能夠獲取到所有需要的join字段,因此我們可以充分利用mapreduce框架的特性,讓他按照join key進行分區(qū),將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現(xiàn)了。

      這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現(xiàn)大量的數(shù)據(jù)傳輸,效率很低。

      Map join

      Map Join 實現(xiàn)方式一

      ● 使用場景:一張表十分小、一張表很大。

      ● 用法:

      在提交作業(yè)的時候先將小表文件放到該作業(yè)的DistributedCache中,然后從DistributeCache中取出該小表進行join (比如放到Hash Map等等容器中)。然后掃描大表,看大表中的每條記錄的join key /value值是否能夠在內(nèi)存中找到相同join key的記錄,如果有則直接輸出結(jié)果。

      DistributedCache是分布式緩存的一種實現(xiàn),它在整個MapReduce框架中起著相當重要的作用,他可以支撐我們寫一些相當復(fù)雜高效的分布式程序。說回到這里,JobTracker在作業(yè)啟動之前會獲取到DistributedCache的資源uri列表,并將對應(yīng)的文件分發(fā)到各個涉及到該作業(yè)的任務(wù)的TaskTracker上。另外,關(guān)于DistributedCache和作業(yè)的關(guān)系,比如權(quán)限、存儲路徑區(qū)分、public和private等屬性。

      ● 代碼實現(xiàn):

      /*

      * 通過分布式緩存實現(xiàn) map join

      * 適用場景:一個小表,一個大表

      */

      public class MapJoinByDistributedCache? extends Configured implements Tool {

      下載該樣本數(shù)據(jù)上運行程序,獲得以下輸出結(jié)果:

      Map Join 實現(xiàn)方式二

      ● 使用場景:一張表在數(shù)據(jù)庫、一張表很大。

      另外還有一種比較變態(tài)的Map Join方式,就是結(jié)合HBase來做Map Join操作。這種方式完全可以突破內(nèi)存的控制,使你毫無忌憚的使用Map Join,而且效率也非常不錯。

      Semi join

      Map Join 實現(xiàn)方式一

      ● 使用場景:一個大表(內(nèi)存放不下),一個超大表

      ● 實現(xiàn)方式:分布式緩存

      ● 用法:

      SemiJoin就是所謂的半連接,其實仔細一看就是reduce join的一個變種,就是在map端過濾掉一些數(shù)據(jù),在網(wǎng)絡(luò)中只傳輸參與連接的數(shù)據(jù)不參與連接的數(shù)據(jù)不必在網(wǎng)絡(luò)中進行傳輸,從而減少了shuffle的網(wǎng)絡(luò)傳輸量,使整體效率得到提高,其他思想和reduce join是一模一樣的。說得更加接地氣一點就是將小表中參與join的key單獨抽出來通過DistributedCach分發(fā)到相關(guān)節(jié)點,然后將其取出放到內(nèi)存中(可以放到HashSet中),在map階段掃描連接表,將join key不在內(nèi)存HashSet中的記錄過濾掉,讓那些參與join的記錄通過shuffle傳輸?shù)絩educe端進行join操作,其他的和reduce join都是一樣的。

      ● 代碼實現(xiàn):

      /*

      * 一個大表,一個小表

      * map 階段:Semi Join解決小表整個記錄內(nèi)存放不下的場景,過濾大表

      * reduce 階段:reduce side join

      */

      public class SemiJoin {

      /**

      * 為來自不同表(文件)的key/value對打標簽以區(qū)別不同來源的記錄。

      * 然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。

      */

      public static class SemiJoinMapper extends

      Mapper {

      // 定義Set集合保存小表中的key

      private Set joinKeys = new HashSet();

      private Text joinKey = new Text();

      private Text combineValue = new Text();

      /**

      * 獲取分布式緩存文件

      */

      protected void setup(Context context) throws IOException,

      InterruptedException {

      BufferedReader br;

      String infoAddr = null;

      // 返回緩存文件路徑

      Path[] cacheFilesPaths = context.getLocalCacheFiles();

      for (Path path : cacheFilesPaths) {

      String pathStr = path.toString();

      br = new BufferedReader(new FileReader(pathStr));

      while (null != (infoAddr = br.readLine())) {

      // 按行讀取并解析氣象站數(shù)據(jù)

      MapReduce編程之Join多種應(yīng)用場景與使用

      String[] records = StringUtils.split(infoAddr.toString(),

      "\\s+");

      if (null != records)// key為stationID

      joinKeys.add(records[0]);

      }

      }

      }

      @Override

      protected void map(Object key, Text value, Context context)

      throws IOException, InterruptedException {

      String pathName = ((FileSplit) context.getInputSplit()).getPath()

      .toString();

      // 如果數(shù)據(jù)來自于records,加一個records的標記

      if (pathName.endsWith("records-semi.txt")) {

      String[] valueItems = StringUtils.split(value.toString(),

      "\\s+");

      // 過濾掉臟數(shù)據(jù)

      if (valueItems.length != 3) {

      return;

      }

      if (joinKeys.contains(valueItems[0])) {

      joinKey.set(valueItems[0]);

      combineValue.set("records-semi.txt" + valueItems[1] + "\t"

      + valueItems[2]);

      context.write(joinKey, combineValue);

      }

      } else if (pathName.endsWith("station.txt")) {

      // 如果數(shù)據(jù)來自于station,加一個station的標記

      String[] valueItems = StringUtils.split(value.toString(),

      "\\s+");

      // 過濾掉臟數(shù)據(jù)

      if (valueItems.length != 2) {

      return;

      }

      joinKey.set(valueItems[0]);

      combineValue.set("station.txt" + valueItems[1]);

      context.write(joinKey, combineValue);

      }

      }

      }

      /*

      * reduce 端做笛卡爾積

      */

      public static class SemiJoinReducer extends

      Reducer {

      private List leftTable = new ArrayList();

      private List rightTable = new ArrayList();

      private Text result = new Text();

      public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration();

      String[] otherArgs = new GenericOptionsParser(conf, args)

      .getRemainingArgs();

      if (otherArgs.length < 2) {

      System.err.println("Usage: semijoin [...] ");

      System.exit(2);

      }

      //輸出路徑

      Path mypath = new Path(otherArgs[otherArgs.length - 1]);

      FileSystem hdfs = mypath.getFileSystem(conf);// 創(chuàng)建輸出路徑

      if (hdfs.isDirectory(mypath)) {

      hdfs.delete(mypath, true);

      }

      Job job = Job.getInstance(conf, "SemiJoin");

      下載該樣本數(shù)據(jù)上運行程序,獲得以下輸出結(jié)果:

      Reduce join + BloomFilter

      ● 使用場景:一個大表(表中的key內(nèi)存仍然放不下),一個超大表

      在某些情況下,SemiJoin抽取出來的小表的key集合在內(nèi)存中仍然存放不下,這時候可以使用BloomFiler以節(jié)省空間。

      BloomFilter最常見的作用是:判斷某個元素是否在一個集合里面。它最重要的兩個方法是:add() 和membershipTest ()。

      因而可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關(guān)系,只不過增加了少量的網(wǎng)絡(luò)IO而已。

      ● BloomFilter參數(shù)計算方式:

      n:小表中的記錄數(shù)。

      m:位數(shù)組大小,一般m是n的倍數(shù),倍數(shù)越大誤判率就越小,但是也有內(nèi)存限制,不能太大,這個值需要反復(fù)測試得出。

      k:hash個數(shù),最優(yōu)hash個數(shù)值為:k = ln2 * (m/n)

      ● 代碼實現(xiàn):

      下載該樣本數(shù)據(jù)上運行程序,獲得以下輸出:

      總結(jié):

      三種join方式適用于不同的場景,其處理效率上相差很大,其主要導(dǎo)致因素是網(wǎng)絡(luò)傳輸。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,寫分布式大數(shù)據(jù)處理程序的時最好要對整體要處理的數(shù)據(jù)分布情況作一個了解,這可以提高我們代碼的效率,使數(shù)據(jù)的傾斜度降到最低,使我們的代碼傾向性更好。

      分布式 Apache 緩存 MapReduce

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔相應(yīng)法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:2019年12月前端面經(jīng)及總結(jié)(西安,杭州)
      下一篇:Kubernetes集群調(diào)度器原理剖析及思考
      相關(guān)文章
      www.91亚洲| 亚洲精品网站在线观看你懂的| 亚洲国产一区在线| 亚洲综合国产成人丁香五月激情| 亚洲综合婷婷久久| 精品亚洲综合久久中文字幕| 亚洲精品线路一在线观看| 亚洲精品A在线观看| 亚洲欧美国产日韩av野草社区| 亚洲一区二区三区在线观看网站| 亚洲国产片在线观看| 亚洲国产中文在线二区三区免| 亚洲国产日韩在线成人蜜芽| 亚洲AV一二三区成人影片| 亚洲一区二区影视| 亚洲熟女乱色一区二区三区| 亚洲欧洲专线一区| 国产精品无码亚洲精品2021| 亚洲Av无码国产情品久久| 青青青国产色视频在线观看国产亚洲欧洲国产综合 | 久久精品国产亚洲AV无码偷窥| 亚洲乱码日产一区三区| 国产A在亚洲线播放| 亚洲第一精品福利| 亚洲精品国产福利片| 亚洲人成人77777网站不卡| 亚洲高清有码中文字| 色综合久久精品亚洲国产| 亚洲国产成人精品91久久久| 国产亚洲一区二区三区在线不卡| 久久亚洲国产精品一区二区| 亚洲人成网站在线播放vr| 亚洲∧v久久久无码精品| 亚洲精品国产福利片| 2020国产精品亚洲综合网| 亚洲欧美日韩中文高清www777| 国产区图片区小说区亚洲区| 国产亚洲精品AA片在线观看不加载| 亚洲免费人成在线视频观看| 亚洲AV无码国产精品麻豆天美| 亚洲视频免费一区|