如何處理Flink的反壓問題
一、什么是反壓
反壓是流處理系統中用來保障應用可靠性的一個重要機制。由于流應用是7*24小時運行,數據輸入速率也不是一成不變,可能隨時間產生波峰波谷,當某個處理單元由于到來的數據忽然增加,暫時性超出其處理能力時,就會出現數據在接收隊列上累積,當數據的累積量超出處理單元的容量時,會出現數據丟失現象甚至因為系統資源耗盡而導致應用崩潰。為此,需要一種反壓機制來告知上游處理單元降低數據發送的速率,以緩解下游處理單元的壓力。
二、Flink中如何實現反壓機制
Flink由于是天然的流計算架構,算子之間的數據傳輸采取類似阻塞隊列的方式,當接收者隊列滿了后,發送者就會被阻塞,從而產生反壓。先來看下flink的網絡棧。
邏輯視圖:
上游數據輸出到ResultPartition,下游任務從InputGate收取數據。
物理視圖:
不同任務之間的每個(遠程)網絡連接將在 Flink 的網絡棧中獲得自己的 TCP 通道。但是,如果同一任務的不同子任務被安排到了同一個 TaskManager,則它們與同一個 TaskManager 的網絡連接將被多路復用,并共享一個 TCP 信道以減少資源占用。如上示例中是 A.1→B.3、A.1→B.4 以及 A.2→B.3 和 A.2→B.4就是復用同一個通道。
每個子任務的結果稱為結果分區,每個結果拆分到單獨的子結果分區(?ResultSubpartitions?)中——每個邏輯通道有一個。在堆棧的這一部分中,Flink 不再處理單個記錄,而是將一組序列化記錄組裝到網絡緩沖區中。每當子任務的發送緩沖池耗盡時——也就是緩存駐留在結果子分區的緩存隊列中或更底層的基于 Netty 的網絡棧中時——生產者就被阻塞了,無法繼續工作,并承受背壓。接收器也是類似:較底層網絡棧中傳入的 Netty 緩存需要通過網絡緩沖區提供給 Flink。如果相應子任務的緩沖池中沒有可用的網絡緩存,Flink 將在緩存可用前停止從該通道讀取。
在1.5之前,這種機制會導致單個子任務的背壓會影響同一個通道的所有子任務,因此1.5之后Flink引入了基于信用的流量控制。
基于信用的流量控制,是在Flink原有反壓機制上,在ResultPartition和InputGate中間又加了一層信用反饋。每一次 ResultPartition 向 InputChannel 發送消息的時候都會發送一個 backlog size 告訴下游準備發送多少消息,下游就會去計算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就會返還給上游一個 Credit 告知他可以發送消息。當Credit未0時,上游就知道下游無法在接收數據,就會停止發送,這樣就不會把底層鏈路堵住。
三、如何定位產生反壓的節點
一看反壓:
FlinkUI上可以直觀的看到每個算子的背壓狀態。背壓狀態顯示的是請求的block率。
二看監控:
通過反壓狀態可以大致鎖定反壓可能存在的算子,但具體反壓是由于自身處理速度慢還是由于下游處理慢導致的,需要通過metric監控進一步監控。
Flink 1.9版本以上可以通過outPoolUsage,inPoolUsage,floatingBuffersUsage,exclusiveBuffersUsage來確認。其中inPoolUsage是floatingBUffersUsage和exclusiveBuffersUsagee的總和。
三看檢查點
通過查看檢查點歷史情況,可以看到檢查點在哪個task耗時最長,以及每個subtask的耗時時間,時間長的一般有兩種可能,狀態較大或者barrier被阻塞。
反壓可能產生的原因包括:
1)資源不足:如果CPU/內存或者IO使用率較高,可通過增加并發、資源或優化代碼等方式調整。
2)GC頻繁:分析GC日志或Heap Dump,確認是否有內存泄露,可適當提高內存緩解。
3)數據傾斜:觀察subtask的數據處理是否分布均勻,可通過對熱點key進行二次分發或者使用local/global aggregation解決。
注:部分截圖來自https://flink.apache.org/2019/06/05/flink-network-stack.html
DLI Apache
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。