Spark Streaming 進階實戰五個例子

      網友投稿 1050 2022-05-30

      一、帶狀態的算子:UpdateStateByKey

      實現 計算 過去一段時間到當前時間 單詞 出現的 頻次

      object StatefulWordCount {

      def main(args: Array[String]): Unit = {

      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")

      val ssc = new streamingContext(sparkConf, Seconds(5))

      //如果使用了 stateful 的算子,必須要設置 checkpoint,

      //因為老的值 必須要 存在 某個 目錄下面,新的值 才能去更新老的值

      //在生產環境中,建議把checkpoint 設置到 hdfs 的某個文件夾中

      ssc.checkpoint(".")

      val lines = ssc.socketTextStream("localhost", 6789)

      val result = lines.flatMap(_.split(" ").map((_, 1)))

      val state = result.updateStateByKey[Int](updateFunction _)

      state.print()

      ssc.start()

      ssc.awaitTermination()

      }

      /**

      * 把當前的數據去更新已有的或者老的數據

      * @param currentValues 當前的

      * @param preValues 老的

      * @return

      */

      def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {

      val current = currentValues.sum

      val pre = preValues.getOrElse(0)

      Some(current + pre)

      }

      }

      二、實戰:計算到目前為止累積出現的單詞的個數寫入到mysql中

      /**

      * 使用 spark streaming 完成 詞頻統計,并輸出到 mysql 數據庫

      * 創建 數據庫

      *

      * 創建數據表

      * create table wordcount (

      * word varchar(50) default null,

      * wordcount int(10) default null

      * )

      */

      object ForeachRDDApp {

      def main(args: Array[String]): Unit = {

      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")

      val ssc = new StreamingContext(sparkConf, Seconds(5))

      val lines = ssc.socketTextStream("localhost", 6789)

      val result = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)

      //result.print()

      //將結果寫入到mysql

      //1、錯誤的方式

      // result.foreachRDD(rdd =>{

      // val connection = createConnection()

      // rdd.foreach {

      // record =>

      // val sql = "insert into wordcount (word,wordcount)" +

      // "values ('"+record._1+"','"+record._2+"')"

      // connection.createStatement().execute(sql)

      // }

      // })

      //2、正確的方式

      result.foreachRDD(rdd => {

      rdd.foreachPartition(partitionOfRecords => {

      if (partitionOfRecords.size > 0) {

      val connection = createConnection()

      partitionOfRecords.foreach(pair => {

      val sql = "insert into wordcount (word,wordcount)" +

      "values ('" + pair._1 + "','" + pair._2 + "')"

      connection.createStatement().execute(sql)

      })

      connection.close()

      }

      })

      })

      //3、更好的方式,查閱官方文檔,使用 連接池的方式

      //存在的問題,這樣每次都會插入新的數據,同樣的單詞頻次字段不會去累加更新

      //解決方案 :每次 insert 之前,判斷一下,該單詞是否已經存在數據庫中,如果已經存在則update

      //或者 存放在 hbase /redis 中,調用相應的api ,直接 插入和更新。

      ssc.start()

      ssc.awaitTermination()

      }

      def createConnection() = {

      Class.forName("com.mysql.jdbc.Driver")

      DriverManager.getConnection("jdbc://mysql://localhost:3306/dzx_spark", "root", "1234")

      }

      }

      三、基于window 的統計

      window :定時的進行一個時間段內的數據處理

      window length : 窗口的長度

      sliding interval : 窗口的間隔

      這2個參數和我們的batch size? 成倍數關系。如果不是倍數關系運行直接報錯

      每隔多久計算某個范圍內的數據:每隔10秒計算前10分鐘的wc ==>每隔 sliding interval? 統計 window length 的值

      pair.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))

      四、黑名單過濾

      /**

      * 黑名單過濾

      *

      * 訪問日志 ==> DStream

      *

      * 20180808,zs

      * 20180808,ls

      * 20180808,ww

      *

      * ==> (zs:20180808,zs) (ls:20180808,ls)(ww:20180808,ww)

      *

      * 黑名單列表 ==》 RDD

      * zs ls

      * ==>(zs:true) (ls:true)

      *

      *

      * leftjoin

      * (zs:[<20180808,zs>,]) pass ...

      * (ls:[<20180808,ls>,]) pass ...

      * (ww:[<20180808,ww>,]) ==> tuple1 ok...

      *

      *

      */

      object BlackNameListApp {

      def main(args: Array[String]): Unit = {

      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")

      val ssc = new StreamingContext(sparkConf, Seconds(5))

      /**

      * 構建黑名單

      */

      val blacks = List("zs", "ls")

      val blacksRDD = ssc.sparkContext.parallelize(blacks)

      .map(x => (x, true))

      val lines = ssc.socketTextStream("localhost", 6789)

      val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {

      rdd.leftOuterJoin(blacksRDD).filter(x => x._2._2.getOrElse(false) != true).map(x => x._2._1)

      })

      clicklog.print()

      ssc.start()

      ssc.awaitTermination()

      }

      }

      五、 spark? streaming 整合 spark? sql? 實戰

      object SqlNetworkWordCount {

      Spark Streaming 進階實戰五個例子

      def main(args: Array[String]): Unit = {

      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount")

      val ssc = new StreamingContext(sparkConf, Seconds(5))

      val lines = ssc.socketTextStream("192.168.42.85", 6789)

      val words = lines.flatMap(_.split(" "))

      // Convert RDDs of the words DStream to DataFrame and run SQL query

      words.foreachRDD { (rdd: RDD[String], time: Time) =>

      // Get the singleton instance of SparkSession

      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)

      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame

      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame

      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it

      val wordCountsDataFrame =

      spark.sql("select word, count(*) as total from words group by word")

      println(s"========= $time =========")

      wordCountsDataFrame.show()

      }

      ssc.start()

      ssc.awaitTermination()

      }

      }

      /** Case class for converting RDD to DataFrame */

      case class Record(word: String)

      /** Lazily instantiated singleton instance of SparkSession */

      object SparkSessionSingleton {

      @transient private var instance: SparkSession = _

      def getInstance(sparkConf: SparkConf): SparkSession = {

      if (instance == null) {

      instance = SparkSession

      .builder

      .config(sparkConf)

      .getOrCreate()

      }

      instance

      }

      }

      MySQL spark

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:代碼重構:內幕交易(Insider Trading)
      下一篇:醫院智能電話客服系統應用場景解決方案
      相關文章
      亚洲中文字幕无码亚洲成A人片| 亚洲成人免费网站| 亚洲精品在线网站| 亚洲免费精彩视频在线观看| 亚洲综合另类小说色区| www.亚洲色图.com| 国产产在线精品亚洲AAVV| 日本亚洲高清乱码中文在线观看| 中文字幕亚洲精品无码| 亚洲精品国产国语| 亚洲日韩一中文字暮| 亚洲日韩国产二区无码| 亚洲精品无码久久久久A片苍井空| 亚洲国产日韩视频观看| 亚洲中文字幕一二三四区| 亚洲精品国产综合久久久久紧| 亚洲AV无码AV男人的天堂不卡| 亚洲国产av玩弄放荡人妇| 亚洲av无一区二区三区| 色欲aⅴ亚洲情无码AV| 亚洲av高清在线观看一区二区| 亚洲成a人一区二区三区| 亚洲精品无码永久在线观看| 国产成人亚洲综合| 亚洲VA中文字幕无码一二三区| 亚洲国产香蕉碰碰人人| 亚洲熟妇av一区| 国产精品亚洲片夜色在线| 亚洲日本成本人观看| 国产亚洲综合久久| 国产亚洲自拍一区| 亚洲日本精品一区二区| 亚洲国产成人精品电影| 亚洲熟妇成人精品一区| 亚洲高清无码在线观看| 国产精品亚洲аv无码播放| 亚洲一区免费观看| 色噜噜亚洲男人的天堂| 色窝窝亚洲av网| 亚洲乱码精品久久久久..| 亚洲国产精品第一区二区|