Spark Streaming 快速入門系列(6) | DStream的幾種保存方式

      網友投稿 884 2022-05-30

      大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/

      本片博文為大家帶來的是DStream的幾種保存方式。

      目錄

      1. 保存到文本文件

      2. 保存到Mysql (第一種寫法)

      3. 保存到Mysql (第二種寫法)

      關于這部分我們還可以通過查看官方文檔實現:

      http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams

      輸出操作指定了對流數據經轉化操作得到的數據所要執行的操作(例如把結果推入外部數據庫或輸出到屏幕上)。

      與RDD中的惰性求值類似,如果一個DStream及其派生出的DStream都沒有被執行輸出操作,那么這些DStream就都不會被求值。如果StreamingContext中沒有設定輸出操作,整個context就都不會啟動。

      下列為輸出操作的方法與解釋

      注意:

      連接不能寫在driver層面(序列化);

      如果寫在foreach則每個RDD中的每一條數據都創建,得不償失;

      增加foreachPartition,在分區創建(獲取)。

      1. 保存到文本文件

      1. 源碼

      package com.buwenbuhuo.spark.streaming.day02.output import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不溫卜火 * @create 2020-08-12 20:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo1").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck1") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\W+")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFiles("world","log") ssc.start() ssc.awaitTermination() } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      2. 打開端口進行測試

      nc -lk 9999

      1

      3. 運行結果

      2. 保存到Mysql (第一種寫法)

      1. 源碼

      package com.buwenbuhuo.spark.streaming.day02.output import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不溫卜火 * @create 2020-08-12 21:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo2 { val props: Properties = new Properties() props.setProperty("user","root") props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo2").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\W+")) .map((_,1)) .reduceByKey(_+_) .foreachRDD(rdd =>{ // 把rdd轉成df // 1. 先創建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 轉換 val df: DataFrame = rdd.toDF("word","count") // 3. 寫 df.write.mode("append").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0812",props) }) ssc.start() ssc.awaitTermination() } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      Spark Streaming 快速入門系列(6) | DStream的幾種保存方式

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      2. 運行與寫入數據

      3. 查看結果

      3. 保存到Mysql (第二種寫法)

      1. 源碼

      package com.buwenbuhuo.spark.streaming.day02.output import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不溫卜火 * @create 2020-08-12 22:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo3 { val props: Properties = new Properties() props.setProperty("user","root") props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo3").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\\W+")) .map((_,1)) .updateStateByKey((seq:Seq[Int],opt:Option[Int]) => Some(seq.sum + opt.getOrElse(0))) .foreachRDD(rdd =>{ // 把rdd轉成df // 1. 先創建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 轉換 val df: DataFrame = rdd.toDF("word","count") // 3. 寫 df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0813",props) }) ssc.start() ssc.awaitTermination() } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      17

      18

      19

      20

      21

      22

      23

      24

      25

      26

      27

      28

      29

      30

      31

      32

      33

      34

      35

      36

      37

      38

      39

      40

      41

      42

      43

      44

      45

      46

      47

      48

      49

      50

      2. 運行

      3. 運行結果

      本次的分享就到這里了,

      好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。

      如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。

      碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!

      MySQL spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:機器人體驗營筆記(一)概要
      下一篇:基于MindSpore框架wide&deep模型的實戰CTR體驗
      相關文章
      亚洲一区二区三区四区在线观看| 亚洲色成人WWW永久网站| 亚洲欧洲日产国码av系列天堂| 亚洲AV无码乱码在线观看性色扶 | 亚洲专区一路线二| 亚洲精品网站在线观看你懂的| 亚洲欧洲日产国码久在线观看 | 亚洲午夜精品久久久久久app| 国产午夜亚洲精品国产| 中文字幕在线日亚洲9| 亚洲中文字幕久久精品无码2021| 亚洲人成毛片线播放| 亚洲冬月枫中文字幕在线看| 亚洲jjzzjjzz在线观看| 亚洲精品456人成在线| 亚洲中文字幕无码久久2020 | 亚洲AV无码成人精品区在线观看| 久久精品国产亚洲av成人| 亚洲韩国—中文字幕| 亚洲伊人久久大香线蕉苏妲己| 久久综合亚洲色一区二区三区 | 久久久无码精品亚洲日韩软件 | 亚洲人成网站18禁止| 爱爱帝国亚洲一区二区三区| 亚洲男人在线无码视频| 亚洲深深色噜噜狠狠爱网站| 久久精品亚洲综合专区| 亚洲系列国产精品制服丝袜第| 亚洲欧洲国产综合| 亚洲最大的成人网站| 蜜桃传媒一区二区亚洲AV| 亚洲国产精品综合久久一线| 亚洲精品你懂的在线观看| 亚洲欧洲国产精品你懂的| 亚洲一卡2卡4卡5卡6卡在线99 | 亚洲今日精彩视频| 亚洲一区在线视频| 苍井空亚洲精品AA片在线播放| 国产成人精品亚洲精品| 亚洲精品午夜无码电影网| 亚洲午夜国产精品无码|