華為FusionInsight MRS實戰 - Flink增強特性之可視化開發平臺FlinkSever開發學習
華為FusionInsight MRS實戰 - Flink增強特性之可視化開發平臺FlinkSever開發學習

背景說明
隨著流計算的發展,挑戰不再僅限于數據量和計算量,業務變得越來越復雜。如何提高開發者的效率,降低流計算的門檻,對推廣實時計算非常重要。
SQL 是數據處理中使用最廣泛的語言,它允許用戶簡明扼要地展示其業務邏輯。Flink 作為流批一體的計算引擎自1.7.2版本開始引入Flink SQL的特性,并不斷發展。之前,用戶可能需要編寫上百行業務代碼,使用 SQL 后,可能只需要幾行 SQL 就可以輕松搞定。
但是真正的要將Flink SQL開發工作投入到實際的生產場景中,如果使用原生的API接口進行作業的開發還是存在門檻較高,易用性低,SQL代碼可維護性差的問題。新需求由業務人員提交給IT人員,IT人員排期開發。從需求到上線,周期長,導致錯失新業務最佳市場時間窗口。同時,IT人員工作繁重,大量相似Flink作業,成就感低。
華為Flink可視化開發平臺FlinkServer優勢:
提供基于Web的可視化開發平臺,只需要寫SQL即可開發作業,極大降低作業開發門檻。
通過作業平臺能力開放,支持業務人員自行編寫SQL開發作業,快速應對需求,并將IT人員從繁瑣的Flink作業開發工作中解放出來;
同時支持流作業和批作業;
支持常見的Connector,包括Kafka、Redis、HDFS等
下面將以kafka為例分別使用原生API接口以及FlinkServer進行作業開發,對比突出FlinkServer的優勢
場景說明
參考已發論壇帖 《華為FusionInsight MRS FlinkSQL 復雜嵌套Json解析最佳實踐》
需要使用FlinkSQL從一個源kafka topic接收cdl復雜嵌套json數據并進行解析,將解析后的數據發送到另一個kafka topic里
使用原生API接口方案開發flink sql操作步驟
前提條件
完成MRS Flink客戶端的安裝以及配置
完成Flink SQL原生接口相關配置
操作步驟
使用如下命令首先啟動Flink集群
source /opt/hadoopclient/bigdata_env kinit developuser cd /opt/hadoopclient/Flink/flink ./bin/yarn-session.sh -t ssl/
使用如下命令啟動Flink SQL Client
cd /opt/hadoopclient/Flink/flink/bin ./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml
使用如下flink sql創建源端kafka表,并提取需要的信息:
CREATE TABLE huditableout_source( `schema` ROW < `fields` ARRAY< ROW
使用如下flink sql創建目標端kafka表:
CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' );
使用如下flink sql將源端kafka流表寫入到目標端kafka流表中
insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source;
檢查測試結果
消費生產源kafka topic的數據(由cdl生成)
消費目標端kafka topic解析后的數據(flink sql任務生成的結果)
可以登錄flink原生界面查看任務
使用flink sql client方式查看結果
首先使用命令set execution.result-mode=tableau; 可以讓查詢結果直接輸出到終端
使用flink sql查詢上面已創建好的流表
select * from huditableout
注意:因為是kafka流表,所以查詢結果只會顯示select任務啟動之后寫進該topic的數據
使用FlinkServer可視化開發平臺方案開發flink sql操作步驟
前提條件
參考產品文檔 《基于用戶和角色的鑒權》章節創建一個具有“FlinkServer管理操作權限”的用戶,使用該用戶訪問Flink Server
操作步驟
登錄FlinkServer選擇作業管理
創建任務cdl_kafka_json_test3并輸入flink sql
說明: 可以看到開發flink sql任務時在FlinkServer界面可以自行設置flink集群規模
CREATE TABLE huditableout_source( `schema` ROW < `fields` ARRAY< ROW
點擊語義校驗,確保語義校驗通過
點擊提交并啟動任務
檢查測試結果
消費生產源kafka topic的數據(由cdl生成)
消費目標端kafka topic解析后的數據(flink sql任務生成的結果)
EI企業智能 Flink FusionInsight MapReduce SQL
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。