Apache Flink On Yarn模式高可用(HA)集群部署
本文介紹如何部署Apache Flink On YARN(也就是如何在YARN上運行Flink作業),采用HDP 2.6.3以及Apache Flink 1.7.1。
Yarn在Hadoop的生態系統中擔任了資源管理和任務調度的角色,可以更好對集群資源進行調度和控制。
此處不對HDP安裝做講述,需要安裝HDP的可以通過HDP官網安裝指南進行安裝。
官方文檔QuickStart中包含兩種Flink啟動方式:
啟動一個YARN session(Start a long-running Flink cluster on YARN)
本文介紹如何部署Apache Flink On YARN(也就是如何在YARN上運行Flink作業),采用HDP 2.6.3以及Apache Flink 1.7.1。
Yarn在Hadoop的生態系統中擔任了資源管理和任務調度的角色,可以更好對集群資源進行調度和控制。
此處不對HDP安裝做講述,需要安裝HDP的可以通過HDP官網安裝指南進行安裝。
官方文檔QuickStart中包含兩種Flink啟動方式:
啟動一個YARN session(Start a long-running Flink cluster on YARN)
直接在YARN上提交運行Flink作業(Run a Flink job on YARN)。
在講解運行方式之前,我們先來講解Flink基于HDP之上的On Yarn安裝。
安裝
從Apache Flink官網-(http://flink.apache.org/downloads.html)下載對應版本的安裝包并解壓
curl?-O?
tar?xvzf?flink-1.8-SNAPSHOT-bin-hadoop2.tgz
Flink On Yarn模式需要用戶配置與Hadoop集群,設置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。
將如下代碼添加到~/.bash_profile配置文件中
$?vi?~/.bash_profile
export?HADOOP_CONF_DIR="/etc/hadoop/conf"
export?HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn啟動前環境變量
oop-yarn-client/lib/*"
source .bash_profile文件引入環境變量并檢查變量是否設置正確
source?~/.bash_profile
echo?$HADOOP_CONFIG_DIR
echo?$HADOOP_CLASSPATH
配置
由于HDP是運行Hadoop任務以及訪問HDFS都是使用hdfs用戶,我們需要在yarn啟動前指定HADOOP_USER_NAME變量,flink才不會因為權限問題而無法啟動。
$?vi?/usr/local/flink-1.3.3/bin/yarn-session.sh
#!/usr/bin/env?bash
...
bin=`dirname?"$0"`
bin=`cd?"$bin";?pwd`
#?get?Flink?config
.?"$bin"/config.sh
if?[?"$FLINK_IDENT_STRING"?=?""?];?then
FLINK_IDENT_STRING="$USER"
fi
export?HADOOP_USER_NAME=hdfs
JVM_ARGS="$JVM_ARGS?-Xmx512m"
CC_CLASSPATH=`manglePathList?$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
log_setting="-Dlog.file="$log"?-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties?-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
export?FLINK_CONF_DIR
$JAVA_RUN?$JVM_ARGS?-classpath?"$CC_CLASSPATH"?$log_setting?org.apache.flink.yarn.cli.FlinkYarnSessionCli?-j?"$FLINK_LIB_DIR"/flink-dist*.jar?"$@"
注意:HADOOP_USER_NAME參數必須在JAVA_RUN之前配置,否則程序運行之后無法讀取到該環境變量
要啟動HA群集,需要在conf/flink-conf.yaml添加以下配置:
高可用性模式(必需):必須在conf/flink-conf.yaml中將高可用模式設置為zookeeper才能啟用高可用模式。 或者,此選項可以設置為Flink應該用于創建HighAvailabilityServices實例的工廠類的FQN。
high-availability:?zookeeper
ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服務器的復制組,它提供分布式協調服務。
high-availability.zookeeper.quorum:?address1:2181[,...],addressX:2181
每個addressX:port指的是一個ZooKeeper服務器,Flink可以在給定的地址和端口訪問它。
Zookeeper root(推薦):
ZooKeeper root節點,在該節點下放置所有集群節點。
high-availability.zookeeper.path.root:?/flink
Zookeeper Cluster-id(推薦):
cluster-id ZooKeeper節點,在該節點下放置集群的所有必需的協調數據。
high-availability.cluster-id:?/default_ns?#?important:?customize?per?cluster
存儲目錄(必需):JobManager元數據保存在文件系統storageDir中,只有一個指向該狀態的指針存儲在ZooKeeper中。
high-availability.storageDir:?hdfs:///flink/recovery
storageDir存儲JobManager故障恢復所需的所有元數據。
配置主服務器和ZooKeeper quorum后,您可以像往常一樣使用提供的集群啟動腳本。他們將啟動HA群集。請記住,調用腳本時必須運行ZooKeeper quorum,并確保為要啟動的每個HA群集配置單獨的ZooKeeper根路徑。
除HA配置外,還需要配置最大嘗試次數conf/flink-conf.yaml:
yarn.application-attempts:?10
這意味著在Yarn應用程序失敗之前,應用程序可以重新啟動9次(9次重試+ 1次初始嘗試)。
由于我們是基于HDP創建的Hadoop集群,已有現成的zookeeper集群,所以這里我們使用現有的zookeeper進行HA配置,配置如下:
high-availability:?zookeeper
high-availability.zookeeper.quorum:?flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181
high-availability.zookeeper.path.root:?/flink
high-availability.storageDir:?hdfs://ns1/flink/recovery
yarn.application-attempts:?10
配置Application最大的嘗試次數
The?maximum?number?of?application?master?execution?attempts.
當前YARN版本的默認值為2(表示允許單個JobManager失敗)。
hdp平臺需要去掉uber shaded hadoop的包,同時添加mapreduce的包到yarn應用classpath,否則會出現如下問題:
Exception?in?thread?"main"?java.lang.IllegalAccessError:?tried?to?access?method?org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;?from?class?org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider
rm?-f?/root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar
進入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加
/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*
修改后,需要重啟yarn相關組件,ambari界面會有指示如何重啟,一鍵搞定.
Flink默認包含兩種配置方式:log4j以及logback
不配置的情況下運行flink集群或者運行flink job會提示建議移除其中一種。
org.apache.flink.yarn.AbstractYarnClusterDescriptor???????????-?The?configuration?directory?('/root/flink-1.7.1/conf')?contains?both?LOG4J?and?Logback?configuration?files.?Please?delete?or?rename?one?of?them.
直接移除或者重命名都可行。
例如:
$?mv?logback.xml?logback.xml_bak
示例配置:
vi?/usr/local/flink-1.3.3/conf/log4j.properties
log4j.appender.file.append=true
log4j.appender.file.MaxFileSize=100M??#最大文件大小
log4j.appender.file.MaxBackupIndex=10??#?最大備份索引大小
啟動Flink
本節主要介紹Flink的兩種啟動方式。
啟動一個長期運行的flink集群通過yarn-session.sh執行部署。
$?./bin/yarn-session.sh
Usage:
Required
-n,--container?
Optional
-D?
-d,--detached???????????????????If?present,?runs?the?job?in?detached?mode
-h,--help???????????????????????Help?for?the?Yarn?session?CLI.
-id,--applicationId?
-j,--jar?
-jm,--jobManagerMemory?
-m,--jobmanager?
-n,--container?
-nl,--nodeLabel?
-nm,--name?
-q,--query??????????????????????Display?available?YARN?resources?(memory,?cores)
-qu,--queue?
-s,--slots?
-sae,--shutdownOnAttachedExit???If?the?job?is?submitted?in?attached?mode,?perform?a?best-effort?cluster?shutdown?when?the?CLI?is?terminated?abruptly,?e.g.,?in?response?to?a?user?interrupt,?such
as?typing?Ctrl?+?C.
-st,--streaming?????????????????Start?Flink?in?streaming?mode
-t,--ship?
-tm,--taskManagerMemory?
-yd,--yarndetached??????????????If?present,?runs?the?job?in?detached?mode?(deprecated;?use?non-YARN?specific?option?instead)
-z,--zookeeperNamespace?
主要參數講解:
1、-n 指定TaskManager數量
2、-jm 指定JobManager使用內存
3、-m 指定JobManager地址
4、-tm 指定TaskManager使用內存
5、-D 指定動態參數
6、-d 客戶端分離,指定后YarnSession部署到yarn之后,客戶端會自行關閉。
7、-j 指定執行jar包
bin/yarn-session.sh?-n?8?-s?5?-jm?2048?-tm?4096?-nm?pinpoint-flink-job
實例說明:
8個TaskManager
每個TaskManager5個slot
每個TaskManager內存4g,
指定application名稱為pinpoint-flink-job
注意:部署長期運行的flink on yarn實例后,在flink web上看到的TaskManager以及Slots都為0。只有在提交任務的時候,才會依據分配資源給對應的任務執行。
執行任務提交命令:
$?bin/flink?run?./examples/batch/WordCount.jar?--input?hdfs://xdata2/tmp/LICENSE-2.0.txt?--output?hdfs://xdata2/tmp/wordcount_result.txt
指定輸入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt
指定輸出文件:hdfs://xdata2/tmp/wordcount_result.txt
命令運行日志如下:
SLF4J:?Class?path?contains?multiple?SLF4J?bindings.
SLF4J:?Found?binding?in?[jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J:?Found?binding?in?[jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J:?Found?binding?in?[jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J:?See?http://www.slf4j.org/codes.html#multiple_bindings?for?an?explanation.
SLF4J:?Actual?binding?is?of?type?[org.slf4j.impl.Log4jLoggerFactory]
2019-01-24?16:05:26,059?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?Found?Yarn?properties?file?under?/tmp/.yarn-properties-root.
2019-01-24?16:05:26,059?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?Found?Yarn?properties?file?under?/tmp/.yarn-properties-root.
2019-01-24?16:05:26,358?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?YARN?properties?set?default?parallelism?to?40
2019-01-24?16:05:26,358?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?YARN?properties?set?default?parallelism?to?40
YARN?properties?set?default?parallelism?to?40
2019-01-24?16:05:26,618?INFO??org.apache.hadoop.yarn.client.AHSProxy????????????????????????-?Connecting?to?Application?History?server?at?vigor-dc-38/192.168.2.38:10200
2019-01-24?16:05:26,628?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?No?path?for?the?flink?jar?passed.?Using?the?location?of?class?org.apache.flink.yarn.YarnClusterDescriptor?to?locate?the?jar
2019-01-24?16:05:26,628?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?No?path?for?the?flink?jar?passed.?Using?the?location?of?class?org.apache.flink.yarn.YarnClusterDescriptor?to?locate?the?jar
2019-01-24?16:05:26,638?INFO??org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider??-?Looking?for?the?active?RM?in?[rm1,?rm2]...
2019-01-24?16:05:26,773?INFO??org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider??-?Found?active?RM?[rm1]
2019-01-24?16:05:26,779?INFO??org.apache.flink.yarn.AbstractYarnClusterDescriptor???????????-?Found?application?JobManager?host?name?'vigor-dc-41'?and?port?'39925'?from?supplied?application?id?'application_1548213441093_0011'
2019-01-24?16:05:27,186?WARN??org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory???????-?The?short-circuit?local?reads?feature?cannot?be?used?because?libhadoop?cannot?be?loaded.
Starting?execution?of?program
Program?execution?finished
Job?with?JobID?7ab3cf90748c8d05c7aa2e7cbce85730?has?finished.
Job?Runtime:?8979?ms
提交后可以在Flink web頁面上看到提交的任務信息及執行情況。
使用hadoop命令查詢執行結果信息
[root@vigor-dc-38?flink-1.7.1]#?hadoop?fs?-cat?/tmp/wordcount_result.txt
...
above?1
acceptance?1
accepting?3
act?1
acting?1
acts?1
add?2
addendum?1
additional?5
additions?1
advised?1
against?2
agree?1
agreed?3
agreement?1
all?3
...
若你想在Yarn上啟動Flink用于單獨任務執行,可以直接通過bin/flink run的方式來實現。
示例:
./bin/flink?run?-m?yarn-cluster?-yn?2?./examples/batch/WordCount.jar
Yarn會話的命令行選項也可以用于./bin/flink。使用y或yarn(對于長參數選項)作為前綴。
命令執行后,yarn會為任務單獨啟動一個flink on yarn實例,用于運行flink任務,在flink web界面上可以看到該任務。
查看后段執行結果:
Printing?result?to?stdout.?Use?--output?to?specify?output?path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
(arms,1)
(arrows,1)
...
總結
Flink on Yarn兩種部署方式可以根據自身的需求自行選擇。可選擇單獨一種,也可以兩種結合使用。
重要任務建議單獨運行一個實例,其他的任務可以使用長時間運行方式,將多個任務部署到上面,不用到時候資源也會得到釋放。
Standalone模式在后續的文章補上。
參考鏈接
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
關注公眾號
Apache Yarn
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。