使用DLI Flink SQL進行電商實時業務數據分析
業務場景介紹
場景描述
當前線上購物無疑是最火熱的購物方式,而電商平臺則又可以以多種方式接入,例如通過web方式訪問、通過app的方式訪問、通過微信小程序的方式訪問等等。而電商平臺則需要每天統計各平臺的實時訪問數據量、訂單數、訪問人數等等指標,從而能在顯示大屏上實時展示相關數據,方便及時了解數據變化,有針對性地調整營銷策略。而如何高效快捷地統計這些指標呢?
假設平臺已經將每個商品的訂單信息實時寫入Kafka中,這些信息包括訂單ID、訂單生成的渠道(即web方式、app方式等)、訂單時間、訂單金額、折扣后實際支付金額、支付時間、用戶ID、用戶姓名、訂單地區ID等信息。而我們需要做的,就是根據當前可以獲取到的業務數據,實時統計每種渠道的相關指標,輸出存儲到數據庫中,并進行大屏展示。
場景方案
場景任務
使用DLI Flink完成電商業務實時數據的分析處理,獲取各個渠道的銷售匯總數據。
數據說明
- 數據源表:電商業務訂單詳情寬表
字段名
字段類型
說明
order_id
string
訂單ID
order_channel
string
訂單生成的渠道(即web方式、app方式等)
order_time
string
訂單時間
pay_amount
double
訂單金額
real_pay
double
實際支付金額
pay_time
string
支付時間
user_id
string
用戶ID
user_name
string
用戶姓名
area_id
string
訂單地區ID
- 結果表:各渠道的銷售總額實時統計表。
字段名
字段類型
說明
begin_time
varchar(32)
開始統計指標的時間
channel_code
varchar(32)
渠道編號
channel_name
varchar(32)
渠道名
cur_gmv
double
當天GMV
cur_order_user_count
bigint
當天付款人數
cur_order_count
bigint
當天付款訂單數
last_pay_time
varchar(32)
最近結算時間
flink_current_time
varchar(32)
Flink數據處理時間
流程介紹
使用DLI Flink進行電商實時業務數據分析的操作過程主要包括7個步驟:
步驟1:注冊賬號。使用DLI對數據進行分析之前,需要注冊華為云賬號并進行實名認證。
步驟2:創建資源。在您的賬戶下創建作業需要的相關資源,涉及VPC、DMS、DLI、RDS。
步驟3:創建DMS topic并獲取連接地址。
步驟4:創建RDS數據庫表。
步驟5:創建增強型跨源打通網絡。
步驟6:創建并提交Flink作業。
步驟7:查詢結果,使用DLV進行大屏展示。
步驟1:注冊賬號
注冊華為云賬號并進行實名認證。
注冊賬號具體步驟可參考賬號注冊可參考賬號注冊。
實名認證具體步驟可參考實名認證。
如果您已完成華為云賬號注冊和實名認證,可跳過該步驟。
步驟2:創建資源
創建VPC,具體步驟可參考:創建VPC和子網。
創建DMS Kafka實例,具體步驟可參考:DMS Kafka入門指引。
創建RDS MySQL實例,具體步驟可參考:RDS MySQL快速入門。
創建DLI CCE隊列,具體步驟可參考:DLI 創建隊列。
創建DLV大屏,具體步驟可參考:DLV 創建大屏。
創建資源時注意以下幾點:
Kafka與MySQL實例創建時需指定VPC,該VPC需提前創建好,且網段不與后續創建的DLI隊列網段沖突;
DLI Flink Opensource語法目前僅支持容器化隊列(目前仍在封閉測試階段),因此創建隊列前需在官網提工單申請開通CCE隊列使用權限后,再創建DLI隊列。
請創建DLI隊列時請創建【包年包月】或者【按需-專屬資源】模式的通用隊列。
步驟3:創建DMS topic并獲取連接地址
點擊【服務列表】,搜索【DMS】,找到【分布式消息服務DMS】,點擊進入DMS頁面。在【Kafka專享版】頁面找到您所創建的Kafka實例。
進入實例詳情頁面。點擊【基本信息】,獲取【連接地址】。
點擊【Topic管理】,創建一個topic:trade_order_detail_info。
topic配置如下:
分區數:1
副本數:1
老化時間:72h
同步落盤:否
步驟4:創建RDS數據庫表
點擊【服務列表】,搜索【RDS】,找到【云數據庫RDS】,點擊進入RDS頁面。在【實例管理頁面】,會看到您已經創建的RDS實例,獲取其內網地址。
點擊所創建RDS實例的登錄按鈕,會跳轉到數據管理服務-DAS。輸入相關賬戶信息,點擊【測試連接】。顯示連接成功后,點擊【登錄】。
登錄RDS實例后,點擊【新建數據庫】按鈕,創建名稱為【dli-demo】的數據庫。
點擊【SQL操作】- 【SQL查詢】,執行如下SQL創建測試用MySQL表,表相關字段含義在【業務場景介紹】-【數據說明】部分有詳細介紹。
DROP TABLE `dli-demo`.`trade_channel_collect`; CREATE TABLE `dli-demo`.`trade_channel_collect` ( `begin_time` VARCHAR(32) NOT NULL, `channel_code` VARCHAR(32) NOT NULL, `channel_name` VARCHAR(32) NULL, `cur_gmv` DOUBLE UNSIGNED NULL, `cur_order_user_count` BIGINT UNSIGNED NULL, `cur_order_count` BIGINT UNSIGNED NULL, `last_pay_time` VARCHAR(32) NULL, `flink_current_time` VARCHAR(32) NULL, PRIMARY KEY (`begin_time`, `channel_code`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '各渠道的銷售總額實時統計';
步驟5:創建增強型跨源打通網絡
點擊【服務列表】,搜索【DLI】,找到【數據湖探索】,點擊進入DLI服務頁面。點擊【隊列管理】,會看到隊列列表里您所創建的通用隊列。
點擊【全局配置】-【服務授權】,選中【VPC Administrator】,點擊更新委托權限。此舉目的是賦予DLI操作用戶VPC資源的權限,用于創建VPC【對等連接】。
點擊【跨源連接】-【增強型跨源】-【創建】。配置如下:【綁定隊列】選擇您所創建的通用隊列,【虛擬私有云】和【子網】選擇 Kafka 與 MySQL 實例所在的 VPC 與子網。 點擊確定。
創建完成后,在跨源列表中,對應的跨源連接狀態會顯示為【已激活】。點擊跨源連接名稱,詳情頁面顯示連接狀態為【ACTIVE】。
測試隊列與RDS、DMS實例連通性。點擊【隊列管理】,選擇您所使用的隊列,點擊【更多】-【測試地址連通性】。輸入前序步驟3-2獲取的DMS Kafka實例連接地址和步驟4-2獲取的RDS MySQL實例內網地址,進行網絡連通性測試。測試結果顯示可達,則DLI隊列與Kafka、MySQL實例的網絡已經聯通。
如果測試結果不可達,需要修改實例所在VPC的安全組規則,放開9092(Kafka連接端口)、3306(MySQL連接端口)對DLI隊列的限制,DLI隊列網段信息可以在隊列【下拉詳情頁】獲取。
步驟6:創建并提交Flink作業
點擊DLI控制臺左側欄【作業管理】,選擇【Flink作業】。點擊“創建作業”,選擇作業類型為:Flink OpenSource SQL,名稱自定義。
點擊“確定”后進入編輯作業頁面,具體SQL示例如下所示,部分參數值需要根據RDS和DMS對應的信息進行修改。
--********************************************************************-- -- 數據源:trade_order_detail_info (訂單詳情寬表) --********************************************************************-- create table trade_order_detail ( order_id string, -- 訂單ID order_channel string, -- 渠道 order_time string, -- 訂單創建時間 pay_amount double, -- 訂單金額 real_pay double, -- 實際付費金額 pay_time string, -- 付費時間 user_id string, -- 用戶ID user_name string, -- 用戶名 area_id string -- 地區ID ) with ( "connector.type" = "kafka", "connector.version" = "0.10", "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka連接地址 "connector.properties.group.id" = "trade_order", -- Kafka groupID "connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json", "connector.startup-mode" = "latest-offset" ); --********************************************************************-- -- 結果表:trade_channel_collect (各渠道的銷售總額實時統計) --********************************************************************-- create table trade_channel_collect( begin_time string, --統計數據的開始時間 channel_code string, -- 渠道編號 channel_name string, -- 渠道名 cur_gmv double, -- 當天GMV cur_order_user_count bigint, -- 當天付款人數 cur_order_count bigint, -- 當天付款訂單數 last_pay_time string, -- 最近結算時間 flink_current_time string, primary key (begin_time, channel_code) not enforced ) with ( "connector.type" = "jdbc", "connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- mysql連接地址,jdbc格式 "connector.table" = "xxxx", -- mysql表名 "connector.driver" = "com.mysql.jdbc.Driver", "connector.username" = "xxx", -- mysql用戶名 "connector.password" = "xxxx", -- mysql密碼 "connector.write.flush.max-rows" = "1000", "connector.write.flush.interval" = "1s" ); --********************************************************************-- -- 臨時中間表 --********************************************************************-- create view tmp_order_detail as select * , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other" else t.order_channel end as channel_code --重新定義統計渠道 只有四個枚舉值[webShop、appShop、miniAppShop、other] , case when t.order_channel = "webShop" then _UTF16"網頁商城" when t.order_channel = "appShop" then _UTF16"app商城" when t.order_channel = "miniAppShop" then _UTF16"小程序商城" else _UTF16"其他" end as channel_name --渠道名稱 from ( select * , row_number() over(partition by order_id order by order_time desc ) as rn --去除重復訂單數據 , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天數據,為了方便運行,這里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string) and real_pay is not null ) t where t.rn = 1; -- 按渠道統計各個指標 insert into trade_channel_collect select begin_time --統計數據的開始時間 , channel_code , channel_name , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --當天GMV , count(distinct user_id) as cur_order_user_count --當天付款人數 , count(1) as cur_order_count --當天付款訂單數 , max(pay_time) as last_pay_time --最近結算時間 , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任務中的當前時間 from tmp_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;
作業邏輯說明:
首先,我們先定義一個Kafka源表,用來從Kafka指定topic中讀取消費數據;再定義一個結果表,用來通過JDBC向MySQL中寫入結果數據。
創建源表和結果表以后,需要實現相應的處理邏輯,以實現各個指標的統計。
為了簡化最終的處理邏輯,使用創建視圖進行數據預處理。首先利用over窗口條件和過濾條件結合以去除重復數據(該方式是利用了top N的方法),同時利用相應的內置函數concat和substr將當天的00:00:00作為統計的開始時間,當天的23:59:59作為統計結束時間,并篩選出支付時間在當天凌晨00:00:00后的訂單數據進行統計(為了方便模擬數據的構造,這里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string),請注意)。然后根據這些數據的訂單渠道利用內置的條件函數設置channel_code和channel_name的值,從而獲取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。
最后,我們根據需要對相應指標進行統計和篩選,并將結果寫入到結果表中。
選擇所創建的DLI 通用隊列提交作業。
等待片刻,作業狀態會變為【運行中】。點擊作業名,可以查看作業詳細運行情況。
使用Kafka客戶端向指定topic發送數據,模擬實時數據流。具體方法可參考:DMS - 連接實例生產消費信息。
發送命令如下: sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list kafka連接地址 --topic 指定topic 示例數據如下: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}
訪問DLI【作業管理頁面】-【Flink作業】,點擊前序步驟提交的Flink作業。在作業詳情頁面,可以看到處理的數據記錄數。
步驟7:查詢結果
參考前序步驟4-2,登錄MySQL實例,執行如下SQL語句,即可查詢到經過Flink作業處理后的結果數據。
配置DLV大屏,執行SQL查詢RDS MySQL,即可以實現大屏實時展示。具體配置方法可參考:DLV開發大屏。
分布式消息服務 DMS 數據湖探索 DLI
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。