【Go語言實戰】基于 WebSocket + MongoDB 的IM即時聊天Demo

      網友投稿 1125 2025-04-02

      文章目錄


      寫在前面

      1. WebSocket原理

      2. 具體流程

      2.1 定義類型

      2.2 進行連接

      2.2.1 服務器監聽連接

      2.2.2 服務器監聽斷開連接

      2.2.3 用戶連接服務器

      2.3 寫入

      2.3.1 定義類型

      2.3.2 讀取數據

      2.3.3 接受消息

      2.3.3 獲取歷史消息

      2.4 讀取

      2.5 插入與查詢

      2.5.1 插入數據

      2.5.2 查詢數據

      2.6 對方不在線

      3. 演示

      4. 源碼地址

      寫在前面

      這個項目是基于WebSocket + MongoDB + MySQL + Redis。

      業務邏輯很簡單,只是兩人的聊天。

      MySQL 用來存儲用戶基本信息

      MongoDB 用來存放用戶聊天信息

      Redis 用來存儲處理過期信息

      github地址

      https://github.com/CocaineCong/gin-chat-demo

      1. WebSocket原理

      WebSocket是應用層第七層上的一個應用層協議,它必須依賴 HTTP 協議進行一次握手。

      握手成功后,數據就直接從TCP通道傳輸,與HTTP無關了。即:WebSocket分為握手和數據傳輸階段。

      即進行了HTTP握手 + 雙工的TCP連接。

      WebSocket 是一種在單個TCP連接上進行全雙工通信的協議。

      WebSocket 使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。

      如果只是想左圖這樣的不斷發送http請求,輪詢的效率是非常低,非常浪費資源,所以就有了websocket協議了,建立在 TCP 協議之上,服務器端的實現比較容易。

      WebSocket協議一旦建立之后,互相溝通所消耗的請求頭是很小的,服務器向客戶端推送消息的功耗就小了。

      2. 具體流程

      2.1 定義類型

      發送消息的結構體

      type SendMsg struct { Type int `json:"type"` Content string `json:"content"` }

      1

      2

      3

      4

      回復消息的結構體

      type ReplyMsg struct { From string `json:"from"` Code int `json:"code"` Content string `json:"content"` }

      1

      2

      3

      4

      5

      用戶結構體

      type Client struct { ID string SendID string Socket *websocket.Conn Send chan []byte }

      1

      2

      3

      4

      5

      6

      廣播類(包括廣播內容和源用戶)

      type Broadcast struct { Client *Client Message []byte Type int }

      1

      2

      3

      4

      5

      用戶管理

      type ClientManager struct { Clients map[string]*Client Broadcast chan *Broadcast Reply chan *Client Register chan *Client Unregister chan *Client }

      1

      2

      3

      4

      5

      6

      7

      信息轉JSON (包括:發送者、接收者、內容)

      type Message struct { Sender string `json:"sender,omitempty"` Recipient string `json:"recipient,omitempty"` Content string `json:"content,omitempty"` }

      1

      2

      3

      4

      5

      2.2 進行連接

      定義一個管理Manager

      var Manager = ClientManager{ Clients : make(map[string]*Client), // 參與連接的用戶,出于性能的考慮,需要設置最大連接數 Broadcast: make(chan *Broadcast), Register : make(chan *Client), Reply : make(chan *Client), Unregister: make(chan *Client), }

      1

      2

      3

      4

      5

      6

      7

      【Go語言實戰】基于 WebSocket + MongoDB 的IM即時聊天Demo

      2.2.1 服務器監聽連接

      用 for 不斷進行監聽查看哪個用戶進入通道通信,對用戶一旦有用戶進來,就 Register 進行注冊

      for { case conn := <- Manager.Register: log.Printf("建立新連接: %v", conn.ID) Manager.Clients[conn.ID] = conn replyMsg := &ReplyMsg{ Code: e.WebsocketSuccess, Content: "已連接至服務器", } msg , _ := json.Marshal(replyMsg) _ = conn.Socket.WriteMessage(websocket.TextMessage, msg) }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      2.2.2 服務器監聽斷開連接

      同樣的,也可以用來對服務器和用戶之間連接的斷開。

      case conn := <-Manager.Unregister: // 斷開連接 log.Printf("連接失敗:%v", conn.ID) if _, ok := Manager.Clients[conn.ID]; ok { replyMsg := &ReplyMsg{ Code: e.WebsocketEnd, Content: "連接已斷開", } msg , _ := json.Marshal(replyMsg) _ = conn.Socket.WriteMessage(websocket.TextMessage, msg) close(conn.Send) delete(Manager.Clients, conn.ID) }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      2.2.3 用戶連接服務器

      我們采用的是gin框架,所以這里我們可以先引入路由

      r := gin.Default() r.Use(gin.Recovery(),gin.Logger()) v1 := r.Group("/") { v1.GET("ping", func(c *gin.Context) { c.JSON(200,"SUCCESS") }) v1.GET("ws",service.WsHandler) }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      再在service層創建一個handler處理

      讀取兩人的id

      uid:=c.Query("uid") // 自己的id toUid:=c.Query("toUid") // 對方的id

      1

      2

      升級ws協議

      conn, err := (&websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // CheckOrigin解決跨域問題 return true }}).Upgrade(c.Writer, c.Request, nil) // 升級成ws協議

      1

      2

      3

      4

      創建用戶實例

      client := &Client{ ID : createId(uid,toUid), SendID : createId(toUid,uid), Socket : conn, Send : make(chan []byte), }

      1

      2

      3

      4

      5

      6

      用戶注冊到用戶管理上面

      Manager.Register <- client

      1

      開通兩個協程, 一個讀,一個寫

      go client.Read() go client.Write()

      1

      2

      2.3 寫入

      2.3.1 定義類型

      我們定義的接受類型是json形式,結構體如下

      我們這里設計了幾個type

      type = 1 接受消息

      type = 2 獲取歷史消息

      type SendMsg struct { Type int `json:"type"` Content string `json:"content"` }

      1

      2

      3

      4

      2.3.2 讀取數據

      先用 PongHandler 返回當前的 socket 對象

      c.Socket.PongHandler() sendMsg := new(SendMsg) // _,msg,_:=c.Socket.ReadMessage() // 不是json格式用這個 err := c.Socket.ReadJSON(&sendMsg) // json格式就用這個

      1

      2

      3

      4

      2.3.3 接受消息

      如果傳過來的type=1的話,那么我們就可以先去redis上面查詢一下當前有多少人進行了連接。

      r1 ,_ := cache.RedisClient.Get(c.ID).Result() r2 ,_ := cache.RedisClient.Get(c.SendID).Result()

      1

      2

      如果有三個人在線上,并且沒有接受消息的話,就拒絕訪問。

      replyMsg := &ReplyMsg{ Code: e.WebsocketLimit, Content: "達到限制", } msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage, msg)

      1

      2

      3

      4

      5

      6

      如果沒有的話,就先記錄到redis中進行緩存

      cache.RedisClient.Incr(c.ID) _ , _ =cache.RedisClient.Expire(c.ID,time.Hour*24*30*3).Result()

      1

      2

      之后,我們再進行廣播消息

      Manager.Broadcast <- &Broadcast{ Client:c, Message:[]byte(sendMsg.Content), }

      1

      2

      3

      4

      2.3.3 獲取歷史消息

      那這個時候我們傳來的 type 就等于 2,Content就是時間戳了

      我們設置的話,是只保存三個月的,三個月過后我們就可以刪除了。

      timeT, err := strconv.Atoi(sendMsg.Content) // 傳送來時間 if err != nil { timeT = 9999999 } results, _ := FindManyMsg(conf.MongoDBName,c.SendID,c.ID,int64(timeT),10)

      1

      2

      3

      4

      5

      這個FindManyMsg后面再說

      返回前十條

      if len(results) > 10 { results = results[:10] }else if len(results) == 0{ replyMsg := &ReplyMsg{ Code:e.WebsocketEnd, Content:"到底了", }

      1

      2

      3

      4

      5

      6

      7

      寫入返回

      msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage,msg)

      1

      2

      2.4 讀取

      我們用一個for循環進行消息的讀取。

      如果有消息的話,就WriteMessage寫下來。發送出去。

      for{ select { case message, ok := <-c.Send : if !ok { _=c.Socket.WriteMessage(websocket.CloseMessage,[]byte{}) return } log.Println(c.ID,"接受消息:",string(message)) replyMsg := &ReplyMsg{ Code:e.WebsocketSuccessMessage, Content:fmt.Sprintf("%s",string(message)), } msg , _ := json.Marshal(replyMsg) _ = c.Socket.WriteMessage(websocket.TextMessage, msg) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      14

      15

      16

      2.5 插入與查詢

      2.5.1 插入數據

      我們使用的是mongoDB進行消息的存儲,MongoDB的插入非常簡單,文檔數據庫,插入json格式即可。

      定義一個存儲的數據類型

      type Trainer struct { Content string `bson:"content"` // 內容 StartTime int64 `bson:"startTime"` // 創建時間 EndTime int64 `bson:"endTime"` // 過期時間 Read uint `bson:"read"` // 已讀 }

      1

      2

      3

      4

      5

      6

      傳入數據庫,用戶ID,內容,是否已讀,過期時間

      func InsertMsg(database string, id string, content string, read uint, expire int64) (err error) { collection := conf.MongoDBClient.Database(database).Collection(id) comment := ws.Trainer{ Content: content, StartTime: time.Now().Unix(), EndTime: time.Now().Unix() + expire, Read: read, } _, err = collection.InsertOne(context.TODO(),comment) return }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      2.5.2 查詢數據

      MongoDB的查詢也非常容易,按照json格式進行查詢。

      定義一個存儲對象的切片

      var resultsMe []ws.Trainer

      1

      通過用戶id查詢所有的用戶消息

      idCollection := conf.MongoDBClient.Database(database).Collection(id)

      1

      根據傳入的time 定義一個過濾器,進行這個時間內的查詢。

      filter := bson.M{"startTime": bson.M{"$lt": time}}

      1

      根據filter進行查詢,然后再通過時間進行倒序排序,并且限定頁數。

      sendIdTimeCursor, err := sendIdCollection.Find(context.TODO(), filter, options.Find().SetSort(bson.D{{"StartTime", -1}}), options.Find(). SetLimit(int64(pageSize)))

      1

      2

      3

      把數據查詢數據傳入到resultsMe中

      err = idTimeCurcor.All(context.TODO(), &resultsMe)

      1

      2.6 對方不在線

      廣播信息

      case broadcast := <-Manager.Broadcast: message := broadcast.Message sendId := broadcast.Client.SendID flag := false // 默認對方不在線

      1

      2

      3

      4

      如果沒有這個人的話就一直找就可以了

      for id, conn := range Manager.Clients { if id != sendId { continue } select { case conn.Send <- message: flag = true default: close(conn.Send) delete(Manager.Clients, conn.ID) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      還是找到的話

      就可以當作已讀信息,存儲

      if flag { log.Println("對方在線應答") replyMsg := &ReplyMsg{ Code: e.WebsocketOnlineReply, Content: "對方在線應答", } msg , err := json.Marshal(replyMsg) _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg) err = InsertMsg(conf.MongoDBName, id, string(message), 1, int64(3*month)) if err != nil { fmt.Println("InsertOneMsg Err", err) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      如果沒有找到的話,就是未讀消息了。

      else { log.Println("對方不在線") replyMsg := ReplyMsg{ Code: e.WebsocketOfflineReply, Content: "對方不在線應答", } msg , err := json.Marshal(replyMsg) _ = broadcast.Client.Socket.WriteMessage(websocket.TextMessage, msg) err = InsertMsg(conf.MongoDBName, id, string(message), 0, int64(3*month)) if err != nil { fmt.Println("InsertOneMsg Err", err) } }

      1

      2

      3

      4

      5

      6

      7

      8

      9

      10

      11

      12

      13

      3. 演示

      測試http連接

      進行ws連接,連接服務器

      當id=1上線,但是id=2沒上線的時候發送消息

      當id=2上線之后

      再次發消息,就是在線應答了

      這邊就實時接受到消息了

      獲取歷史信息

      4. 源碼地址

      github地址

      https://github.com/CocaineCong/gin-chat-demo

      Go MongoDB websocket

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:怎么設置公式自動填充?(excel中如何自動填充公式)
      下一篇:ppt做到一半,閃退了,怎么辦(ppt總是閃退怎么辦)
      相關文章
      亚洲高清无码专区视频| 国产精品亚洲综合一区| 国产av天堂亚洲国产av天堂| 无码不卡亚洲成?人片| 日韩欧美亚洲国产精品字幕久久久| 国产精品亚洲四区在线观看| 亚洲一级在线观看| 四虎必出精品亚洲高清| 精品亚洲成在人线AV无码| 亚洲a级在线观看| 亚洲精品国产日韩| 亚洲精品又粗又大又爽A片| 亚洲精品无码久久久久A片苍井空 亚洲精品无码久久久久YW | 精品亚洲视频在线| 成人精品国产亚洲欧洲| 国产亚洲情侣久久精品| 亚洲另类激情专区小说图片| 亚洲综合国产精品第一页| 亚洲综合无码精品一区二区三区 | 中文字幕人成人乱码亚洲电影 | 亚洲最大激情中文字幕| 亚洲国产精品无码久久久蜜芽| 精品国产亚洲一区二区三区 | 亚洲第一AAAAA片| 亚洲一区二区三区四区在线观看| 久久久久亚洲AV无码网站| 亚洲狠狠ady亚洲精品大秀| 亚洲国产精品一区二区久| 亚洲欧洲日本在线观看| 亚洲JLZZJLZZ少妇| 亚洲欧洲日本在线| 国产亚洲人成网站观看| 久久亚洲精品无码AV红樱桃| 亚洲国产av一区二区三区丶| 亚洲色偷精品一区二区三区| 亚洲成A∨人片天堂网无码| 亚洲色偷偷综合亚洲AVYP| 亚洲综合视频在线| 亚洲AV无码久久久久网站蜜桃 | 国产亚洲综合视频| 亚洲精品无码mv在线观看网站|