亞寵展、全球?qū)櫸锂a(chǎn)業(yè)風(fēng)向標(biāo)——亞洲寵物展覽會深度解析
1091
2022-05-29
Flink的狀態(tài)與容錯是這個框架很核心的知識點(diǎn)。其中一致檢查點(diǎn)也就是Checkpoints也是Flink故障恢復(fù)機(jī)制的核心,這篇文章將詳細(xì)介紹Flink的狀態(tài)管理和Checkpoints的概念以及在生產(chǎn)環(huán)境中的參數(shù)設(shè)置。
什么是State狀態(tài)?
在使用Flink進(jìn)行窗口聚合統(tǒng)計,排序等操作的時候,數(shù)據(jù)流的處理離不開狀態(tài)管理
是一個Operator的運(yùn)行的狀態(tài)/歷史值,在內(nèi)存中進(jìn)行維護(hù)
流程:一個算子的子任務(wù)接收輸入流,獲取對應(yīng)的狀態(tài),計算新的結(jié)果,然后把結(jié)果更新到狀態(tài)里面
有狀態(tài)和無狀態(tài)介紹
無狀態(tài)計算: 同個數(shù)據(jù)進(jìn)到算子里面多少次,都是一樣的輸出,比如 filter
有狀態(tài)計算:需要考慮歷史狀態(tài),同個輸入會有不同的輸出,比如sum、reduce聚合操作
狀態(tài)管理分類
ManagedState(用的多)
Flink管理,自動存儲恢復(fù)
細(xì)分兩類
Keyed State 鍵控狀態(tài)(用的多)
有KeyBy才用這個,僅限用在KeyStream中,每個key都有state ,是基于KeyedStream上的狀態(tài)
一般是用richFlatFunction,或者其他richfunction里面,在open()聲明周期里面進(jìn)行初始化
ValueState、ListState、MapState等數(shù)據(jù)結(jié)構(gòu)
Operator State 算子狀態(tài)(用的少,部分source會用)
ListState、UnionListState、BroadcastState等數(shù)據(jù)結(jié)構(gòu)
RawState(用的少)
用戶自己管理和維護(hù)
存儲結(jié)構(gòu):二進(jìn)制數(shù)組
State數(shù)據(jù)結(jié)構(gòu)(狀態(tài)值可能存在內(nèi)存、磁盤、DB或者其他分布式存儲中)
ValueState 簡單的存儲一個值(ThreadLocal / String)
ValueState.value()
ValueState.update(T value)
ListState 列表
ListState.add(T value)
ListState.get() //得到一個Iterator
MapState 映射類型
MapState.get(key)
MapState.put(key, value)
State狀態(tài)后端:存儲在哪里
Flink 內(nèi)置了以下這些開箱即用的 state backends :
(新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
如果沒有其他配置,系統(tǒng)將使用 HashMapStateBackend。
(舊版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
如果不設(shè)置,默認(rèn)使用 MemoryStateBackend。
狀態(tài)詳解
HashMapStateBackend 保存數(shù)據(jù)在內(nèi)部作為Java堆的對象。
鍵/值狀態(tài)和窗口操作符持有哈希表,用于存儲值、觸發(fā)器等
非???,因為每個狀態(tài)訪問和更新都對 Java 堆上的對象進(jìn)行操作
但是狀態(tài)大小受集群內(nèi)可用內(nèi)存的限制
場景:
具有大狀態(tài)、長窗口、大鍵/值狀態(tài)的作業(yè)。
所有高可用性設(shè)置。
EmbeddedRocksDBStateBackend 在RocksDB數(shù)據(jù)庫中保存狀態(tài)數(shù)據(jù)
該數(shù)據(jù)庫(默認(rèn))存儲在 TaskManager 本地數(shù)據(jù)目錄中
與HashMapStateBackend在java存儲 對象不同,數(shù)據(jù)存儲為序列化的字節(jié)數(shù)組
RocksDB可以根據(jù)可用磁盤空間進(jìn)行擴(kuò)展,并且是唯一支持增量快照的狀態(tài)后端。
但是每個狀態(tài)訪問和更新都需要(反)序列化并可能從磁盤讀取,這導(dǎo)致平均性能比內(nèi)存狀態(tài)后端慢一個數(shù)量級
場景
具有非常大狀態(tài)、長窗口、大鍵/值狀態(tài)的作業(yè)。
所有高可用性設(shè)置
舊版
MemoryStateBackend(內(nèi)存,不推薦在生產(chǎn)場景使用) FsStateBackend(文件系統(tǒng)上,本地文件系統(tǒng)、HDFS, 性能更好,常用) RocksDBStateBackend (無需擔(dān)心 OOM 風(fēng)險,是大部分時候的選擇) 代碼配置: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); //或者 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
什么是Checkpoint檢查點(diǎn)
Flink中所有的Operator的當(dāng)前State的全局快照
默認(rèn)情況下 checkpoint 是禁用的
Checkpoint是把State數(shù)據(jù)定時持久化存儲,防止丟失
手工調(diào)用checkpoint,叫 savepoint,主要是用于flink集群維護(hù)升級等
底層使用了Chandy-Lamport 分布式快照算法,保證數(shù)據(jù)在分布式環(huán)境下的一致性
有狀態(tài)流應(yīng)用的一致檢查點(diǎn),其實(shí)就是所有任務(wù)的狀態(tài),在某個時間點(diǎn)的一份 拷貝(一份快照);這個時間點(diǎn),應(yīng)該是所有任務(wù)都恰好處理完一個相同的輸入數(shù)據(jù)的時候
Flink 捆綁的些檢查點(diǎn)存儲類型:
作業(yè)管理器檢查點(diǎn)存儲 JobManagerCheckpointStorage
文件系統(tǒng)檢查點(diǎn)存儲 FileSystemCheckpointStorage
端到端(end-to-end)狀態(tài)一致性
數(shù)據(jù)一致性保證都是由流處理器實(shí)現(xiàn)的,也就是說都是在Flink流處理器內(nèi)部保證的 在真實(shí)應(yīng)用中,了流處理器以外還包含了數(shù)據(jù)源(例如Kafka、Mysql)和輸出到持久化系統(tǒng)(Kafka、Mysql、Hbase、CK) 端到端的一致性保證,是意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的各個環(huán)節(jié),每一個組件都要保證自己的一致性。
Source
需要外部數(shù)據(jù)源可以重置讀取位置,當(dāng)發(fā)生故障的時候重置偏移量到故障之前的位置
內(nèi)部
依賴Checkpoints機(jī)制,在發(fā)生故障的時可以恢復(fù)各個環(huán)節(jié)的數(shù)據(jù)
Sink:
當(dāng)故障恢復(fù)時,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng),常見的就是 冪等和事務(wù)寫入(和checkpoint配合)
有關(guān)檢查點(diǎn)配置的常用參數(shù)配置介紹
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設(shè)置checkpoint的周期, 每隔1000 ms進(jìn)行啟動一個檢查點(diǎn) env.getCheckpointConfig().setCheckpointInterval(1000); // 設(shè)置狀態(tài)級別模式為exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //超時時間,可能是保存太耗費(fèi)時間或者是狀態(tài)后端的問題,任務(wù)同步執(zhí)行不能一直阻塞 env.getCheckpointConfig().setCheckpointTimeout(60000L); // 設(shè)置取消和故障時是否保留Checkpoint數(shù)據(jù),這個設(shè)置較為重要,沒有正確的選擇好可能會導(dǎo)致檢查點(diǎn)數(shù)據(jù)失效 //有兩個參數(shù)可以設(shè)置 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業(yè)時保留檢查點(diǎn)。必須在取消后手動清理檢查點(diǎn)狀態(tài)。 //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業(yè)時刪除檢查點(diǎn)。只有在作業(yè)失敗時,檢查點(diǎn)狀態(tài)才可用。 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
實(shí)戰(zhàn)部分:
為了模擬生產(chǎn)環(huán)境中實(shí)時產(chǎn)生的訂單數(shù)據(jù),這里我們自己定義一個數(shù)據(jù)源來源源不斷的產(chǎn)生模擬訂單數(shù)據(jù)
訂單類:
@Data @AllArgsConstructor @NoArgsConstructor public class VideoOrder { private String tradeNo; private String title; private int money; private int userId; private Date createTime; @Override public String toString() { return "VideoOrder{" + "tradeNo='" + tradeNo + '\'' + ", title='" + title + '\'' + ", money=" + money + ", userId=" + userId + ", createTime=" + createTime + '}'; } }
public class VideoOrderSourceV2 extends RichParallelSourceFunction
產(chǎn)生數(shù)據(jù)的格式如下:
主程序:使用reduce算子對數(shù)據(jù)進(jìn)訂單價格進(jìn)行滾動計算,并設(shè)置Checkpoint保證數(shù)據(jù)狀態(tài)可以存取
public class FlinkKeyByReduceApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(5000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //這是我本機(jī)的ip地址 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint")); DataStreamSource
在本地測試運(yùn)行結(jié)果,可以看到數(shù)據(jù)根據(jù)訂單分組不斷的進(jìn)行滾動計算
進(jìn)入服務(wù)器的HDFS查看檢查點(diǎn)數(shù)據(jù)是否存在
之后將應(yīng)用進(jìn)行打包,上傳到服務(wù)器進(jìn)行測試,可以使用Flink的Web頁面進(jìn)行手動提交jar包運(yùn)行,也可以使用命令進(jìn)行提交,之后可以看到程序運(yùn)行過程中的相關(guān)日志輸出
./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar
模擬宕機(jī)
運(yùn)行程序的時候我們可以在Flink看到任務(wù)進(jìn)行的id號,這個時候我們手動的cancel掉或者是直接把服務(wù)kill掉,這個時候任務(wù)被強(qiáng)制暫停。
進(jìn)入到HDFS可以看到我們設(shè)置的檢查點(diǎn)的數(shù)據(jù)依舊存在,我們使用如下命令,讓程序從上次宕機(jī)前的訂單計算狀態(tài)繼續(xù)往下計算。
-s : 指定檢查點(diǎn)的元數(shù)據(jù)的位置,這個位置記錄著宕機(jī)前程序的計算狀態(tài) ./bin/flink run -s /checkpoint/id號/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar
運(yùn)行命令,進(jìn)入WEB頁面進(jìn)行查看,是否成功。
可以看到出現(xiàn)一次close的時候,代表我們的程序以及停止,服務(wù)器已經(jīng)宕機(jī),這個時候訂單的計算結(jié)果如上圖的紅色方框。在我們運(yùn)行了上面那條命令后再次查看日志的數(shù)據(jù),從open開始可以看到這次就不是從訂單最初的狀態(tài)開始進(jìn)行的了,而是從上一次宕機(jī)前計算的結(jié)果,繼續(xù)往下計算,到這里Checkponit的實(shí)戰(zhàn)應(yīng)用測試就完成了。
Flink 自建電商
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。