萬字直通面試Flink雙流JOIN

      網友投稿 629 2022-05-29

      今天和大家聊聊Flink雙流Join問題。這是一個高頻面試點,也是工作中常遇到的一種真實場景。

      如何保證Flink雙流Join準確性和及時性、除了窗口join還存在哪些實現方式、究竟如何回答才能完全打動面試官呢。。你將在本文中找到答案。

      1 引子

      1.1 數據庫SQL中的JOIN

      我們先來看看數據庫SQL中的JOIN操作。如下所示的訂單查詢SQL,通過將訂單表的id和訂單詳情表order_id關聯,獲取所有訂單下的商品信息。

      select a.id as '訂單id', a.order_date as '下單時間', a.order_amount as '訂單金額', b.order_detail_id as '訂單詳情id', b.goods_name as '商品名稱', b.goods_price as '商品價格', b.order_id as '訂單id' from dwd_order_info_pfd a right join dwd_order_detail_pfd b on a.id = b.order_id

      這是一段很簡單的SQL代碼,就不詳細展開敘述了。此處主要引出SQL中的JOIN類型,這里用到的是 right join , 即右連接。

      left join: 保留左表全部數據和右表關聯數據,右表非關聯數據置NULL

      right join: 保留右表全部數據和左表關聯數據,左表非關聯數據置NULL

      inner join: 保留左表關聯數據和右邊關聯數據

      cross join: 保留左表和右表數據笛卡爾積

      今天和大家聊聊Flink雙流Join問題。這是一個高頻面試點,也是工作中常遇到的一種真實場景。

      如何保證Flink雙流Join準確性和及時性、除了窗口join還存在哪些實現方式、究竟如何回答才能完全打動面試官呢。。你將在本文中找到答案。

      1 引子

      1.1 數據庫SQL中的JOIN

      我們先來看看數據庫SQL中的JOIN操作。如下所示的訂單查詢SQL,通過將訂單表的id和訂單詳情表order_id關聯,獲取所有訂單下的商品信息。

      select a.id as '訂單id', a.order_date as '下單時間', a.order_amount as '訂單金額', b.order_detail_id as '訂單詳情id', b.goods_name as '商品名稱', b.goods_price as '商品價格', b.order_id as '訂單id' from dwd_order_info_pfd a right join dwd_order_detail_pfd b on a.id = b.order_id

      這是一段很簡單的SQL代碼,就不詳細展開敘述了。此處主要引出SQL中的JOIN類型,這里用到的是 right join , 即右連接。

      left join: 保留左表全部數據和右表關聯數據,右表非關聯數據置NULL

      right join: 保留右表全部數據和左表關聯數據,左表非關聯數據置NULL

      inner join: 保留左表關聯數據和右邊關聯數據

      cross join: 保留左表和右表數據笛卡爾積

      基于關聯鍵值逐行關聯匹配,過濾表數據并生成最終結果,提供給下游數據分析使用。

      就此打住,關于數據庫SQL中的JOIN原理不再多贅述,感興趣的話大家可自行研究,下面我們將目光轉移到大數據領域看看吧。

      1.2 離線場景下的JOIN

      假設存在這樣一個場景:

      已知Mysql數據庫中訂單表和訂單明細表,且滿足一對多的關系,統計T-1天所有訂單的商品分布詳情。

      聰明的大家肯定已經給出了答案,沒錯~就是上面的SQL:

      select a.*, b.* from dwd_order_info_pfd a right join dwd_order_detail_pfd b on a.id = b.order_id

      現在修改下條件:已知訂單表和訂單明細表均為億級別數據,求相同場景下的分析結果。

      咋辦?此時關系型數據庫貌似不大合適了~開始放大招:使用大數據計算引擎來解決。

      考慮到T-1統計場景對時效性要求很低,可以使用Hive SQL來處理,底層跑Mapreduce任務。如果想提高運行速度,換成Flink或Spark計算引擎,使用內存計算。

      至于查詢SQL和上面一樣,并將其封裝成一個定時調度任務, 等系統調度運行。如果結果不正確的話,由于數據源和數據靜態不變,大不了重跑,看起來感覺皆大歡喜~

      可是好景不長,產品冤家此時又給了你一個無法拒絕的需求:我要實時統計!!

      2 實時場景下的JOIN

      還是上面的場景,此時數據源換成了實時訂單流和實時訂單明細流,比如Kafka的兩個topic,要求實時統計每分鐘內所有訂單下的商品分布詳情。

      現在情況貌似變得復雜了起來,簡單分析下:

      數據源。實時數據流,和靜態流不同,數據是實時流入的且動態變化,需要計算程序支持實時處理機制。

      關聯性。前面提到靜態數據執行多次join操作,左表和右表能關聯的數據是很恒定的;而實時數據流(左右表)如果進入時機不一致,原本可以關聯的數據會關聯不上或者發生錯誤。

      延遲性。實時統計,提供分鐘甚至秒級別響應結果。

      由于流數據join的特殊性,在滿足實時處理機制、低延遲、強關聯性的前提下,看來需要制定完善的數據方案,才能實現真正的流數據JOIN。

      2.1 方案思路

      我們知道訂單數據和訂單明細數據是一對多的關系,即一條訂單數據對應著多條商品明細數據,畢竟買一件商品也是那么多郵費,不如打包團購。。而一條明細數據僅對應一條訂單數據。

      這樣,雙流join策略可以考慮如下思路:

      當數據流為訂單數據時。無條件保留,無論當前是否關聯到明細數據,均留作后續join使用。

      當數據流為明細數據時。在關聯到其訂單數據后,就可以say goodbye了,否則暫時保留等待下一次與訂單數據的邂逅。

      完成所有處于同一時段內的訂單數據和訂單明細數據join, 清空存儲狀態

      實際生產場景中,需要考慮更多的復雜情況,包括JOIN過程的數據丟失等異常情況的處理,此處僅示意。

      好了,看起來我們已經有了一個馬馬虎虎的實時流JOIN方案雛形。

      貌似可以準備動手大干一場了~ 別著急,有人已經幫我們偷偷的實現了:Apache Flink

      3 Flink的雙流JOIN

      Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink 被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。

      ——來自Flink官網定義

      這里我們只需要知道Flink是一個實時計算引擎就行了,主要關注其如何實現雙流JOIN。

      3.1 內部運行機制

      內存計算:Flink任務優先在內存中計算,內存不夠時保存到訪問高效的磁盤,提供秒級延遲響應。

      狀態強一致性:Flink使用一致性快照保存狀態,并定期檢查本地狀態、持久存儲來保證狀態一致性。

      分布式執行: Flink應用程序可以劃分為無數個并行任務在集群中執行,幾乎無限量使用CPU、主內存、磁盤和網絡IO。

      內置高級編程模型:Flink編程模型抽象為SQL、Table、DataStream|DataSet API、Process四層,并封裝成豐富功能的算子,其中就包含JOIN類型的算子。

      仔細看看,我們前面章節討論的實時流JOIN方案的前提是否都滿足了呢?

      實時處理機制: Flink天生即實時計算引擎

      低延遲: Flink內存計算秒級延遲

      強關聯性: Flink狀態一致性和join類算子

      不由感嘆, 這個Flink果然強啊~

      保持好奇心,我們去瞅瞅Flink雙流join的真正奧義!!

      3.2 JOIN實現機制

      Flink雙流JOIN主要分為兩大類。一類是基于原生State的Connect算子操作,另一類是基于窗口的JOIN操作。其中基于窗口的JOIN可細分為window join和interval join兩種。

      實現原理:底層原理依賴Flink的State狀態存儲,通過將數據存儲到State中進行關聯join, 最終輸出結果。

      恍然大悟, Flink原來是通過State狀態來緩存等待join的實時流。

      這里給大家拋出一個問題:

      用redis存儲可不可以,state存儲相比redis存儲的區別?

      更多細節歡迎大家一起探討,添加個人- youlong525 拉您進群,還有免費Flink PDF領取~

      回到正題,這幾種方式到底是如何實現雙流JOIN的?我們接著往下看。

      注意: 后面內容將多以文字 + 代碼的形式呈現,避免枯燥,我放了一堆原創示意圖~

      4 基于Window Join的雙流JOIN實現機制

      顧名思義,此類方式利用Flink的窗口機制實現雙流join。通俗理解,將兩條實時流中元素分配到同一個時間窗口中完成Join。

      底層原理: 兩條實時流數據緩存在Window State中,當窗口觸發計算時,執行join操作。

      4.1 join算子

      先看看Window join實現方式之一的join算子。這里涉及到Flink中的窗口(window)概念,因此Window Joinan按照窗口類型區分的話某種程度來說可以細分出3種:

      Tumbling Window Join (滾動窗口)

      Tumbling Window Join (滾動窗口)

      Sliding Window Join (滑動窗口)

      Sliding Window Join (滑動窗口)

      Session Widnow Join(會話窗口)

      Session Widnow Join(會話窗口)

      兩條流數據按照關聯主鍵在(滾動、滑動、會話)窗口內進行inner join, 底層基于State存儲,并支持處理時間和事件時間兩種時間特征,看下源碼:

      源碼核心總結:windows窗口 + state存儲 + 雙層for循環執行join()

      現在讓我們把時間軸往回拉一點點,在實時場景JOIN那里我們收到了這樣的需求:統計每分鐘內所有訂單下的商品明細分布。

      OK, 使用join算子小試牛刀一下。我們定義60秒的滾動窗口,將訂單流和訂單明細流通過order_id關聯,得到如下的程序:

      val env = ... // kafka 訂單流 val orderStream = ... // kafka 訂單明細流 val orderDetailStream = ... orderStream.join(orderDetailStream) .where(r => r._1) //訂單id .equalTo(r => r._2) //訂單id .window(TumblingProcessTimeWindows.of( Time.seconds(60))) .apply {(r1, r2) => r1 + " : " + r2} .print()

      整個代碼其實很簡單,概要總結下:

      定義兩條輸入實時流A、B

      A流調用join(b流)算子

      關聯關系定義: where為A流關聯鍵,equalTo為B流關聯鍵,都是訂單id

      定義window窗口(60s間隔)

      apply方法定義邏輯輸出

      這樣只要程序穩定運行,就能夠持續不斷的計算每分鐘內訂單分布詳情,貌似解決問題了奧~

      還是別高興太早,別忘了此時的join類型是inner join。復習一下知識: inner join指的是僅保留兩條流關聯上的數據。

      這樣雙流中沒關聯上的數據豈不是都丟掉了?別擔心,Flink還提供了另一個window join操作: coGroup算子。

      4.2 coGroup算子

      coGroup算子也是基于window窗口機制,不過coGroup算子比Join算子更加靈活,可以按照用戶指定的邏輯匹配左流或右流數據并輸出。

      換句話說,我們通過自己指定雙流的輸出來達到left join和right join的目的。

      現在來看看在相同場景下coGroup算子是如何實現left join:

      #這里看看java算子的寫法 orderDetailStream .coGroup(orderStream) .where(r -> r.getOrderId()) .equalTo(r -> r.getOrderId()) .window(TumblingProcessingTimeWindows.of(Time.seconds(60))) .apply(new CoGroupFunction>() { @Override public void coGroup(Iterable orderDetailRecords, Iterable orderRecords, Collector> collector) { for (OrderDetail orderDetaill : orderDetailRecords) { boolean flag = false; for (Order orderRecord : orderRecords) { // 右流中有對應的記錄 collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price())); flag = true; } if (!flag) { // 右流中沒有對應的記錄 collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null)); } } } }) .print();

      這里需要說明幾點:

      join算子替換為coGroup算子

      兩條流依然需要在一個window中且定義好關聯條件

      apply方法中自定義判斷,此處對右值進行判斷:如果有值則進行連接輸出,否則右邊置為NULL。

      可以這么說,現在我們已經徹底搞定了窗口雙流JOIN。

      只要你給我提供具體的窗口大小,我就能通過join或coGroup算子鼓搗出各種花樣join,而且使用起來特別簡單。

      但是假如此時我們親愛的產品又提出了一個小小問題:

      大促高峰期,商品數據某時段會寫入不及時,時間可能比訂單早也可能比訂單晚,還是要計算每分鐘內的訂單商品分布詳情,沒問題吧~

      當然有問題:兩條流如果步調不一致,還用窗口來控制能join的上才怪了~ 很容易等不到join流窗口就自動關閉了。

      還好,我知道Flink提供了Interval join機制。

      5 基于Interval Join的雙流JOIN實現機制

      Interval Join根據右流相對左流偏移的時間區間(interval)作為關聯窗口,在偏移區間窗口中完成join操作。

      有點不好理解,我畫個圖看下:

      stream2.time ∈ (stream1.time +low, stream1.time +high)

      滿足數據流stream2在數據流stream1的 interval(low, high)偏移區間內關聯join。interval越大,關聯上的數據就越多,超出interval的數據不再關聯。

      實現原理:interval join也是利用Flink的state存儲數據,不過此時存在state失效機制ttl,觸發數據清理操作。

      這里再引出一個問題:

      state的ttl機制需要怎么設置?不合理的ttl設置會不會撐爆內存?

      我會在后面的文章中深入講解下State的ttl機制,歡迎大家一起探討~

      下面簡單看下interval join的代碼實現過程:

      val env = ... // kafka 訂單流 val orderStream = ... // kafka 訂單明細流 val orderDetailStream = ... orderStream.keyBy(_.1) // 調用intervalJoin關聯 .intervalJoin(orderDetailStream._2) // 設定時間上限和下限 .between(Time.milliseconds(-30), Time.milliseconds(30)) .process(new ProcessWindowFunction()) class ProcessWindowFunction extends ProcessJoinFunction...{ override def processElement(...) { collector.collect((r1, r2) => r1 + " : " + r2) } }

      訂單流在流入程序后,等候(low,high)時間間隔內的訂單明細流數據進行join, 否則繼續處理下一個流。

      從代碼中我們發現,interval join需要在兩個KeyedStream之上操作,即keyBy(),并在between()方法中指定偏移區間的上下界。

      需要注意的是interval join實現的也是inner join,且目前只支持事件時間。

      6 基于Connect的雙流JOIN實現機制

      前面在使用Window join或者Interval Join來實現雙流join的時候,我發現了其中的共性:

      無論哪種實現方式,Flink內部都將join過程透明化,在算子中封裝了所有的實現細節。

      這是什么?是編程語言中的抽象概念~ 隱藏底層細節,對外暴露統一API, 大幅簡化程序編碼。

      可是這樣會引來一個問題:如果程序報錯或者數據異常,如何快速進行調優排查,直接看源碼嗎?不大現實。。

      這里介紹基于Connect算子實現的雙流JOIN方法,我們可自己控制雙流JOIN處理邏輯,同時保持過程時效性和準確性。

      6.1 Connect算子原理

      對兩個DataStream執行connect操作,將其轉化為ConnectedStreams, 生成的Streams可以調用不同方法在兩個實時流上執行,且雙流之間可以共享狀態。

      圖上我們可以看到,兩個數據流被connect之后,只是被放在了同一個流中,內部依然保持各自的數據和形式,兩個流相互獨立。

      [DataStream1, DataStream2] -> ConnectedStreams[1,2]

      萬字直通面試:Flink雙流JOIN

      這樣,我們可以在Connect算子底層的ConnectedStreams中編寫代碼,自行實現雙流JOIN的邏輯處理。

      6.2 技術實現

      1.調用connect算子,根據orderid進行分組,并使用process算子分別對兩條流進行處理。

      orderStream.connect(orderDetailStream) .keyBy("orderId", "orderId") .process(new orderProcessFunc());

      2.process方法內部進行狀態編程, 初始化訂單、訂單明細和定時器的ValueState狀態。

      private ValueState orderState; private ValueState orderDetailState; private ValueState timeState; // 初始化狀態Value orderState = getRuntimeContext().getState( new ValueStateDescriptor ("order-state",Order.class)); ····

      3.為每個進入的數據流保存state狀態并創建定時器。在時間窗口內另一個流達到時進行join并輸出,完成后刪除定時器。

      @Override public void processElement1(Order value, Context ctx, Collector> out){ if (orderDetailState.value() == null){ //明細數據未到,先把訂單數據放入狀態 orderState.update(value); //建立定時器,60秒后觸發 Long ts = (value.getEventTime()+10)*1000L; ctx.timerService().registerEventTimeTimer( ts); timeState.update(ts); }else{ //明細數據已到,直接輸出到主流 out.collect(new Tuple2<>(value,orderDetailS tate.value())); //刪除定時器 ctx.timerService().deleteEventTimeTimer (timeState.value()); //清空狀態,注意清空的是支付狀態 orderDetailState.clear(); timeState.clear(); } } ... @Override public void processElement2(){ ... }

      4.未及時達到的數據流觸發定時器輸出到側輸出流,左流先到而右流未到,則輸出左流,反之輸出右連流。

      @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector> out) { // 實現左連接 if (orderState.value() != null){ ctx.output(new OutputTag("left-jo in") {}, orderState.value().getTxId()); // 實現右連接 }else{ ctx.output(new OutputTag("left-jo in") {}, orderDetailState.value().getTxId()); } orderState.clear(); orderDetailState.clear(); timeState.clear(); }

      總體思想: 基于數據時間實現訂單數據及訂單明細數據的關聯,超時或者缺失則由側輸出流輸出。

      在connect中針對訂單流和訂單明細流,先創建定時器并保存state狀態,處于窗口內就進行join, 否則進入側輸出流。

      7 雙流JOIN的優化與總結

      為什么我的雙流join時間到了卻不觸發,一直沒有輸出

      檢查一下watermark的設置是否合理,數據時間是否遠遠大于watermark和窗口時間,導致窗口數據經常為空

      state數據保存多久,會內存爆炸嗎

      state自帶有ttl機制,可以設置ttl過期策略,觸發Flink清理過期state數據。建議程序中的state數據結構用完后手動clear掉。

      我的雙流join傾斜怎么辦

      join傾斜三板斧: 過濾異常key、拆分表減少數據、打散key分布。當然可以的話我建議加內存!加內存!加內存!!

      想實現多流join怎么辦

      目前無法一次實現,可以考慮先union然后再二次處理;或者先進行connnect操作再進行join操作,僅建議~

      join過程延遲、沒關聯上的數據會丟失嗎

      這個一般來說不會,join過程可以使用側輸出流存儲延遲流;如果出現節點網絡等異常,Flink checkpoint也可以保證數據不丟失。

      某日

      面試官: Flink雙流join了解嗎? 簡單說說其實現原理。

      某君: Flink雙流JOIN是。。。

      本文完。

      》》》更多優質文章請關注gzh:大數據兵工廠

      Flink Scala 大數據 實時流計算服務 CS

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:KubeEdge v0.2發布,全球首個K8S原生的邊緣計算平臺開放云端代碼
      下一篇:OpenCV中的圖像處理 —— 直方圖大章(查找、繪制及分析+均衡化+二維直方圖+直方圖反投影)
      相關文章
      亚洲乱人伦中文字幕无码| 亚洲日韩激情无码一区| 亚洲A丁香五香天堂网| 2020年亚洲天天爽天天噜| 久久精品视频亚洲| 亚洲免费人成在线视频观看| 高清在线亚洲精品国产二区| 亚洲国产精品自在自线观看| 亚洲色最新高清av网站| 亚洲午夜无码久久久久软件| 亚洲专区中文字幕| 亚洲Av无码一区二区二三区| 亚洲Av高清一区二区三区| 亚洲中文字幕久在线| 亚洲宅男精品一区在线观看| 自拍偷区亚洲国内自拍| 亚洲人成色99999在线观看| 亚洲日本一线产区和二线 | 亚洲福利视频一区| 99久久亚洲精品无码毛片| 亚洲av不卡一区二区三区| 亚洲一区二区影院| 亚洲视频免费在线看| 亚洲一区二区三区深夜天堂| 亚洲中文无码永久免| 亚洲AV成人精品日韩一区| 相泽南亚洲一区二区在线播放| 亚洲AV无码一区二三区| 国产亚洲精品免费视频播放| 亚洲成A人片777777| 噜噜噜亚洲色成人网站∨ | 亚洲精品国产精品乱码不卡| 亚洲最大AV网站在线观看| 亚洲AV无码乱码在线观看富二代| 久久精品九九亚洲精品| 激情综合亚洲色婷婷五月 | 日韩亚洲人成网站| 国产亚洲成人久久| 亚洲人成依人成综合网| 亚洲一区免费在线观看| 鲁死你资源站亚洲av|