數據源程序庫可行性替換研究
890
2025-04-01
1. 參考官方文檔-使用2.x及之前的開發指南(https://support.huaweicloud.com/devg-mrs/mrs_06_0187.html),開發指南(適用于2.x及之前)->Spark應用開發章節->Spark on HBase,將樣例代碼的pom文件hbase.version 配置成
CreateTable類如下:
package com.huawei.bigdata.spark.examples;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import com.huawei.hadoop.security.LoginUtil;
/**
* Get data from table.
*/
public class TableOutputData {
public static void main(String[] args) throws IOException {
Configuration hadoopConf = new Configuration();
if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){
//security mode
final String userPrincipal = "sparkuser";
final String USER_KEYTAB_FILE = "user.keytab";
String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + USER_KEYTAB_FILE;
String ZKServerPrincipal = "ZooKeeper/hadoop.hadoop.com";
String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;
}
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");
// Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";
String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";
String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";
hbConf.set("hbase.zookeeper.quorum",ip);
hbConf.set("hbase.zookeeper.quorum",ip1);
hbConf.set("hbase.zookeeper.quorum",ip2);
// Declare the information of the table to be queried.
Scan scan = new org.apache.hadoop.hbase.client.Scan();
scan.addFamily(Bytes.toBytes("info"));
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = Base64.encodeBytes(proto.toByteArray());
hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
hbConf.set(TableInputFormat.SCAN, scanToString);
// Obtain the data in the table through the Spark interface.
JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
// Traverse every row in the HBase table and print the results.
List
for (int i = 0; i < rddList.size(); i++) {
Tuple2
ImmutableBytesWritable key = t2._1();
Iterator
while (it.hasNext()) {
Cell c = it.next();
String family = Bytes.toString(CellUtil.cloneFamily(c));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
String value = Bytes.toString(CellUtil.cloneValue(c));
Long tm = c.getTimestamp();
System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
}
}
jsc.stop();
}
}
TableInputData類如下:
package com.huawei.bigdata.spark.examples;
import java.io.File;
import java.io.IOException;
import java.util.List;
import scala.Tuple4;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.huawei.hadoop.security.LoginUtil;
/**
* Input data to hbase table.
*/
public class TableInputData {
public static void main(String[] args) throws IOException {
Configuration hadoopConf = new Configuration();
if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){
//security mode
final String userPrincipal = "sparkuser";
final String USER_KEYTAB_FILE = "user.keytab";
String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + USER_KEYTAB_FILE;
String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";
String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;
}
// Create the configuration parameter to connect the HBase.
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";
String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";
String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";
hbConf.set("hbase.zookeeper.quorum",ip);
hbConf.set("hbase.zookeeper.quorum",ip1);
hbConf.set("hbase.zookeeper.quorum",ip2);
// Declare the information of the table.
Table table = null;
String tableName = "shb1";
byte[] familyName = Bytes.toBytes("info");
Connection Connection = null;
try {
// Connect to the HBase.
connection = ConnectionFactory.createConnection(hbConf);
// Obtain the table object.
table = connection.getTable(TableName.valueOf(tableName));
List
new Function
public Tuple4
String[] tokens = s.split(",");
return new Tuple4
}
}).collect();
Integer i = 0;
for (Tuple4
Put put = new Put(Bytes.toBytes("row" + i));
put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));
put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));
put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));
put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));
i += 1;
table.put(put);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (table != null) {
try {
// Close the HTable.
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
// Close the HBase connection.
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
jsc.stop();
}
}
}
TableOutputData類如下:
package com.huawei.bigdata.spark.examples;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import com.huawei.hadoop.security.LoginUtil;
/**
* Get data from table.
*/
public class TableOutputData {
public static void main(String[] args) throws IOException {
Configuration hadoopConf = new Configuration();
if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){
//security mode
final String userPrincipal = "sparkuser";
final String USER_KEYTAB_FILE = "user.keytab";
String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
String krbFile = filePath + "krb5.conf";
String userKeyTableFile = filePath + USER_KEYTAB_FILE;
String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";
String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";
LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);
LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);
LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;
}
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");
// Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.
SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");
JavaSparkContext jsc = new JavaSparkContext(conf);
Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());
String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";
String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";
String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";
hbConf.set("hbase.zookeeper.quorum",ip);
hbConf.set("hbase.zookeeper.quorum",ip1);
hbConf.set("hbase.zookeeper.quorum",ip2);
// Declare the information of the table to be queried.
Scan scan = new org.apache.hadoop.hbase.client.Scan();
scan.addFamily(Bytes.toBytes("info"));
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = Base64.encodeBytes(proto.toByteArray());
hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");
hbConf.set(TableInputFormat.SCAN, scanToString);
// Obtain the data in the table through the Spark interface.
JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
// Traverse every row in the HBase table and print the results.
List
for (int i = 0; i < rddList.size(); i++) {
Tuple2
ImmutableBytesWritable key = t2._1();
Iterator
while (it.hasNext()) {
Cell c = it.next();
String family = Bytes.toString(CellUtil.cloneFamily(c));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));
String value = Bytes.toString(CellUtil.cloneValue(c));
Long tm = c.getTimestamp();
System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);
}
}
jsc.stop();
}
}
2. 集群的各個節點的/etc/hosts文件添加如下內容(據不同CloudTable集群會不同,具體的獲取聯系CloudTable服務的SRE):
3. 找一個1.9.0集群將/opt/client/Spark/spark/jars/下面的這些1.3.1版本的hbase的包
hbase-client-1.3.1-mrs-1.9.0.jar, hbase-common-1.3.1-mrs-1.9.0.jar, hbase-hadoop2-compat-1.3.1-mrs-1.9.0.jar, hbase-protocol-1.3.1-mrs-1.9.0.jar,? hbase-server-1.3.1-mrs-1.9.0.jar, htrace-core-3.1.0-incubating.jar
都換到3.0.1集群的/opt/client/Spark2x/spark/jars/下面,將/opt/client/Spark2x/spark/jars/下面2.3.2版本的hbase包都移掉。
按照如下步驟執行官方樣例代碼即可完成:
MapReduce spark 表格存儲服務 CloudTable
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。