寵物集市-寵物集市華東、華南、華北排行榜一覽表
1010
2022-05-30
今天就來帶你體驗一下數據湖DLI,Flink SQL進行電商實時業務數據分析,如果使用華為云上的幫助文檔,可能你會遇到一些問題,下面就把體驗過程中遇到的一些問題分享給大家,減少走彎路。
華為云的官方地址:https://support.huaweicloud.com/bestpractice-dli/dli_05_0006.html
目標任務
使用DLI Flink完成電商業務實時數據的分析處理,獲取各個渠道的銷售匯總數據。
資源準備
我們需要創建:ECS(彈性云服務器),VPC(虛擬私有云)、DMS(分布式消息服務)、RDS(云數據庫),DLI(數據湖探索)、EIP(彈性公網IP),所有資源都要在同一個區域,這樣相互之間才能連通。
如果不想開通公網IP,提前下載:JDK,Kafka客戶端,通過 ECS 遠程登錄進行上傳并安裝。
JDK1.8-:https://www.oracle.com/java/technologies/downloads/#java8
Kafka 1.1.0版本實例的-:https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
Kafka 2.3.0版本實例的-:https://archive.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz
ECS遠程登錄
安裝JDK
# 兩種安裝方式 tar -zxf jdk-8u144-linux-x64.tgz rpm -ivh jdk-8u144-linux-x64.rpm
ECS安裝Java JDK或JRE,配置JAVA_HOME與PATH環境變量,使用命令 vi .bash_profile 打開文件,添加如下行:
export JAVA_HOME=/opt/java/jdk1.8.0_144 export PATH=$JAVA_HOME/bin:$PATH
執行source .bash_profile命令使修改生效
# 驗證是否安裝成功 java -version
數據庫創建表
先創建一個數據庫,打開SQL查詢窗口,執行下面創建表的腳本。
CREATE TABLE `trade_channel_collect` ( `begin_time` varchar(32) DEFAULT NULL, `channel_name` varchar(32) DEFAULT NULL, `channel_code` varchar(32) DEFAULT NULL, `cur_gmv` double unsigned DEFAULT NULL, `cur_order_user_count` bigint(20) unsigned DEFAULT NULL COMMENT '付款人數', `cur_order_count` bigint(20) unsigned DEFAULT NULL COMMENT '付款訂單數', `last_pay_time` varchar(32) DEFAULT NULL, `flink_current_time` varchar(32) DEFAULT NULL COMMENT 'Flink數據處理時間' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `trade_order_detail_info` ( `order_id` varchar(32) DEFAULT NULL COMMENT '訂單ID', `order_channel` varchar(32) DEFAULT NULL COMMENT '訂單來源渠道', `order_time` varchar(32) DEFAULT NULL COMMENT '訂單時間', `pay_amount` double unsigned DEFAULT NULL COMMENT '金額', `real_pay` double unsigned DEFAULT NULL COMMENT '支付金額', `pay_time` varchar(32) DEFAULT NULL, `user_id` varchar(32) DEFAULT NULL, `user_name` varchar(32) DEFAULT NULL, `area_id` varchar(32) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
創建Kafka
創建好了Kafka,點名稱進入后創建Topic。
創建Flink作業
打開數據湖 DLI,作業管理里面創建Flink作業。
創建作業的時候右側“所屬隊列”要選擇所創建的DLI通用隊列,如果該步驟無法選擇創建的CCE隊列,則可能是因為沒有提前開通CCE隊列使用權限。請在官網提工單申請開通CCE隊列使用權限后,再重新創建DLI CCE隊列,繼續后續步驟。
--********************************************************************-- -- 數據源:trade_order_detail_info (訂單詳情寬表) --********************************************************************-- create table trade_order_detail ( order_id string, -- 訂單ID order_channel string, -- 渠道 order_time string, -- 訂單創建時間 pay_amount double, -- 訂單金額 real_pay double, -- 實際付費金額 pay_time string, -- 付費時間 user_id string, -- 用戶ID user_name string, -- 用戶名 area_id string -- 地區ID ) with ( "connector.type" = "kafka", "connector.version" = "0.10", "connector.properties.bootstrap.servers" = "192.168.0.69:9092", -- Kafka連接地址 "connector.properties.group.id" = "kafka-test", -- Kafka groupID "connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json", "connector.startup-mode" = "latest-offset" ); --********************************************************************-- -- 結果表:trade_channel_collect (各渠道的銷售總額實時統計) --********************************************************************-- create table trade_channel_collect( begin_time string, --統計數據的開始時間 channel_code string, -- 渠道編號 channel_name string, -- 渠道名 cur_gmv double, -- 當天GMV cur_order_user_count bigint, -- 當天付款人數 cur_order_count bigint, -- 當天付款訂單數 last_pay_time string, -- 最近結算時間 flink_current_time string, primary key (begin_time, channel_code) not enforced ) with ( "connector.type" = "jdbc", "connector.url" = "jdbc:mysql://192.168.0.241:3306/db_csdn_zeke", -- mysql連接地址,jdbc格式 "connector.table" = "trade_channel_collect", -- mysql表名 "connector.driver" = "com.mysql.jdbc.Driver", "connector.username" = "root", -- mysql用戶名 "connector.password" = "Passw0rd", -- mysql密碼 "connector.write.flush.max-rows" = "1000", "connector.write.flush.interval" = "1s" ); --********************************************************************-- -- 臨時中間表 --********************************************************************-- create view tmp_order_detail as select * , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other" else t.order_channel end as channel_code --重新定義統計渠道 只有四個枚舉值[webShop、appShop、miniAppShop、other] , case when t.order_channel = "webShop" then _UTF16"網頁商城" when t.order_channel = "appShop" then _UTF16"app商城" when t.order_channel = "miniAppShop" then _UTF16"小程序商城" else _UTF16"其他" end as channel_name --渠道名稱 from ( select * , row_number() over(partition by order_id order by order_time desc ) as rn --去除重復訂單數據 , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天數據,為了方便運行,這里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string) and real_pay is not null ) t where t.rn = 1; -- 按渠道統計各個指標 insert into trade_channel_collect select begin_time --統計數據的開始時間 , channel_code , channel_name , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --當天GMV , count(distinct user_id) as cur_order_user_count --當天付款人數 , count(1) as cur_order_count --當天付款訂單數 , max(pay_time) as last_pay_time --最近結算時間 , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任務中的當前時間 from tmp_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;
注意修改地方:Kafka連接配置,數據庫連接配置
創建隊列
創建隊列一定要在所有準備工作完成后在創建,它的按需收費計算規則是創建就收費,并且以整點為結算時間,并不是按照使用時長。
創建跨源連接
Kafka客戶端發送數據
使用Kafka客戶端向指定topic發送數據,模擬實時數據流,命令如下:
# 進入目錄 kafka_2.11-2.3.0/bin # sh ./kafka-console-producer.sh --broker-list kafka連接地址 --topic 指定topic sh ./kafka-console-producer.sh --broker-list 192.168.0.69:9092 --topic trade_order_detail_info
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
注意:在ECS服務器命令窗口輸入JSON數據,數據不能存在換行。
Flink作業啟動后會自動執行
數據庫查看結果
溫馨提示
文章內容如果寫的存在問題歡迎留言指出,讓我們共同交流,共同探討,共同進步~~~
文章如果對你有幫助,動動你的小手點個贊,鼓勵一下,給我前行的動力。
【我的華為云體驗之旅】有獎征文火熱進行中:https://bbs.huaweicloud.com/blogs/309059
云數據庫 MySQL 分布式消息服務 DMS 數據湖探索 DLI
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。