基于FusionInsight開發智能搜車系統
一、背景
1.1、業務背景
XX市交警智能搜車系統,通過各卡口攝像頭采集每天過往的車輛信息,存入數據庫。交警人員在頁面上可根據靈活條件進行車輛信息查詢、違章查詢、或者描繪車輛軌跡。
卡口數據包括兩種:1、以csv格式保存的歷史文本數據;2、實時卡口數據
項目需求:
1.?????????根據精確車牌號查出車輛信息
2.?????????根據模糊車牌號查出車輛信息
3.?????????查出沒有系安全帶的違法車輛信息
4.?????????查處符合某時間規則的車輛信息
5.?????????根據車輛顏色和品牌查出車輛信息
1.2、平臺選型
FI選型使用 HD8.0版本,對應Spark版本2.4.5,ES版本7.6.0,HBase版本2.2.3
二、方案設計
2.1、數據模型
車輛詳細信息字段有上百個,全量保存在HBase表中,如下是抽取出的部分需做索引的字段,存儲到ES中
2.2、數據流向
2.3、HBase表設計
1、避免單表數據量過大,以月為單位建表
2、以單條記錄唯一主鍵 id?作為rowkey
三、歷史數據處理
1、歷史數據通過bulkload方式導入HBase,具體步驟可參考帖子??https://bbs.huaweicloud.com/forum/thread-66116-1-1.html
2、導入HBase的數據再導入ES,可通過HBase2ES遷移工具,具體可參考?https://bbs.huaweicloud.com/forum/thread-71359-1-1.html
四、實時數據流
4.1、模擬數據流生成
我們通過python腳本(見附件)定時生成數據文件,定時放到指定目錄,例如/var/log/realtimeLog
腳本data_gen.py存放于部署有flume客戶端的節點的/opt/test目錄下,執行命令為
nohup python data_gen.py? -i? /opt/test? -o /var/log/realtimeLog &
生成的樣例數據如圖:
4.2、配置Flume
我們這里直接將數據文件通過Flume發往Kafka,使用Kafka普通模式端口,檢查kafka配置?allow.everyone.if.no.acl.found為true
Flume配置文件通過Manager頁面上的Flume配置工具生成,如圖:
Flume客戶端下載后,解壓縮,得到安裝腳本,路徑為:
/tmp/flume-Client/FusionInsight_Cluster_1_Flume_ClientConfig/Flume/FlumeClient
通過該路徑下的install.sh 安裝flume客戶端
./install.sh -d /opt/realtimeFlume -c /opt/test/properties.properties -f 10.244.230.213 -n realtimeTest
其中-d為安裝目錄,-c為指定的上面生成的配置文件,-f為Flume Monitor實例的一個IP,-n為客戶端指定的名稱
安裝完畢后,啟動測試數據生成腳本
通過kafka腳本驗證數據是否有生成:
4.3、SparkStreaming讀取kafka數據
該部分詳細可參考產品文檔《應用開發指南》-《安全模式》-《Spark2x開發指南》-《開發程序》-《SparkStreaming對接Kafka0-10程序》-《Java樣例代碼》部分
4.4、SparkStreaming寫入HBase和ES
獲取到?微批?JavaStreamingContext,通過foreachRDD寫入HBase和ES
HBase主要調用HBase的API,ES調用的是low level rest Client接口,相關代碼可參考附件
五、打包
建議不要將依賴打成一個包,如果有版本更改,可方便后面替換依賴包
通過 Artifacts?打包如圖
可能碰到如下錯誤,是因為沒有引入scala的SDK
在Global Libraries中引入scala的SDK即可
六、配置客戶端
因為Kafka的認證信息是通過JAAS認證機制,通過Spark自帶的—keytab自己無法解決認證對接的問題,需要單獨處理。
Spark中driver和executor默認分別使用客戶端/Spark2x/spark/conf目錄下的jaas.conf(driver端)和jaas-zk.conf(executor端)進行認證。
1、jaas.conf文件內容參考,注意keyTab指定的是提交任務的節點上存放user.keytab的絕對路徑
Client?{ com.sun.security.auth.module.Krb5LoginModule?required useKeyTab=true keyTab="/opt/client/Spark2x/spark/user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; };
2、jaas-zk.conf文件內容參考,這里keyTab是上傳到executor后的user.keytab的路徑,使用相對路徑
Client?{ com.sun.security.auth.module.Krb5LoginModule?required useKeyTab=true keyTab="./user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; }; KafkaClient?{ com.sun.security.auth.module.Krb5LoginModule?required useKeyTab=true keyTab="./user.keytab" principal="fwc" useTicketCache=false storeKey=true debug=true; };
3、將連接HBase的依賴配置文件拷貝到客戶端/Spark2x/spark/conf目錄下
包括HBase客戶端下的 core-site.xml、hdfs-site.xml和hbase-site.xml
4、創建提交任務的臨時目錄,例如/opt/sparkTest,如圖,將依賴的jar包放到該目錄下的lib目錄下,創建啟動任務的submit.sh
該目錄下的連接ES的配置文件為 esParams.properties,該文件參數參考附件
七、提交任務查看執行結果
7.1、提交spark任務
執行上面的 /opt/sparkTest/submit.sh腳本即可
7.2、查看hbase數據
通過hbase?shell命令行可查詢數據已經導入?vehicle_table_202008?表,后綴是當前的年月
7.3、驗證ES中數據已有數據
附件: sparkStreamingDemo.zip 7.41KB 下載次數:3次
FusionInsight 大數據
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。