FusionInsight HD Flink組件基本原理和常見問題解析
Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個(gè)提供數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。
它的最大亮點(diǎn)是流處理,是業(yè)界最頂級的開源流處理引擎。
Flink最適合的應(yīng)用場景是低時(shí)延的數(shù)據(jù)處理(Data Processing)場景:高并發(fā)pipeline處理數(shù)據(jù),時(shí)延毫秒級,且兼具可靠性。
本文主要介紹了FusionInsight Flink組件的基本原理、Flink任務(wù)提交的常見問題、以及最佳實(shí)踐FAQ。
基本概念
基本原理
Flink是一個(gè)批處理和流處理結(jié)合的統(tǒng)一計(jì)算框架,其核心是一個(gè)提供了數(shù)據(jù)分發(fā)以及并行化計(jì)算的流數(shù)據(jù)處理引擎。它的最大亮點(diǎn)是流處理,是業(yè)界最頂級的開源流處理引擎。
Flink最適合的應(yīng)用場景是低時(shí)延的數(shù)據(jù)處理(Data Processing)場景:高并發(fā)pipeline處理數(shù)據(jù),時(shí)延毫秒級,且兼具可靠性。
Flink技術(shù)棧如圖所示:
Flink在當(dāng)前版本中重點(diǎn)構(gòu)建如下特性,其他特性繼承開源社區(qū),不做增強(qiáng),具體請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.4/
DataStream
Checkpoint
Stream SQL
窗口
Job Pipeline
配置表
Flink架構(gòu)如圖所示。
Flink整個(gè)系統(tǒng)包含三個(gè)部分:
Client
Flink Client主要給用戶提供向Flink系統(tǒng)提交用戶任務(wù)(流式作業(yè))的能力。
TaskManager
Flink系統(tǒng)的業(yè)務(wù)執(zhí)行節(jié)點(diǎn),執(zhí)行具體的用戶任務(wù)。TaskManager可以有多個(gè),各個(gè)TaskManager都平等。
JobManager
Flink系統(tǒng)的管理節(jié)點(diǎn),管理所有的TaskManager,并決策用戶任務(wù)在哪些Taskmanager執(zhí)行。JobManager在HA模式下可以有多個(gè),但只有一個(gè)主JobManager。
Flink系統(tǒng)提供的關(guān)鍵能力:
低時(shí)延
提供ms級時(shí)延的處理能力。
Exactly Once
提供異步快照機(jī)制,保證所有數(shù)據(jù)真正只處理一次。
HA
JobManager支持主備模式,保證無單點(diǎn)故障。
水平擴(kuò)展能力
TaskManager支持手動水平擴(kuò)展。
Stream & Transformation & Operator
用戶實(shí)現(xiàn)的Flink程序是由Stream和Transformation這兩個(gè)基本構(gòu)建塊組成。
Stream是一個(gè)中間結(jié)果數(shù)據(jù),而Transformation是一個(gè)操作,它對一個(gè)或多個(gè)輸入Stream進(jìn)行計(jì)算處理,輸出一個(gè)或多個(gè)結(jié)果Stream。
當(dāng)一個(gè)Flink程序被執(zhí)行的時(shí)候,它會被映射為Streaming Dataflow。一個(gè)Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似于一個(gè)DAG圖,在啟動的時(shí)候從一個(gè)或多個(gè)Source Operator開始,結(jié)束于一個(gè)或多個(gè)Sink Operator。
下圖為一個(gè)由Flink程序映射為Streaming Dataflow的示意圖。
上圖中“FlinkKafkaConsumer”是一個(gè)Source Operator,Map、KeyBy、TimeWindow、Apply是Transformation Operator,RollingSink是一個(gè)Sink Operator。
Pipeline Dataflow
在Flink中,程序是并行和分布式的方式運(yùn)行。一個(gè)Stream可以被分成多個(gè)Stream分區(qū)(Stream Partitions),一個(gè)Operator可以被分成多個(gè)Operator Subtask。
Flink內(nèi)部有一個(gè)優(yōu)化的功能,根據(jù)上下游算子的緊密程度來進(jìn)行優(yōu)化。
緊密度低的算子則不能進(jìn)行優(yōu)化,而是將每一個(gè)Operator Subtask放在不同的線程中獨(dú)立執(zhí)行。一個(gè)Operator的并行度,等于Operator Subtask的個(gè)數(shù),一個(gè)Stream的并行度(分區(qū)總數(shù))等于生成它的Operator的并行度。如下圖所示。
Operator
緊密度高的算子可以進(jìn)行優(yōu)化,優(yōu)化后可以將多個(gè)Operator Subtask串起來組成一個(gè)Operator Chain,實(shí)際上就是一個(gè)執(zhí)行鏈,每個(gè)執(zhí)行鏈會在TaskManager上一個(gè)獨(dú)立的線程中執(zhí)行,如下圖所示。
Operator chain
上圖中上半部分表示的是將Source和map兩個(gè)緊密度高的算子優(yōu)化后串成一個(gè)Operator Chain,實(shí)際上一個(gè)Operator Chain就是一個(gè)大的Operator的概念。圖中的Operator Chain表示一個(gè)Operator,keyBy表示一個(gè)Operator,Sink表示一個(gè)Operator,它們通過Stream連接,而每個(gè)Operator在運(yùn)行時(shí)對應(yīng)一個(gè)Task,也就是說圖中的上半部分有3個(gè)Operator對應(yīng)的是3個(gè)Task。
上圖中下半部分是上半部分的一個(gè)并行版本,對每一個(gè)Task都并行化為多個(gè)Subtask,這里只是演示了2個(gè)并行度,sink算子是1個(gè)并行度。
日志介紹
日志存儲路徑:
Executor運(yùn)行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}”
運(yùn)行中的任務(wù)日志存儲在以上路徑中,運(yùn)行結(jié)束后會基于Yarn的配置確定是否匯聚到HDFS目錄中。
其他日志:“/var/log/Bigdata/flink/flinkResource”
日志歸檔規(guī)則:
Executor日志默認(rèn)50MB滾動存儲一次,最多保留100個(gè)文件,不壓縮。
日志大小和壓縮文件保留個(gè)數(shù)可以在FusionInsight Manager界面中配置。
Flink日志列表
日志類型
日志文件名
描述
Flink服務(wù)日志
postinstall.log
服務(wù)安裝日志。
prestart.log
prestart腳本日志。
cleanup.log
安裝卸載實(shí)例時(shí)的清理日志。
Flink中提供了如下表所示的日志級別。日志級別優(yōu)先級從高到低分別是ERROR、WARN、INFO、DEBUG。程序會打印高于或等于所設(shè)置級別的日志,設(shè)置的日志等級越高,打印出來的日志就越少。
日志級別
級別
描述
ERROR
ERROR表示當(dāng)前時(shí)間處理存在錯(cuò)誤信息。
WARN
WARN表示當(dāng)前事件處理存在異常信息。
INFO
INFO表示記錄系統(tǒng)及各事件正常運(yùn)行狀態(tài)信息。
DEBUG
DEBUG表示記錄系統(tǒng)及系統(tǒng)的調(diào)試信息。
如果您需要修改日志級別,請執(zhí)行如下操作:
登錄FusionInsight Manager系統(tǒng)。
選擇“服務(wù)管理 > Flink > 服務(wù)配置”。
“參數(shù)類別”下拉框中選擇“全部”。
左邊菜單欄中選擇所需修改的角色所對應(yīng)的日志菜單。
選擇所需修改的日志級別。
單擊“保存配置”,在彈出窗口中單擊“確定”使配置生效。
配置完成后立即生效,不需要重啟服務(wù)。
日志類型
格式
示例
運(yùn)行日志
2017-06-27 21:30:31,778 | INFO | [flink-akka.actor.default-dispatcher-3] | TaskManager container_e10_1498290698388_0004_02_000007 has started. | org.apache.flink.yarn.YarnFlinkResourceManager (FlinkResourceManager.java:368)
常見故障
1. Flink對接kafka-寫入數(shù)據(jù)傾斜,部分分區(qū)沒有寫入數(shù)據(jù)
問題現(xiàn)象與背景
使用FlinkKafkaProducer進(jìn)行數(shù)據(jù)生產(chǎn),數(shù)據(jù)只寫到了kafka的部分分區(qū)中,其它的分區(qū)沒有數(shù)據(jù)寫入
原因分析
可能原因1:Flink寫kafka使用的機(jī)制與原生接口的寫入方式是有差別的,在默認(rèn)情況下,F(xiàn)link使用了”并行度編號+分區(qū)數(shù)量”取模計(jì)算的結(jié)果作為topic的分區(qū)編號。那么會有以下兩種場景:
并行度%分區(qū)數(shù)量=0,表示并行度是kafkatopic分區(qū)數(shù)的一倍或者多倍,數(shù)據(jù)的寫入每個(gè)分區(qū)數(shù)據(jù)量是均衡的。
并行度%分區(qū)數(shù)量≠0,那么數(shù)據(jù)量勢必會在個(gè)別分區(qū)上的數(shù)據(jù)量產(chǎn)生傾斜。
可能原因2:在業(yè)務(wù)代碼的部分算子中使用了keyby()方法,由于現(xiàn)網(wǎng)中的數(shù)據(jù)流中,每個(gè)key值所屬的數(shù)據(jù)量不一致(就是說某些key的數(shù)據(jù)量會非常大,有些又非常小)導(dǎo)致每個(gè)并行度中輸出的數(shù)據(jù)流量不一致。從而出現(xiàn)數(shù)據(jù)傾斜。
解決辦法
原因一:
方法1,調(diào)整kafka的分區(qū)數(shù)跟flink的并行度保持一致,即要求kafka的分區(qū)數(shù)與flink寫kafka的sink并行度保持強(qiáng)一致性。這種做法的優(yōu)勢在于每個(gè)并行度僅需要跟每個(gè)kafka分區(qū)所在的 broker保持一個(gè)常鏈接即可。能夠節(jié)省每個(gè)并發(fā)線程與分區(qū)之間調(diào)度的時(shí)間。
方法2,flink寫kafka的sink的分區(qū)策略寫成隨機(jī)寫入模式,如下圖,這樣數(shù)據(jù)會隨即寫入topic的分區(qū)中,但是會有一部分時(shí)間損耗在線程向?qū)ぶ罚扑]使用方法1。
原因二:
需要調(diào)整業(yè)務(wù)側(cè)對key值的選取,例如:可以將key調(diào)整為“key+隨機(jī)數(shù)”的方式,保證Flink的keyby()算子中每個(gè)處理并行度中的數(shù)據(jù)是均衡的。
2. Flink任務(wù)的日志目錄增長過快,導(dǎo)致磁盤寫滿
集群告警磁盤使用率超過閾值,經(jīng)過排查發(fā)現(xiàn)是taskmanager.out文件過大導(dǎo)致
原因分析
代碼中存在大量的print模塊,導(dǎo)致taskmanager.out文件被寫入大量的日志信息,taskmanager.out 一般是,業(yè)務(wù)代碼加入了 .print的代碼,需要在代碼中排查是否有類似于以下的代碼邏輯:
或者類似于這樣的打印:
如果包含,日志信息會持續(xù)打印到taskmanager.out里面。
解決方案
將上圖紅框中的代碼去掉,或者輸出到日志文件中。
3.?任務(wù)啟動失敗,報(bào)資源不足:Could not allocate all requires slots within timeout of xxxx ms
問題現(xiàn)象
任務(wù)啟動一段時(shí)間后報(bào)錯(cuò),例如如下日志,需要60個(gè)資源實(shí)際上只有54個(gè)。
原因分析
Flink任務(wù)在啟動過程中的資源使用是先增長在下降到當(dāng)前值的,實(shí)際在啟動過程中需要的資源量等于每個(gè)算子并行度之和。等到任務(wù)開始運(yùn)行后,F(xiàn)link會對資源進(jìn)行合并。
例如如下算子,在啟動過程中需要“1+6+6+5+3=21”個(gè)資源。
但是運(yùn)行穩(wěn)定后會降低到6。這個(gè)是Flink的機(jī)制。假如任務(wù)在啟動過程中不滿足21個(gè)資源的啟動資源量,任務(wù)就會出現(xiàn)NoResourceAvailableException的異常。
解決方案
減少任務(wù)的啟動并發(fā),或者將其它任務(wù)kill掉再啟動Flink任務(wù)。
4. 算子的部分節(jié)點(diǎn)產(chǎn)生背壓,其它節(jié)點(diǎn)正常
問題現(xiàn)象
業(yè)務(wù)運(yùn)行一段時(shí)間以后,算子的部分節(jié)點(diǎn)出現(xiàn)背壓。
原因分析
通過Flink原生頁面排查這個(gè)并發(fā)的算子所在的節(jié)點(diǎn),通過上圖我們能夠看出是異常算子的第44個(gè)并發(fā)。通過前臺頁面能夠查看并確認(rèn)第44并發(fā)所在的節(jié)點(diǎn),例如下圖:
通過查找這個(gè)節(jié)點(diǎn)在taskmanager列表,例如下圖位置:
整理taskmanager在每個(gè)nodemanager節(jié)點(diǎn)的數(shù)量發(fā)現(xiàn),背壓節(jié)點(diǎn)啟動的taskmanager數(shù)量過多。
經(jīng)過排查,該yarn集群資源相對比較緊張,每個(gè)節(jié)點(diǎn)啟動的taskmanager數(shù)量不一致,如果部分節(jié)點(diǎn)啟動的較多容易出現(xiàn)數(shù)據(jù)傾斜。
解決方案
建議一個(gè)節(jié)點(diǎn)啟動多個(gè)slot。避免多個(gè)taskmanager出現(xiàn)在一個(gè)nodemanager節(jié)點(diǎn)上。啟動方式見:slot優(yōu)化。
FAQ
Flink如何加載其它目錄的jar包
Flink業(yè)務(wù)一般在運(yùn)行過程中默認(rèn)加載的jar包路徑為:“xxx/Flink/flink/lib”的目錄下,如果添加其它路徑的jar包會報(bào)錯(cuò),如何添加其它外部依賴。
創(chuàng)建一個(gè)外部的lib目錄,將部分依賴包放到外部lib目錄下,如下圖:
修改啟動腳本的參數(shù)配置腳本,config.sh將jar包路徑傳給環(huán)境變量中。
此時(shí)正常啟動任務(wù)即可,不需要加其它參數(shù)。
HDFS上也能看到第三方j(luò)ar的目錄。
如何收集任務(wù)taskmanager的jstack和pstree信息
在任務(wù)運(yùn)行過程中我們通常需要對taskmanager的進(jìn)程進(jìn)行查詢和處理,例如:打jstack,jmap等操作,做這些操作的過程中需要獲取任務(wù)的taskmanager信息。
獲取一個(gè)nodemanager節(jié)點(diǎn)上面所有taskmanager的進(jìn)程信息的方法如下:
ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c"
其中紅框中的內(nèi)容就是taskmanager的進(jìn)程號,如果一個(gè)節(jié)點(diǎn)上面存在多個(gè)taskmanager那么這個(gè)地方會有多個(gè)進(jìn)程號。獲取到進(jìn)程號后我們可以針對這個(gè)進(jìn)程號收集jstack或者pstree信息。
收集jstack
通過上面流程獲取到進(jìn)程信息,然后從中獲取進(jìn)程ID和application id,如上圖中進(jìn)程id為“30047 applicationid為application_1623239745860_0001”。
從FI前臺界面獲取這個(gè)進(jìn)程的啟動用戶。如下圖為flinkuser。
3.在對應(yīng)的nodemanager節(jié)點(diǎn)后臺切換到這個(gè)用戶,人機(jī)用戶機(jī)機(jī)用戶即可。
4. 進(jìn)入到節(jié)點(diǎn)所在的jdk目錄下
5. 給taskmanager進(jìn)程打jstack。
不同用戶提交的taskmanager只能由提交任務(wù)的用戶打jstack。
收集pstree信息
使用pstree –p PID 的方式能夠獲取taskmanager的pstree信息,這個(gè)地方提供一個(gè)收集腳本。內(nèi)容如下:
#!/bin/bash searchPID() { local pids=`ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c" | grep -v taskmanagerSearch.sh | awk '{print $2}'`; time=$(date "+%Y-%m-%d %H:%M:%S") echo "checktime is --------------------- $time" >> /var/log/Bigdata/taskManagerTree.log for i in $pids do local treeNum=$(pstree -p $i | wc -l) echo "$i 's pstree num is $treeNum" >> /var/log/Bigdata/taskManagerTree.log done } searchPID
該腳本的功能為獲取節(jié)點(diǎn)上所有taskmanager pstree的數(shù)量,打印結(jié)果如下:
slot優(yōu)化
Slot可以認(rèn)為是taskmanager上面一塊獨(dú)立分配的資源,是taskmanager并行執(zhí)行的能力的體現(xiàn)。Taskmanager中有兩種使用slot的方法:
一個(gè)taskmanager中設(shè)置了一個(gè)slot。
一個(gè)taskmanager中設(shè)置了多個(gè)slot。
每個(gè)task slot 表示TaskManager 擁有資源的一個(gè)固定大小的子集。假如一個(gè)taskManager 有三個(gè)slot,那么它會將其管理的內(nèi)存分成三份給各個(gè)slot。資源slot化意味著一個(gè)subtask 將不需要跟來自其他job 的subtask 競爭被管理的內(nèi)存,取而代之的是它將擁有一定數(shù)量的內(nèi)存儲備。需要注意的是,這里不會涉及到CPU 的隔離,slot 目前僅用來隔離task 的受管理的內(nèi)存。通過調(diào)整task slot 的數(shù)量,允許用戶定義subtask 之間隔離的方式。如果一個(gè)TaskManager 一個(gè)slot,那將意味著每個(gè)task group運(yùn)行在獨(dú)立的JVM 中(該JVM可能是通過一個(gè)特定的容器啟動的),而一個(gè)TaskManager 多個(gè)slot 意味著更多的subtask 可以共享同一個(gè)JVM。而在同一個(gè)JVM 進(jìn)程中的task 將共享TCP 連接(基于多路復(fù)用)和心跳消息。它們也可能共享數(shù)據(jù)集和數(shù)據(jù)結(jié)構(gòu)。因此,對于資源密集型任務(wù)(尤其是對cpu使用較為密集的)不建議使用單個(gè)taskmanager中創(chuàng)建多個(gè)slot使用,否則容易導(dǎo)致taskmanager心跳超時(shí),出現(xiàn)任務(wù)失敗。如果需要設(shè)置單taskmanager多slot,參考如下操作。
方式一:在配置文件中配置taskmanager.numberOfTaskSlots,通過修改提交任務(wù)的客戶端配置文件中的配置flink-conf.yaml配置,如下圖:將該值設(shè)置為需要調(diào)整的數(shù)值即可。
方式二:啟動命令的過程中使用-ys命令傳入,例如以下命令:
./flink run -m yarn-cluster -p 1 -ys 3 ../examples/streaming/WindowJoin.jar
在啟動后在一個(gè)taskmanager中會啟動3個(gè)slot。
設(shè)置單taskmanager多slot需要優(yōu)化以下參數(shù)
參數(shù)名稱
默認(rèn)值
意義與調(diào)整建議
yarn.containers.vcores
1
每個(gè)taskmanager(也就是container)內(nèi)部需要啟動使用的vcore的數(shù)量
配置建議:與配置的slot數(shù)量相同一致
taskmanager.network.netty.server.numThreads
1(如果配置為-1,則默認(rèn)跟slot數(shù)量保持一致)
Taskmanager作為服務(wù)端,并行線程數(shù)通道數(shù)量
配置建議:與配置的slot數(shù)量相同一致
taskmanager.network.netty.client.numThreads
1(如果配置為-1,則默認(rèn)跟slot數(shù)量保持一致)
Taskmanager作為客戶端,并行線程數(shù)通道數(shù)量
配置建議:與配置的slot數(shù)量相同一致
taskmanager.network.netty.num-arenas
1(如果配置為-1,則默認(rèn)跟slot數(shù)量保持一致)
Netty域的數(shù)量
配置建議:與配置的slot數(shù)量相同一致
taskmanager.network.memory.max
1G
Netty使用的緩存數(shù)量的最大值,通常情況下隨著slot數(shù)量增加,netty線程數(shù)量增加,那么緩存數(shù)據(jù)的量會越來越多此時(shí)需要增加這個(gè)緩存的數(shù)量。
配置建議:2G以上,如果
增加taskmanager的啟動內(nèi)存,即-yjm
跟slot數(shù)量保持一致
一個(gè)taskmanager在啟動后,如果設(shè)置了多個(gè)slot,每個(gè)slot會均分啟動內(nèi)存
EI企業(yè)智能 Flink FusionInsight
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。
版權(quán)聲明:本文內(nèi)容由網(wǎng)絡(luò)用戶投稿,版權(quán)歸原作者所有,本站不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實(shí)的內(nèi)容,請聯(lián)系我們jiasou666@gmail.com 處理,核實(shí)后本網(wǎng)站將在24小時(shí)內(nèi)刪除侵權(quán)內(nèi)容。