物聯網網關開發:基于MQTT消息總線的設計過程丨【拜托了,物聯網!】
一、前言
二、網關的作用
2.1 指令轉發
2.2 外網通信
2.3 協議轉換
2.4 設備管理
2.5 邊沿計算(自動化控制)
一、前言
二、網關的作用
2.1 指令轉發
2.2 外網通信
2.3 協議轉換
2.4 設備管理
2.5 邊沿計算(自動化控制)
三、網關內部進程之間的通信
3.1 網關中需要哪些進程
3.2 mqtt消息總線
3.3 Topic 的設計
3.4 與 DBUS 總線的對比
四、網關與云平臺之間的通信
4.1 與云平臺之間的 MQTT 連接
4.2 Proc_Bridge 進程:外部和內部消息總線之間的橋接器
1. mosquitto 的 API 接口
2. 利用 UserData 指針,實現多個 MQTT 連接
五、總結
一、前言
在一個嵌入式系統中,利用
MQTT消息總線
在各進程之間進行通信,是一個很常見的進行通信方式。
這樣的通信模型,對于非工控產品來說,
通信速度完全足夠
。以前做過測試,在x86平臺和ARM平臺,一條數據從本地到云端繞一下,然后再回到本地,可以控制在
毫秒級別
。
這篇文章,我們來具體的聊一聊物聯網系統中的
網關內部程序
可以利用 MQTT 消息總線如何進行設計。
閱讀這篇文章,你可以有如下收獲:
物聯網系統中,設備之間是如何通信的;
網關中的進程之間消息總線通信模型;
網關內部消息總線上的數據如何與服務器進行通信;
作為消遣,了解一下物聯網系統中的一些基本知識;
二、網關的作用
物聯網這個詞語的范疇太廣,似乎所有的硬件設備,只要能夠接入網絡,就可以稱之為物聯網產品,似乎物聯網這個詞可以把一切都納入到其中。這么空洞的詞語不利于我們的講解,因此我們就用一個可以
感知、想象
的場景來代替,那就是
智能家居系統
,這是最能代表物聯網時代的典型產品了。
在一個智能家居系統中,假設有這么幾個設備:
這些設備的通信模塊,如果是
WiFi
或者是
藍牙
,那么一般都可以直接通過手機來控制(當然,需要廠家提供相應的手機 APP),手機就相當于一個
中心節點
,控制著所有的設備。目前市面上的一些智能設備
單品
都是這樣的通信方式,例如:空調、吸塵器、空氣凈化器、冰箱等等。只要在這些設備中加一個
無線通信模塊
即可(例如:ESP8266模塊)。
如果通信模塊是其它的通信模塊,例如:
RF433、ZigBee、ZWave
等,由于手機沒有這些通信模塊,因此就需要一個
網關
來“轉發”指令。手機和網關都連接到家中的路由器,
處于同一個局域網中
,手機把控制指令發送給網關,網關再把指令轉發給相應的設備。通信模型如下:
在上面的通信模型中,手機和網關由于處于同一個局域網中,因此可以
直接通信
。如果手機不在局域網中呢?那么就要通過云端的服務器來轉發了,通信模型如下:
手機把指令發到服務器;
服務器把指令轉發給網關;
網關把指令發給指定的設備;
以上描述的是控制指令的流程,如果是設備發出的報警信息呢,數據的流向就是
倒過來進行的
。
可以看出,
網關是所有設備之間通信的中心節點,也是內網與外網之間通信的中轉節點,也就是把各種智能設備連接到互聯網的中轉器。
上面已經提到,硬件設備上的通信模塊都是
確定的(RF,ZigBee,ZWave等等)
,一般來說,可以把這些通信模塊稱呼為
無線通信協議
。在一套智能家居系統中,所有設備的無線通信協議大部分都是相同的。
那么,
不同類型的無線通信協議設備
是否可以共存在同一個系統中呢?
答案是:
可以
。只要在網關中,集成了相應的
無線通信協議模塊
就可以達到這個目的!如下圖所示:
從手機APP上看,所有的設備都是相同的,不會關心設備的無線通信協議是什么,因此,發出的控制指令都是
協議無關
的。
當網關接收到控制指令時,首先根據指令內容
查找出目標設備
,然后確定目標設備的
無線通信協議
,最后把指令發送給對應的
硬件通信模塊
,由該通信模塊通過無線電信號把控制指令發送到設備。
從這個指令的傳輸過程來看,網關就充當著
協議轉換的角色
。
另外還有一種通信場景:當系統中的一個“輸入”設備與一個“輸出”設備進行
綁定/關聯
時,例如:
紅外感應器與聲光報警器綁定:當紅外感應器監測到人體時,發出信號,然后控制聲光報警器發出報警;
門磁與燈綁定:當開門時,門磁發出信號,自動打開燈光;
如果“輸入”設備與“輸出”設備是
不同類型的
無線通信協議,也需要
網關來進行協議轉換
。
在一個智能家居系統中,設備可多可少,對這些設備進行管理也是很重要的事情。網關作為系統的中心節點,對設備進行管理的重任理所當然就由網關來承擔。
設備管理功能包括:
設備的添加和刪除;
設備狀態的管理(電量、設備斷網、失聯等等);
設備樹的管理;
在正常的情況下,網關是可以通過路由器,與服務器保持著
長連接
的。如果服務器的處理能力比較強大,智能家居系統中所有需要處理的事情
都可以丟給服務器
來計算、處理,服務器在計算之后把處理結果再發送給網關。看起來想法很完美!
但是,考慮下面這 2 種情況:
路由器出現問題了,網關無法連接到服務器,因此就無法把本地數據及時上報;
系統中出現了異常情況,需要緊急處理,如果把信息上報到服務器,由服務器計算之后再回傳給網關,耗費的時間可能超過了可容忍時間,該如何處理?(可以用車聯網系統來腦補一下這個場景:自動駕駛中的汽車遇到緊急情況,如果把所有信息上傳給服務器,然后等待服務器的下一步指令?)
對于上面的這些場景,把一些計算、處理操作
放在網關這一端來處理
也許更合適!這也是近幾年比較流行的
邊沿計算
。
1. 邊緣計算,是指在靠近物或數據源頭的一側,采用網絡、計算、存儲、應用核心能力為一體的開放平臺,就近提供最近端服務。 2. 其應用程序在邊緣側發起,產生更快的網絡服務響應,滿足行業在實時業務、應用智能、安全與隱私保護等方面的基本需求。 3. 邊緣計算處于物理實體和工業連接之間,或處于物理實體的頂端。而云端計算,仍然可以訪問邊緣計算的歷史數據
三、網關內部進程之間的通信
在設計一個應用程序的架構時,可以通過
多線程
來實現,也可以通過多進程來實現,每個人的習慣都不一樣,各有各的好處。我們這里不去討論孰優孰劣,因為我對多進程這樣的設計思想比較偏愛,所以就直接按照多進程的程序架構來討論。
網關中需要執行的所有進程,是根據網關的功能來決定的,假設包括如下的功能:
(1)連接外網的進程 Proc_Bridge
網關需要連接到云端的服務器,需要一個進程與服務器之間保持長連接,這樣就可以
及時接收到
服務器發來的控制指令,以及把系統內部數據
及時上報
給服務器。
這個進程需要把從服務器接收到的指令
轉發
到網關系統內部,把從系統內部接收到的信息
轉發
給服務器,類似于橋接的功能,因此命名為 Proc_Bridge。
(2)設備管理進程 Proc_DevMgr
這個進程用來執行設備管理功能,設備的添加(入網)、刪除(退網),都由此進程來管理。
(3)協議轉換進程 Proc_Protocol
下行
:把應用層的統一通信協議,轉換成不同類型無線通信協議,發送給相應的無線模塊。
上行
:把設備上報的、不同類型的無線通信協議,轉換成應用層的統一通信協議。
(4)邊沿計算進程(自動化控制) Proc_Auto
很明顯,這需要一個獨立的進程來處理各種計算,這個進程就相當于
系統的大腦
。
(5)無線通信協議相關的進程 Proc_ZigBee, Proc_RF, Proc_ZWave
在硬件上,每一種無線通信模塊通過
串口或其他硬件連接方式
與到網關的 CPU 進行通信,因此,每一種無線通信模塊都需要一個相應的進程來處理。
(6)其他“軟設備”進程 Proc_Xxx
在之前的項目中,還遇到一些硬件設備,它們與門磁、插座等設備
在邏輯上處于同一個層次
,但是與網關之間是通過
TCP
來連接。對于這樣的設備,也可以使用一個獨立的進程來進行管理。
上面的這些進程,在網關中的運行模型如下:
以上這些進程之間需要相互通信,
不是簡單的點對點通信,而是一個網狀的通信模型
。比如:
設備管理進程 Proc_DevMgr:當任何一種設備被添加到系統中時,都需要處進行處理,因此它需要與 Proc_ZigBee, Proc_RF, Proc_ZWave 這些進程進行通信;
當某個設備上報數據時(例如:Proc_ZigBee),Proc_Protocol 進程需要把數據進行協議轉換,然后 Proc_Bridge 進程把轉換后的數據上報給服務器,同時 Proc_Auto 進程需要檢查這個設備上報的數據是否觸發了其他相關聯的設備;
也就是說,這些進程中間的通信是
相互交叉
的,如果通過傳統的 IPC 方式(共享內存、命名管道、消息隊列、Socket)等,處理起來比較復雜。
引入了
MQTT 消息總線
之后,每個進程只需要掛載到總線上。每個進程只需要
監聽自己感興趣的 topic
,就可以接收到相應的數據。
既然這些進程之間的通信關系比較復雜,那么一個良好的 topic 設計規范就顯得很重要了!
MQTT 的通信模型是基于
訂閱/發布
的模式,一個客戶端(進程)接入到消息總線之后,需要
注冊
自己感興趣的
主題 topic
,其他客戶端(進程)往這個 topic 發送消息,即可被訂閱者接收到。
主題 topic 是一個以
反斜線(/)
分割的字符串,用來表示
多層的分級結構
,例如下面的這 2 個 topic,是亞馬遜 AWS 平臺中在線升級(OTA)相關的 topic:
/aws/things/MyThing/jobs/get/accepted
/aws/things/MyThing/jobs/get/rejected
在我們的示例場景中,可以按照下面這樣來設計主題 topic:
(1) Proc_DevMgr
訂閱主題:
/iot/v1/ZigBee/Register
/iot/v1/ZigBee/UnRegister
/iot/v1/RF/Register
/iot/v1/RF/UnRegister
/iot/v1/ZWave/Register
/iot/v1/ZWave/UnRegister
(2) Proc_Bridge
訂閱主題:
/iot/v1/Device/Report
發出數據的主題:
/iot/v1/Device/Control
/iot/v1/Device/Remove
/iot/v1/Auto/AddRule
/iot/v1/Auto/RemoveRule
(3) Proc_Protocol
訂閱主題:
/iot/v1/Device/Control
/iot/v1/Device/Remove
/iot/v1/ZigBee/Report
/iot/v1/RF/Report
/iot/v1/ZWave/Report
發送數據主題:
/iot/v1/Device/Report
/iot/v1/ZigBee/Control
/iot/v1/ZigBee/Remove
/iot/v1/RF/Control
/iot/v1/RF/Remove
/iot/v1/ZWave/Control
/iot/v1/ZWave/Remove
(4) Proc_Auto
訂閱主題:
/iot/v1/Auto/AddRule
/iot/v1/Auto/RemoveRule
/iot/v1/Device/Report
發送數據主題:
/iot/v1/Device/Control
(5) Proc_ZigBee
訂閱主題:
/iot/v1/ZigBee/Control
/iot/v1/ZigBee/Remove
發送數據主題:
/iot/v1/ZigBee/Register
/iot/v1/ZigBee/UnRegister
/iot/v1/ZigBee/Report
(6) Proc_RF
訂閱主題:
/iot/v1/RF/Control
/iot/v1/RF/Remove
發送數據主題:
/iot/v1/RF/Register
/iot/v1/RF/UnRegister
/iot/v1/RF/Report
(7) Proc_ZWave
訂閱主題:
/iot/v1/ZWave/Control
/iot/v1/ZWave/Remove
發送數據主題:
/iot/v1/ZWave/Register
/iot/v1/ZWave/UnRegister
/iot/v1/ZWave/Report
以上這些主題 topic 的設計,還是有些
粗略
的。如果借助
通配符(#, +, $)
,可以設計出更靈活的層次結構。
多層通配符: “#”是用于匹配主題中任意層級的通配符,多層通配符表示它的父級和任意數量的子層級。
單層通配符:“+”加號是只能用于單個主題層級匹配的通配符,在主題過濾器的任意層級都可以使用單層通配符,包括第一個和最后一個層級。
通配符:“$”表示匹配一個字符,只要不是放在主題的最開頭,其它情況下都表示匹配一個字符。
我們以一個
控制指令
為例,來梳理一下數據是如何通過 topic 進行流動:
Proc_Bridge 進程從服務器接收到控制指令后,發送到消息總線上的 topic: /iot/v1/Device/Control。
由于 Proc_Protocol 進程訂閱了這個 topic,所以立刻接收到指令。
Proc_Protocol 分析指令內容,發現是一個 ZigBee 設備,于是進行協議轉換,發送一個 ZigBee 控制指令到消息總線上的 topic: /iot/v1/ZigBee/Control。
由于 Proc_ZigBee 進程訂閱了這個 topic,因此它接收到這個控制指令。
Proc_ZigBee 把控制指令轉換成 ZigBee 無線通信模塊要求的格式,通過硬件發送給設備燈泡。
我們再分析一下
設備數據上報
的場景:
先關注圖中
紅色箭頭
,忽略藍色箭頭:
門磁打開后,通過無線通信把信息上報給進程 Proc_CF。
Proc_RF 進程接收到 RF433 通信模塊上報的數據,把“門磁打開”這個信息發送到消息總線上的 topic:/iot/v1/RF/Report。
由于 Proc_Protocol 進程訂閱了這個 topic,因此接收到上報的門磁數據。
Proc_Protocol 分析數據,把 RF433 協議的數據轉成統一的應用層協議的數據,發送到消息總線上的 topic:/iot/v1/Device/Report。
由于 Proc_Bridge 進程訂閱了這個 topic,因此就接收到了這次上報的數據。
Proc_Bridge 進程把數據上報給服務器。
再來看一下
藍色箭頭
流程:
在上面的第 4 步:Proc_Protocol 進程把 RF433 協議數據轉成應用層統一協議之后,把數據發送到消息總線上的 topic:
/iot/v1/Device/Report
之后,
Proc_Auto
進程同時進行如下操作:
由于 Proc_Auto 也訂閱了這個 topic,因此它也接收到了門磁上報的這個應用層協議的數據。
Proc_Auto 查找自己的配置信息(假設用戶已經提前配置好了一條規則:當門磁打開的時候,就觸發聲光報警器),發現匹配到了“門磁->報警器”這條規則,于是發出一條控制報警器的指令,發送到消息總線上的 topic: /iot/v1/Device/Control。
后面的 7,8,9,10 這四個步驟就與上面的
控制指令流程完全一樣了
。
從上面描述的 3 個數據流向的場景中,是不是感覺到使用 topic 為
“數據管道”
的這種通信方式,與
Linux 系統中的 DBUS
總線特別的相似?
DBUS 總線也是用于
進程之間的通信
,按照我個人的理解,DBUS中其實是把進程之間的兩種通信組織在一起了:
基于信號的數據傳輸;
基于方法的 RPC 遠程調用;
DBUS 總線包含的概念更復雜一些,包括:
路徑、對象、接口、方法
等等,這些概念組織在一起
共同定位到
一個具體的服務提供者了。
相比較而言,我感覺 MQTT 這樣的方式
更簡潔
一些。
所謂的 RPC 遠程調用,就是調用位于遠程機器上的一個函數,主要解決兩個問題:
網絡連接;
數據的序列化和反序列化;
后面我會專門寫一篇文章,利用
protobuf 框架
來實現 RPC 調用。
四、網關與云平臺之間的通信
上面講解的設計過程,是
網關內部
的各功能模塊之間通信方式,這也是我們作為嵌入式開發者能
充分發揮的部分
。
網關與云平臺之間的通信方式一般都是客戶指定的,就那么幾種(阿里云、華為云、騰訊云、亞馬遜AWS平臺)。一般都要求網關與云平臺之間處于
長連接
的狀態,這樣云端的各種指令就可以隨時發送到網關。
當然了,這些云平臺都會提供相應的 SDK 開發包,一般使用 MQTT 協議來連接云平臺的更多一些。在一些文檔中,會把位于云端的 MQTT 服務器稱作
Broker
,其實就是一個服務器。
進程
Proc_Bridge
的功能主要有 2 點:
與云平臺的數據傳輸通道;
協議轉換:把云平臺相關的協議轉換成網關內部的協議,以及相反的轉換。
也就是說:Proc_Bridge 進程需要
同時連接到云平臺的 MQTT Broker 和網關內部的 MQTT 消息總線
。
目前的幾大物聯網云平臺,都提供了不同的接入方式。對于網關來說,應用最多的就是
MQTT
接入。
我們知道,MQTT 只是一個
協議
而已,不同的編程語言中都有實現,在 C 語言中也有好幾個實現。
在網關內部,運行著一個后臺 deamon:
MQTT Broker
,其實就是 mosquitto 這個可執行程序,它充當著消息總線的功能。這里請大家注意:因為這個消息總線是
運行在嵌入式系統的內部
,接入總線的客戶端就是需要相互通信的那些
進程
。這些進程的數量是有限的,即使是一個比較復雜的系統,最多十幾個進程也就差不多了。因此,mosquitto 這個實現是
完全可以支撐系統負載的
。
那么,如果在
云端
部署一個 MQTT Broker,理論上是可以直接使用 mosquitto 這個實現來作為消息總線的,但是你要評估接入的
客戶端(也就是網關)在一個什么樣的數量級,考慮到并發的問題,一定要做壓力測試。
對于后臺開發,我的經驗不多,不敢(也不能)多言,誤導大家就罪過了。不過,對于一般的學習和測試來說,在云端直接部署 mosquitto 作為消息總線,是沒有問題的。
下面這張圖,說明了
Proc_Bridge 進程
在這個模型中的作用:
從云平臺消息總線接收到的消息,需要轉發到內部的消息總線;
從內部消息總線接收到的消息,需要轉發到云平臺的消息總線;
如果用 mosquitto 來實現,應該如何來實現呢?
mosquitto 這個實現是基于
回調函數
的機制來運行的,例如:
// 連接成功時的回調函數 void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { // ... } // 連接失敗時的回調函數 void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) { // ... } // 接收到消息時的回調函數 void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { // .. } int main() { // 其他代碼 // ... // 創建一個 mosquitto 對象 struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL); // 注冊回調函數 mosquitto_connect_callback_set(g_mosq, my_connect_callback); mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback); mosquitto_message_callback_set(g_mosq, my_message_callback); // 這里還有其他的回調函數設置 // 開始連接到消息總線 mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60); while(1) { int rc = mosquitto_loop(g_mosq, -1, 1); if (rc) { printf("mqtt_portal: mosquitto_loop rc = %d \n", rc); sleep(1); mosquitto_reconnect(g_mosq); } } mosquitto_destroy(g_mosq); mosquitto_lib_cleanup(); return 0; }
以上代碼就是一個 mosquitto 客戶端的
最簡代碼
了,使用回調函數的機制,讓程序的開發非常簡單。
mosquitto 把底層的細節問題都幫助我們處理了,只要我們注冊的函數
被調用
了,就說明
發生了我們感興趣的事件
。
這樣的回調機制在各種開源軟件中使用的比較多,比如:
glib 里的定時器、libevent通訊處理、libmodbus 里的數據處理、linux 內核中的驅動開發和定時器
,都是這個套路,一通百通!
在網關中的每個進程,只需要添加上面這部分代碼,就可以
掛載到消息總線上
,從而可以與其它進程進行收發數據了。
上面的實例僅僅是連接到
一個
消息總線上,對于一個普通的進程來說,達到了通信的目的。
但是對于
Proc_Bridge 進程
來說,還沒有達到目的,因為這個進程處于
橋接
的位置,需要
同時連接到遠程和本地這兩個消息總線上
。那么應該如何實現呢?
看一下
mosquitto_new
這個函數的簽名:
/* * obj - A user pointer that will be passed as an argument to any * callbacks that are specified. */ libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);
最后一個參數的作用是:可以設置一個
用戶自己的數據
(作為指針傳入),那么 mosquitto 在
回調
我們的注冊的任何一個函數時,
都會把這個指針傳入
。因此,我們可以利用這個參數來區分這個連接是遠程連接?還是本地連接。
所以,我們可以定義一個結構體變量,把一個 MQTT 連接的
所有信息
都記錄在這里,然后注冊給 mosquitto。當 mosquitto 回調函數時,把這個結構體變量的指針
回傳
給我們,這樣就拿到了這個連接的所有數據,在某種程度上來說,這也是一種面向對象的思想。
// 從來表示一個 MQTT 連接的結構體 typedef struct{ char *id; char *name; char *pw; char *host; int port; pthread_t tHandle; struct mosquitto *mosq; int mqtt_num; }MQData;
完整的代碼已經放到網盤里了,為了讓你
先從原理上看明白
,我把關鍵幾個地方的代碼貼在這里:
// 分配結構體變量 MQData userData = (MQData *)malloc(sizeof(MQData)); // 設置屬于這里連接的參數: id, name 等等 // 創建 mosquitto 對象時,傳入 userData。 struct mosquitto *mosq = mosquitto_new(userData->id, true, userData); // 在回調函數中,把 obj 指針前轉成 MQData 指針 static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { MQData *userData = (MQData *)obj; // 此時就可以根據 userData 指針中的內容分辨出這是哪一個鏈接了 }
另外一個問題:不知道你是否注意到示例中的
mosquitto_loop()
這個函數?這個函數需要
放在 while 死循環中不停的調用,才能出發 mosuiqtto 內部的事件
。(其實在 mosuiqtto 中,還提供了另一個簡化的函數
mosquitto_loop_forever
)。
也就是說:在每個連接中,需要
持續的觸發
mosquitto 底層的事件,才能讓消息系統順利的收發。因此,在示例代碼中,使用
兩個線程
分別連接到云平臺的總線和內部的總線。
五、總結
這篇文章,基本上把一個
物聯網系統
的網關中,
最基本的通信模型
聊完了,相當于是一個程序的骨架吧,剩下的事情就是處理業務層的細節問題了。
萬里長征,這才是第一步!
對于一個網關來說,還有其他更多的問題需要處理,比如:MQTT 連接的鑒權(用戶名+密碼,證書)、通信數據的序列化和反序列化、加密和解密等等,以后慢慢聊吧,希望我們一路前行!
【拜托了,物聯網!】有獎征文火熱進行中:https://bbs.huaweicloud.com/blogs/296704
IoT MQTT NAT 嵌入式
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。