Flink狀態(tài)與容錯介紹
時間、窗口、狀態(tài)、容錯可以稱為是flink的四大基石,本文將介紹下flink中的狀態(tài)和容錯。
1. 狀態(tài)
flink中的計算分為有狀態(tài)和無狀態(tài)兩種,有狀態(tài)算子需要使用內(nèi)置狀態(tài)和流入的事件來計算結(jié)果,本文基于有狀態(tài)計算來講述狀態(tài)機制。
1.1 狀態(tài)類型
1.1.1 按鍵值區(qū)分
根據(jù)key值分區(qū),flink可以分為keyed state 和 operator state兩種。
對于flink數(shù)據(jù)流的每個key分區(qū),都會有一個keystate與其對應(yīng),鍵值相同的記錄訪問到的是一樣的狀態(tài)。
對于非keyedstream 與其對應(yīng)的是opeartor state, operator state只與算子的并行實例相關(guān),每個算子實例只持有一部分狀態(tài)數(shù)據(jù),其對應(yīng)的state類型只有一種,為listState。
flink中的狀態(tài)可以分為managed state(托管狀態(tài))、和raw state 原生狀態(tài),managedState是flink自己管理的state,rawstate需要用戶自己管理。使用byte[]來讀寫狀態(tài)。
1.1.3 廣播狀態(tài)
即broadcaststate,廣播至下游所有任務(wù),在本地存儲,broadcaststate必須是mapstate類型。對于dataset api中,廣播在每個計算節(jié)點存儲一份,對于datastream 而言,在每個并行度存在一份,算子的每個任務(wù)的狀態(tài)都相同。同時flink watermark和CheckpointBarrier向下游傳遞時也是基于廣播機制。
1.2 狀態(tài)后端
狀態(tài)后端是一種可插拔的機制,按照存儲介質(zhì),狀態(tài)后端分為MemoryStateBackend、FsStateBackend、RocksDBStateBackend三種。狀態(tài)后端主要負責(zé)本地狀態(tài)管理和將狀態(tài)checkpoint到遠程存儲。checkpoint機制將在下節(jié)講解。
1. memoryStateBackend
這種形式狀態(tài)存儲在堆內(nèi)存中,狀態(tài)過大可能導(dǎo)致oom問題,checkpoint時快照到j(luò)obmanager內(nèi)存中。
2. FsStateBackend
狀態(tài)保存在taskmanager內(nèi)存中,與memoryStateBackend不同的是,checkpoint會把state快照到外部文件系統(tǒng)中,相對memoryStateBackend可用性更高。
上述兩種狀態(tài)都依賴于heapkeyedstatebackend,使用statetable存儲數(shù)據(jù),statetable有兩種實現(xiàn)
CopyOnWriteStateTable 支持copy-on-write實現(xiàn),可以支持異步快照,
另外一種是NestedMapsStateTable,其使用一個雙層的hashmap和單層的hashmap作為狀態(tài)存儲。
3. RocksDBStateBackend
rocksdb是一種類似于hbase的kv存儲本地數(shù)據(jù)庫,依賴于lsm實現(xiàn),可以將數(shù)據(jù)保存到本地磁盤上,讀寫狀態(tài)時會涉及到序列化反序列化操作,與內(nèi)存相比,性能會偏低些。但其可以保存比較大的狀態(tài),受限于磁盤大小,但其key value依賴于byte數(shù)組,大小受byte[]限制。在一些與外部系統(tǒng)交互的場景可以適當(dāng)?shù)氖褂胷ocksdb減少依賴外部系統(tǒng)。同時rocksdb后端支持增量checkpoint。
1.3 算子擴縮容
有狀態(tài)算子并行度調(diào)整時,同時伴隨著狀態(tài)的重分布。
1. 對于broadcaststate,并行度改變時,狀態(tài)會發(fā)到新的task上,以確保所有的task狀態(tài)相同。
2. 對于liststate,并行度改變時,將每個list取出,合并成一個,根據(jù)元素均勻的分配給新的task。
3. 對于keyedState,flink會將所有的鍵值分為不同的鍵值組,每個鍵值組包含部分鍵值,每個鍵值隸屬于唯一的鍵值組,flink以鍵值為單位將鍵值分配給不同的任務(wù)。`鍵值組分配算法如下:
/** ?*?Assigns?the?given?key?to?a?key-group?index. ?* ?*?@param?key?the?key?to?assign ?*?@param?maxParallelism?the?maximum?supported?parallelism,?aka?the?number?of?key-groups. ?*?@return?the?key-group?to?which?the?given?key?is?assigned ?*/ public?static?int?assignToKeyGroup(Object?key,?int?maxParallelism)?{ ???Preconditions.checkNotNull(key,?"Assigned?key?must?not?be?null!"); ???return?computeKeyGroupForKeyHash(key.hashCode(),?maxParallelism); } /** ?*?Assigns?the?given?key?to?a?key-group?index. ?* ?*?@param?keyHash?the?hash?of?the?key?to?assign ?*?@param?maxParallelism?the?maximum?supported?parallelism,?aka?the?number?of?key-groups. ?*?@return?the?key-group?to?which?the?given?key?is?assigned ?*/ public?static?int?computeKeyGroupForKeyHash(int?keyHash,?int?maxParallelism)?{ ???return?MathUtils.murmurHash(keyHash)?%?maxParallelism; }
該圖展現(xiàn)了keyedState的擴縮容流程
1.4 持久化策略
heapsnapshotStrategy策略對應(yīng)heapKeyedStateBackend,rocksdbStateBackend支持rocksfullSnapshotStratey和rocksIncementalSnapshotStrategy全量持久化策略和增量持久化策略。
1. 全量持久化
全量持久化策略每次講全量的state寫到狀態(tài)存儲中,在執(zhí)行持久化策略時,使用異步的方式,后臺啟動一個線程去做持久化工作,基于memory的狀態(tài)后端會使用CopyOnWriteStateTable來保證線程安全,基于rocksdb的則使用rocksdb的快照機制來確保線程安全。
2. 增量持久化
只有rocksdb后端支持增量持久化,rocksdb基于lsm-tree實現(xiàn),
基于lsm-tree的memtable、sstable、以及Compaction來實現(xiàn)增量更新。
1.5 基于max算子講解managedState
會有同學(xué)有疑問,我沒看到狀態(tài),狀態(tài)在哪里被使用,下面基于max算子講下managedstate的使用流程。
StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment();?????? ?????????env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args)); ????????env.fromElements(WORDS)?? ??????????.flatMap(new?FlatMapFunction
下面可以看下max的具體實現(xiàn)
public?SingleOutputStreamOperator
1.6 實現(xiàn)有狀態(tài)函數(shù)
1.用戶可以使用flink自帶的richfunction操作狀態(tài)。richfunction可以通過getRunctionContext獲取RuntimeContext對象,以此操作狀態(tài)。首先注冊一個狀態(tài)描述符,包含狀態(tài)名稱和類型,狀態(tài)名稱的作用域是整個算子,可以在算子中注冊多個狀態(tài)描述符來創(chuàng)建多個狀態(tài)對象。狀態(tài)引用通常在richfunction中的open方法中初始化。
2. 處理函數(shù) ProcessFunction,不過其也是繼承與richfunction體系。
3. 通過實現(xiàn)ListCheckpointed和實現(xiàn)CheckpointedFunction接口
public?interface?ListCheckpointed
snapshotState可以將算子狀態(tài)拆分成多個部分,restoreState可以對多個子狀態(tài)進行組裝,算子進行狀態(tài)恢復(fù)時,flink會將狀態(tài)的各個部分分發(fā)到相關(guān)并行實例上。
public?interface?CheckpointedFunction?{ ???/** ????*?This?method?is?called?when?a?snapshot?for?a?checkpoint?is?requested.?This?acts?as?a?hook?to?the?function?to ????*?ensure?that?all?state?is?exposed?by?means?previously?offered?through?{@link?FunctionInitializationContext}?when ????*?the?Function?was?initialized,?or?offered?now?by?{@link?FunctionSnapshotContext}?itself. ?????*/??生成檢查點時被調(diào)用 ???void?snapshotState(FunctionSnapshotContext?context)?throws?Exception; ???/** ????*?This?method?is?called?when?the?parallel?function?instance?is?created?during?distributed ????*?execution.?Functions?typically?set?up?their?state?storing?data?structures?in?this?method. ????*/??任務(wù)啟動或故障重啟時觸發(fā) ???void?initializeState(FunctionInitializationContext?context)?throws?Exception; }
checkpointedFunction 可使用列表狀態(tài)和鍵值分區(qū)狀態(tài)。
1.7 狀態(tài)過期
2. 容錯機制
流式系統(tǒng)通常被設(shè)計用來7 * 24 運行,這樣就要確保作業(yè)能夠從失敗中恢復(fù),失敗時,系統(tǒng)需要重啟任務(wù),并且恢復(fù)狀態(tài)。
2.1 容錯語義
1. at more once
至多一次
2. at least once
至少一次
3. exactly once
flink內(nèi)精確一次
4. end to end exactly once
端到端精確一次會關(guān)注三個問題,sources、shuffle、sinks間的exactly once。
How Dataflow guarantees that every source record is processed exactly once
How Dataflow guarantees that every record is shuffled exactly once
How Dataflow guarantees that every sink produces accurate output.
本文中的內(nèi)容將回答這三個問題。
2.2 組件容錯
對于flink這種分布式組件,每個組件失敗都會導(dǎo)致作業(yè)失敗,下文將基于jobmaster、taskmanager、resourcemanager三種角色來講解容錯,這三種組件通過akka進行網(wǎng)絡(luò)通信,保持心跳。故障檢測需要使用心跳的方式,故障恢復(fù)需要狀態(tài)快照和恢復(fù)。重要組件使用至少兩個的HA機制,一個leader和若干個standby。
2.2.1 jobmaster容錯
jobmaster在flink中負責(zé)應(yīng)用程序執(zhí)行調(diào)度和管理,將客戶端提交過來的jobGraph轉(zhuǎn)化為executionGraph,向resourcemanager申請資源,并將executionGraph的任務(wù)分發(fā)給taskmanager執(zhí)行,在執(zhí)行過程中,jm還會觸發(fā)checkpoint執(zhí)行,通常一個leader,一個standby。
jm失敗時,所有的task都會被自動取消,首先會從zk中取到j(luò)obgraph、jar、最近的checkpoint地址,向resourcemanager去申請資源重啟任務(wù),任務(wù)重啟后將從最近的一次checkpoint狀態(tài)恢復(fù)task。 如圖所示:
private?JobGraph?jobGraph; private?final?HighAvailabilityServices?highAvailabilityServices; //?心跳管理 ivate?final?HeartbeatManager
2.2.2 taskmanager容錯
taskmanager在flink中的工作進程,是flink處理資源單元,同時是checkpoint的執(zhí)行者,每個taskmanager有1到多個slot,tm啟動后,會向rm注冊它的處理槽,jm就可以像slot分配任務(wù)來執(zhí)行任務(wù),task在tm中執(zhí)行,tm負責(zé)task的啟動、執(zhí)行、取消,task異常時會向jm匯報,tm為task數(shù)據(jù)交換提供依托,如同一個tm內(nèi)不同線程間的通過緩沖區(qū)的數(shù)據(jù)交換,不同tm的線程間通過網(wǎng)絡(luò)傳輸機制進行數(shù)據(jù)交換并提供反壓機制。執(zhí)行期間如果一個taskmanager失敗了,jobmaster將向resourcemanager申請可用的slot,有足夠的slot后,這個taskmanager將會重啟。重啟策略分為三種,fixed-delay、failure-rate、no-restart。
/**?The?access?to?the?leader?election?and?retrieval?services.?*/ private?final?HighAvailabilityServices?haServices; /**?The?heartbeat?manager?for?job?manager?in?the?task?manager.?*/ private?final?HeartbeatManager
2.2.3 resourcemanager容錯
resourcemanager在flink中是一個可插拔組件,對于不用的環(huán)境有不同的實現(xiàn),如對用k8s 和 yarn 有對應(yīng)的KubernetesResourceManager和YarnResourceManager等。其負責(zé)資源的管理,同時和tm、jm保持心跳。rm中持有一個slotmanager組件,負責(zé)維護當(dāng)前有多少tm,各個slot的使用情況。 rm發(fā)生故障時,jm會通過leader選舉通知得到新的rm,并重新嘗試和新的rm建立連接。tm也會將自己的slot注冊到新的rm。
//?高可用管理 /**?High?availability?services?for?leader?retrieval?and?election.?*/ private?final?HighAvailabilityServices?highAvailabilityServices; //?心跳管理 /**?The?heartbeat?manager?with?task?managers.?*/ private?final?HeartbeatManager
2.3 checkpoint
flink的checkpoint機制是輕量級異步分布式快照,那它是如何做到它的名字中說的異步分布式的呢,又是如何做到at least once 和 exactly once的呢?
2.3.1 分布式快照
分布式是按算子將數(shù)據(jù)流切分,flink使用Chandy-Lamport分布式快照算法來生成檢查點,將生成檢查點的過程與處理過程分離。流計算計算的正確性語義需要依賴state + checkpoint + connector特性來支持。flink jm中使用CheckpointCoordinator周期性的以廣播的形式發(fā)送barrier到數(shù)據(jù)流中,每個barrier都有一個檢查點編號,用來從邏輯上切分數(shù)據(jù)流,barrier在流中的位置不會提前或延后,隨數(shù)據(jù)流鄉(xiāng)下流動,在數(shù)據(jù)源注入barrier后,觸發(fā)向狀態(tài)后端生成檢查點,barrier所在位置就是恢復(fù)數(shù)據(jù)時的起始位置。下游算子收齊barrier后,會執(zhí)行自己的算子state快照,并向下游廣播barrier,直至下游sink算子收集齊barrier后,對自己的state執(zhí)行快照,完成快照后,并通知CheckpointCoordinator快照地址,CheckpointCoordinator收集齊barriar確認信息后,確認本次快照完成。
2.3.2 at least once
對齊barrier時,早來的數(shù)據(jù)持續(xù)處理就是at least once。該數(shù)據(jù)依舊被包含在checkpoint備份的狀態(tài)之中,當(dāng)故障發(fā)生時,從備份的狀態(tài)恢復(fù)時,該數(shù)據(jù)依舊會被處理。
2.3.3 exactly once
對齊barrier時,早來的數(shù)據(jù)進行buffer就是flink引擎內(nèi)exectly once。就是對齊環(huán)節(jié),將數(shù)據(jù)收集緩存起來,對齊完成之后再處理。但只能保證flink引擎內(nèi)的exactly once 語義。
2.3.4 end to end exactly once
如果要做到end to end exactly once 語義,需要Source端和sink端的配合,
1. 數(shù)據(jù)源需要支持斷點讀取
2. sink端需要支持回滾機制或滿足冪等性。回滾--將部分寫入結(jié)果回滾到寫入之前的狀態(tài),冪等-- 多次寫入一致
源端和目的端支持的語義如下:
flink中采用的是2pc解決方案,即 two phase commit 兩階段提交,所謂的兩個階段是指:第一階段:準備階段(投票階段)和第二階段:提交階段(執(zhí)行階段)。我們將提議的節(jié)點稱為協(xié)調(diào)者(coordinator),其他參與決議節(jié)點稱為參與者(participants)。另還有3pc的方式。
sink算子收到上游所有的barrier后,執(zhí)行state快照,并預(yù)提交事務(wù),再通知CheckpointCoordinator,CheckpointCoordinator確認本次快照完成, sink算子提交事務(wù)。
flink提供了一個2pc的TwoPhaseCommitSinkFunction的抽象 類
public?abstract?class?TwoPhaseCommitSinkFunction
TwoPhaseCommitSinkFunction 繼承了checkpointfunction接口,可以在預(yù)提交階段,將檢查點寫到可靠性存儲。其又繼承了checkpointlistener接口,在提交階段接收jm的確認通知,觸發(fā)提交外部事務(wù)。
2.4 savepoint
savepoint和checkpoint都是一致性快照,checkpoint由flink自動創(chuàng)建,故障發(fā)生時會自動加載恢復(fù),flink根據(jù)配置自動刪除,而保存點需要用戶手動觸發(fā),不會被自動刪除,啟動時,從savepoint啟動即可。savepoint主要用于作業(yè)遷移和集群升級等場景中。
bin/flink/?savepoint?jobId?savepointpath? bin/flin/?run?-s?savepointpath?...
3。 后記
后續(xù)將更多的投向flink sql,so,上云就上華為云,流計算服務(wù)推薦選擇華為云EI 數(shù)據(jù)湖探索 DLI-FLINK serverless云服務(wù)。
參考:
https://ci.apache.org/projects/flink/flink-docs-master/
https://github.com/apache/flink/tree/blink
https://www.oreilly.com/library/view/stream-processing-with/9781491974285/
分布式 任務(wù)調(diào)度
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。