用 Golang 實現基于 Redis 的安全高效 RPC 通信(用一生去愛你)

      網友投稿 1225 2025-04-01

      前言


      RPC(Remote Procedure Call),翻譯過來為“遠程過程調用”,是一種分布式系統中服務或節點之間的有效通信機制。通過 RPC,某個節點(或客戶端)可以很輕松的調用遠端(或服務端)的方法或服務,就像在本地調用一樣簡單。現有的很多 RPC 框架都要求暴露服務端地址,也就是需要知道服務器的 IP 和 RPC 端口。而本篇文章將介紹一種不需要暴露 IP 地址和端口的 RPC 通信方式。這種方式是基于 Redis BRPOP/BLPOP 操作實現的延遲隊列,以及 golang 中的 goroutine 協程異步機制,整個框架非常簡單和易于理解,同時也很高效、穩定和安全。這種方式已經應用到了 Crawlab 中的節點通信當中,成為了各節點即時傳輸信息的主要方式。下面我們將從 Crawlab 早期節點通信方案 PubSub 開始,介紹當時遇到的問題和解決方案,然后如何過渡到現在的 RPC 解決方案,以及它是如何在 Crawlab 中發揮作用的。

      基于 PubSub 的方案

      早期的 Crawlab 是基于 Redis 的 PubSub,也就是發布訂閱模式。這是 Redis 中主要用于一對多的單向通信的方案。其用法非常簡單:

      訂閱者利用SUBSCRIBE channel1 channel2 ...來訂閱一個或多個頻道;

      發布者利用PUBLISH channelx message來發布消息給該頻道的訂閱者。

      Redis的PubSub可以用作廣播模式,即一個發布者對應多個訂閱者。而在Crawlab中,我們只有一個訂閱者對應一個發布者的情況(主節點->工作節點:nodes:)或一個訂閱者對應多個發布者的情況(工作節點->主節點:nodes:master>)。這是為了方便雙向通信。

      以下為節點通信原理示意圖。

      各個節點會通過Redis的PubSub功能來做相互通信。

      所謂PubSub,簡單來說是一個發布訂閱模式。訂閱者(Subscriber)會在Redis上訂閱(Subscribe)一個通道,其他任何一個節點都可以作為發布者(Publisher)在該通道上發布(Publish)消息。

      在 Crawlab 中,主節點會訂閱 nodes:master 通道,其他節點如果需要向主節點發送消息,只需要向 nodes:master 發布消息就可以了。同理,各工作節點會各自訂閱一個屬于自己的通道 nodes:(node_id 是MongoDB里的節點ID,是MongoDB ObjectId),如果需要給工作節點發送消息,只需要發布消息到該通道就可以了。

      一個網絡請求的簡單過程如下:

      客戶端(前端應用)發送請求給主節點(API);

      主節點通過Redis PubSub 的 通道發布消息給相應的工作節點;

      工作節點收到消息之后,執行一些操作,并將相應的消息通過 通道發布給主節點;

      主節點收到消息之后,將消息返回給客戶端。

      不是所有節點通信都是雙向的,也就是說,主節點只會單方面對工作節點通信,工作節點并不會返回響應給主節點,所謂的單向通信。以下是Crawlab的通信類型。

      如果您在閱讀 Crawlab 源碼,會發現節點通信中有大量的 chan 語法,這是 golang 的一個并發特性。

      chan 表示為一個通道,在 Golang 中分為無緩沖和有緩沖的通道,我們用了無緩沖通道來阻塞協程,只有當 chan 接收到信號(chan <- "some signal"),該阻塞才會釋放,協程進行下一步操作)。在請求響應模式中,如果為雙向通信,主節點收到請求后會起生成一個無緩沖通道來阻塞該請求,當收到來自工作節點的消息后,向該無緩沖通道賦值,阻塞釋放,返回響應給客戶端。

      go 命令會起一個 goroutine(協程)來完成并發,配合 chan,該協程可以利用無緩沖通道掛起,等待信號執行接下來的操作。

      基于延遲隊列的方案

      PubSub 這種消息訂閱-發布設計模式是一種有效的實現節點通信的方式,但是它有兩個問題:

      PubSub 的數據是即時的,會隨著 Redis 宕機而丟失;

      寫基于 PubSub 的通信服務會要求用到 goroutine 和 channel,這加大了開發難度,降低了可維護性。

      其中,第二個問題是比較棘手的。如果我們希望加入更多的功能,需要寫大量的異步代碼,這會加大系統模塊間的耦合度,造成擴展性很差,而且代碼閱讀起來很痛苦。

      因此,為了解決這個問題,我們采用了基于 Redis 延遲消息隊列的 RPC 服務。

      下圖是基于延遲隊列架構的 RPC 實現示意圖。

      每一個節點都有一個客戶端(Client)和服務端(Server)。客戶端用于發送消息到目標節點(Target Node)并接收其返回的消息,服務端用于接收、處理源節點(Source Node)的消息并返回消息給源節點的客戶端。

      整個 RPC 通信的流程如下:

      源節點的客戶端通過 LPUSH 將消息推送到 Redis 的 nodes: 中,并執行 BRPOP nodes:: 阻塞并監聽這個消息隊列;

      目標節點的服務端通過 BRPOP 一直在監聽 nodes:,收到消息后,通過消息中的 Method 字段執行對應的程序;

      目標節點執行完畢后,服務端通過 LPUSH 將消息推送到 Redis 的 nodes:: 中;

      由于源節點客戶端一直在監聽 nodes:: 這個消息隊列,當目標節點服務端推送消息到這個隊列后,源節點客戶端將立即收到返回的消息,再做后續處理。

      這樣,整個節點的通信流程就通過 Redis 完成了。這樣做的好處在于不用暴露 HTTP 的 IP 地址和端口,只需要知道節點 ID 即可完成 RPC 通信。

      這樣設計后的 RPC 代碼比較容易理解和維護。每次需要擴展新的通信類別時,只需要繼承 rpc.Service 類,實現 ClientHandle(客戶端處理方法)和 ServerHandle(服務端處理方法)方法就可以了。

      這里多說一下 BRPOP。它將移出并獲取消息隊列的最后一個元素, 如果消息隊列沒有元素會阻塞隊列直到等待超時或發現可彈出元素為止。因此,使用 BRPOP 命令相對于輪訓或其他方式,可以避免不間斷的請求 Redis,避免浪費網絡和計算資源。

      如果對 Redis 的操作命令不熟悉的,可以參考一下掘金小冊《Redis 深度歷險:核心原理與應用實踐》,這本小冊深入介紹了 Redis 的原理以及工程實踐,對于應用 Redis 到實際開發中非常實用。

      代碼實踐

      講了這么多理論知識,我們還是需要看看代碼的。老師常教育我們:“Talk is cheap. Show me the code.”

      由于 Crawlab 后端是 Golang 開發的,要理解以下代碼需要一些 Golang 的基礎知識。

      首先我們需要定一個傳輸消息的數據結構。代碼如下。

      package?entity

      type?RpcMessage?struct?{

      Id??????string????????????`json:"id"`??????//?消息ID

      Method??string????????????`json:"method"`??//?消息方法

      NodeId??string????????????`json:"node_id"`?//?節點ID

      Params??map[string]string?`json:"params"`??//?參數

      Timeout?int???????????????`json:"timeout"`?//?超時

      Result??string????????????`json:"result"`??//?結果

      Error???string????????????`json:"error"`???//?錯誤

      }

      這里,我們定義了消息 ID、方法、節點 ID、參數等字段。消息 ID 是 UUID,保證了消息 ID 的唯一性。

      首先,我們定義一個抽象基礎接口,方便讓實際業務邏輯模塊繼承。服務端的處理邏輯在 ServerHandle 中,返回 entity 里的 RpcMessage,而客戶端的邏輯在 ClientHandle 中。

      //?RPC服務基礎類

      type?Service?interface?{

      ServerHandle()?(entity.RpcMessage,?error)

      ClientHandle()?(interface{},?error)

      }

      當我們調用客戶端的通用方法的時候,需要實現兩個邏輯:

      發送消息:生成消息 ID,將消息序列化為 JSON,LPUSH 推入 Redis 消息隊列;

      通過 BRPOP 延遲獲取返回的消息,返回給調用方。

      以下是實現的代碼。

      //?客戶端處理消息函數

      func?ClientFunc(msg?entity.RpcMessage)?func()?(entity.RpcMessage,?error)?{

      return?func()?(replyMsg?entity.RpcMessage,?err?error)?{

      //?請求ID

      msg.Id?=?uuid.NewV4().String()

      //?發送RPC消息

      msgStr?:=?utils.ObjectToString(msg)

      if?err?:=?database.RedisClient.LPush(fmt.Sprintf("rpc:%s",?msg.NodeId),?msgStr);?err?!=?nil?{

      log.Errorf("RpcClientFunc?error:?"?+?err.Error())

      debug.PrintStack()

      return?replyMsg,?err

      }

      //?獲取RPC回復消息

      dataStr,?err?:=?database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s",?msg.NodeId,?msg.Id),?msg.Timeout)

      if?err?!=?nil?{

      log.Errorf("RpcClientFunc?error:?"?+?err.Error())

      debug.PrintStack()

      return?replyMsg,?err

      }

      //?反序列化消息

      if?err?:=?json.Unmarshal([]byte(dataStr),?&replyMsg);?err?!=?nil?{

      log.Errorf("RpcClientFunc?error:?"?+?err.Error())

      debug.PrintStack()

      return?replyMsg,?err

      }

      //?如果返回消息有錯誤,返回錯誤

      if?replyMsg.Error?!=?""?{

      return?replyMsg,?errors.New(replyMsg.Error)

      }

      return

      }

      }

      服務端處理的邏輯如下,大致的邏輯是:

      在一個循環中,通過 BRPOP 獲取該節點對應的消息;

      當獲取到消息時,生成一個 goroutine 異步處理該消息;

      繼續等待。

      您可以在 InitRpcService 這個方法中看到上述邏輯。私有方法 handleMsg 實現了序列化、調用服務端 RPC 服務方法、發送返回消息的邏輯。如果需要拓展 RPC 方法類型,在工廠類方法 GetService 里添加就可以了。

      //?獲取RPC服務

      func?GetService(msg?entity.RpcMessage)?Service?{

      switch?msg.Method?{

      case?constants.RpcInstallLang:

      return?&InstallLangService{msg:?msg}

      case?constants.RpcInstallDep:

      return?&InstallDepService{msg:?msg}

      case?constants.RpcUninstallDep:

      return?&UninstallDepService{msg:?msg}

      case?constants.RpcGetLang:

      return?&GetLangService{msg:?msg}

      case?constants.RpcGetInstalledDepList:

      return?&GetInstalledDepsService{msg:?msg}

      }

      return?nil

      }

      //?處理RPC消息

      func?handleMsg(msgStr?string,?node?model.Node)?{

      //?反序列化消息

      var?msg?entity.RpcMessage

      if?err?:=?json.Unmarshal([]byte(msgStr),?&msg);?err?!=?nil?{

      log.Errorf(err.Error())

      debug.PrintStack()

      }

      //?獲取service

      service?:=?GetService(msg)

      //?根據Method調用本地方法

      replyMsg,?err?:=?service.ServerHandle()

      if?err?!=?nil?{

      log.Errorf(err.Error())

      debug.PrintStack()

      }

      //?發送返回消息

      if?err?:=?database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s",?node.Id.Hex(),?replyMsg.Id),?utils.ObjectToString(replyMsg));?err?!=?nil?{

      log.Errorf(err.Error())

      debug.PrintStack()

      }

      }

      //?初始化服務端RPC服務

      func?InitRpcService()?error?{

      go?func()?{

      for?{

      //?獲取當前節點

      node,?err?:=?model.GetCurrentNode()

      if?err?!=?nil?{

      log.Errorf(err.Error())

      用 Golang 實現基于 Redis 的安全高效 RPC 通信(用一生去愛你)

      debug.PrintStack()

      continue

      }

      //?獲取獲取消息隊列信息

      msgStr,?err?:=?database.RedisClient.BRPop(fmt.Sprintf("rpc:%s",?node.Id.Hex()),?0)

      if?err?!=?nil?{

      if?err?!=?redis.ErrNil?{

      log.Errorf(err.Error())

      debug.PrintStack()

      }

      continue

      }

      //?處理消息

      go?handleMsg(msgStr,?node)

      }

      }()

      return?nil

      }

      Crawlab 的節點上經常需要為爬蟲安裝一些第三方依賴,例如 pymongo、requests 等。而其中,我們也需要直到某個節點上是否已經安裝了某個依賴,這需要跨服務器通信,也就是需要在分布式網絡中進行雙向通信。而這個邏輯是通過 RPC 實現的。主節點向目標節點發起 RPC 調用,目標節點運行被調用方法,將運行結果也就是安裝的依賴列表返回給客戶端,客戶端再返回給調用者。

      下面的代碼實現了獲取目標節點上已安裝的依賴的 RPC 方法。

      //?獲取已安裝依賴服務

      //?繼承Service基礎類

      type?GetInstalledDepsService?struct?{

      msg?entity.RpcMessage

      }

      //?服務端處理方法

      //?重載ServerHandle

      func?(s?*GetInstalledDepsService)?ServerHandle()?(entity.RpcMessage,?error)?{

      lang?:=?utils.GetRpcParam("lang",?s.msg.Params)

      deps,?err?:=?GetInstalledDepsLocal(lang)

      if?err?!=?nil?{

      s.msg.Error?=?err.Error()

      return?s.msg,?err

      }

      resultStr,?_?:=?json.Marshal(deps)

      s.msg.Result?=?string(resultStr)

      return?s.msg,?nil

      }

      //?客戶端處理方法

      //?重載ClientHandle

      func?(s?*GetInstalledDepsService)?ClientHandle()?(o?interface{},?err?error)?{

      //?發起?RPC?請求,獲取服務端數據

      s.msg,?err?=?ClientFunc(s.msg)()

      if?err?!=?nil?{

      return?o,?err

      }

      //?反序列化

      var?output?[]entity.Dependency

      if?err?:=?json.Unmarshal([]byte(s.msg.Result),?&output);?err?!=?nil?{

      return?o,?err

      }

      o?=?output

      return

      }

      寫好了 RPC 服務端和客戶端處理方法,就可以輕松編寫調用邏輯了。以下是調用獲取遠端已安裝依賴列表的方法。首先由 GetService 工廠類獲取之前定義好的 GetInstalledDepsService,再調用其客戶端處理方法 ClientHandle,然后返回結果。這就像在本地調用方法一樣。是不是很簡單?

      //?獲取遠端已安裝依賴

      func?GetInstalledDepsRemote(nodeId?string,?lang?string)?(deps?[]entity.Dependency,?err?error)?{

      params?:=?make(map[string]string)

      params["lang"]?=?lang

      s?:=?GetService(entity.RpcMessage{

      NodeId:??nodeId,

      Method:??constants.RpcGetInstalledDepList,

      Params:??params,

      Timeout:?60,

      })

      o,?err?:=?s.ClientHandle()

      if?err?!=?nil?{

      return

      }

      deps?=?o.([]entity.Dependency)

      return

      }

      結語

      本篇文章主要介紹了一種基于 Redis 延遲隊列的 RPC 通信方式,這種方式不用暴露各個節點或服務的 IP 地址或端口,是一種非常安全的方式。而且,這種方式已經用 Golang 在 Crawlab 中實現了雙向通信,特別是 Golang 中的天生支持異步的 goroutine,讓這種方式的實現變得簡單。實際上,這種方式理論上是非常高效的,能夠支持高并發數據傳輸。

      雖然如此,這種方式對于 Crawlab 的低并發遠程通信來說是足夠的了,在實際使用中也沒有出現問題,非常穩定。對于隱秘性有要求、希望不暴露地址信息的開發者,我們也推薦將該種方式在實際應用中嘗試。

      參考

      Crawlab 主頁

      Crawlab Demo

      Crawlab 文檔

      通過 Redis 實現 RPC 遠程方法調用

      Go Redis RPC

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:為Excel圖表添加趨勢線的方法(excel2010圖表如何添加趨勢線)
      下一篇:Excel表格亂碼怎么辦(excel表格亂碼怎么弄)
      相關文章
      亚洲va无码专区国产乱码| 亚洲国产黄在线观看| 亚洲综合精品网站| 亚洲精品无码久久久久APP| 亚洲国产美女精品久久久久| 亚洲国产一区二区三区青草影视 | 亚洲AV永久青草无码精品| 亚洲中文字幕无码不卡电影| 亚洲欧洲中文日韩久久AV乱码| 国产亚洲精品国产福利在线观看| 久久亚洲AV成人无码国产最大| 亚洲国产精品成人AV在线| 亚洲精品久久无码| 亚洲大尺度无码无码专线一区| 亚洲精品无码国产片| 亚洲国产精品99久久久久久| 亚洲精品美女久久久久久久| 亚洲av日韩av永久在线观看| 国产99久久亚洲综合精品| 偷自拍亚洲视频在线观看99| 亚洲国产精品视频| 伊人婷婷综合缴情亚洲五月| 亚洲老妈激情一区二区三区| 亚洲av无码国产精品色午夜字幕 | 伊人久久五月丁香综合中文亚洲| 亚洲无mate20pro麻豆| 最新亚洲精品国偷自产在线| 亚洲国产欧洲综合997久久| 国产精品亚洲精品日韩电影| 亚洲一区二区高清| 日本亚洲成高清一区二区三区| 久久久久久a亚洲欧洲AV| 久久国产亚洲高清观看| 亚洲国产成人手机在线电影bd| 亚洲男人的天堂久久精品| 亚洲日韩av无码中文| 在线亚洲精品视频| 国产亚洲精品自在线观看| 亚洲国产精品无码久久SM| 久久亚洲精品无码VA大香大香| 亚洲乱码中文字幕小综合|