Spark Streaming 快速入門系列(6) | DStream的幾種保存方式
大家好,我是不溫卜火,是一名計算機學院大數據專業大二的學生,昵稱來源于成語—不溫不火,本意是希望自己性情溫和。作為一名互聯網行業的小白,博主寫博客一方面是為了記錄自己的學習過程,另一方面是總結自己所犯的錯誤希望能夠幫助到很多和自己一樣處于起步階段的萌新。但由于水平有限,博客中難免會有一些錯誤出現,有紕漏之處懇請各位大佬不吝賜教!暫時只有csdn這一個平臺,博客主頁:https://buwenbuhuo.blog.csdn.net/
本片博文為大家帶來的是DStream的幾種保存方式。
目錄
1. 保存到文本文件
2. 保存到Mysql (第一種寫法)
3. 保存到Mysql (第二種寫法)
關于這部分我們還可以通過查看官方文檔實現:
http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#transformations-on-dstreams
輸出操作指定了對流數據經轉化操作得到的數據所要執行的操作(例如把結果推入外部數據庫或輸出到屏幕上)。
與RDD中的惰性求值類似,如果一個DStream及其派生出的DStream都沒有被執行輸出操作,那么這些DStream就都不會被求值。如果StreamingContext中沒有設定輸出操作,整個context就都不會啟動。
下列為輸出操作的方法與解釋
注意:
連接不能寫在driver層面(序列化);
如果寫在foreach則每個RDD中的每一條數據都創建,得不償失;
增加foreachPartition,在分區創建(獲取)。
1. 保存到文本文件
1. 源碼
package com.buwenbuhuo.spark.streaming.day02.output import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不溫卜火 * @create 2020-08-12 20:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo1").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck1") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\W+")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFiles("world","log") ssc.start() ssc.awaitTermination() } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2. 打開端口進行測試
nc -lk 9999
1
3. 運行結果
2. 保存到Mysql (第一種寫法)
1. 源碼
package com.buwenbuhuo.spark.streaming.day02.output import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不溫卜火 * @create 2020-08-12 21:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo2 { val props: Properties = new Properties() props.setProperty("user","root") props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo2").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\W+")) .map((_,1)) .reduceByKey(_+_) .foreachRDD(rdd =>{ // 把rdd轉成df // 1. 先創建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 轉換 val df: DataFrame = rdd.toDF("word","count") // 3. 寫 df.write.mode("append").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0812",props) }) ssc.start() ssc.awaitTermination() } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2. 運行與寫入數據
3. 查看結果
3. 保存到Mysql (第二種寫法)
1. 源碼
package com.buwenbuhuo.spark.streaming.day02.output import java.util.Properties import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * @author 不溫卜火 * @create 2020-08-12 22:45 * MyCSDN : https://buwenbuhuo.blog.csdn.net/ * */ object OutDemo3 { val props: Properties = new Properties() props.setProperty("user","root") props.setProperty("password","199712") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OutDemo3").setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint("ck3") ssc .socketTextStream("hadoop002",9999) .flatMap(_.split("\\W+")) .map((_,1)) .updateStateByKey((seq:Seq[Int],opt:Option[Int]) => Some(seq.sum + opt.getOrElse(0))) .foreachRDD(rdd =>{ // 把rdd轉成df // 1. 先創建sparkSession val spark: SparkSession = SparkSession.builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ // 2. 轉換 val df: DataFrame = rdd.toDF("word","count") // 3. 寫 df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop002:3306/rdd","window0813",props) }) ssc.start() ssc.awaitTermination() } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
2. 運行
3. 運行結果
本次的分享就到這里了,
好書不厭讀百回,熟讀課思子自知。而我想要成為全場最靚的仔,就必須堅持通過學習來獲取更多知識,用知識改變命運,用博客見證成長,用行動證明我在努力。
如果我的博客對你有幫助、如果你喜歡我的博客內容,請“” “評論”“”一鍵三連哦!聽說的人運氣不會太差,每一天都會元氣滿滿呦!如果實在要白嫖的話,那祝你開心每一天,歡迎常來我博客看看。
碼字不易,大家的支持就是我堅持下去的動力。后不要忘了關注我哦!
MySQL spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。