面試高頻|一文詳解Flink背壓
不知為何,最近的我開始走下坡路了。。。
1 故事的開始
此刻,我抬頭看了一眼坐在對面的這個家伙: 格子衫、中等身材,略高的鼻梁下頂著一副黑框眼鏡,微瞇的目光透出絲絲倦意,正一眨不眨地盯著我看。
我心里直犯嘀咕: 我又有什么好看的呢?不過是A君你用來換取面包、汽車的工具罷了。雖然陪伴了五年的時光,想來也就是如此~
說到這,忘了自我介紹了。我叫Flink,當然,我還是喜歡你們叫我的全名: Apache Flink,因為這樣聽起來很有科技感。 我是目前最火的大數據實時計算引擎之一。
之所以敢這么說,是因為目前我在實時領域確實處于獨領風騷的地位,不信請看下面的統計:
此處需要@一下我的老大哥:Apache Spark,我聽說一度出現過"Flink的出現,Spark是否慢慢成為雞肋"的言論。咱也不敢說,也不敢問,對于前輩還是保持尊重和理性。
"咳"~ 一聲輕咳把我拉回了現實,A君又開始調試代碼了~
2 我開始有壓力了
其實我是在上周和A君再次遇見的,之前聽說他在我的好朋友:Kafka那里呆了一周,好像是準備搞一個大事情。
等到他找到我, 才知道公司準備建設實時數倉。需要我和Kafka兄弟一起加入,處理億級別實時數據。
對于實時數倉我大抵是了解的。再看看A君的老大拿出的架構方案,心中暗喜:這可是到了我的專業領域。
整體架構并不難,很好理解。
程序實時獲取源數據,放置kafka ods層存儲
進行ods->dwd->dws層實時加工計算,結果寫進kafka
再加一條離線處理流程,作為備用
不知為何,最近的我開始走下坡路了。。。
1 故事的開始
此刻,我抬頭看了一眼坐在對面的這個家伙: 格子衫、中等身材,略高的鼻梁下頂著一副黑框眼鏡,微瞇的目光透出絲絲倦意,正一眨不眨地盯著我看。
我心里直犯嘀咕: 我又有什么好看的呢?不過是A君你用來換取面包、汽車的工具罷了。雖然陪伴了五年的時光,想來也就是如此~
說到這,忘了自我介紹了。我叫Flink,當然,我還是喜歡你們叫我的全名: Apache Flink,因為這樣聽起來很有科技感。 我是目前最火的大數據實時計算引擎之一。
之所以敢這么說,是因為目前我在實時領域確實處于獨領風騷的地位,不信請看下面的統計:
此處需要@一下我的老大哥:Apache Spark,我聽說一度出現過"Flink的出現,Spark是否慢慢成為雞肋"的言論。咱也不敢說,也不敢問,對于前輩還是保持尊重和理性。
"咳"~ 一聲輕咳把我拉回了現實,A君又開始調試代碼了~
2 我開始有壓力了
其實我是在上周和A君再次遇見的,之前聽說他在我的好朋友:Kafka那里呆了一周,好像是準備搞一個大事情。
等到他找到我, 才知道公司準備建設實時數倉。需要我和Kafka兄弟一起加入,處理億級別實時數據。
對于實時數倉我大抵是了解的。再看看A君的老大拿出的架構方案,心中暗喜:這可是到了我的專業領域。
整體架構并不難,很好理解。
程序實時獲取源數據,放置kafka ods層存儲
進行ods->dwd->dws層實時加工計算,結果寫進kafka
再加一條離線處理流程,作為備用
我看了一眼旁邊躍躍欲試的Kafka兄弟,互相點了點頭,我們開始吧~
作為老搭檔,我和Kafka兄弟配合的很默契,A君也是個老手,于是我們在短短的一周內就出色的完成了初步任務。
我可以給你看看我們的部分配合成果:
- src.main.scala.com.xxproject.xx |--handler |---FlinkODSHandler.scala |---FlinkDWHandler.scala |---FlinkADSHandler.scala ... |--service |---KafkaSchdulerService.scala |---SchdulerService.scala ... |--config/util/model |---KafkaUtils.scala |---XXDataModel.scala ...
春風得意馬蹄疾~ 此刻的心情舒服極了,我們仨簡直就是完美搭檔。。
可是好景不長。來到第二周,我漸漸的發現自己開始變慢了~
具體的表現為 :
運行開始時正常,到了后面就出現大量Task任務等待
少量Task任務開始報checkpoint超時問題
Kafka數據堆積,無法消費
我有點慌,去看了下自身的狀況,結果嚇了一大跳:
無論是輸入還是輸出,緩沖區內存都被占滿了。數據處理不過來,barrier流動極為緩慢,大量checkpoint生成時間變長。
我發生了背壓問題!!!
3 我的反壓機制
在默默的進行一段時間的自我調節后,問題依然沒有解決。
同時,我的周圍不斷拉響警報,內存頻繁告急。轉眼間我的Task執行頁面已被紅色High標識占滿~
沒有辦法,最終我還是向A君發出了告警~
A君收到消息,盯著我看了好一會,嘆了口氣。我覺得有點不好意思,感覺把事情搞砸了。。
他沒有多說什么,只是問起了我的反壓機制,說要從源頭解決問題。
下面是A君和我的對話
按照我以往的經驗,一般出現反壓就是下游數據的處理速度跟不上上游數據的產生速度。
可以細分兩種情況:
當前Task任務處理速度慢,比如task任務中調用算法處理等復雜邏輯,導致上游申請不到足夠內存。
下游Task任務處理速度慢,比如多次collect()輸出到下游,導致當前節點無法申請足夠的內存。
頻繁反壓會導致流處理作業數據延遲增加,同時還會影響到Checkpoint。
Checkpoint時需要進行Barrier對齊,此時若某個Task出現反壓,Barrier流動速度會下降,導致Checkpoint變慢甚至超時,任務整體也變慢。
長期或頻繁出現反壓才需要處理,如果由于網絡波動或者GC出現的偶爾反壓可以不必處理。
在我的Web界面,我會從Sink到Source逆向Task排查。逐個查看BackPressure詳情,找到第一個出現反壓的Task。
下面這是正常的狀況~
我的內部檢測原理
BackPressure界面定期采樣Task線程棧信息,統計線程請求內存Buffer的阻塞頻率,判斷節點是否處于反壓狀態。
默認情況下,頻率小于0.1顯示正常
(0.1,0.5) 為LOW,背壓輕微
超過0.5為 HIGH,需要注意反壓
此時,我指給A君看了目前項目的BackPressure頁面,這明顯是不正常的狀況。
A君頓了頓嗓子,提示我此處講的仔細一點。 我整理了下思路,決定先從限流開始說起:
數據流程
整體流程可類比為生產者->消費者體系。上游生產者發送數據(2M/s)至Send Buffer,途徑網絡傳輸(5M/s)到Receive Buffer, 最終下游Consumer消費(<1M/s)。
這明顯是不行的,下游速度慢于上游速度,數據久積成疾~ 需要做限流。
限流
這很好理解。既然上游處理較快,那么我添加一個限流機制將其速度降下來,讓上下游速度基本一致,這樣不就解決了嗎。。
其實不然,這里有幾個問題:
我無法提前預估下游實際速度(流速限制設置多少)
常碰到網絡波動等情況,上下游的流速是動態變化的
考慮到這些原因,我的內部提供一種強大的反壓機制:
上下游動態反饋,如果下游速度慢,則上游限速;否則上游提速。實現動態自動反壓的效果。
反壓機制示意
上游發送網絡數據前經過自身的Network Buffer層,之后往下傳輸到Channel Buffer層(Netty通道)。最終通過網絡傳輸,層層傳遞達到下游。
Network Buffer、Channel Buffer和Socket Buffer通俗理解就是用戶態和內核態的區別,處于不同的交換空間和操作系統。
有關內核態和用戶態原理,有興趣的小伙伴歡迎添加個人- youlong525 進行討論~
反壓機制原理
前面做了一些鋪墊,這里我給A君總結了我的反壓機制的運行流程:
每個TaskManager維護共享Network BufferPool(Task共享內存池),初始化時向Off-heap Memory中申請內存。
每個TaskManager維護共享Network BufferPool(Task共享內存池),初始化時向Off-heap Memory中申請內存。
每個Task創建自身的Local BufferPool(Task本地內存池),并和Network BufferPool交換內存。
每個Task創建自身的Local BufferPool(Task本地內存池),并和Network BufferPool交換內存。
上游Record Writer向 Local BufferPool申請buffer(內存)寫數據。如果Local BufferPool沒有足夠內存則向Network BufferPool申請,使用完之后將申請的內存返回Pool。
上游Record Writer向 Local BufferPool申請buffer(內存)寫數據。如果Local BufferPool沒有足夠內存則向Network BufferPool申請,使用完之后將申請的內存返回Pool。
Netty Buffer拷貝buffer并經過Socket Buffer發送到網絡,后續下游端按照相似機制處理。
Netty Buffer拷貝buffer并經過Socket Buffer發送到網絡,后續下游端按照相似機制處理。
當下游申請buffer失敗時,表示當前節點內存不夠,則逐層發送反壓信號給上游,上游慢慢停止數據發送,直到下游再次恢復。
當下游申請buffer失敗時,表示當前節點內存不夠,則逐層發送反壓信號給上游,上游慢慢停止數據發送,直到下游再次恢復。
所以,我的反壓機制類似于Java中的阻塞隊列,如下圖我的內存級的反壓工作原理示意。
Task任務通過與Local BufferPool和Network BufferPool協作進行內存申請和釋放,同時下游內存使用情況實時反饋給上游,實現動態反壓。
A君聽完我的回答,陷入了沉思~
4 我要減壓
其實我心里也很迷惑。我對自己的反壓機制很有信心,會不會是其他原因影響到了反壓處理?
這時,一旁的A君打開了我的WEB UI,口中喃喃的吐出幾個詞: 數據傾斜和并發。
4.1 第一次嘗試
我瞬間明白了過來,轉眼去看屏幕。
我分別查看了各個SubTask情況,發現在某個Checkpoint中對應的state size值存在個別異常,竟達到了10G左右大小!!
再看下分區內的其他值(如圖):
發生數據傾斜了~
我心里有了底,立馬和A君一起找出了這些特殊的Key,進行預聚合打散和數據拆分,再次運行。
感覺有那么一點效果,但是還是有蠻多的高峰值。。
4.2 第二次嘗試
此刻又陷入了僵局。
沒辦法,我加大了自身的一點內存。想了想,又加大了算子的并發度,畢竟增加線程數總歸會緩解一些計算壓力。
不甘心的調整了參數之后,結果依然沒有太多提升。
4.3 第三次嘗試
A君開始重新梳理我的整體計算流程,然后改動了一個參數。
我看了下,還是修改并發度。心中不以為然,我剛才可就試過了這個。。
好像有點不對勁。。
這就是我要的結果!!我不禁喊了出來。
他笑了笑,告訴我這是用到了我的算子鏈機制。
算子鏈
通過將下游算子和上游算子設置相同并發度,可自動形成算子鏈
這樣做的好處是:
有效減少線程間切換和數據緩存開銷
提高吞吐量且降低延遲
整個流程中形成多個算子鏈,導致線程開銷和內存使用率下降。我的反壓情況自然也變得緩和了。
我不禁大受震撼~~
5 一日看盡長安花
最終在A君的協助下,我的速度回來了。幾天高壓的日子徹底結束,此刻盡絲滑~
我緩緩吐出一口氣,有點欣慰的看著最后的結果:
不自覺地抬頭看了眼A君,他也露出了久違的微笑。
我是Flink,現在沒有壓力~
本文完。
》》》更多好文,請大家關注我的公眾號: 大數據兵工廠
Flink 大數據 實時流計算服務 CS
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。