MapReduce多種輸入格式(默認(rèn)的mapreduce輸入格式)

      網(wǎng)友投稿 1030 2022-05-30

      文件是 MapReduce 任務(wù)數(shù)據(jù)的初始存儲(chǔ)地。正常情況下,輸入文件一般是存儲(chǔ)在 HDFS 里面。這些文件的格式可以是任意的:我們可以使用基于行的日志文件, 也可以使用二進(jìn)制格式,多行輸入記錄或者其它一些格式。這些文件一般會(huì)很大,達(dá)到數(shù)十GB,甚至更大。那么 MapReduce 是如何讀取這些數(shù)據(jù)的呢?下面我們首先學(xué)習(xí) InputFormat 接口。

      InputFormat 接口

      InputFormat 接口決定了輸入文件如何被 Hadoop 分塊(split up)與接受。InputFormat 能夠從一個(gè) job 中得到一個(gè) split 集合(InputSplit[]),然后再為這個(gè) split 集合配上一個(gè)合適的 RecordReader(getRecordReader)來讀取每個(gè)split中的數(shù)據(jù)。 下面我們來看一下 InputFormat 接口由哪些抽象方法組成。

      InputFormat 的抽象類方法

      InputFormat 包含兩個(gè)抽象方法,如下所示。

      public abstract class InputFormat< K, V> {

      public abstract List< InputSplit> getSplits(JobContext context) throws IOException,InterruptedException;

      public abstract RecordReader< K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,InterruptedException;

      }

      getSplits(JobContext context) 方法負(fù)責(zé)將一個(gè)大數(shù)據(jù)邏輯分成許多片。比如數(shù)據(jù)庫表有 100 條數(shù)據(jù),按照主鍵 ID 升序存儲(chǔ)。 假設(shè)每 20 條分成一片,這個(gè) List 的大小就是 5,然后每個(gè) InputSplit 記錄兩個(gè)參數(shù),第一個(gè)為這個(gè)分片的起始 ID,第二個(gè)為這個(gè)分片數(shù)據(jù)的大小,這里是20.很明顯 InputSplit 并沒有真正存儲(chǔ)數(shù)據(jù)。只是提供了一個(gè)如何將數(shù)據(jù)分片的方法。

      createRecordReader(InputSplit split,TaskAttemptContext context) 方法根據(jù) InputSplit 定義的方法,返回一個(gè)能夠讀取分片記錄的 RecorderReader。getSplit 用來獲取由輸入文件計(jì)算出來的 InputSplit, 后面會(huì)看到計(jì)算 InputSplit 時(shí),會(huì)考慮輸入文件是否可分割、文件存儲(chǔ)時(shí)分塊的大小和文件大小等因素;而createRecordReader() 提供了前面說的 RecorderReader 的實(shí)現(xiàn), 將Key-Value 對從 InputSplit 中正確讀出來,比如LineRecorderReader,它是以偏移值為Key,每行的數(shù)據(jù)為 Value,這使所有 createRecorderReader() 返回 LineRecorderReader 的 InputFormat 都是以偏移值為Key,每行數(shù)據(jù)為 Value 的形式讀取輸入分片的。

      其實(shí)很多時(shí)候并不需要我們實(shí)現(xiàn) InputFormat 來讀取數(shù)據(jù),Hadoop 自帶有很多數(shù)據(jù)輸入格式,已經(jīng)實(shí)現(xiàn)了 InputFormat接口。

      InputFormat 接口實(shí)現(xiàn)類

      InputFormat 接口實(shí)現(xiàn)類有很多,其層次結(jié)構(gòu)如下圖所示:

      FileInputFormat

      FileInputFormat 是所有使用文件作為其數(shù)據(jù)源的 InputFormat 實(shí)現(xiàn)的基類,它的主要作用是指出作業(yè)的輸入文件位置。因?yàn)樽鳂I(yè)的輸入被設(shè)定為一組路徑, 這對指定作業(yè)輸入提供了很強(qiáng)的靈活性。FileInputFormat 提供了四種靜態(tài)方法來設(shè)定 Job 的輸入路徑:

      public static void addInputPath(Job job,Path path);

      public static void addInputPaths(Job job,String commaSeparatedPaths);

      public static void setInputPaths(Job job,Path... inputPaths);

      public static void setInputPaths(Job job,String commaSeparatedPaths);

      addInputPath()和addInputPaths()方法可以將一個(gè)或多個(gè)路徑加入路徑列表,可以分別調(diào)用這兩種方法來建立路徑列表。 setInputPaths()方法一次設(shè)定完整的路徑列表,替換前面調(diào)用中在 Job 上所設(shè)置的所有路徑。它們具體的使用方法,看如下示例。

      FileInputFormat.addInputPath(job, new Path("hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath1"));//設(shè)置一個(gè)源路徑

      FileInputFormat.addInputPaths(job, "hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath1,hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath2,...");//設(shè)置多個(gè)源路徑,多個(gè)源路徑之間用逗號(hào)分開

      FileInputFormat.setInputPaths(job, inputPaths);//inputPaths是一個(gè)Path類型的數(shù)組,可以包含多個(gè)源路徑,比如hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath1,hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath2,...等

      FileInputFormat.setInputPaths(job, "hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath1,hdfs://single.hadoop.xxx.com:9000/hadoop/inputPath2,...");//設(shè)置多個(gè)源路徑,多個(gè)源路徑之間用逗號(hào)分開

      add 方法和 set 方法允許指定包含的文件。如果需要排除特定文件,可以使用 FileInputFormat 的 setInputPathFilter()方法設(shè)置一個(gè)過濾器:

      public static void setInputPathFilter(Job job,Class< ?extends PathFilter filter);

      過濾器的詳細(xì)討論,請點(diǎn)擊PathFilter,這里不再深入討論。即使不設(shè)置過濾器,F(xiàn)ileInputFormat 也會(huì)使用一個(gè)默認(rèn)的過濾器來排除隱藏文件。 如果通過調(diào)用 setInputPathFilter()設(shè)置了過濾器,它會(huì)在默認(rèn)過濾器的基礎(chǔ)上進(jìn)行過濾。換句話說,自定義的過濾器只能看到非隱藏文件。

      對于輸入的數(shù)據(jù)源是文件類型的情況下,Hadoop 不僅擅長處理非結(jié)構(gòu)化文本數(shù)據(jù),而且可以處理二進(jìn)制格式的數(shù)據(jù), 但它們的基類都是FileInputFormat。下面我們介紹的幾種常用輸入格式,都實(shí)現(xiàn)了FileInputFormat基類。

      1、TextInputFormat

      TextInputFormat 是默認(rèn)的 InputFormat。每條記錄是一行輸入。鍵是LongWritable 類型,存儲(chǔ)該行在整個(gè)文件中的字節(jié)偏移量。 值是這行的內(nèi)容,不包括任何行終止符(換行符合回車符),它被打包成一個(gè) Text 對象。

      以下是一個(gè)示例,比如,一個(gè)分片包含了如下4條文本記錄:

      Rich learning form

      Intelligent learning engine

      Learning more convenient

      From the real demand for more close to the enterprise

      每條記錄表示為以下鍵/值對:

      (0,Rich learning form)

      (19,Intelligent learning engine)

      (47,Learning more convenient)

      (72,From the real demand for more close to the enterprise)

      很明顯,鍵并不是行號(hào)。一般情況下,很難取得行號(hào),因?yàn)槲募醋止?jié)而不是按行切分為分片。

      2、KeyValueTextInputFormat

      每一行均為一條記錄, 被分隔符(缺省是tab(\t))分割為key(Text),value(Text)。可以通過 mapreduce.input.keyvaluelinerecordreader.key.value,separator屬性(或者舊版本 API 中的 key.value.separator.in.input.line)來設(shè)定分隔符。 它的默認(rèn)值是一個(gè)制表符。

      以下是一個(gè)示例,輸入是一個(gè)包含4條記錄的分片。其中——>表示一個(gè)(水平方向的)制表符:

      line1 ——>Rich learning form

      line2 ——>Intelligent learning engine

      line3 ——>Learning more convenient

      line4 ——>From the real demand for more close to the enterprise

      每條記錄表示為以下鍵/值對:

      (line1,Rich learning form)

      (line2,Intelligent learning engine)

      (line3,Learning more convenient)

      (line4,From the real demand for more close to the enterprise)

      此時(shí)的鍵是每行排在制表符之前的 Text 序列。

      3、NLineInputFormat

      通過 TextInputFormat 和 KeyValueTextInputFormat,每個(gè) Mapper 收到的輸入行數(shù)不同。行數(shù)取決于輸入分片的大小和行的長度。 如果希望 Mapper 收到固定行數(shù)的輸入,需要將 NLineInputFormat 作為 InputFormat。與 TextInputFormat 一樣, 鍵是文件中行的字節(jié)偏移量,值是行本身。N 是每個(gè) Mapper 收到的輸入行數(shù)。N 設(shè)置為1(默認(rèn)值)時(shí),每個(gè) Mapper 正好收到一行輸入。 mapreduce.input.lineinputformat.linespermap 屬性(在舊版本 API 中的 mapred.line.input.format.linespermap 屬性)實(shí)現(xiàn) N 值的設(shè)定。

      以下是一個(gè)示例,仍然以上面的4行輸入為例:

      Rich learning form

      Intelligent learning engine

      Learning more convenient

      From the real demand for more close to the enterprise

      例如,如果 N 是2,則每個(gè)輸入分片包含兩行。一個(gè) mapper 收到前兩行鍵值對:

      (0,Rich learning form)

      (19,Intelligent learning engine)

      另一個(gè) mapper 則收到后兩行:

      (47,Learning more convenient)

      (72,From the real demand for more close to the enterprise)

      這里的鍵和值與 TextInputFormat 生成的一樣。

      4、SequenceFileInputFormat

      用于讀取 sequence file。鍵和值由用戶定義。序列文件為 Hadoop 專用的壓縮二進(jìn)制文件格式。它專用于一個(gè) MapReduce 作業(yè)和其它 MapReduce 作業(yè)之間的傳送數(shù)據(jù)(適用與多個(gè) MapReduce 鏈接操作)。

      多個(gè)輸入

      雖然一個(gè) MapReduce 作業(yè)的輸入可能包含多個(gè)輸入文件,但所有文件都由同一個(gè) InputFormat 和 同一個(gè) Mapper 來解釋。 然而,數(shù)據(jù)格式往往會(huì)隨時(shí)間演變,所以必須寫自己的 Mapper 來處理應(yīng)用中的遺留數(shù)據(jù)格式問題。或者,有些數(shù)據(jù)源會(huì)提供相同的數(shù)據(jù), 但是格式不同。

      這些問題可以使用 MultipleInputs 類來妥善處理,它允許為每條輸入路徑指定 InputFormat 和 Mapper。例如,我們想把英國 Met Office 的氣象站數(shù)據(jù)和 NCDC 的氣象站數(shù)據(jù)放在一起來統(tǒng)計(jì)平均氣溫,則可以按照下面的方式來設(shè)置輸入路徑。

      MultipleInputs.addInputPath(job,ncdcInputPath,TextInputFormat.class,NCDCTemperatureMapper.class);

      MultipleInputs.addInputPath(job,metofficeInputPath,TextInputFormat.class,MetofficeTemperatureMapper.class);

      這段代碼取代了對 FileInputFormat.addInputPath()和job.setMapperClass() 的常規(guī)調(diào)用。Met Office 和 NCDC 的數(shù)據(jù)都是文本文件,所以對兩者都使用 TextInputFormat 數(shù)據(jù)類型。 但這兩個(gè)數(shù)據(jù)源的行格式不同,所以我們使用了兩個(gè)不一樣的 Mapper,分別為NCDCTemperatureMapper和MetofficeTemperatureMapper。重要的是兩個(gè) Mapper 的輸出類型一樣,因此,reducer 看到的是聚集后的 map 輸出,并不知道這些輸入是由不同的 Mapper 產(chǎn)生的。

      MultipleInputs 類還有一個(gè)重載版本的 addInputPath() 方法,它沒有 Mapper參數(shù)。如果有多種輸入格式而只有一個(gè) Mapper(通過 Job 的 setMapperClass()方法設(shè)定),這種方法很有用。其具體方法如下所示。

      public static void addInputPath(Job job,Path path,class< ? extends InputFormat> inputFormatClass);

      DBInputFormat

      DBInputFormat 這種輸入格式用于使用 JDBC 從關(guān)系數(shù)據(jù)庫中讀取數(shù)據(jù)。因?yàn)樗鼪]有任何共享能力,所以在訪問數(shù)據(jù)庫的時(shí)候必須非常小心,在數(shù)據(jù)庫中運(yùn)行太多的 mapper 讀數(shù)據(jù)可能會(huì)使數(shù)據(jù)庫受不了。 正是由于這個(gè)原因,DBInputFormat 最好用于加載少量的數(shù)據(jù)集。與之相對應(yīng)的輸出格式是DBOutputFormat,它適用于將作業(yè)輸出數(shù)據(jù)(中等規(guī)模的數(shù)據(jù))轉(zhuǎn)存到數(shù)據(jù)庫。

      自定義 InputFormat

      有時(shí)候 Hadoop 自帶的輸入格式,并不能完全滿足業(yè)務(wù)的需求,所以需要我們根據(jù)實(shí)際情況自定義 InputFormat 類。而數(shù)據(jù)源一般都是文件數(shù)據(jù),那么自定義 InputFormat時(shí)繼承 FileInputFormat 類會(huì)更為方便,從而不必考慮如何分片等復(fù)雜操作。 自定義輸入格式我們分為以下幾步:

      1、繼承 FileInputFormat 基類。

      2、重寫 FileInputFormat 里面的 isSplitable() 方法。

      3、重寫 FileInputFormat 里面的 createRecordReader()方法。

      按照上述步驟如何自定義輸入格式呢?下面我們通過一個(gè)示例加強(qiáng)理解。

      我們?nèi)∮幸环輰W(xué)生五門課程的期末考試成績數(shù)據(jù),現(xiàn)在我們希望統(tǒng)計(jì)每個(gè)學(xué)生的總成績和平均成績。 樣本數(shù)據(jù)如下所示,每行數(shù)據(jù)的數(shù)據(jù)格式為:學(xué)號(hào)、姓名、語文成績、數(shù)學(xué)成績、英語成績、物理成績、化學(xué)成績。

      19020090040 秦心芯 123 131 100 95 100

      19020090006 李磊 99 92 100 90 100

      19020090017 唐一建 90 99 100 89 95

      19020090031 曾麗麗 100 99 97 79 96

      19020090013 羅開俊 105 115 94 45 100

      19020090039 周世海 114 116 93 31 97

      19020090020 王正偉 109 98 88 47 99

      19020090025 謝瑞彬 94 120 100 50 73

      19020090007 于微 89 78 100 66 99

      19020090012 劉小利 87 82 89 71 99

      下面我們就編寫程序,實(shí)現(xiàn)自定義輸入并求出每個(gè)學(xué)生的總成績和平均成績。分為以下幾個(gè)步驟:

      第一步:為了便于每個(gè)學(xué)生學(xué)習(xí)成績的計(jì)算,這里我們需要自定義一個(gè) ScoreWritable 類實(shí)現(xiàn) WritableComparable 接口,將學(xué)生各門成績封裝起來。

      package com.xxx.hadoop.junior;

      import java.io.DataInput;

      import java.io.DataOutput;

      import java.io.IOException;

      import org.apache.hadoop.io.WritableComparable;

      /**

      * 學(xué)習(xí)成績讀寫類

      * 數(shù)據(jù)格式參考:19020090017 張三 90 99 100 89 95

      * @author Bertron

      */

      public class ScoreWritable implements WritableComparable< Object > {

      private float Chinese;

      private float Math;

      private float English;

      private float Physics;

      private float Chemistry;

      public ScoreWritable(){

      }

      public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry){

      this.Chinese = Chinese;

      this.Math = Math;

      this.English = English;

      this.Physics = Physics;

      this.Chemistry = Chemistry;

      }

      public void set(float Chinese,float Math,float English,float Physics,float Chemistry){

      this.Chinese = Chinese;

      this.Math = Math;

      this.English = English;

      this.Physics = Physics;

      this.Chemistry = Chemistry;

      }

      public float getChinese() {

      return Chinese;

      }

      public float getMath() {

      return Math;

      }

      public float getEnglish() {

      return English;

      }

      public float getPhysics() {

      return Physics;

      }

      public float getChemistry() {

      return Chemistry;

      }

      @Override

      public void readFields(DataInput in) throws IOException {

      Chinese = in.readFloat();

      Math = in.readFloat();

      English = in.readFloat();

      Physics = in.readFloat();

      Chemistry = in.readFloat();

      }

      @Override

      public void write(DataOutput out) throws IOException {

      out.writeFloat(Chinese);

      out.writeFloat(Math);

      out.writeFloat(English);

      out.writeFloat(Physics);

      out.writeFloat(Chemistry);

      }

      @Override

      public int compareTo(Object o) {

      return 0;

      }

      }

      第二步:自定義輸入格式 ScoreInputFormat 類,首先繼承 FileInputFormat,然后分別重寫 isSplitable() 方法和 createRecordReader() 方法。 需要注意的是,重寫createRecordReader()方法,其實(shí)也就是重寫其返回的對象ScoreRecordReader。ScoreRecordReader 類繼承 RecordReader,實(shí)現(xiàn)數(shù)據(jù)的讀取。

      package com.xxx.hadoop.junior;

      import java.io.IOException;

      import org.apache.hadoop.conf.Configuration;

      import org.apache.hadoop.fs.FSDataInputStream;

      import org.apache.hadoop.fs.FileSystem;

      import org.apache.hadoop.fs.Path;

      import org.apache.hadoop.io.Text;

      import org.apache.hadoop.mapreduce.InputSplit;

      import org.apache.hadoop.mapreduce.RecordReader;

      import org.apache.hadoop.mapreduce.TaskAttemptContext;

      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

      import org.apache.hadoop.mapreduce.lib.input.FileSplit;

      import org.apache.hadoop.util.LineReader;

      /**

      * 自定義學(xué)生成績讀寫InputFormat

      * 數(shù)據(jù)格式參考:19020090017 張三 90 99 100 89 95

      * @author Bertron

      */

      public class ScoreInputFormat extends FileInputFormat< Text,ScoreWritable > {

      @Override

      protected boolean isSplitable(JobContext context, Path filename) {

      // TODO Auto-generated method stub

      return false;

      }

      @Override

      public RecordReader< Text,ScoreWritable > createRecordReader(InputSplit inputsplit,

      TaskAttemptContext context) throws IOException, InterruptedException {

      return new ScoreRecordReader();

      }

      //RecordReader 中的兩個(gè)參數(shù)分別填寫我們期望返回的key/value類型,我們期望key為Text類型,value為ScoreWritable類型封裝學(xué)生所有成績

      public static class ScoreRecordReader extends RecordReader< Text, ScoreWritable > {

      public LineReader in;//行讀取器

      public Text lineKey;//自定義key類型

      public ScoreWritable lineValue;//自定義value類型

      public Text line;//每行數(shù)據(jù)類型

      @Override

      public void close() throws IOException {

      if(in !=null){

      in.close();

      }

      }

      @Override

      public Text getCurrentKey() throws IOException, InterruptedException {

      return lineKey;

      }

      @Override

      public ScoreWritable getCurrentValue() throws IOException,

      InterruptedException {

      return lineValue;

      }

      @Override

      public float getProgress() throws IOException, InterruptedException {

      return 0;

      }

      @Override

      public void initialize(InputSplit input, TaskAttemptContext context)

      throws IOException, InterruptedException {

      FileSplit split=(FileSplit)input;

      Configuration job=context.getConfiguration();

      Path file=split.getPath();

      FileSystem fs=file.getFileSystem(job);

      FSDataInputStream filein=fs.open(file);

      in=new LineReader(filein,job);

      line=new Text();

      lineKey=new Text();

      lineValue = new ScoreWritable();

      }

      //此方法讀取每行數(shù)據(jù),完成自定義的key和value

      @Override

      public boolean nextKeyValue() throws IOException, InterruptedException {

      int linesize=in.readLine(line);//每行數(shù)據(jù)

      if(linesize==0) return false;

      String[] pieces = line.toString().split("\s+");//解析每行數(shù)據(jù)

      if(pieces.length != 7){

      throw new IOException("Invalid record received");

      }

      //將學(xué)生的每門成績轉(zhuǎn)換為 float 類型

      float a,b,c,d,e;

      try{

      a = Float.parseFloat(pieces[2].trim());

      b = Float.parseFloat(pieces[3].trim());

      c = Float.parseFloat(pieces[4].trim());

      d = Float.parseFloat(pieces[5].trim());

      e = Float.parseFloat(pieces[6].trim());

      }catch(NumberFormatException nfe){

      throw new IOException("Error parsing floating poing value in record");

      }

      lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定義key數(shù)據(jù)

      lineValue.set(a, b, c, d, e);//封裝自定義value數(shù)據(jù)

      return true;

      }

      }

      }

      在上述類中,我們只需根據(jù)自己的需求,重點(diǎn)編寫nextKeyValue()方法即可,其它的方法比較固定,仿造著編碼就可以了。

      第三步:編寫 MapReduce 程序,統(tǒng)計(jì)學(xué)生總成績和平均成績。這里 MapReduce 程序仿造前面模板編寫就可以了,很簡單。

      package com.xxx.hadoop.junior;

      import java.io.IOException;

      import org.apache.hadoop.conf.Configuration;

      import org.apache.hadoop.conf.Configured;

      MapReduce多種輸入格式(默認(rèn)的mapreduce輸入格式)

      import org.apache.hadoop.fs.FileSystem;

      import org.apache.hadoop.fs.Path;

      import org.apache.hadoop.io.Text;

      import org.apache.hadoop.mapreduce.Job;

      import org.apache.hadoop.mapreduce.Mapper;

      import org.apache.hadoop.mapreduce.Reducer;

      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

      import org.apache.hadoop.util.Tool;

      import org.apache.hadoop.util.ToolRunner;

      /**

      * 學(xué)生成績統(tǒng)計(jì)Hadoop程序

      * 數(shù)據(jù)格式參考:19020090017 張三 90 99 100 89 95

      * @author HuangBQ

      */

      public class ScoreCount extends Configured implements Tool {

      public static class ScoreMapper extends Mapper< Text, ScoreWritable, Text, ScoreWritable > {

      @Override

      protected void map(Text key, ScoreWritable value, Context context)

      throws IOException, InterruptedException {

      context.write(key, value);

      }

      }

      public static class ScoreReducer extends Reducer< Text, ScoreWritable, Text, Text > {

      private Text text = new Text();

      protected void reduce(Text Key, Iterable< ScoreWritable > Values, Context context)

      throws IOException, InterruptedException {

      float totalScore=0.0f;

      float averageScore = 0.0f;

      for(ScoreWritable ss:Values){

      totalScore +=ss.getChinese()+ss.getMath()+ss.getEnglish()+ss.getPhysics()+ss.getChemistry();

      averageScore +=totalScore/5;

      }

      text.set(totalScore+"\t"+averageScore);

      context.write(Key, text);

      }

      }

      @Override

      public int run(String[] args) throws Exception {

      Configuration conf = new Configuration();//讀取配置文件

      Path mypath = new Path(args[1]);

      FileSystem hdfs = mypath.getFileSystem(conf);//創(chuàng)建輸出路徑

      if (hdfs.isDirectory(mypath)) {

      hdfs.delete(mypath, true);

      }

      Job job = new Job(conf, "ScoreCount");//新建任務(wù)

      job.setJarByClass(ScoreCount.class);//設(shè)置主類

      FileInputFormat.addInputPath(job, new Path(args[0]));// 輸入路徑

      FileOutputFormat.setOutputPath(job, new Path(args[1]));// 輸出路徑

      job.setMapperClass(ScoreMapper.class);// Mapper

      job.setReducerClass(ScoreReducer.class);// Reducer

      job.setMapOutputKeyClass(Text.class);// Mapper key輸出類型

      job.setMapOutputValueClass(ScoreWritable.class);// Mapper value輸出類型

      job.setInputFormatClass(ScoreInputFormat.class);//設(shè)置自定義輸入格式

      job.waitForCompletion(true);

      return 0;

      }

      public static void main(String[] args) throws Exception {

      String[] args0 = {

      "hdfs://single.hadoop.xxx.com:9000/junior/score.txt",

      "hdfs://single.hadoop.xxx.com:9000/junior/score-out/"

      };

      int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);

      System.exit(ec);

      }

      }

      需要注意的是,上面我們自定義的輸入格式ScoreInputFormat,需要在 MapReduce 程序中做如下設(shè)置。

      job.setInputFormatClass(ScoreInputFormat.class);//設(shè)置自定義輸入格式

      一般情況下,并不需要我們自定義輸入格式,Hadoop 自帶有很多種輸入格式,基本滿足我們工作的需要。

      Hadoop MapReduce

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:我的非洲經(jīng)歷回憶錄(我的非洲歷險(xiǎn)記)
      下一篇:Kubernetes安裝部署操作手冊(kubernetes安裝詳解)
      相關(guān)文章
      精品亚洲aⅴ在线观看| 国产91成人精品亚洲精品| 久久精品夜色噜噜亚洲A∨| 亚洲欧美第一成人网站7777 | 日韩成人精品日本亚洲| 亚洲日韩一区精品射精| 国产精品亚洲一区二区麻豆| 亚洲专区一路线二| 亚洲无限乱码一二三四区| 亚洲精品欧洲精品| 亚洲欧洲日本天天堂在线观看| 久久久久久亚洲AV无码专区| 久久亚洲免费视频| 亚洲网站在线观看| 中文字幕亚洲综合精品一区| 久久久久亚洲AV片无码下载蜜桃| 亚洲视频中文字幕| 亚洲综合久久成人69| 亚洲精品91在线| 亚洲五月综合缴情婷婷| 日韩亚洲国产综合高清| 亚洲日韩精品无码AV海量| 久久亚洲精品无码av| 亚洲AV中文无码乱人伦| 亚洲一区视频在线播放| 亚洲综合av永久无码精品一区二区 | 91在线亚洲综合在线| 亚洲人成色4444在线观看| 亚洲AV综合色区无码一二三区| 欧美色欧美亚洲另类二区| 国产精品亚洲综合| 亚洲一区无码精品色| 国产亚洲成AV人片在线观黄桃| 亚洲国产精品嫩草影院在线观看 | 国产 亚洲 中文在线 字幕| 亚洲av无码成人精品国产| 亚洲av午夜成人片精品电影| 中文字幕亚洲激情| 亚洲AV无码乱码在线观看裸奔| 亚洲黄色免费观看| 亚洲AV成人影视在线观看|