Spark Streaming 進階實戰五個例子
一、帶狀態的算子:UpdateStateByKey
實現 計算 過去一段時間到當前時間 單詞 出現的 頻次
object StatefulWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StatefulWordCount")
val ssc = new streamingContext(sparkConf, Seconds(5))
//如果使用了 stateful 的算子,必須要設置 checkpoint,
//因為老的值 必須要 存在 某個 目錄下面,新的值 才能去更新老的值
//在生產環境中,建議把checkpoint 設置到 hdfs 的某個文件夾中
ssc.checkpoint(".")
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ").map((_, 1)))
val state = result.updateStateByKey[Int](updateFunction _)
state.print()
ssc.start()
ssc.awaitTermination()
}
/**
* 把當前的數據去更新已有的或者老的數據
* @param currentValues 當前的
* @param preValues 老的
* @return
*/
def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
val current = currentValues.sum
val pre = preValues.getOrElse(0)
Some(current + pre)
}
}
二、實戰:計算到目前為止累積出現的單詞的個數寫入到mysql中
/**
* 使用 spark streaming 完成 詞頻統計,并輸出到 mysql 數據庫
* 創建 數據庫
*
* 創建數據表
* create table wordcount (
* word varchar(50) default null,
* wordcount int(10) default null
* )
*/
object ForeachRDDApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("localhost", 6789)
val result = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _)
//result.print()
//將結果寫入到mysql
//1、錯誤的方式
// result.foreachRDD(rdd =>{
// val connection = createConnection()
// rdd.foreach {
// record =>
// val sql = "insert into wordcount (word,wordcount)" +
// "values ('"+record._1+"','"+record._2+"')"
// connection.createStatement().execute(sql)
// }
// })
//2、正確的方式
result.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.size > 0) {
val connection = createConnection()
partitionOfRecords.foreach(pair => {
val sql = "insert into wordcount (word,wordcount)" +
"values ('" + pair._1 + "','" + pair._2 + "')"
connection.createStatement().execute(sql)
})
connection.close()
}
})
})
//3、更好的方式,查閱官方文檔,使用 連接池的方式
//存在的問題,這樣每次都會插入新的數據,同樣的單詞頻次字段不會去累加更新
//解決方案 :每次 insert 之前,判斷一下,該單詞是否已經存在數據庫中,如果已經存在則update
//或者 存放在 hbase /redis 中,調用相應的api ,直接 插入和更新。
ssc.start()
ssc.awaitTermination()
}
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc://mysql://localhost:3306/dzx_spark", "root", "1234")
}
}
三、基于window 的統計
window :定時的進行一個時間段內的數據處理
window length : 窗口的長度
sliding interval : 窗口的間隔
這2個參數和我們的batch size? 成倍數關系。如果不是倍數關系運行直接報錯
每隔多久計算某個范圍內的數據:每隔10秒計算前10分鐘的wc ==>每隔 sliding interval? 統計 window length 的值
pair.reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(10))
四、黑名單過濾
/**
* 黑名單過濾
*
* 訪問日志 ==> DStream
*
* 20180808,zs
* 20180808,ls
* 20180808,ww
*
* ==> (zs:20180808,zs) (ls:20180808,ls)(ww:20180808,ww)
*
* 黑名單列表 ==》 RDD
* zs ls
* ==>(zs:true) (ls:true)
*
*
* leftjoin
* (zs:[<20180808,zs>,
* (ls:[<20180808,ls>,
* (ww:[<20180808,ww>,
*
*
*/
object BlackNameListApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ForeachRDDApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
/**
* 構建黑名單
*/
val blacks = List("zs", "ls")
val blacksRDD = ssc.sparkContext.parallelize(blacks)
.map(x => (x, true))
val lines = ssc.socketTextStream("localhost", 6789)
val clicklog = lines.map(x => (x.split(",")(1), x)).transform(rdd => {
rdd.leftOuterJoin(blacksRDD).filter(x => x._2._2.getOrElse(false) != true).map(x => x._2._1)
})
clicklog.print()
ssc.start()
ssc.awaitTermination()
}
}
五、 spark? streaming 整合 spark? sql? 實戰
object SqlNetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SqlNetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.socketTextStream("192.168.42.85", 6789)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
MySQL spark
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。