帶你體驗數據湖DLI Flink SQL進行電商實時業務數據分析丨【我的華為云體驗之旅】

      網友投稿 1010 2022-05-30

      今天就來帶你體驗一下數據湖DLI,Flink SQL進行電商實時業務數據分析,如果使用華為云上的幫助文檔,可能你會遇到一些問題,下面就把體驗過程中遇到的一些問題分享給大家,減少走彎路。

      華為云的官方地址:https://support.huaweicloud.com/bestpractice-dli/dli_05_0006.html

      目標任務

      使用DLI Flink完成電商業務實時數據的分析處理,獲取各個渠道的銷售匯總數據。

      資源準備

      我們需要創建:ECS(彈性云服務器),VPC(虛擬私有云)、DMS(分布式消息服務)、RDS(云數據庫),DLI(數據湖探索)、EIP(彈性公網IP),所有資源都要在同一個區域,這樣相互之間才能連通。

      如果不想開通公網IP,提前下載:JDK,Kafka客戶端,通過 ECS 遠程登錄進行上傳并安裝。

      JDK1.8-:https://www.oracle.com/java/technologies/downloads/#java8

      Kafka 1.1.0版本實例的-:https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

      Kafka 2.3.0版本實例的-:https://archive.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz

      ECS遠程登錄

      安裝JDK

      # 兩種安裝方式 tar -zxf jdk-8u144-linux-x64.tgz rpm -ivh jdk-8u144-linux-x64.rpm

      ECS安裝Java JDK或JRE,配置JAVA_HOME與PATH環境變量,使用命令 vi .bash_profile 打開文件,添加如下行:

      export JAVA_HOME=/opt/java/jdk1.8.0_144 export PATH=$JAVA_HOME/bin:$PATH

      執行source .bash_profile命令使修改生效

      # 驗證是否安裝成功 java -version

      數據庫創建表

      帶你體驗數據湖DLI Flink SQL進行電商實時業務數據分析丨【我的華為云體驗之旅】

      先創建一個數據庫,打開SQL查詢窗口,執行下面創建表的腳本。

      CREATE TABLE `trade_channel_collect` ( `begin_time` varchar(32) DEFAULT NULL, `channel_name` varchar(32) DEFAULT NULL, `channel_code` varchar(32) DEFAULT NULL, `cur_gmv` double unsigned DEFAULT NULL, `cur_order_user_count` bigint(20) unsigned DEFAULT NULL COMMENT '付款人數', `cur_order_count` bigint(20) unsigned DEFAULT NULL COMMENT '付款訂單數', `last_pay_time` varchar(32) DEFAULT NULL, `flink_current_time` varchar(32) DEFAULT NULL COMMENT 'Flink數據處理時間' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `trade_order_detail_info` ( `order_id` varchar(32) DEFAULT NULL COMMENT '訂單ID', `order_channel` varchar(32) DEFAULT NULL COMMENT '訂單來源渠道', `order_time` varchar(32) DEFAULT NULL COMMENT '訂單時間', `pay_amount` double unsigned DEFAULT NULL COMMENT '金額', `real_pay` double unsigned DEFAULT NULL COMMENT '支付金額', `pay_time` varchar(32) DEFAULT NULL, `user_id` varchar(32) DEFAULT NULL, `user_name` varchar(32) DEFAULT NULL, `area_id` varchar(32) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

      創建Kafka

      創建好了Kafka,點名稱進入后創建Topic。

      創建Flink作業

      打開數據湖 DLI,作業管理里面創建Flink作業。

      創建作業的時候右側“所屬隊列”要選擇所創建的DLI通用隊列,如果該步驟無法選擇創建的CCE隊列,則可能是因為沒有提前開通CCE隊列使用權限。請在官網提工單申請開通CCE隊列使用權限后,再重新創建DLI CCE隊列,繼續后續步驟。

      --********************************************************************-- -- 數據源:trade_order_detail_info (訂單詳情寬表) --********************************************************************-- create table trade_order_detail ( order_id string, -- 訂單ID order_channel string, -- 渠道 order_time string, -- 訂單創建時間 pay_amount double, -- 訂單金額 real_pay double, -- 實際付費金額 pay_time string, -- 付費時間 user_id string, -- 用戶ID user_name string, -- 用戶名 area_id string -- 地區ID ) with ( "connector.type" = "kafka", "connector.version" = "0.10", "connector.properties.bootstrap.servers" = "192.168.0.69:9092", -- Kafka連接地址 "connector.properties.group.id" = "kafka-test", -- Kafka groupID "connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json", "connector.startup-mode" = "latest-offset" ); --********************************************************************-- -- 結果表:trade_channel_collect (各渠道的銷售總額實時統計) --********************************************************************-- create table trade_channel_collect( begin_time string, --統計數據的開始時間 channel_code string, -- 渠道編號 channel_name string, -- 渠道名 cur_gmv double, -- 當天GMV cur_order_user_count bigint, -- 當天付款人數 cur_order_count bigint, -- 當天付款訂單數 last_pay_time string, -- 最近結算時間 flink_current_time string, primary key (begin_time, channel_code) not enforced ) with ( "connector.type" = "jdbc", "connector.url" = "jdbc:mysql://192.168.0.241:3306/db_csdn_zeke", -- mysql連接地址,jdbc格式 "connector.table" = "trade_channel_collect", -- mysql表名 "connector.driver" = "com.mysql.jdbc.Driver", "connector.username" = "root", -- mysql用戶名 "connector.password" = "Passw0rd", -- mysql密碼 "connector.write.flush.max-rows" = "1000", "connector.write.flush.interval" = "1s" ); --********************************************************************-- -- 臨時中間表 --********************************************************************-- create view tmp_order_detail as select * , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other" else t.order_channel end as channel_code --重新定義統計渠道 只有四個枚舉值[webShop、appShop、miniAppShop、other] , case when t.order_channel = "webShop" then _UTF16"網頁商城" when t.order_channel = "appShop" then _UTF16"app商城" when t.order_channel = "miniAppShop" then _UTF16"小程序商城" else _UTF16"其他" end as channel_name --渠道名稱 from ( select * , row_number() over(partition by order_id order by order_time desc ) as rn --去除重復訂單數據 , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天數據,為了方便運行,這里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string) and real_pay is not null ) t where t.rn = 1; -- 按渠道統計各個指標 insert into trade_channel_collect select begin_time --統計數據的開始時間 , channel_code , channel_name , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --當天GMV , count(distinct user_id) as cur_order_user_count --當天付款人數 , count(1) as cur_order_count --當天付款訂單數 , max(pay_time) as last_pay_time --最近結算時間 , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任務中的當前時間 from tmp_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;

      注意修改地方:Kafka連接配置,數據庫連接配置

      創建隊列

      創建隊列一定要在所有準備工作完成后在創建,它的按需收費計算規則是創建就收費,并且以整點為結算時間,并不是按照使用時長。

      創建跨源連接

      Kafka客戶端發送數據

      使用Kafka客戶端向指定topic發送數據,模擬實時數據流,命令如下:

      # 進入目錄 kafka_2.11-2.3.0/bin # sh ./kafka-console-producer.sh --broker-list kafka連接地址 --topic 指定topic sh ./kafka-console-producer.sh --broker-list 192.168.0.69:9092 --topic trade_order_detail_info

      {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}

      注意:在ECS服務器命令窗口輸入JSON數據,數據不能存在換行。

      Flink作業啟動后會自動執行

      數據庫查看結果

      溫馨提示

      文章內容如果寫的存在問題歡迎留言指出,讓我們共同交流,共同探討,共同進步~~~

      文章如果對你有幫助,動動你的小手點個贊,鼓勵一下,給我前行的動力。

      【我的華為云體驗之旅】有獎征文火熱進行中:https://bbs.huaweicloud.com/blogs/309059

      云數據庫 MySQL 分布式消息服務 DMS 數據湖探索 DLI

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:測試行業工作4年了,從只會“點點點”到了現在的測試開發,年薪30W+,還好當時沒有放棄
      下一篇:智能車競賽創意組別對應的FQA
      相關文章
      亚洲日韩中文字幕| 亚洲AV无码专区国产乱码4SE| 亚洲精品视频观看| 无码久久精品国产亚洲Av影片| 久久久久亚洲AV综合波多野结衣 | 亚洲乱码国产一区三区| 亚洲伊人久久综合影院| 亚洲色欲久久久久综合网| 亚洲裸男gv网站| 国产亚洲精品资在线| 中文字幕亚洲综合久久男男| 亚洲色婷婷综合开心网| 亚洲国模精品一区 | 亚洲午夜久久久精品电影院| 亚洲另类春色校园小说| 中文字幕乱码亚洲无线三区 | 亚洲免费视频播放| 久久亚洲国产最新网站| 亚洲日韩精品无码专区加勒比☆| 亚洲七久久之综合七久久| 亚洲变态另类一区二区三区| 小说区亚洲自拍另类| 亚洲精品国产电影| 亚洲人成网77777亚洲色 | 亚洲∧v久久久无码精品| 亚洲高清美女一区二区三区| 亚洲精品美女久久久久| 亚洲av无码专区在线| 亚洲综合色一区二区三区| 亚洲成a∨人片在无码2023| 亚洲国产精品毛片av不卡在线| 国产亚洲精品影视在线产品| 国产A在亚洲线播放| 久久久久亚洲精品无码蜜桃| 亚洲国产午夜电影在线入口| 亚洲综合国产成人丁香五月激情| 校园亚洲春色另类小说合集 | 亚洲Aⅴ无码一区二区二三区软件| 亚洲精品无码久久久久AV麻豆| 亚洲综合在线另类色区奇米| 无码欧精品亚洲日韩一区|