Apache Flink On Yarn模式高可用(HA)集群部署

      網友投稿 1802 2025-04-02

      本文介紹如何部署Apache Flink On YARN(也就是如何在YARN上運行Flink作業),采用HDP 2.6.3以及Apache Flink 1.7.1。

      Yarn在Hadoop的生態系統中擔任了資源管理和任務調度的角色,可以更好對集群資源進行調度和控制。

      此處不對HDP安裝做講述,需要安裝HDP的可以通過HDP官網安裝指南進行安裝。

      官方文檔QuickStart中包含兩種Flink啟動方式:

      啟動一個YARN session(Start a long-running Flink cluster on YARN)

      本文介紹如何部署Apache Flink On YARN(也就是如何在YARN上運行Flink作業),采用HDP 2.6.3以及Apache Flink 1.7.1。

      Yarn在Hadoop的生態系統中擔任了資源管理和任務調度的角色,可以更好對集群資源進行調度和控制。

      Apache Flink On Yarn模式高可用(HA)集群部署

      此處不對HDP安裝做講述,需要安裝HDP的可以通過HDP官網安裝指南進行安裝。

      官方文檔QuickStart中包含兩種Flink啟動方式:

      啟動一個YARN session(Start a long-running Flink cluster on YARN)

      直接在YARN上提交運行Flink作業(Run a Flink job on YARN)。

      在講解運行方式之前,我們先來講解Flink基于HDP之上的On Yarn安裝。

      安裝

      從Apache Flink官網-(http://flink.apache.org/downloads.html)下載對應版本的安裝包并解壓

      curl?-O?

      tar?xvzf?flink-1.8-SNAPSHOT-bin-hadoop2.tgz

      Flink On Yarn模式需要用戶配置與Hadoop集群,設置HADOOP_CONF_DIR以及HADOOP_CLASSPATH。

      將如下代碼添加到~/.bash_profile配置文件中

      $?vi?~/.bash_profile

      export?HADOOP_CONF_DIR="/etc/hadoop/conf"

      export?HADOOP_CLASSPATH="/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/h配置yarn啟動前環境變量

      oop-yarn-client/lib/*"

      source .bash_profile文件引入環境變量并檢查變量是否設置正確

      source?~/.bash_profile

      echo?$HADOOP_CONFIG_DIR

      echo?$HADOOP_CLASSPATH

      配置

      由于HDP是運行Hadoop任務以及訪問HDFS都是使用hdfs用戶,我們需要在yarn啟動前指定HADOOP_USER_NAME變量,flink才不會因為權限問題而無法啟動。

      $?vi?/usr/local/flink-1.3.3/bin/yarn-session.sh

      #!/usr/bin/env?bash

      ...

      bin=`dirname?"$0"`

      bin=`cd?"$bin";?pwd`

      #?get?Flink?config

      .?"$bin"/config.sh

      if?[?"$FLINK_IDENT_STRING"?=?""?];?then

      FLINK_IDENT_STRING="$USER"

      fi

      export?HADOOP_USER_NAME=hdfs

      JVM_ARGS="$JVM_ARGS?-Xmx512m"

      CC_CLASSPATH=`manglePathList?$(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

      log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log

      log_setting="-Dlog.file="$log"?-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties?-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

      export?FLINK_CONF_DIR

      $JAVA_RUN?$JVM_ARGS?-classpath?"$CC_CLASSPATH"?$log_setting?org.apache.flink.yarn.cli.FlinkYarnSessionCli?-j?"$FLINK_LIB_DIR"/flink-dist*.jar?"$@"

      注意:HADOOP_USER_NAME參數必須在JAVA_RUN之前配置,否則程序運行之后無法讀取到該環境變量

      要啟動HA群集,需要在conf/flink-conf.yaml添加以下配置:

      高可用性模式(必需):必須在conf/flink-conf.yaml中將高可用模式設置為zookeeper才能啟用高可用模式。 或者,此選項可以設置為Flink應該用于創建HighAvailabilityServices實例的工廠類的FQN。

      high-availability:?zookeeper

      ZooKeeper quorum(必需):ZooKeeper quorum是ZooKeeper服務器的復制組,它提供分布式協調服務。

      high-availability.zookeeper.quorum:?address1:2181[,...],addressX:2181

      每個addressX:port指的是一個ZooKeeper服務器,Flink可以在給定的地址和端口訪問它。

      Zookeeper root(推薦):

      ZooKeeper root節點,在該節點下放置所有集群節點。

      high-availability.zookeeper.path.root:?/flink

      Zookeeper Cluster-id(推薦):

      cluster-id ZooKeeper節點,在該節點下放置集群的所有必需的協調數據。

      high-availability.cluster-id:?/default_ns?#?important:?customize?per?cluster

      存儲目錄(必需):JobManager元數據保存在文件系統storageDir中,只有一個指向該狀態的指針存儲在ZooKeeper中。

      high-availability.storageDir:?hdfs:///flink/recovery

      storageDir存儲JobManager故障恢復所需的所有元數據。

      配置主服務器和ZooKeeper quorum后,您可以像往常一樣使用提供的集群啟動腳本。他們將啟動HA群集。請記住,調用腳本時必須運行ZooKeeper quorum,并確保為要啟動的每個HA群集配置單獨的ZooKeeper根路徑。

      除HA配置外,還需要配置最大嘗試次數conf/flink-conf.yaml:

      yarn.application-attempts:?10

      這意味著在Yarn應用程序失敗之前,應用程序可以重新啟動9次(9次重試+ 1次初始嘗試)。

      由于我們是基于HDP創建的Hadoop集群,已有現成的zookeeper集群,所以這里我們使用現有的zookeeper進行HA配置,配置如下:

      high-availability:?zookeeper

      high-availability.zookeeper.quorum:?flink-dc-01:2181,flink-dc-02:2181,flink-dc-03:2181

      high-availability.zookeeper.path.root:?/flink

      high-availability.storageDir:?hdfs://ns1/flink/recovery

      yarn.application-attempts:?10

      配置Application最大的嘗試次數

      yarn.resourcemanager.am.max-attempts

      4

      The?maximum?number?of?application?master?execution?attempts.

      當前YARN版本的默認值為2(表示允許單個JobManager失敗)。

      hdp平臺需要去掉uber shaded hadoop的包,同時添加mapreduce的包到yarn應用classpath,否則會出現如下問題:

      Exception?in?thread?"main"?java.lang.IllegalAccessError:?tried?to?access?method?org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider.getProxyInternal()Ljava/lang/Object;?from?class?org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider

      rm?-f?/root/flink-1.7.1/lib/flink-shaded-hadoop2-uber-1.7.1.jar

      進入ambari界面,service->yarn->config->advanced->Advanced yarn-site->yarn.application.classpath添加

      /usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*

      修改后,需要重啟yarn相關組件,ambari界面會有指示如何重啟,一鍵搞定.

      Flink默認包含兩種配置方式:log4j以及logback

      不配置的情況下運行flink集群或者運行flink job會提示建議移除其中一種。

      org.apache.flink.yarn.AbstractYarnClusterDescriptor???????????-?The?configuration?directory?('/root/flink-1.7.1/conf')?contains?both?LOG4J?and?Logback?configuration?files.?Please?delete?or?rename?one?of?them.

      直接移除或者重命名都可行。

      例如:

      $?mv?logback.xml?logback.xml_bak

      示例配置:

      vi?/usr/local/flink-1.3.3/conf/log4j.properties

      log4j.appender.file.append=true

      log4j.appender.file.MaxFileSize=100M??#最大文件大小

      log4j.appender.file.MaxBackupIndex=10??#?最大備份索引大小

      啟動Flink

      本節主要介紹Flink的兩種啟動方式。

      啟動一個長期運行的flink集群通過yarn-session.sh執行部署。

      $?./bin/yarn-session.sh

      Usage:

      Required

      -n,--container????Number?of?YARN?container?to?allocate?(=Number?of?Task?Managers)

      Optional

      -D??????????????use?value?for?given?property

      -d,--detached???????????????????If?present,?runs?the?job?in?detached?mode

      -h,--help???????????????????????Help?for?the?Yarn?session?CLI.

      -id,--applicationId????????Attach?to?running?YARN?session

      -j,--jar???????????????????Path?to?Flink?jar?file

      -jm,--jobManagerMemory?????Memory?for?JobManager?Container?with?optional?unit?(default:?MB)

      -m,--jobmanager????????????Address?of?the?JobManager?(master)?to?which?to?connect.?Use?this?flag?to?connect?to?a?different?JobManager?than?the?one?specified?in?the?configuration.

      -n,--container?????????????Number?of?YARN?container?to?allocate?(=Number?of?Task?Managers)

      -nl,--nodeLabel????????????Specify?YARN?node?label?for?the?YARN?application

      -nm,--name?????????????????Set?a?custom?name?for?the?application?on?YARN

      -q,--query??????????????????????Display?available?YARN?resources?(memory,?cores)

      -qu,--queue????????????????Specify?YARN?queue.

      -s,--slots?????????????????Number?of?slots?per?TaskManager

      -sae,--shutdownOnAttachedExit???If?the?job?is?submitted?in?attached?mode,?perform?a?best-effort?cluster?shutdown?when?the?CLI?is?terminated?abruptly,?e.g.,?in?response?to?a?user?interrupt,?such

      as?typing?Ctrl?+?C.

      -st,--streaming?????????????????Start?Flink?in?streaming?mode

      -t,--ship??????????????????Ship?files?in?the?specified?directory?(t?for?transfer)

      -tm,--taskManagerMemory????Memory?per?TaskManager?Container?with?optional?unit?(default:?MB)

      -yd,--yarndetached??????????????If?present,?runs?the?job?in?detached?mode?(deprecated;?use?non-YARN?specific?option?instead)

      -z,--zookeeperNamespace????Namespace?to?create?the?Zookeeper?sub-paths?for?high?availability?mode

      主要參數講解:

      1、-n 指定TaskManager數量

      2、-jm 指定JobManager使用內存

      3、-m 指定JobManager地址

      4、-tm 指定TaskManager使用內存

      5、-D 指定動態參數

      6、-d 客戶端分離,指定后YarnSession部署到yarn之后,客戶端會自行關閉。

      7、-j 指定執行jar包

      bin/yarn-session.sh?-n?8?-s?5?-jm?2048?-tm?4096?-nm?pinpoint-flink-job

      實例說明:

      8個TaskManager

      每個TaskManager5個slot

      每個TaskManager內存4g,

      指定application名稱為pinpoint-flink-job

      注意:部署長期運行的flink on yarn實例后,在flink web上看到的TaskManager以及Slots都為0。只有在提交任務的時候,才會依據分配資源給對應的任務執行。

      執行任務提交命令:

      $?bin/flink?run?./examples/batch/WordCount.jar?--input?hdfs://xdata2/tmp/LICENSE-2.0.txt?--output?hdfs://xdata2/tmp/wordcount_result.txt

      指定輸入文件:hdfs://xdata2/tmp/LICENSE-2.0.txt

      指定輸出文件:hdfs://xdata2/tmp/wordcount_result.txt

      命令運行日志如下:

      SLF4J:?Class?path?contains?multiple?SLF4J?bindings.

      SLF4J:?Found?binding?in?[jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

      SLF4J:?Found?binding?in?[jar:file:/root/flink-1.7.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]

      SLF4J:?Found?binding?in?[jar:file:/usr/hdp/2.6.2.0-205/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]

      SLF4J:?See?http://www.slf4j.org/codes.html#multiple_bindings?for?an?explanation.

      SLF4J:?Actual?binding?is?of?type?[org.slf4j.impl.Log4jLoggerFactory]

      2019-01-24?16:05:26,059?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?Found?Yarn?properties?file?under?/tmp/.yarn-properties-root.

      2019-01-24?16:05:26,059?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?Found?Yarn?properties?file?under?/tmp/.yarn-properties-root.

      2019-01-24?16:05:26,358?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?YARN?properties?set?default?parallelism?to?40

      2019-01-24?16:05:26,358?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?YARN?properties?set?default?parallelism?to?40

      YARN?properties?set?default?parallelism?to?40

      2019-01-24?16:05:26,618?INFO??org.apache.hadoop.yarn.client.AHSProxy????????????????????????-?Connecting?to?Application?History?server?at?vigor-dc-38/192.168.2.38:10200

      2019-01-24?16:05:26,628?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?No?path?for?the?flink?jar?passed.?Using?the?location?of?class?org.apache.flink.yarn.YarnClusterDescriptor?to?locate?the?jar

      2019-01-24?16:05:26,628?INFO??org.apache.flink.yarn.cli.FlinkYarnSessionCli?????????????????-?No?path?for?the?flink?jar?passed.?Using?the?location?of?class?org.apache.flink.yarn.YarnClusterDescriptor?to?locate?the?jar

      2019-01-24?16:05:26,638?INFO??org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider??-?Looking?for?the?active?RM?in?[rm1,?rm2]...

      2019-01-24?16:05:26,773?INFO??org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider??-?Found?active?RM?[rm1]

      2019-01-24?16:05:26,779?INFO??org.apache.flink.yarn.AbstractYarnClusterDescriptor???????????-?Found?application?JobManager?host?name?'vigor-dc-41'?and?port?'39925'?from?supplied?application?id?'application_1548213441093_0011'

      2019-01-24?16:05:27,186?WARN??org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory???????-?The?short-circuit?local?reads?feature?cannot?be?used?because?libhadoop?cannot?be?loaded.

      Starting?execution?of?program

      Program?execution?finished

      Job?with?JobID?7ab3cf90748c8d05c7aa2e7cbce85730?has?finished.

      Job?Runtime:?8979?ms

      提交后可以在Flink web頁面上看到提交的任務信息及執行情況。

      使用hadoop命令查詢執行結果信息

      [root@vigor-dc-38?flink-1.7.1]#?hadoop?fs?-cat?/tmp/wordcount_result.txt

      ...

      above?1

      acceptance?1

      accepting?3

      act?1

      acting?1

      acts?1

      add?2

      addendum?1

      additional?5

      additions?1

      advised?1

      against?2

      agree?1

      agreed?3

      agreement?1

      all?3

      ...

      若你想在Yarn上啟動Flink用于單獨任務執行,可以直接通過bin/flink run的方式來實現。

      示例:

      ./bin/flink?run?-m?yarn-cluster?-yn?2?./examples/batch/WordCount.jar

      Yarn會話的命令行選項也可以用于./bin/flink。使用y或yarn(對于長參數選項)作為前綴。

      命令執行后,yarn會為任務單獨啟動一個flink on yarn實例,用于運行flink任務,在flink web界面上可以看到該任務。

      查看后段執行結果:

      Printing?result?to?stdout.?Use?--output?to?specify?output?path.

      (a,5)

      (action,1)

      (after,1)

      (against,1)

      (all,2)

      (and,12)

      (arms,1)

      (arrows,1)

      ...

      總結

      Flink on Yarn兩種部署方式可以根據自身的需求自行選擇。可選擇單獨一種,也可以兩種結合使用。

      重要任務建議單獨運行一個實例,其他的任務可以使用長時間運行方式,將多個任務部署到上面,不用到時候資源也會得到釋放。

      Standalone模式在后續的文章補上。

      參考鏈接

      https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/hadoop.html

      https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/deployment/yarn_setup.html

      https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html

      關注公眾號

      Apache Yarn

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:Excel中進行表格設置每頁都能打印表格標題行的操作方法(excel表格中每頁打印標題行怎么弄)
      下一篇:Excel2007基礎教程:選擇不連續的區域
      相關文章
      久久精品国产精品亚洲艾| 成人亚洲网站www在线观看| 亚洲AV中文无码乱人伦| 亚洲国产系列一区二区三区 | 亚洲色欲色欲综合网站| 亚洲免费在线观看| 国产精品亚洲av色欲三区| 国产成人亚洲综合一区| 亚洲高清有码中文字| 亚洲一区在线视频| 亚洲国产精品成人久久久 | 色窝窝亚洲av网| 精品国产日韩亚洲一区91| 亚洲AV无码一区二区三区性色| 亚洲七久久之综合七久久| 99亚偷拍自图区亚洲| 99亚偷拍自图区亚洲| 亚洲精品无码专区在线| 亚洲国产欧洲综合997久久| 亚洲GV天堂无码男同在线观看| 亚洲女子高潮不断爆白浆| 亚洲av乱码一区二区三区按摩| 青草久久精品亚洲综合专区| 亚洲成人一区二区| 久久亚洲精品无码播放| 久久国产成人亚洲精品影院| 亚洲情综合五月天| 久久精品国产精品亚洲蜜月| 亚洲精品视频在线观看免费| 亚洲成人黄色在线观看| 亚洲自国产拍揄拍| 亚洲AV无码国产精品永久一区| 亚洲av成人无码网站…| 亚洲精品无码av天堂| 亚洲开心婷婷中文字幕| 亚洲国产精品不卡在线电影| 亚洲第一成年人网站| 亚洲中文字幕乱码熟女在线| 亚洲avav天堂av在线网毛片| 亚洲精品一级无码中文字幕| 亚洲狠狠婷婷综合久久久久|