MRS 3.X集群Spark on CloudTable使用指導

      網友投稿 890 2025-04-01

      1. 參考官方文檔-使用2.x及之前的開發指南(https://support.huaweicloud.com/devg-mrs/mrs_06_0187.html),開發指南(適用于2.x及之前)->Spark應用開發章節->Spark on HBase,將樣例代碼的pom文件hbase.version 配置成1.3.1-mrs-1.9.0,樣例大媽中對應的CreateTable、TableInputData、TableOutputData類換成如下內容。(其中hbase.ZooKeeper.quorum值根據不同CloudTable集群會不同,具體的獲取聯系CloudTableSRE)


      CreateTable類如下:

      package com.huawei.bigdata.spark.examples;

      import java.io.File;

      import java.io.IOException;

      import java.util.Iterator;

      import java.util.List;

      import scala.Tuple2;

      import org.apache.hadoop.conf.Configuration;

      import org.apache.hadoop.hbase.Cell;

      import org.apache.hadoop.hbase.CellUtil;

      import org.apache.hadoop.hbase.HBaseConfiguration;

      import org.apache.hadoop.hbase.client.Result;

      import org.apache.hadoop.hbase.client.Scan;

      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

      import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

      import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

      import org.apache.hadoop.hbase.util.Base64;

      import org.apache.hadoop.hbase.util.Bytes;

      import org.apache.spark.SparkConf;

      import org.apache.spark.api.java.JavaPairRDD;

      import org.apache.spark.api.java.JavaSparkContext;

      import com.huawei.hadoop.security.LoginUtil;

      /**

      * Get data from table.

      */

      public class TableOutputData {

      public static void main(String[] args) throws IOException {

      Configuration hadoopConf = new Configuration();

      if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

      //security mode

      final String userPrincipal = "sparkuser";

      final String USER_KEYTAB_FILE = "user.keytab";

      String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

      String krbFile = filePath + "krb5.conf";

      String userKeyTableFile = filePath + USER_KEYTAB_FILE;

      String ZKServerPrincipal = "ZooKeeper/hadoop.hadoop.com";

      String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

      String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

      LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

      }

      System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

      System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

      // Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.

      SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

      JavaSparkContext jsc = new JavaSparkContext(conf);

      Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

      MRS 3.X集群Spark on CloudTable使用指導

      String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

      String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

      String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

      hbConf.set("hbase.zookeeper.quorum",ip);

      hbConf.set("hbase.zookeeper.quorum",ip1);

      hbConf.set("hbase.zookeeper.quorum",ip2);

      // Declare the information of the table to be queried.

      Scan scan = new org.apache.hadoop.hbase.client.Scan();

      scan.addFamily(Bytes.toBytes("info"));

      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

      String scanToString = Base64.encodeBytes(proto.toByteArray());

      hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");

      hbConf.set(TableInputFormat.SCAN, scanToString);

      // Obtain the data in the table through the Spark interface.

      JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

      // Traverse every row in the HBase table and print the results.

      List> rddList = rdd.collect();

      for (int i = 0; i < rddList.size(); i++) {

      Tuple2 t2 = rddList.get(i);

      ImmutableBytesWritable key = t2._1();

      Iterator it = t2._2().listCells().iterator();

      while (it.hasNext()) {

      Cell c = it.next();

      String family = Bytes.toString(CellUtil.cloneFamily(c));

      String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));

      String value = Bytes.toString(CellUtil.cloneValue(c));

      Long tm = c.getTimestamp();

      System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);

      }

      }

      jsc.stop();

      }

      }

      TableInputData類如下:

      package com.huawei.bigdata.spark.examples;

      import java.io.File;

      import java.io.IOException;

      import java.util.List;

      import scala.Tuple4;

      import org.apache.hadoop.conf.Configuration;

      import org.apache.hadoop.hbase.HBaseConfiguration;

      import org.apache.hadoop.hbase.TableName;

      import org.apache.hadoop.hbase.client.*;

      import org.apache.hadoop.hbase.util.Bytes;

      import org.apache.spark.SparkConf;

      import org.apache.spark.api.java.JavaSparkContext;

      import org.apache.spark.api.java.function.Function;

      import com.huawei.hadoop.security.LoginUtil;

      /**

      * Input data to hbase table.

      */

      public class TableInputData {

      public static void main(String[] args) throws IOException {

      Configuration hadoopConf = new Configuration();

      if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

      //security mode

      final String userPrincipal = "sparkuser";

      final String USER_KEYTAB_FILE = "user.keytab";

      String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

      String krbFile = filePath + "krb5.conf";

      String userKeyTableFile = filePath + USER_KEYTAB_FILE;

      String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

      String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

      String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

      LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

      }

      // Create the configuration parameter to connect the HBase.

      SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

      JavaSparkContext jsc = new JavaSparkContext(conf);

      Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

      String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

      String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

      String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

      hbConf.set("hbase.zookeeper.quorum",ip);

      hbConf.set("hbase.zookeeper.quorum",ip1);

      hbConf.set("hbase.zookeeper.quorum",ip2);

      // Declare the information of the table.

      Table table = null;

      String tableName = "shb1";

      byte[] familyName = Bytes.toBytes("info");

      Connection Connection = null;

      try {

      // Connect to the HBase.

      connection = ConnectionFactory.createConnection(hbConf);

      // Obtain the table object.

      table = connection.getTable(TableName.valueOf(tableName));

      List> data = jsc.textFile(args[0]).map(

      new Function>() {

      public Tuple4 call(String s) throws Exception {

      String[] tokens = s.split(",");

      return new Tuple4(tokens[0], tokens[1], tokens[2], tokens[3]);

      }

      }).collect();

      Integer i = 0;

      for (Tuple4 line : data) {

      Put put = new Put(Bytes.toBytes("row" + i));

      put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));

      put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));

      put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));

      put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));

      i += 1;

      table.put(put);

      }

      } catch (IOException e) {

      e.printStackTrace();

      } finally {

      if (table != null) {

      try {

      // Close the HTable.

      table.close();

      } catch (IOException e) {

      e.printStackTrace();

      }

      }

      if (connection != null) {

      try {

      // Close the HBase connection.

      connection.close();

      } catch (IOException e) {

      e.printStackTrace();

      }

      }

      jsc.stop();

      }

      }

      }

      TableOutputData類如下:

      package com.huawei.bigdata.spark.examples;

      import java.io.File;

      import java.io.IOException;

      import java.util.Iterator;

      import java.util.List;

      import scala.Tuple2;

      import org.apache.hadoop.conf.Configuration;

      import org.apache.hadoop.hbase.Cell;

      import org.apache.hadoop.hbase.CellUtil;

      import org.apache.hadoop.hbase.HBaseConfiguration;

      import org.apache.hadoop.hbase.client.Result;

      import org.apache.hadoop.hbase.client.Scan;

      import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

      import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

      import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

      import org.apache.hadoop.hbase.util.Base64;

      import org.apache.hadoop.hbase.util.Bytes;

      import org.apache.spark.SparkConf;

      import org.apache.spark.api.java.JavaPairRDD;

      import org.apache.spark.api.java.JavaSparkContext;

      import com.huawei.hadoop.security.LoginUtil;

      /**

      * Get data from table.

      */

      public class TableOutputData {

      public static void main(String[] args) throws IOException {

      Configuration hadoopConf = new Configuration();

      if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

      //security mode

      final String userPrincipal = "sparkuser";

      final String USER_KEYTAB_FILE = "user.keytab";

      String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

      String krbFile = filePath + "krb5.conf";

      String userKeyTableFile = filePath + USER_KEYTAB_FILE;

      String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

      String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

      String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

      LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

      }

      System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

      System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

      // Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.

      SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

      JavaSparkContext jsc = new JavaSparkContext(conf);

      Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

      String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

      String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

      String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

      hbConf.set("hbase.zookeeper.quorum",ip);

      hbConf.set("hbase.zookeeper.quorum",ip1);

      hbConf.set("hbase.zookeeper.quorum",ip2);

      // Declare the information of the table to be queried.

      Scan scan = new org.apache.hadoop.hbase.client.Scan();

      scan.addFamily(Bytes.toBytes("info"));

      org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

      String scanToString = Base64.encodeBytes(proto.toByteArray());

      hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");

      hbConf.set(TableInputFormat.SCAN, scanToString);

      // Obtain the data in the table through the Spark interface.

      JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

      // Traverse every row in the HBase table and print the results.

      List> rddList = rdd.collect();

      for (int i = 0; i < rddList.size(); i++) {

      Tuple2 t2 = rddList.get(i);

      ImmutableBytesWritable key = t2._1();

      Iterator it = t2._2().listCells().iterator();

      while (it.hasNext()) {

      Cell c = it.next();

      String family = Bytes.toString(CellUtil.cloneFamily(c));

      String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));

      String value = Bytes.toString(CellUtil.cloneValue(c));

      Long tm = c.getTimestamp();

      System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);

      }

      }

      jsc.stop();

      }

      }

      2. 集群的各個節點的/etc/hosts文件添加如下內容(據不同CloudTable集群會不同,具體的獲取聯系CloudTable服務的SRE):

      3. 找一個1.9.0集群將/opt/client/Spark/spark/jars/下面的這些1.3.1版本的hbase的包

      hbase-client-1.3.1-mrs-1.9.0.jar, hbase-common-1.3.1-mrs-1.9.0.jar, hbase-hadoop2-compat-1.3.1-mrs-1.9.0.jar, hbase-protocol-1.3.1-mrs-1.9.0.jar,? hbase-server-1.3.1-mrs-1.9.0.jar, htrace-core-3.1.0-incubating.jar

      都換到3.0.1集群的/opt/client/Spark2x/spark/jars/下面,將/opt/client/Spark2x/spark/jars/下面2.3.2版本的hbase包都移掉。

      按照如下步驟執行官方樣例代碼即可完成:

      MapReduce spark 表格存儲服務 CloudTable

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:如何在Excel中比較多個單元格是否相等?
      下一篇:二次函數表格(二次函數表格信息題)
      相關文章
      亚洲精品一区二区三区四区乱码 | 亚洲精品视频免费在线观看| 亚洲美女在线国产| 久久精品亚洲一区二区| 亚洲欧美日韩自偷自拍| 久久亚洲精精品中文字幕| 亚洲阿v天堂在线2017免费| 亚洲免费福利在线视频| 婷婷亚洲综合五月天小说| 香蕉视频亚洲一级| 亚洲精品一二三区| 亚洲天天在线日亚洲洲精| 亚洲日韩av无码| 亚洲欧美乱色情图片| 国产国拍亚洲精品福利| 国产成人精品日本亚洲专| 亚洲精品tv久久久久| 亚洲美女一区二区三区| 亚洲Av无码专区国产乱码DVD| 精品亚洲av无码一区二区柚蜜| 亚洲日本国产综合高清| 曰韩亚洲av人人夜夜澡人人爽| 久久精品亚洲日本波多野结衣| 亚洲人成电影青青在线播放| 久久青青草原亚洲av无码app| 亚洲一级特黄大片无码毛片| 亚洲欧美一区二区三区日产| 亚洲韩国—中文字幕| 亚洲视频一区在线播放| 亚洲一线产区二线产区精华| 亚洲日韩国产二区无码| 日韩亚洲人成在线综合| 国产亚洲?V无码?V男人的天堂| 亚洲国产精品福利片在线观看 | 亚洲人成片在线观看| 亚洲色成人WWW永久在线观看| 亚洲av永久无码一区二区三区| 亚洲AV无码一区二三区| 亚洲精品乱码久久久久久按摩| 亚洲情a成黄在线观看动漫尤物| 91亚洲精品麻豆|