Flink狀態(tài)管理Checkpoint實(shí)戰(zhàn)——模擬電商訂單計算過程中宕機(jī)的場景,探索宕機(jī)恢復(fù)時如何精準(zhǔn)繼續(xù)計算訂單

      網(wǎng)友投稿 1091 2022-05-29

      Flink的狀態(tài)與容錯是這個框架很核心的知識點(diǎn)。其中一致檢查點(diǎn)也就是Checkpoints也是Flink故障恢復(fù)機(jī)制的核心,這篇文章將詳細(xì)介紹Flink的狀態(tài)管理和Checkpoints的概念以及在生產(chǎn)環(huán)境中的參數(shù)設(shè)置。

      什么是State狀態(tài)?

      在使用Flink進(jìn)行窗口聚合統(tǒng)計,排序等操作的時候,數(shù)據(jù)流的處理離不開狀態(tài)管理

      是一個Operator的運(yùn)行的狀態(tài)/歷史值,在內(nèi)存中進(jìn)行維護(hù)

      流程:一個算子的子任務(wù)接收輸入流,獲取對應(yīng)的狀態(tài),計算新的結(jié)果,然后把結(jié)果更新到狀態(tài)里面

      有狀態(tài)和無狀態(tài)介紹

      無狀態(tài)計算: 同個數(shù)據(jù)進(jìn)到算子里面多少次,都是一樣的輸出,比如 filter

      有狀態(tài)計算:需要考慮歷史狀態(tài),同個輸入會有不同的輸出,比如sum、reduce聚合操作

      狀態(tài)管理分類

      ManagedState(用的多)

      Flink管理,自動存儲恢復(fù)

      細(xì)分兩類

      Keyed State 鍵控狀態(tài)(用的多)

      有KeyBy才用這個,僅限用在KeyStream中,每個key都有state ,是基于KeyedStream上的狀態(tài)

      一般是用richFlatFunction,或者其他richfunction里面,在open()聲明周期里面進(jìn)行初始化

      ValueState、ListState、MapState等數(shù)據(jù)結(jié)構(gòu)

      Operator State 算子狀態(tài)(用的少,部分source會用)

      ListState、UnionListState、BroadcastState等數(shù)據(jù)結(jié)構(gòu)

      RawState(用的少)

      用戶自己管理和維護(hù)

      存儲結(jié)構(gòu):二進(jìn)制數(shù)組

      State數(shù)據(jù)結(jié)構(gòu)(狀態(tài)值可能存在內(nèi)存、磁盤、DB或者其他分布式存儲中)

      ValueState 簡單的存儲一個值(ThreadLocal / String)

      ValueState.value()

      ValueState.update(T value)

      ListState 列表

      ListState.add(T value)

      ListState.get() //得到一個Iterator

      MapState 映射類型

      MapState.get(key)

      MapState.put(key, value)

      State狀態(tài)后端:存儲在哪里

      Flink 內(nèi)置了以下這些開箱即用的 state backends :

      (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend

      如果沒有其他配置,系統(tǒng)將使用 HashMapStateBackend。

      (舊版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend

      如果不設(shè)置,默認(rèn)使用 MemoryStateBackend。

      狀態(tài)詳解

      HashMapStateBackend 保存數(shù)據(jù)在內(nèi)部作為Java堆的對象。

      鍵/值狀態(tài)和窗口操作符持有哈希表,用于存儲值、觸發(fā)器等

      非???,因為每個狀態(tài)訪問和更新都對 Java 堆上的對象進(jìn)行操作

      但是狀態(tài)大小受集群內(nèi)可用內(nèi)存的限制

      場景:

      具有大狀態(tài)、長窗口、大鍵/值狀態(tài)的作業(yè)。

      所有高可用性設(shè)置。

      EmbeddedRocksDBStateBackend 在RocksDB數(shù)據(jù)庫中保存狀態(tài)數(shù)據(jù)

      該數(shù)據(jù)庫(默認(rèn))存儲在 TaskManager 本地數(shù)據(jù)目錄中

      與HashMapStateBackend在java存儲 對象不同,數(shù)據(jù)存儲為序列化的字節(jié)數(shù)組

      RocksDB可以根據(jù)可用磁盤空間進(jìn)行擴(kuò)展,并且是唯一支持增量快照的狀態(tài)后端。

      但是每個狀態(tài)訪問和更新都需要(反)序列化并可能從磁盤讀取,這導(dǎo)致平均性能比內(nèi)存狀態(tài)后端慢一個數(shù)量級

      場景

      具有非常大狀態(tài)、長窗口、大鍵/值狀態(tài)的作業(yè)。

      所有高可用性設(shè)置

      舊版

      MemoryStateBackend(內(nèi)存,不推薦在生產(chǎn)場景使用) FsStateBackend(文件系統(tǒng)上,本地文件系統(tǒng)、HDFS, 性能更好,常用) RocksDBStateBackend (無需擔(dān)心 OOM 風(fēng)險,是大部分時候的選擇) 代碼配置: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new EmbeddedRocksDBStateBackend()); env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir"); //或者 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

      什么是Checkpoint檢查點(diǎn)

      Flink中所有的Operator的當(dāng)前State的全局快照

      默認(rèn)情況下 checkpoint 是禁用的

      Checkpoint是把State數(shù)據(jù)定時持久化存儲,防止丟失

      手工調(diào)用checkpoint,叫 savepoint,主要是用于flink集群維護(hù)升級等

      底層使用了Chandy-Lamport 分布式快照算法,保證數(shù)據(jù)在分布式環(huán)境下的一致性

      有狀態(tài)流應(yīng)用的一致檢查點(diǎn),其實(shí)就是所有任務(wù)的狀態(tài),在某個時間點(diǎn)的一份 拷貝(一份快照);這個時間點(diǎn),應(yīng)該是所有任務(wù)都恰好處理完一個相同的輸入數(shù)據(jù)的時候

      Flink 捆綁的些檢查點(diǎn)存儲類型:

      作業(yè)管理器檢查點(diǎn)存儲 JobManagerCheckpointStorage

      文件系統(tǒng)檢查點(diǎn)存儲 FileSystemCheckpointStorage

      端到端(end-to-end)狀態(tài)一致性

      數(shù)據(jù)一致性保證都是由流處理器實(shí)現(xiàn)的,也就是說都是在Flink流處理器內(nèi)部保證的 在真實(shí)應(yīng)用中,了流處理器以外還包含了數(shù)據(jù)源(例如Kafka、Mysql)和輸出到持久化系統(tǒng)(Kafka、Mysql、Hbase、CK) 端到端的一致性保證,是意味著結(jié)果的正確性貫穿了整個流處理應(yīng)用的各個環(huán)節(jié),每一個組件都要保證自己的一致性。

      Source

      需要外部數(shù)據(jù)源可以重置讀取位置,當(dāng)發(fā)生故障的時候重置偏移量到故障之前的位置

      內(nèi)部

      依賴Checkpoints機(jī)制,在發(fā)生故障的時可以恢復(fù)各個環(huán)節(jié)的數(shù)據(jù)

      Sink:

      當(dāng)故障恢復(fù)時,數(shù)據(jù)不會重復(fù)寫入外部系統(tǒng),常見的就是 冪等和事務(wù)寫入(和checkpoint配合)

      Flink狀態(tài)管理與Checkpoint實(shí)戰(zhàn)——模擬電商訂單計算過程中宕機(jī)的場景,探索宕機(jī)恢復(fù)時如何精準(zhǔn)繼續(xù)計算訂單

      有關(guān)檢查點(diǎn)配置的常用參數(shù)配置介紹

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設(shè)置checkpoint的周期, 每隔1000 ms進(jìn)行啟動一個檢查點(diǎn) env.getCheckpointConfig().setCheckpointInterval(1000); // 設(shè)置狀態(tài)級別模式為exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //超時時間,可能是保存太耗費(fèi)時間或者是狀態(tài)后端的問題,任務(wù)同步執(zhí)行不能一直阻塞 env.getCheckpointConfig().setCheckpointTimeout(60000L); // 設(shè)置取消和故障時是否保留Checkpoint數(shù)據(jù),這個設(shè)置較為重要,沒有正確的選擇好可能會導(dǎo)致檢查點(diǎn)數(shù)據(jù)失效 //有兩個參數(shù)可以設(shè)置 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作業(yè)時保留檢查點(diǎn)。必須在取消后手動清理檢查點(diǎn)狀態(tài)。 //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作業(yè)時刪除檢查點(diǎn)。只有在作業(yè)失敗時,檢查點(diǎn)狀態(tài)才可用。 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

      實(shí)戰(zhàn)部分:

      為了模擬生產(chǎn)環(huán)境中實(shí)時產(chǎn)生的訂單數(shù)據(jù),這里我們自己定義一個數(shù)據(jù)源來源源不斷的產(chǎn)生模擬訂單數(shù)據(jù)

      訂單類:

      @Data @AllArgsConstructor @NoArgsConstructor public class VideoOrder { private String tradeNo; private String title; private int money; private int userId; private Date createTime; @Override public String toString() { return "VideoOrder{" + "tradeNo='" + tradeNo + '\'' + ", title='" + title + '\'' + ", money=" + money + ", userId=" + userId + ", createTime=" + createTime + '}'; } }

      public class VideoOrderSourceV2 extends RichParallelSourceFunction { private volatile Boolean flag = true; private Random random = new Random(); private static List list = new ArrayList<>(); static { list.add(new VideoOrder("","java",10,0,null)); list.add(new VideoOrder("","spring boot",15,0,null)); } /** * run 方法調(diào)用前 用于初始化連接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("-----open-----"); } /** * 用于清理之前 * @throws Exception */ @Override public void close() throws Exception { System.out.println("-----close-----"); } /** * 產(chǎn)生數(shù)據(jù)的邏輯 * @param ctx * @throws Exception */ @Override public void run(SourceContext ctx) throws Exception { while (flag){ Thread.sleep(1000); String id = UUID.randomUUID().toString().substring(30); int userId = random.nextInt(10); int videoNum = random.nextInt(list.size()); VideoOrder videoOrder = list.get(videoNum); videoOrder.setUserId(userId); videoOrder.setCreateTime(new Date()); videoOrder.setTradeNo(id); System.out.println("產(chǎn)生:"+videoOrder.getTitle()+",價格:"+videoOrder.getMoney()+", 時間:"+ TimeUtil.format(videoOrder.getCreateTime())); ctx.collect(videoOrder); } } /** * 控制任務(wù)取消 */ @Override public void cancel() { flag = false; } }

      產(chǎn)生數(shù)據(jù)的格式如下:

      主程序:使用reduce算子對數(shù)據(jù)進(jìn)訂單價格進(jìn)行滾動計算,并設(shè)置Checkpoint保證數(shù)據(jù)狀態(tài)可以存取

      public class FlinkKeyByReduceApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動的入口, 存儲全局相關(guān)的參數(shù) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(5000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //這是我本機(jī)的ip地址 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://192.168.192.100:8020/checkpoint")); DataStreamSource ds = env.addSource(new VideoOrderSourceV2()); KeyedStream videoOrderStringKeyedStream = ds.keyBy(new KeySelector() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction() { @Override public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception { VideoOrder videoOrder = new VideoOrder(); videoOrder.setTitle(value1.getTitle()); videoOrder.setMoney(value1.getMoney() + value2.getMoney()); return videoOrder; } }); reduce.print(); env.execute("job"); } }

      在本地測試運(yùn)行結(jié)果,可以看到數(shù)據(jù)根據(jù)訂單分組不斷的進(jìn)行滾動計算

      進(jìn)入服務(wù)器的HDFS查看檢查點(diǎn)數(shù)據(jù)是否存在

      之后將應(yīng)用進(jìn)行打包,上傳到服務(wù)器進(jìn)行測試,可以使用Flink的Web頁面進(jìn)行手動提交jar包運(yùn)行,也可以使用命令進(jìn)行提交,之后可以看到程序運(yùn)行過程中的相關(guān)日志輸出

      ./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar

      模擬宕機(jī)

      運(yùn)行程序的時候我們可以在Flink看到任務(wù)進(jìn)行的id號,這個時候我們手動的cancel掉或者是直接把服務(wù)kill掉,這個時候任務(wù)被強(qiáng)制暫停。

      進(jìn)入到HDFS可以看到我們設(shè)置的檢查點(diǎn)的數(shù)據(jù)依舊存在,我們使用如下命令,讓程序從上次宕機(jī)前的訂單計算狀態(tài)繼續(xù)往下計算。

      -s : 指定檢查點(diǎn)的元數(shù)據(jù)的位置,這個位置記錄著宕機(jī)前程序的計算狀態(tài) ./bin/flink run -s /checkpoint/id號/chk-23/_metadata -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /root/xdclass-flink.jar

      運(yùn)行命令,進(jìn)入WEB頁面進(jìn)行查看,是否成功。

      可以看到出現(xiàn)一次close的時候,代表我們的程序以及停止,服務(wù)器已經(jīng)宕機(jī),這個時候訂單的計算結(jié)果如上圖的紅色方框。在我們運(yùn)行了上面那條命令后再次查看日志的數(shù)據(jù),從open開始可以看到這次就不是從訂單最初的狀態(tài)開始進(jìn)行的了,而是從上一次宕機(jī)前計算的結(jié)果,繼續(xù)往下計算,到這里Checkponit的實(shí)戰(zhàn)應(yīng)用測試就完成了。

      Flink 自建電商

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:GaussDB Roach邏輯備份恢復(fù)
      下一篇:Android 開發(fā)小技巧(2)
      相關(guān)文章
      亚洲国产日韩在线成人蜜芽| 亚洲婷婷五月综合狠狠爱| 久久久久久a亚洲欧洲AV| 国产亚洲精久久久久久无码77777 国产亚洲精品成人AA片新蒲金 | 老司机亚洲精品影院无码| 亚洲国产精品无码专区影院| 国产亚洲成av片在线观看 | 久久亚洲中文字幕精品一区| 相泽亚洲一区中文字幕| 丁香五月亚洲综合深深爱| 久久久久亚洲精品男人的天堂| 亚洲偷自拍拍综合网| 亚洲综合国产一区二区三区| 亚洲综合伊人久久综合| 亚洲精品无码不卡在线播放HE| 亚洲Av综合色区无码专区桃色| 久久噜噜噜久久亚洲va久| 久久精品亚洲一区二区三区浴池| 亚洲视频在线观看视频| 精品日韩99亚洲的在线发布| 亚洲乱码在线观看| 亚洲av中文无码字幕色不卡| www.亚洲色图| 亚洲无线码一区二区三区| 久久精品国产亚洲AV网站| 亚洲经典在线观看| 久久久国产亚洲精品| 亚洲aⅴ无码专区在线观看春色 | 国产亚洲一区二区三区在线不卡| 亚洲毛片αv无线播放一区| 久久亚洲精品无码| 亚洲国产成人久久99精品| 亚洲中文字幕一二三四区| 色窝窝亚洲av网| 国产亚洲成人在线播放va| 久久精品亚洲综合一品| 亚洲欧洲综合在线| 亚洲精品无码av中文字幕| 亚洲国产成人久久精品99| 亚洲国产一成人久久精品| 亚洲色欲www综合网|