一網打盡Flink算子大全(面試問的都在這里)

      網友投稿 973 2022-05-30

      大家好,我是老兵。

      Flink基于流編程模型,內置了很多強大功能的算子,可以幫助我們快速開發應用程序。

      作為Flink開發老手,大多算子的寫法和場景想來已是了然于胸,但是使用過程常常會有一些小小的問題:

      部分算子長時間未用,忘了用法。。

      某些場景選擇什么算子?如何選擇?含糊不清。。

      工欲善其事,必先利其器!快速高效的使用合適的算子開發程序,往往可以達到事半功倍的效果。

      想著好記性不如爛筆頭這個道理,特此整理一份常見的Flink算子開發手冊!!也作為自己的工作筆記。歡迎大家~

      1 DataStream API

      Flink DataStream API讓用戶靈活且高效編寫Flink流式程序。主要分為DataSource模塊、Transformation模塊以及DataSink模塊。

      Flink編程模型流程示意

      Source模塊定義數據接入功能,包括內置數據源和外部數據源。

      Transformation模塊定義DataStream數據流各種轉換操作。

      Sink模塊定義數據輸出功能,存儲結果到外部存儲介質中。

      大家好,我是老兵。

      Flink基于流編程模型,內置了很多強大功能的算子,可以幫助我們快速開發應用程序。

      作為Flink開發老手,大多算子的寫法和場景想來已是了然于胸,但是使用過程常常會有一些小小的問題:

      部分算子長時間未用,忘了用法。。

      某些場景選擇什么算子?如何選擇?含糊不清。。

      工欲善其事,必先利其器!快速高效的使用合適的算子開發程序,往往可以達到事半功倍的效果。

      想著好記性不如爛筆頭這個道理,特此整理一份常見的Flink算子開發手冊!!也作為自己的工作筆記。歡迎大家~

      1 DataStream API

      Flink DataStream API讓用戶靈活且高效編寫Flink流式程序。主要分為DataSource模塊、Transformation模塊以及DataSink模塊。

      Source模塊定義數據接入功能,包括內置數據源和外部數據源。

      Transformation模塊定義DataStream數據流各種轉換操作。

      Sink模塊定義數據輸出功能,存儲結果到外部存儲介質中。

      執行環境: StreamExecutionEnvironment

      系統模塊 :

      DataSouce、Transformation和DataSink

      2 DataSource 輸入

      DataSource輸入模塊定義了DataStream API中的數據輸入操作,Flink輸入數據源分為內置數據源和第三方數據源兩種類型。

      內置數據源包括文件、Socket網絡端口以及集合類型數據,不需要引入其他依賴庫,在Flink系統內部已經實現。

      第三方數據源定義了Flink和外部系統數據交互邏輯,例如Apache Kafka Connector、Elastic Search Connector等。

      同時用戶可以自定義數據源。

      2.1 readTextFile、readFile算子

      支持讀取文本文件到Flink系統,轉換成DataStream數據集。

      readTextFile算子直接讀取系統文本文件(.log|.txt ...)

      readFile算子可以指定InputFormat讀取特定數據類型的文件(包括CSV、JSON或者自定義InputFormat)

      // 讀取文本文件 val textInputStream = env.readTextFile( "/data/example.log") // 指定InputFormat,讀取CSV文件 val csvInputStream = env.readFile( // 可以自定義類型(InputFormat) new CsvInputFormat[String] ( new Path("/data/example.csv") ) { override def fillRecord(out: String, onbjects: Array[AnyRef]: String) = { return null } }, "/data/example.csv" )

      2.2 Socket算子

      支持從Socket端口讀取數據,轉換成DataStream算子。

      算子參數:Ip地址、端口、delimiter字符串切割符、最大重試次數maxRetry

      maxRetry主要提供任務失敗重連機制。當設定為0時,Flink任務直接停止。

      Unix環境下,執行nc -lk [:port] 啟動網絡服務

      // Flink程序讀取Socket端口(9999)數據 val socketDataStream = env.socketTextStream("localhost", 9999)

      2.3 集合算子

      支持操作Flink內置集合類(Collection),轉換成DataStream。

      支持Java、Scala算子常見集合類

      本質是將本地集合數據分發到遠程執行;適用于本地測試,注意數據結構類型的一致性

      // fromElements元素集合轉換 val elementDataStream = env.fromElements( Tuple2('aa', 1L),Tuple2('bb', 2L) ) // fromCollection數組轉換(Java) String[] collections = new String[] { "aa", "bb" }; DataStream collectionDatastream = env.fromCollection( Arrays.asList(collections) ); // List列表轉換(Java) List arrays = new ArrayList<>(); arrays.add("aa") arrays.add("bb") DataStream arrayDataStream = env.fromCollection(arrays)

      2.4 外部數據源算子

      支持從第三方數據源系統讀取數據,轉換成DataStreams算子。

      常見外部數據源算子: Hadoop FileSystem、ElasticSearch、 Apache Kafka、RabbitMQ等

      使用時需要在Maven環境中添加jar包依賴(pom)

      // Maven配置 org.apache.flink flink-connector-kafka-1.2_2.12 1.9.1 // 讀取Kafka數據源(Java) Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "localhost:9092"); ... DataStream kafkaStream = env.addSource( new FlinkKafkaStream010<> ( "topic-1", new SimpleStringSchema(), properties ) )

      2.5 自定義數據源算子

      支持實現內置的Function相關接口,自定義數據源。

      具體的內置方法包含但不限于:

      SourceFunction接口

      ParallelSourceFunction接口

      RichParallelSourceFunction類

      后續再通過env的addSource()方法添加,具體實現不展開。

      3 DataStream轉換

      Flink對若干個DataStream操作生成新的DataStream,該過程被稱為Transformation。

      Flink程序中大多數邏輯均在Transformation過程中完成,包含轉換、過濾、排序、連接、關聯、選擇和聚合等操作。

      注意和Spark中transformation的區別。

      Flink中DataStream轉換可以分為幾種類型:

      Single DataStream: 單個DataStream數據集元素處理邏輯

      Multi DataStream: 多個DataStream數據集元素處理邏輯

      物理分區:數據集并行度和數據分區處理

      3.1 Map算子(#Single)

      對數據集中每個元素進行轉換操作,生成新DataStream。

      底層為MapFunction算子。通過調用map函數,對每個元素執行操作。

      常用于數據清洗、計算和轉換等。

      val inputStream = env.fromElements( ("aa", 1), ("bb", 2), ("cc", 3) ) // 第一種寫法: map操作,完成每個元素 + 1 val mapStream1 = inputStream.map( t => (t._1, t.2 + 1) ) // 第二種寫法: 指定MapFunction val mapStream2 = inputStream.map( new MapFunction[(String, Int), (String, Int)] { override def map(t: (String, Int)): (String, Int) = { (t._1, t._2 + 1)} } )

      一網打盡Flink算子大全(面試問的都在這里)

      3.2 FlatMap算子(#Single)

      支持對數據集中所有元素轉換成多個元素,生成新DataStream。

      val flatDataStream = env.fromCollections() val resultStream = flatDataStream.flatMap{ line => line.split(",") }

      3.3 filter算子(#Single)

      支持對數據集進行過濾篩選,生成新的DataStream

      // 通配符寫法 val filterDataStream = dataStream.fliter { _ % 2 == 0 } // 指定運算符表達式 val filterDS = dataStream.filter( x => x % 2 == 0 )

      3.4 keyBy算子(#Single)

      根據指定Key對DataStream數據集分區,生成新的KeyedStream

      相同Key值的數據歸并到同一分區

      類似于Spark中的groupByKey

      val inputStream = env.fromElements( ("aa", 11), ("aa", 22), ("bb", 33) ) // 根據第一個字段作為key分區 // 轉換為KeyedStream[(String, String), Tuple] val keyedStream: inputStream.keyBy(0)

      3.5 reduce算子(#Single)

      支持對輸入KeyedSteam根據reduce()聚合,生成新的DataStream

      根據key分區聚合形成KeyedStream

      支持運算符和自定義reduceFunc函數

      val inputStream = env.fromElements( ("aa", 11), ("bb", 33), ("cc", 22), ("aa", 21) ) // 指定第一個字段分區key val keyedStream = inputStream.keyBy(0) // 對第二個字段進行累加求和 val reduceDataStream = keyedStream.reduce { (t1, t2) => (t1._1, t1._2 + t2._2) }

      自定義Reduce函數,需要實現匿名類。

      val reduceDataStream = keyedStream.reeduce( new ReduceFunction[(String, Int)] { override def reduce(t1: (String,Int), t2: (String, Int)): (String, Int) = { (t1._1, t1._2 + t2._2) } } )

      3.6 aggregations算子(#Single)

      DataStream基礎聚合算子,通過輸入KeyedStream進行聚合生成新的DataStream

      根據指定字段聚合,可自定義聚合邏輯

      底層封裝了sum、min、max等函數

      val inputStream = env.fromElements( (1, 7), (2, 8), (3, 11), (2, 3) ) // 指定第一個字段分區key val keyedStream: [(Int, Int), Tuple] = inputStream.keyBy(0) // 第二個字段sum統計 val sumStream = keyedStream.sum(1) // 最后輸出結果 sumStream.print()

      3.7 Connect合并算子(#Multi)

      合并多種類型數據集,并保留原數據集的數據類型,生成ConnectedStream

      共享狀態數據,可互相獲取數據集狀態

      某些場景下可替代join算子,變相實現flink雙流join功能

      // 創建不同數據類型數據集 val stream1 = env.fromElements( ("aa", 3), ("bb", 4), ("cc", 11), ("dd", 22) ) val stream2 = env.fromElements( (1, 2, 11, 8) ) // 連接數據集 // 返回[(String, Int), Int] // 類似: [("aa", 3),1] val connectedStream = stream1.connect(stream2)

      3.8 Connect算子—CoMap(#Multi)

      ConnectedStream數據流的Map功能算子,操作合并數據集所有元素

      定義CoMapFunction對象,參數為輸入數據類型、輸出數據類型和mapFunc

      子map函數多線程交替執行,生成最終的合并目標數據集

      // 上文Connected操作后形成的數據流 // 參數: 第1個為stream1類型;第2個為stream2類型;第3個為stream3類型 val resultStream = connnectedStream.map( new CoMapFunction[(String, Int), Int, (Int, String)] { // 定義第一個數據集處理邏輯,輸入值為stream1 override def map1(in1: (String, Int)): (Int, String) = { (in1._2, in1._1) } // 定義第二個數據集處理邏輯,輸入值為stream2 override def map2(in2: Int): (Int,String)={ (in2, "default") } )

      3.9 Connect算子—CoFlatMap(#Multi)

      ConnectedStream數據流的flatmap功能算子

      在flatmap()方法中指定CoFlatMapFunction,并分別實現flatmap1()和flatmap2()函數。

      val resultStream2 = connectedStream.flatMap( new CoFlatMapFunction[(String, Int), Int, (String, Int, Int)] { // 舉例: 函數中共享變量,完成兩個數據集合并 var value = 0 // 定義第1個數據集處理函數 override def flatMap1(in1: (String, Int), collect: Collector[(String, Int, Int)]): Unit = { collect.collect((in1._1, in1._2, value)) } } // 定義第2個數據集處理函數 override def flatMap2(in2: Int, collect: Collector[(String, Int, Int)]): Unit = { value = in2 } )

      3.10 Union算子(#Multi)

      將兩個或者多個數據集合并,生成與輸入數據集類型一致的DataStream

      輸入數據集的數據類型要求一致

      輸出數據集的數據類型和輸入數據一致

      注意和connect算子的區別

      val stream1 = env.fromElements( ("aa", 3), ("bb", 22), ("cc", 45) ) val stream2 = env.fromElements( ("dd", 23), ("ff", 21), ("gg", 89) ) val stream3 = .... // 合并數據集 val unionStream = stream1.union(stream2) val unionStream2 = stream1.union( stream2, stream3 )

      3.11 Split算子(#Multi)

      將DataStream數據集按照條件拆分,轉換成兩個數據集的DataStream算子

      將接入的數據路由到多個輸出數據集,在split函數中定義拆分邏輯

      可以被看作是union的逆向實現

      val stream1 = env.fromElements( ("aa", 3), ("bb", 33), ("cc", 56),("aa", 23), ("cc", 67) ) // 根據第二個字段的奇偶性標記數據(切分) val splitStream = stream1.split( v => if (v._2 % 2 == 1 Seq("even") else Seq("odd")) )

      3.12 Select算子(#Multi)

      Select篩選算子,通過條件選擇數據集中元素,生成新的DataStream

      // 篩選偶數數據 val evenStream = splitedStream.select("even") //篩選所有數據 val allStream = splitedStream.select("even", "odd")

      3.13 window窗口算子(時間機制)

      Flink的窗口算子是實時計算的核心算子,常用于某固定時間內指標統計

      1)窗口API

      Flink提供了高級窗口API算子,封裝底層窗口操作,包括窗口類型、觸發器、側輸出等。同時根據上游輸入Stream流分為Non-Keyed和Keyed兩種類型。

      Non-keyed(上游為Non-KeyedStream) 直接調用windowAll(),獲取全局統計

      val inputStream: DataStream = ... // 當傳入為KeyedStream時,調用window()函數 inputStream.keyBy(0).window(new WindowFunc(...)) // 當傳入為不做處理的Non-Keyed輸入Stream流 // 直接使用windowAll()全局統計 inputStream.windowAll(new WindowFunc(...))

      keyed(上游為KeyedStream類型)

      調用DataStream的內置window()

      stream.keyBy(..//keyed輸入流.) .window(..//窗口類型.) .trigger(.//觸發器<可選>..) .evictor(.//剔除器<可選>.) .allowdedLateness(.//延遲處理機制.) .sideOutputLateDate(.//側輸出.) .reduce/fold.aggregate/apply(.//計算函數.)

      2)窗口類型

      根據窗口的分配方式分為: 滾動、滑動、會話和全局等,分別支持不同窗口流動方式和范圍。

      同時支持事件時間和處理時間數據流。

      Tumbling Window Join (滾動窗口)

      Tumbling Window Join (滾動窗口)

      Sliding Window Join (滑動窗口)

      Sliding Window Join (滑動窗口)

      Session Widnow Join(會話窗口)

      Session Widnow Join(會話窗口)

      以十分鐘時間滑動窗口統計案例說明:

      val tumblingStream = inputStream .keyBy(0) .window( TumblingEventTimeWindows.of( Time.seconds(10)) ).process(...)

      4 DataSink輸出

      Flink讀取數據源,經過系列Transform操作后,結果一般轉存至外部存儲介質或者下游,即Flink的DataSink過程。

      Flink將外部存儲的連接邏輯封裝在Connector連接器中,常見的有:

      Apache Kafka

      ElasticSearch

      Hadoop FileSystem

      Redis

      文件系統、端口

      4.1 文件|端口

      支持文件、客戶端、Socket網絡輸出,為Flink內置算子,不需要依賴三方庫

      常見有writeAsCSV(本地文件)、writeToSocket(Socket網絡)

      // 本地csv inputStream.writeAsCsv( "file://path/xx.csv", WriteMode.OVERWRITE ) // Socket網絡 inputStream.writeToSocket( host, post, new SimpleStringSchema() )

      4.2 外部第三方

      基于SinkFunction定義,需要引入外部三方依賴庫,設置三方系統參數

      val dataStream = ... // 定義FlinkKafkaProducer val kafkaProducer = new FlinkKafkaProducer011[Sting] ( "localhost:9092", //kafka broker list連接 "xxx-topic", // kafka topic new SimpleStringSchema() //序列化 ) // 添加SinkFunc dataStream.addSink(kafkaProducer())

      5 總結

      Flink內置的算子庫種類全、功能強大,熟練掌握算子的使用方式和場景應用,是實時計算的必備技能。

      后面還會繼續更新此系列,歡迎添加我的個人- youlong525,一起學習交流~

      未完待續。。

      》》更多好文,歡迎關注公眾號: 大數據兵工廠

      Flink Scala 大數據 實時流計算服務 CS

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:ROS機器人操作系統資料與資訊(2018年5月)
      下一篇:Make Apache Spark better with CarbonData
      相關文章
      亚洲成?Ⅴ人在线观看无码| 亚洲天堂一区二区三区四区| 亚洲成人黄色网址| 午夜亚洲国产理论秋霞| 亚洲国产日韩在线视频| 伊人久久精品亚洲午夜| 久久久久久亚洲精品不卡| 精品亚洲成α人无码成α在线观看 | 亚洲国产成人精品激情| 亚洲国产精品网站久久| 亚洲三级视频在线观看| 亚洲人成在线免费观看| 色偷偷女男人的天堂亚洲网| 精品国产成人亚洲午夜福利| 在线精品亚洲一区二区| 亚洲中文字幕乱码AV波多JI| 亚洲另类无码专区首页| 亚洲av无码专区在线观看亚| 精品亚洲国产成人av| 亚洲AV无码专区日韩| 亚洲精品WWW久久久久久| 亚洲一区日韩高清中文字幕亚洲| 国产精品亚洲mnbav网站| 亚洲愉拍99热成人精品热久久| 亚洲中文字幕无码爆乳AV| 国产成人亚洲精品狼色在线| 亚洲色婷婷六月亚洲婷婷6月| 亚洲国产无套无码av电影| 一区二区三区亚洲| 亚洲午夜精品在线| 亚洲人成色4444在线观看| 在线亚洲v日韩v| 综合亚洲伊人午夜网| 久久精品亚洲综合专区| 亚洲综合激情另类小说区| 亚洲AV成人噜噜无码网站| 亚洲欧洲专线一区| 亚洲成av人片在线观看天堂无码 | 亚洲综合视频在线| 亚洲a视频在线观看| 亚洲暴爽av人人爽日日碰|