華為FusionInsight MRS實戰 - Flink增強特性之可視化開發平臺FlinkSever開發學習

      網友投稿 1145 2025-03-31

      華為FusionInsight MRS實戰 - Flink增強特性之可視化開發平臺FlinkSever開發學習


      背景說明

      隨著流計算的發展,挑戰不再僅限于數據量和計算量,業務變得越來越復雜。如何提高開發者的效率,降低流計算的門檻,對推廣實時計算非常重要。

      SQL 是數據處理中使用最廣泛的語言,它允許用戶簡明扼要地展示其業務邏輯。Flink 作為流批一體的計算引擎自1.7.2版本開始引入Flink SQL的特性,并不斷發展。之前,用戶可能需要編寫上百行業務代碼,使用 SQL 后,可能只需要幾行 SQL 就可以輕松搞定。

      但是真正的要將Flink SQL開發工作投入到實際的生產場景中,如果使用原生的API接口進行作業的開發還是存在門檻較高,易用性低,SQL代碼可維護性差的問題。新需求由業務人員提交給IT人員,IT人員排期開發。從需求到上線,周期長,導致錯失新業務最佳市場時間窗口。同時,IT人員工作繁重,大量相似Flink作業,成就感低。

      華為Flink可視化開發平臺FlinkServer優勢:

      提供基于Web的可視化開發平臺,只需要寫SQL即可開發作業,極大降低作業開發門檻。

      通過作業平臺能力開放,支持業務人員自行編寫SQL開發作業,快速應對需求,并將IT人員從繁瑣的Flink作業開發工作中解放出來;

      同時支持流作業和批作業;

      支持常見的Connector,包括Kafka、Redis、HDFS等

      下面將以kafka為例分別使用原生API接口以及FlinkServer進行作業開發,對比突出FlinkServer的優勢

      場景說明

      參考已發論壇帖 《華為FusionInsight MRS FlinkSQL 復雜嵌套Json解析最佳實踐》

      需要使用FlinkSQL從一個源kafka topic接收cdl復雜嵌套json數據并進行解析,將解析后的數據發送到另一個kafka topic里

      使用原生API接口方案開發flink sql操作步驟

      前提條件

      完成MRS Flink客戶端的安裝以及配置

      完成Flink SQL原生接口相關配置

      華為FusionInsight MRS實戰 - Flink增強特性之可視化開發平臺FlinkSever開發學習

      操作步驟

      使用如下命令首先啟動Flink集群

      source /opt/hadoopclient/bigdata_env kinit developuser cd /opt/hadoopclient/Flink/flink ./bin/yarn-session.sh -t ssl/

      使用如下命令啟動Flink SQL Client

      cd /opt/hadoopclient/Flink/flink/bin ./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml

      使用如下flink sql創建源端kafka表,并提取需要的信息:

      CREATE TABLE huditableout_source( `schema` ROW < `fields` ARRAY< ROW> >, payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' );

      使用如下flink sql創建目標端kafka表:

      CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' );

      使用如下flink sql將源端kafka流表寫入到目標端kafka流表中

      insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source;

      檢查測試結果

      消費生產源kafka topic的數據(由cdl生成)

      消費目標端kafka topic解析后的數據(flink sql任務生成的結果)

      可以登錄flink原生界面查看任務

      使用flink sql client方式查看結果

      首先使用命令set execution.result-mode=tableau; 可以讓查詢結果直接輸出到終端

      使用flink sql查詢上面已創建好的流表

      select * from huditableout

      注意:因為是kafka流表,所以查詢結果只會顯示select任務啟動之后寫進該topic的數據

      使用FlinkServer可視化開發平臺方案開發flink sql操作步驟

      前提條件

      參考產品文檔 《基于用戶和角色的鑒權》章節創建一個具有“FlinkServer管理操作權限”的用戶,使用該用戶訪問Flink Server

      操作步驟

      登錄FlinkServer選擇作業管理

      創建任務cdl_kafka_json_test3并輸入flink sql

      說明: 可以看到開發flink sql任務時在FlinkServer界面可以自行設置flink集群規模

      CREATE TABLE huditableout_source( `schema` ROW < `fields` ARRAY< ROW> >, payload ROW < `TIMESTAMP` BIGINT, `data` ROW < uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT> >, type1 as `schema`.`fields`[1].type, optional1 as `schema`.`fields`[1].optional, field1 as `schema`.`fields`[1].field, type2 as `schema`.`fields`[2].type, optional2 as `schema`.`fields`[2].optional, field2 as `schema`.`fields`[2].field, ts as payload.`TIMESTAMP`, uid as payload.`data`.uid, uname as payload.`data`.uname, age as payload.`data`.age, sex as payload.`data`.sex, mostlike as payload.`data`.mostlike, lastview as payload.`data`.lastview, totalcost as payload.`data`.totalcost, localts as LOCALTIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); CREATE TABLE huditableout( type1 VARCHAR(32), optional1 BOOLEAN, field1 VARCHAR(32), type2 VARCHAR(32), optional2 BOOLEAN, field2 VARCHAR(32), ts BIGINT, uid INT, uname VARCHAR(32), age INT, sex VARCHAR(30), mostlike VARCHAR(30), lastview VARCHAR(30), totalcost INT, localts TIMESTAMP ) WITH( 'connector' = 'kafka', 'topic' = 'huditableout2', 'properties.bootstrap.servers' = '172.16.9.113:21007,172.16.9.117:21007,172.16.9.118:21007', 'properties.group.id' = 'example', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.hadoop.com' ); insert into huditableout select type1, optional1, field1, type2, optional2, field2, ts, uid, uname, age, sex, mostlike, lastview, totalcost, localts from huditableout_source;

      點擊語義校驗,確保語義校驗通過

      點擊提交并啟動任務

      檢查測試結果

      消費生產源kafka topic的數據(由cdl生成)

      消費目標端kafka topic解析后的數據(flink sql任務生成的結果)

      EI企業智能 Flink FusionInsight MapReduce SQL

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:定制家居門店管理
      下一篇:電子表格中的不包含用什么符號(電子表格不等于符號)
      相關文章
      久久影视国产亚洲| 国产精品亚洲а∨无码播放不卡| 亚洲av综合av一区二区三区| 亚洲国产日韩综合久久精品| 亚洲一级视频在线观看| 亚洲六月丁香六月婷婷蜜芽| 亚洲中文无码线在线观看| 亚洲最大在线观看| 亚洲精品国产福利在线观看| 久久久久久亚洲Av无码精品专口 | 综合自拍亚洲综合图不卡区| 亚洲成a人片在线观看中文动漫 | 亚洲乱码在线卡一卡二卡新区| 亚洲成aⅴ人片在线影院八| 亚洲天堂一区二区三区| 亚洲熟妇色自偷自拍另类| 亚洲成人免费网址| 亚洲精品中文字幕无乱码麻豆| 亚洲国产日韩在线成人蜜芽| 亚洲a∨无码男人的天堂| 亚洲情A成黄在线观看动漫软件| 伊人久久亚洲综合影院首页| 亚洲色大网站WWW永久网站| 亚洲国产午夜精品理论片在线播放| 亚洲AV香蕉一区区二区三区| 噜噜噜亚洲色成人网站| 久久影视国产亚洲| 久久精品亚洲一区二区 | 亚洲视频日韩视频| 亚洲H在线播放在线观看H| 亚洲一线产区二线产区区| 亚洲AV电影天堂男人的天堂| 亚洲av无码天堂一区二区三区| 亚洲精品第一国产综合境外资源| 中文字幕亚洲图片| 亚洲成av人影院| 亚洲毛片免费视频| 一本色道久久88—综合亚洲精品| 亚洲成AV人片在WWW| 亚洲天堂在线视频| 久久青青成人亚洲精品|