使用DLI Flink SQL進行電商實時業務數據分析

      網友投稿 1189 2025-04-02

      業務場景介紹

      場景描述

      當前線上購物無疑是最火熱的購物方式,而電商平臺則又可以以多種方式接入,例如通過web方式訪問、通過app的方式訪問、通過微信小程序的方式訪問等等。而電商平臺則需要每天統計各平臺的實時訪問數據量、訂單數、訪問人數等等指標,從而能在顯示大屏上實時展示相關數據,方便及時了解數據變化,有針對性地調整營銷策略。而如何高效快捷地統計這些指標呢?

      假設平臺已經將每個商品的訂單信息實時寫入Kafka中,這些信息包括訂單ID、訂單生成的渠道(即web方式、app方式等)、訂單時間、訂單金額、折扣后實際支付金額、支付時間、用戶ID、用戶姓名、訂單地區ID等信息。而我們需要做的,就是根據當前可以獲取到的業務數據,實時統計每種渠道的相關指標,輸出存儲到數據庫中,并進行大屏展示。

      場景方案

      場景任務

      使用DLI Flink完成電商業務實時數據的分析處理,獲取各個渠道的銷售匯總數據。

      數據說明

      - 數據源表:電商業務訂單詳情寬表

      字段名

      字段類型

      說明

      order_id

      string

      訂單ID

      order_channel

      string

      訂單生成的渠道(即web方式、app方式等)

      order_time

      string

      訂單時間

      pay_amount

      double

      訂單金額

      real_pay

      double

      實際支付金額

      pay_time

      string

      支付時間

      user_id

      string

      用戶ID

      user_name

      string

      用戶姓名

      使用DLI Flink SQL進行電商實時業務數據分析

      area_id

      string

      訂單地區ID

      - 結果表:各渠道的銷售總額實時統計表。

      字段名

      字段類型

      說明

      begin_time

      varchar(32)

      開始統計指標的時間

      channel_code

      varchar(32)

      渠道編號

      channel_name

      varchar(32)

      渠道名

      cur_gmv

      double

      當天GMV

      cur_order_user_count

      bigint

      當天付款人數

      cur_order_count

      bigint

      當天付款訂單數

      last_pay_time

      varchar(32)

      最近結算時間

      flink_current_time

      varchar(32)

      Flink數據處理時間

      流程介紹

      使用DLI Flink進行電商實時業務數據分析的操作過程主要包括7個步驟:

      步驟1:注冊賬號。使用DLI對數據進行分析之前,需要注冊華為云賬號并進行實名認證。

      步驟2:創建資源。在您的賬戶下創建作業需要的相關資源,涉及VPC、DMS、DLI、RDS。

      步驟3:創建DMS topic并獲取連接地址。

      步驟4:創建RDS數據庫表。

      步驟5:創建增強型跨源打通網絡。

      步驟6:創建并提交Flink作業。

      步驟7:查詢結果,使用DLV進行大屏展示。

      步驟1:注冊賬號

      注冊華為云賬號并進行實名認證。

      注冊賬號具體步驟可參考賬號注冊可參考賬號注冊。

      實名認證具體步驟可參考實名認證。

      如果您已完成華為云賬號注冊和實名認證,可跳過該步驟。

      步驟2:創建資源

      創建VPC,具體步驟可參考:創建VPC和子網。

      創建DMS Kafka實例,具體步驟可參考:DMS Kafka入門指引。

      創建RDS MySQL實例,具體步驟可參考:RDS MySQL快速入門。

      創建DLI CCE隊列,具體步驟可參考:DLI 創建隊列。

      創建DLV大屏,具體步驟可參考:DLV 創建大屏。

      創建資源時注意以下幾點:

      Kafka與MySQL實例創建時需指定VPC,該VPC需提前創建好,且網段不與后續創建的DLI隊列網段沖突;

      DLI Flink Opensource語法目前僅支持容器化隊列(目前仍在封閉測試階段),因此創建隊列前需在官網提工單申請開通CCE隊列使用權限后,再創建DLI隊列。

      請創建DLI隊列時請創建【包年包月】或者【按需-專屬資源】模式的通用隊列。

      步驟3:創建DMS topic并獲取連接地址

      點擊【服務列表】,搜索【DMS】,找到【分布式消息服務DMS】,點擊進入DMS頁面。在【Kafka專享版】頁面找到您所創建的Kafka實例。

      進入實例詳情頁面。點擊【基本信息】,獲取【連接地址】。

      點擊【Topic管理】,創建一個topic:trade_order_detail_info。

      topic配置如下:

      分區數:1

      副本數:1

      老化時間:72h

      同步落盤:否

      步驟4:創建RDS數據庫表

      點擊【服務列表】,搜索【RDS】,找到【云數據庫RDS】,點擊進入RDS頁面。在【實例管理頁面】,會看到您已經創建的RDS實例,獲取其內網地址。

      點擊所創建RDS實例的登錄按鈕,會跳轉到數據管理服務-DAS。輸入相關賬戶信息,點擊【測試連接】。顯示連接成功后,點擊【登錄】。

      登錄RDS實例后,點擊【新建數據庫】按鈕,創建名稱為【dli-demo】的數據庫。

      點擊【SQL操作】- 【SQL查詢】,執行如下SQL創建測試用MySQL表,表相關字段含義在【業務場景介紹】-【數據說明】部分有詳細介紹。

      DROP TABLE `dli-demo`.`trade_channel_collect`; CREATE TABLE `dli-demo`.`trade_channel_collect` ( `begin_time` VARCHAR(32) NOT NULL, `channel_code` VARCHAR(32) NOT NULL, `channel_name` VARCHAR(32) NULL, `cur_gmv` DOUBLE UNSIGNED NULL, `cur_order_user_count` BIGINT UNSIGNED NULL, `cur_order_count` BIGINT UNSIGNED NULL, `last_pay_time` VARCHAR(32) NULL, `flink_current_time` VARCHAR(32) NULL, PRIMARY KEY (`begin_time`, `channel_code`) ) ENGINE = InnoDB DEFAULT CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '各渠道的銷售總額實時統計';

      步驟5:創建增強型跨源打通網絡

      點擊【服務列表】,搜索【DLI】,找到【數據湖探索】,點擊進入DLI服務頁面。點擊【隊列管理】,會看到隊列列表里您所創建的通用隊列。

      點擊【全局配置】-【服務授權】,選中【VPC Administrator】,點擊更新委托權限。此舉目的是賦予DLI操作用戶VPC資源的權限,用于創建VPC【對等連接】。

      點擊【跨源連接】-【增強型跨源】-【創建】。配置如下:【綁定隊列】選擇您所創建的通用隊列,【虛擬私有云】和【子網】選擇 Kafka 與 MySQL 實例所在的 VPC 與子網。 點擊確定。

      創建完成后,在跨源列表中,對應的跨源連接狀態會顯示為【已激活】。點擊跨源連接名稱,詳情頁面顯示連接狀態為【ACTIVE】。

      測試隊列與RDS、DMS實例連通性。點擊【隊列管理】,選擇您所使用的隊列,點擊【更多】-【測試地址連通性】。輸入前序步驟3-2獲取的DMS Kafka實例連接地址和步驟4-2獲取的RDS MySQL實例內網地址,進行網絡連通性測試。測試結果顯示可達,則DLI隊列與Kafka、MySQL實例的網絡已經聯通。

      如果測試結果不可達,需要修改實例所在VPC的安全組規則,放開9092(Kafka連接端口)、3306(MySQL連接端口)對DLI隊列的限制,DLI隊列網段信息可以在隊列【下拉詳情頁】獲取。

      步驟6:創建并提交Flink作業

      點擊DLI控制臺左側欄【作業管理】,選擇【Flink作業】。點擊“創建作業”,選擇作業類型為:Flink OpenSource SQL,名稱自定義。

      點擊“確定”后進入編輯作業頁面,具體SQL示例如下所示,部分參數值需要根據RDS和DMS對應的信息進行修改。

      --********************************************************************-- -- 數據源:trade_order_detail_info (訂單詳情寬表) --********************************************************************-- create table trade_order_detail ( order_id string, -- 訂單ID order_channel string, -- 渠道 order_time string, -- 訂單創建時間 pay_amount double, -- 訂單金額 real_pay double, -- 實際付費金額 pay_time string, -- 付費時間 user_id string, -- 用戶ID user_name string, -- 用戶名 area_id string -- 地區ID ) with ( "connector.type" = "kafka", "connector.version" = "0.10", "connector.properties.bootstrap.servers" = "xxxx:9092,xxxx:9092,xxxx:9092", -- Kafka連接地址 "connector.properties.group.id" = "trade_order", -- Kafka groupID "connector.topic" = "trade_order_detail_info", -- Kafka topic "format.type" = "json", "connector.startup-mode" = "latest-offset" ); --********************************************************************-- -- 結果表:trade_channel_collect (各渠道的銷售總額實時統計) --********************************************************************-- create table trade_channel_collect( begin_time string, --統計數據的開始時間 channel_code string, -- 渠道編號 channel_name string, -- 渠道名 cur_gmv double, -- 當天GMV cur_order_user_count bigint, -- 當天付款人數 cur_order_count bigint, -- 當天付款訂單數 last_pay_time string, -- 最近結算時間 flink_current_time string, primary key (begin_time, channel_code) not enforced ) with ( "connector.type" = "jdbc", "connector.url" = "jdbc:mysql://xxxx:3306/xxxx", -- mysql連接地址,jdbc格式 "connector.table" = "xxxx", -- mysql表名 "connector.driver" = "com.mysql.jdbc.Driver", "connector.username" = "xxx", -- mysql用戶名 "connector.password" = "xxxx", -- mysql密碼 "connector.write.flush.max-rows" = "1000", "connector.write.flush.interval" = "1s" ); --********************************************************************-- -- 臨時中間表 --********************************************************************-- create view tmp_order_detail as select * , case when t.order_channel not in ("webShop", "appShop", "miniAppShop") then "other" else t.order_channel end as channel_code --重新定義統計渠道 只有四個枚舉值[webShop、appShop、miniAppShop、other] , case when t.order_channel = "webShop" then _UTF16"網頁商城" when t.order_channel = "appShop" then _UTF16"app商城" when t.order_channel = "miniAppShop" then _UTF16"小程序商城" else _UTF16"其他" end as channel_name --渠道名稱 from ( select * , row_number() over(partition by order_id order by order_time desc ) as rn --去除重復訂單數據 , concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") as begin_time , concat(substr("2021-03-25 12:03:00", 1, 10), " 23:59:59") as end_time from trade_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") --取今天數據,為了方便運行,這里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string) and real_pay is not null ) t where t.rn = 1; -- 按渠道統計各個指標 insert into trade_channel_collect select begin_time --統計數據的開始時間 , channel_code , channel_name , cast(COALESCE(sum(real_pay), 0) as double) as cur_gmv --當天GMV , count(distinct user_id) as cur_order_user_count --當天付款人數 , count(1) as cur_order_count --當天付款訂單數 , max(pay_time) as last_pay_time --最近結算時間 , cast(LOCALTIMESTAMP as string) as flink_current_time --flink任務中的當前時間 from tmp_order_detail where pay_time >= concat(substr("2021-03-25 12:03:00", 1, 10), " 00:00:00") group by begin_time, channel_code, channel_name;

      作業邏輯說明:

      首先,我們先定義一個Kafka源表,用來從Kafka指定topic中讀取消費數據;再定義一個結果表,用來通過JDBC向MySQL中寫入結果數據。

      創建源表和結果表以后,需要實現相應的處理邏輯,以實現各個指標的統計。

      為了簡化最終的處理邏輯,使用創建視圖進行數據預處理。首先利用over窗口條件和過濾條件結合以去除重復數據(該方式是利用了top N的方法),同時利用相應的內置函數concat和substr將當天的00:00:00作為統計的開始時間,當天的23:59:59作為統計結束時間,并篩選出支付時間在當天凌晨00:00:00后的訂單數據進行統計(為了方便模擬數據的構造,這里使用"2021-03-25 12:03:00"替代cast(LOCALTIMESTAMP as string),請注意)。然后根據這些數據的訂單渠道利用內置的條件函數設置channel_code和channel_name的值,從而獲取了源表中的字段信息,以及begin_time、end_time和channel_code、channel_name的值。

      最后,我們根據需要對相應指標進行統計和篩選,并將結果寫入到結果表中。

      選擇所創建的DLI 通用隊列提交作業。

      等待片刻,作業狀態會變為【運行中】。點擊作業名,可以查看作業詳細運行情況。

      使用Kafka客戶端向指定topic發送數據,模擬實時數據流。具體方法可參考:DMS - 連接實例生產消費信息。

      發送命令如下: sh kafka_2.11-2.3.0/bin/kafka-console-producer.sh --broker-list kafka連接地址 --topic 指定topic 示例數據如下: {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103252020200001", "order_channel":"webShop", "order_time":"2021-03-24 20:20:20", "pay_amount":"600.00", "real_pay":"480.00", "pay_time":"2021-03-25 00:00:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103260808080001", "order_channel":"webShop", "order_time":"2021-03-25 08:08:08", "pay_amount":"300.00", "real_pay":"240.00", "pay_time":"2021-03-25 08:10:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103261313130001", "order_channel":"webShop", "order_time":"2021-03-25 13:13:13", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-25 16:16:16", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"} {"order_id":"202103270606060001", "order_channel":"appShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"50.50", "real_pay":"50.50", "pay_time":"2021-03-25 06:07:00", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103270606060002", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"66.60", "real_pay":"66.60", "pay_time":"2021-03-25 06:07:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103270606060003", "order_channel":"miniAppShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"88.80", "real_pay":"88.80", "pay_time":"2021-03-25 06:07:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} {"order_id":"202103270606060004", "order_channel":"webShop", "order_time":"2021-03-25 06:06:06", "pay_amount":"99.90", "real_pay":"99.90", "pay_time":"2021-03-25 06:07:00", "user_id":"0004", "user_name":"Daisy", "area_id":"330102"}

      訪問DLI【作業管理頁面】-【Flink作業】,點擊前序步驟提交的Flink作業。在作業詳情頁面,可以看到處理的數據記錄數。

      步驟7:查詢結果

      參考前序步驟4-2,登錄MySQL實例,執行如下SQL語句,即可查詢到經過Flink作業處理后的結果數據。

      配置DLV大屏,執行SQL查詢RDS MySQL,即可以實現大屏實時展示。具體配置方法可參考:DLV開發大屏。

      分布式消息服務 DMS 數據湖探索 DLI

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:2019wps怎么解壓
      下一篇:這周開始wps excel使用一會就會異常退出(這周開始生產 英文)
      相關文章
      国产成人亚洲精品影院| 亚洲熟伦熟女新五十路熟妇| 亚洲伊人色欲综合网| 亚洲成AⅤ人影院在线观看| 亚洲heyzo专区无码综合| 99999久久久久久亚洲| 亚洲国产精品白丝在线观看| 亚洲高清资源在线观看| 99ri精品国产亚洲| 内射干少妇亚洲69XXX| 久久精品国产亚洲AV香蕉| 久久精品蜜芽亚洲国产AV| 亚洲视频在线免费看| 亚洲电影在线免费观看| 亚洲欧洲日产韩国在线| 亚洲国产高清在线精品一区| 亚洲人成电影院在线观看| 亚洲人成影院在线高清| 亚洲乱码中文论理电影| 亚洲一区二区影视| 精品国产日韩久久亚洲| 亚洲youwu永久无码精品| 久久精品国产亚洲av品善| 亚洲av无码专区在线观看素人| 国产亚洲精品美女| 亚洲色偷拍区另类无码专区| 国产亚洲?V无码?V男人的天堂| 国产亚洲AV手机在线观看| 亚洲精品无码久久久久| 亚洲av无码av制服另类专区| 亚洲一区免费观看| 亚洲国产精品综合福利专区| 亚洲一区欧洲一区| 九九精品国产亚洲AV日韩| 亚洲日韩在线中文字幕第一页| 国产成人麻豆亚洲综合无码精品| 国产亚洲一区二区精品| 亚洲男人都懂得羞羞网站| 亚洲国产成人综合| 亚洲成AV人片高潮喷水| 亚洲国产一区视频|