FusionInsight HD Flink組件基本原理常見問題解析

      網(wǎng)友投稿 617 2025-03-31

      Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個(gè)提供數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。

      它的最大亮點(diǎn)是流處理,是業(yè)界最頂級的開源流處理引擎。

      Flink最適合的應(yīng)用場景是低時(shí)延的數(shù)據(jù)處理(Data Processing)場景:高并發(fā)pipeline處理數(shù)據(jù),時(shí)延毫秒級,且兼具可靠性。

      本文主要介紹了FusionInsight Flink組件的基本原理、Flink任務(wù)提交的常見問題、以及最佳實(shí)踐FAQ。

      基本概念

      基本原理

      Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個(gè)提供了數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。它的最大亮點(diǎn)是流處理,是業(yè)界最頂級的開源流處理引擎。

      Flink最適合的應(yīng)用場景是低時(shí)延的數(shù)據(jù)處理(Data Processing)場景:高并發(fā)pipeline處理數(shù)據(jù),時(shí)延毫秒級,且兼具可靠性。

      Flink技術(shù)棧如圖所示:

      Flink在當(dāng)前版本中重點(diǎn)構(gòu)建如下特性,其他特性繼承開源社區(qū),不做增強(qiáng),具體請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.4/

      DataStream

      Checkpoint

      Stream SQL

      窗口

      Job Pipeline

      配置表

      Flink架構(gòu)如圖所示。

      Flink整個(gè)系統(tǒng)包含三個(gè)部分:

      Client

      Flink Client主要給用戶提供向Flink系統(tǒng)提交用戶任務(wù)(流式作業(yè))的能力。

      TaskManager

      Flink系統(tǒng)的業(yè)務(wù)執(zhí)行節(jié)點(diǎn),執(zhí)行具體的用戶任務(wù)。TaskManager可以有多個(gè),各個(gè)TaskManager都平等。

      JobManager

      Flink系統(tǒng)的管理節(jié)點(diǎn),管理所有的TaskManager,并決策用戶任務(wù)在哪些Taskmanager執(zhí)行。JobManager在HA模式下可以有多個(gè),但只有一個(gè)主JobManager。

      Flink系統(tǒng)提供的關(guān)鍵能力:

      低時(shí)延

      提供ms級時(shí)延的處理能力。

      Exactly Once

      提供異步快照機(jī)制,保證所有數(shù)據(jù)真正只處理一次。

      HA

      JobManager支持主備模式,保證無單點(diǎn)故障。

      水平擴(kuò)展能力

      TaskManager支持手動水平擴(kuò)展。

      Stream & Transformation & Operator

      用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個(gè)基本構(gòu)建塊組成。

      Stream是一個(gè)中間結(jié)果數(shù)據(jù),而Transformation是一個(gè)操作,它對一個(gè)或多個(gè)輸入Stream進(jìn)行計(jì)算處理,輸出一個(gè)或多個(gè)結(jié)果Stream。

      當(dāng)一個(gè)Flink程序被執(zhí)行的時(shí)候,它會被映射為Streaming Dataflow。一個(gè)Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似于一個(gè)DAG圖,在啟動的時(shí)候從一個(gè)或多個(gè)Source Operator開始,結(jié)束于一個(gè)或多個(gè)Sink Operator。

      下圖為一個(gè)由Flink程序映射為Streaming Dataflow的示意圖。

      上圖中“FlinkKafkaConsumer”是一個(gè)Source Operator,Map、KeyBy、TimeWindow、Apply是Transformation Operator,RollingSink是一個(gè)Sink Operator。

      Pipeline Dataflow

      在Flink中,程序是并行和分布式的方式運(yùn)行。一個(gè)Stream可以被分成多個(gè)Stream分區(qū)(Stream Partitions),一個(gè)Operator可以被分成多個(gè)Operator Subtask。

      Flink內(nèi)部有一個(gè)優(yōu)化的功能,根據(jù)上下游算子的緊密程度來進(jìn)行優(yōu)化。

      緊密度低的算子則不能進(jìn)行優(yōu)化,而是將每一個(gè)Operator Subtask放在不同的線程中獨(dú)立執(zhí)行。一個(gè)Operator的并行度,等于Operator Subtask的個(gè)數(shù),一個(gè)Stream的并行度(分區(qū)總數(shù))等于生成它的Operator的并行度。如下圖所示。

      Operator

      緊密度高的算子可以進(jìn)行優(yōu)化,優(yōu)化后可以將多個(gè)Operator Subtask串起來組成一個(gè)Operator Chain,實(shí)際上就是一個(gè)執(zhí)行鏈,每個(gè)執(zhí)行鏈會在TaskManager上一個(gè)獨(dú)立的線程中執(zhí)行,如下圖所示。

      Operator chain

      上圖中上半部分表示的是將Source和map兩個(gè)緊密度高的算子優(yōu)化后串成一個(gè)Operator Chain,實(shí)際上一個(gè)Operator Chain就是一個(gè)大的Operator的概念。圖中的Operator Chain表示一個(gè)Operator,keyBy表示一個(gè)Operator,Sink表示一個(gè)Operator,它們通過Stream連接,而每個(gè)Operator在運(yùn)行時(shí)對應(yīng)一個(gè)Task,也就是說圖中的上半部分有3個(gè)Operator對應(yīng)的是3個(gè)Task。

      上圖中下半部分是上半部分的一個(gè)并行版本,對每一個(gè)Task都并行化為多個(gè)Subtask,這里只是演示了2個(gè)并行度,sink算子是1個(gè)并行度。

      日志介紹

      日志存儲路徑:

      Executor運(yùn)行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}”

      運(yùn)行中的任務(wù)日志存儲在以上路徑中,運(yùn)行結(jié)束后會基于Yarn的配置確定是否匯聚到HDFS目錄中。

      其他日志:“/var/log/Bigdata/flink/flinkResource”

      日志歸檔規(guī)則:

      Executor日志默認(rèn)50MB滾動存儲一次,最多保留100個(gè)文件,不壓縮。

      日志大小和壓縮文件保留個(gè)數(shù)可以在FusionInsight Manager界面中配置。

      Flink日志列表

      日志類型

      日志文件名

      描述

      Flink服務(wù)日志

      postinstall.log

      服務(wù)安裝日志。

      prestart.log

      prestart腳本日志。

      cleanup.log

      安裝卸載實(shí)例時(shí)的清理日志。

      Flink中提供了如下表所示的日志級別。日志級別優(yōu)先級從高到低分別是ERROR、WARN、INFO、DEBUG。程序會打印高于或等于所設(shè)置級別的日志,設(shè)置的日志等級越高,打印出來的日志就越少。

      日志級別

      級別

      描述

      ERROR

      ERROR表示當(dāng)前時(shí)間處理存在錯(cuò)誤信息。

      WARN

      WARN表示當(dāng)前事件處理存在異常信息。

      INFO

      INFO表示記錄系統(tǒng)及各事件正常運(yùn)行狀態(tài)信息。

      DEBUG

      DEBUG表示記錄系統(tǒng)及系統(tǒng)的調(diào)試信息。

      如果您需要修改日志級別,請執(zhí)行如下操作:

      登錄FusionInsight Manager系統(tǒng)。

      選擇“服務(wù)管理 > Flink > 服務(wù)配置”。

      “參數(shù)類別”下拉框中選擇“全部”。

      左邊菜單欄中選擇所需修改的角色所對應(yīng)的日志菜單。

      選擇所需修改的日志級別。

      單擊“保存配置”,在彈出窗口中單擊“確定”使配置生效。

      配置完成后立即生效,不需要重啟服務(wù)。

      日志類型

      格式

      示例

      運(yùn)行日志

      ||<產(chǎn)生該日志的線程名字>||<日志事件的發(fā)生位置>

      2017-06-27 21:30:31,778 | INFO | [flink-akka.actor.default-dispatcher-3] | TaskManager container_e10_1498290698388_0004_02_000007 has started. | org.apache.flink.yarn.YarnFlinkResourceManager (FlinkResourceManager.java:368)

      常見故障

      1. Flink對接kafka-寫入數(shù)據(jù)傾斜,部分分區(qū)沒有寫入數(shù)據(jù)

      問題現(xiàn)象與背景

      使用FlinkKafkaProducer進(jìn)行數(shù)據(jù)生產(chǎn),數(shù)據(jù)只寫到了kafka的部分分區(qū)中,其它的分區(qū)沒有數(shù)據(jù)寫入

      原因分析

      可能原因1:Flink寫kafka使用的機(jī)制與原生接口的寫入方式是有差別的,在默認(rèn)情況下,F(xiàn)link使用了”并行度編號+分區(qū)數(shù)量”取模計(jì)算的結(jié)果作為topic的分區(qū)編號。那么會有以下兩種場景:

      并行度%分區(qū)數(shù)量=0,表示并行度是kafkatopic分區(qū)數(shù)的一倍或者多倍,數(shù)據(jù)的寫入每個(gè)分區(qū)數(shù)據(jù)量是均衡的。

      并行度%分區(qū)數(shù)量≠0,那么數(shù)據(jù)量勢必會在個(gè)別分區(qū)上的數(shù)據(jù)量產(chǎn)生傾斜。

      可能原因2:在業(yè)務(wù)代碼的部分算子中使用了keyby()方法,由于現(xiàn)網(wǎng)中的數(shù)據(jù)流中,每個(gè)key值所屬的數(shù)據(jù)量不一致(就是說某些key的數(shù)據(jù)量會非常大,有些又非常小)導(dǎo)致每個(gè)并行度中輸出的數(shù)據(jù)流量不一致。從而出現(xiàn)數(shù)據(jù)傾斜。

      解決辦法

      原因一:

      方法1,調(diào)整kafka的分區(qū)數(shù)跟flink的并行度保持一致,即要求kafka的分區(qū)數(shù)與flink寫kafka的sink并行度保持強(qiáng)一致性。這種做法的優(yōu)勢在于每個(gè)并行度僅需要跟每個(gè)kafka分區(qū)所在的 broker保持一個(gè)常鏈接即可。能夠節(jié)省每個(gè)并發(fā)線程與分區(qū)之間調(diào)度的時(shí)間。

      方法2,flink寫kafka的sink的分區(qū)策略寫成隨機(jī)寫入模式,如下圖,這樣數(shù)據(jù)會隨即寫入topic的分區(qū)中,但是會有一部分時(shí)間損耗在線程向?qū)ぶ罚扑]使用方法1。

      原因二:

      需要調(diào)整業(yè)務(wù)側(cè)對key值的選取,例如:可以將key調(diào)整為“key+隨機(jī)數(shù)”的方式,保證Flink的keyby()算子中每個(gè)處理并行度中的數(shù)據(jù)是均衡的。

      2. Flink任務(wù)的日志目錄增長過快,導(dǎo)致磁盤寫滿

      集群告警磁盤使用率超過閾值,經(jīng)過排查發(fā)現(xiàn)是taskmanager.out文件過大導(dǎo)致

      原因分析

      代碼中存在大量的print模塊,導(dǎo)致taskmanager.out文件被寫入大量的日志信息,taskmanager.out 一般是,業(yè)務(wù)代碼加入了 .print的代碼,需要在代碼中排查是否有類似于以下的代碼邏輯:

      或者類似于這樣的打印:

      如果包含,日志信息會持續(xù)打印到taskmanager.out里面。

      解決方案

      將上圖紅框中的代碼去掉,或者輸出到日志文件中。

      3.?任務(wù)啟動失敗,報(bào)資源不足:Could not allocate all requires slots within timeout of xxxx ms

      問題現(xiàn)象

      任務(wù)啟動一段時(shí)間后報(bào)錯(cuò),例如如下日志,需要60個(gè)資源實(shí)際上只有54個(gè)。

      原因分析

      Flink任務(wù)在啟動過程中的資源使用是先增長在下降到當(dāng)前值的,實(shí)際在啟動過程中需要的資源量等于每個(gè)算子并行度之和。等到任務(wù)開始運(yùn)行后,F(xiàn)link會對資源進(jìn)行合并。

      例如如下算子,在啟動過程中需要“1+6+6+5+3=21”個(gè)資源。

      FusionInsight HD Flink組件基本原理和常見問題解析

      但是運(yùn)行穩(wěn)定后會降低到6。這個(gè)是Flink的機(jī)制。假如任務(wù)在啟動過程中不滿足21個(gè)資源的啟動資源量,任務(wù)就會出現(xiàn)NoResourceAvailableException的異常。

      解決方案

      減少任務(wù)的啟動并發(fā),或者將其它任務(wù)kill掉再啟動Flink任務(wù)。

      4. 算子的部分節(jié)點(diǎn)產(chǎn)生背壓,其它節(jié)點(diǎn)正常

      問題現(xiàn)象

      業(yè)務(wù)運(yùn)行一段時(shí)間以后,算子的部分節(jié)點(diǎn)出現(xiàn)背壓。

      原因分析

      通過Flink原生頁面排查這個(gè)并發(fā)的算子所在的節(jié)點(diǎn),通過上圖我們能夠看出是異常算子的第44個(gè)并發(fā)。通過前臺頁面能夠查看并確認(rèn)第44并發(fā)所在的節(jié)點(diǎn),例如下圖:

      通過查找這個(gè)節(jié)點(diǎn)在taskmanager列表,例如下圖位置:

      整理taskmanager在每個(gè)nodemanager節(jié)點(diǎn)的數(shù)量發(fā)現(xiàn),背壓節(jié)點(diǎn)啟動的taskmanager數(shù)量過多。

      經(jīng)過排查,該yarn集群資源相對比較緊張,每個(gè)節(jié)點(diǎn)啟動的taskmanager數(shù)量不一致,如果部分節(jié)點(diǎn)啟動的較多容易出現(xiàn)數(shù)據(jù)傾斜。

      解決方案

      建議一個(gè)節(jié)點(diǎn)啟動多個(gè)slot。避免多個(gè)taskmanager出現(xiàn)在一個(gè)nodemanager節(jié)點(diǎn)上。啟動方式見:slot優(yōu)化。

      FAQ

      Flink如何加載其它目錄的jar包

      Flink業(yè)務(wù)一般在運(yùn)行過程中默認(rèn)加載的jar包路徑為:“xxx/Flink/flink/lib”的目錄下,如果添加其它路徑的jar包會報(bào)錯(cuò),如何添加其它外部依賴。

      創(chuàng)建一個(gè)外部的lib目錄,將部分依賴包放到外部lib目錄下,如下圖:

      修改啟動腳本的參數(shù)配置腳本,config.sh將jar包路徑傳給環(huán)境變量中。

      此時(shí)正常啟動任務(wù)即可,不需要加其它參數(shù)。

      HDFS上也能看到第三方j(luò)ar的目錄。

      如何收集任務(wù)taskmanager的jstack和pstree信息

      在任務(wù)運(yùn)行過程中我們通常需要對taskmanager的進(jìn)程進(jìn)行查詢和處理,例如:打jstack,jmap等操作,做這些操作的過程中需要獲取任務(wù)的taskmanager信息。

      獲取一個(gè)nodemanager節(jié)點(diǎn)上面所有taskmanager的進(jìn)程信息的方法如下:

      ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c"

      其中紅框中的內(nèi)容就是taskmanager的進(jìn)程號,如果一個(gè)節(jié)點(diǎn)上面存在多個(gè)taskmanager那么這個(gè)地方會有多個(gè)進(jìn)程號。獲取到進(jìn)程號后我們可以針對這個(gè)進(jìn)程號收集jstack或者pstree信息。

      收集jstack

      通過上面流程獲取到進(jìn)程信息,然后從中獲取進(jìn)程ID和application id,如上圖中進(jìn)程id為“30047 applicationid為application_1623239745860_0001”。

      從FI前臺界面獲取這個(gè)進(jìn)程的啟動用戶。如下圖為flinkuser。

      3.在對應(yīng)的nodemanager節(jié)點(diǎn)后臺切換到這個(gè)用戶,人機(jī)用戶機(jī)機(jī)用戶即可。

      4. 進(jìn)入到節(jié)點(diǎn)所在的jdk目錄下

      5. 給taskmanager進(jìn)程打jstack。

      不同用戶提交的taskmanager只能由提交任務(wù)的用戶打jstack。

      收集pstree信息

      使用pstree –p PID 的方式能夠獲取taskmanager的pstree信息,這個(gè)地方提供一個(gè)收集腳本。內(nèi)容如下:

      #!/bin/bash searchPID() { local pids=`ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c" | grep -v taskmanagerSearch.sh | awk '{print $2}'`; time=$(date "+%Y-%m-%d %H:%M:%S") echo "checktime is --------------------- $time" >> /var/log/Bigdata/taskManagerTree.log for i in $pids do local treeNum=$(pstree -p $i | wc -l) echo "$i 's pstree num is $treeNum" >> /var/log/Bigdata/taskManagerTree.log done } searchPID

      該腳本的功能為獲取節(jié)點(diǎn)上所有taskmanager pstree的數(shù)量,打印結(jié)果如下:

      slot優(yōu)化

      Slot可以認(rèn)為是taskmanager上面一塊獨(dú)立分配的資源,是taskmanager并行執(zhí)行的能力的體現(xiàn)。Taskmanager中有兩種使用slot的方法:

      一個(gè)taskmanager中設(shè)置了一個(gè)slot。

      一個(gè)taskmanager中設(shè)置了多個(gè)slot。

      每個(gè)task slot 表示TaskManager 擁有資源的一個(gè)固定大小的子集。假如一個(gè)taskManager 有三個(gè)slot,那么它會將其管理的內(nèi)存分成三份給各個(gè)slot。資源slot化意味著一個(gè)subtask 將不需要跟來自其他job 的subtask 競爭被管理的內(nèi)存,取而代之的是它將擁有一定數(shù)量的內(nèi)存儲備。需要注意的是,這里不會涉及到CPU 的隔離,slot 目前僅用來隔離task 的受管理的內(nèi)存。通過調(diào)整task slot 的數(shù)量,允許用戶定義subtask 之間隔離的方式。如果一個(gè)TaskManager 一個(gè)slot,那將意味著每個(gè)task group運(yùn)行在獨(dú)立的JVM 中(該JVM可能是通過一個(gè)特定的容器啟動的),而一個(gè)TaskManager 多個(gè)slot 意味著更多的subtask 可以共享同一個(gè)JVM。而在同一個(gè)JVM 進(jìn)程中的task 將共享TCP 連接(基于多路復(fù)用)和心跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)。因此,對于資源密集型任務(wù)(尤其是對cpu使用較為密集的)不建議使用單個(gè)taskmanager中創(chuàng)建多個(gè)slot使用,否則容易導(dǎo)致taskmanager心跳超時(shí),出現(xiàn)任務(wù)失敗。如果需要設(shè)置單taskmanager多slot,參考如下操作。

      方式一:在配置文件中配置taskmanager.numberOfTaskSlots,通過修改提交任務(wù)的客戶端配置文件中的配置flink-conf.yaml配置,如下圖:將該值設(shè)置為需要調(diào)整的數(shù)值即可。

      方式二:啟動命令的過程中使用-ys命令傳入,例如以下命令:

      ./flink run -m yarn-cluster -p 1 -ys 3 ../examples/streaming/WindowJoin.jar

      在啟動后在一個(gè)taskmanager中會啟動3個(gè)slot。

      設(shè)置單taskmanager多slot需要優(yōu)化以下參數(shù)

      參數(shù)名稱

      默認(rèn)值

      意義與調(diào)整建議

      yarn.containers.vcores

      1

      每個(gè)taskmanager(也就是container)內(nèi)部需要啟動使用的vcore的數(shù)量

      配置建議:與配置的slot數(shù)量相同一致

      taskmanager.network.netty.server.numThreads

      1(如果配置為-1,則默認(rèn)跟slot數(shù)量保持一致)

      Taskmanager作為服務(wù)端,并行線程數(shù)通道數(shù)量

      配置建議:與配置的slot數(shù)量相同一致

      taskmanager.network.netty.client.numThreads

      1(如果配置為-1,則默認(rèn)跟slot數(shù)量保持一致)

      Taskmanager作為客戶端,并行線程數(shù)通道數(shù)量

      配置建議:與配置的slot數(shù)量相同一致

      taskmanager.network.netty.num-arenas

      1(如果配置為-1,則默認(rèn)跟slot數(shù)量保持一致)

      Netty域的數(shù)量

      配置建議:與配置的slot數(shù)量相同一致

      taskmanager.network.memory.max

      1G

      Netty使用的緩存數(shù)量的最大值,通常情況下隨著slot數(shù)量增加,netty線程數(shù)量增加,那么緩存數(shù)據(jù)的量會越來越多此時(shí)需要增加這個(gè)緩存的數(shù)量。

      配置建議:2G以上,如果

      增加taskmanager的啟動內(nèi)存,即-yjm

      跟slot數(shù)量保持一致

      一個(gè)taskmanager在啟動后,如果設(shè)置了多個(gè)slot,每個(gè)slot會均分啟動內(nèi)存

      EI企業(yè)智能 Flink FusionInsight

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。

      上一篇:如何改變文件KB的大小(怎么改變kb大小)
      下一篇:WPS表格文字粘貼為圖片的方法教程(wps表格如何粘貼為圖片)
      相關(guān)文章
      亚洲精品国产日韩| 亚洲国产精品久久久久网站| 亚洲视频在线观看网址| 日韩亚洲欧洲在线com91tv| 精品国产亚洲男女在线线电影| 夜色阁亚洲一区二区三区| 国产精品无码亚洲精品2021| 成人区精品一区二区不卡亚洲| 亚洲一级片在线观看| 亚洲综合无码一区二区三区| 亚洲一区综合在线播放| 夜夜亚洲天天久久| 亚洲色欲或者高潮影院| 亚洲精品影院久久久久久| 亚洲第一区视频在线观看| 亚洲成a人片在线观看中文!!! | 亚洲av无码乱码在线观看野外 | 久久久久久亚洲精品成人| 亚洲黄色在线播放| 亚洲精彩视频在线观看| 亚洲国产成人久久| 中文字幕在线观看亚洲日韩| 亚洲日本在线电影| 激情婷婷成人亚洲综合| 亚洲精品成人区在线观看| 国产精品xxxx国产喷水亚洲国产精品无码久久一区 | 色偷偷尼玛图亚洲综合| 国产精品自拍亚洲| 亚洲精品tv久久久久| 亚洲精品少妇30p| 亚洲短视频男人的影院| 亚洲视频在线一区二区三区| 亚洲国产精品专区| 中文字幕在线观看亚洲日韩| 日韩精品亚洲专区在线影视| 国产成人毛片亚洲精品| 亚洲AV无码乱码国产麻豆| 亚洲精品自在线拍| 中文字幕无码亚洲欧洲日韩| 成a人片亚洲日本久久| 国产精品V亚洲精品V日韩精品 |