MRS Flink使用SQL-Client對接Hive
一、準備環境

1.根據產品文檔安裝Flink客戶端;
2.將sql-client-defaults.yaml放入/opt/client/Flink/flink/conf中
3.將jaas.conf放入/opt/client/Flink/flink/conf中
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=false
useTicketCache=true
debug=false;
};
4.添加sql-client.sh中添加在JVM_ARGS參數:
JVM_ARGS="-Djava.security.auth.login.config=/opt/client/Flink/flink/conf/jaas.conf $JVM_ARGS"
二、啟動Flink集群
例如:yarn-session.sh -t ssl -d
三、啟動SQL-Client
./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml
四、運行SQL
CREATE TABLE kafkaSourceTable (
order_id VARCHAR,
shop_id VARCHAR,
member_id VARCHAR,
trade_amt DOUBLE
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'order_sql',
'connector.properties.bootstrap.servers' = '10.162.147.217:21005',
'connector.properties.zookeeper.connect' = '10.162.147.217:24002',
'connector.properties.group.id' = 'test-consumer-group',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
);
CREATE TABLE kafkaSinkTable(shop_id VARCHAR, member_id VARCHAR) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'order_sql',
'connector.properties.bootstrap.servers' = '10.162.147.217:21005',
'connector.properties.zookeeper.connect' = '10.162.147.217:24002',
'update-mode' = 'append',
'format.type' = 'json'
);
INSERT INTO
kafkaSinkTable
SELECT
shop_id,
member_id
FROM
kafkaSourceTable;
SELECT
shop_id,
member_id
FROM
kafkaSourceTable;
五、對接Hive
1)修改sql-client-defaults.yaml
catalogs:
- name: myhive
type: hive
hive-conf-dir: /opt/clienrc5/Hive/config
hive-version: 3.1.0
2)在/opt/clienrc5/Hive/config/hive-site.xml添加配置
3)啟動sql-client
use catalog myhive;
SET table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS hive_dialect_tbl (
`id` int ,
`name` string ,
`age` int
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
SET table.sql-dialect=default;
CREATE TABLE datagen (
`id` int ,
`name` string ,
`age` int
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
INSERT INTO hive_dialect_tbl SELECT * FROM datagen;
select * from hive_dialect_tbl;
EI企業智能 Flink FusionInsight
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。