從零開始開發IM(即時通訊)服務端(二)

  • 2019 年 10 月 3 日
  • 筆記

好消息:IM1.0.0版本已經上線啦,支援特性

  • 私聊發送文本/文件
  • 已發送/已送達/已讀回執
  • 支援使用ldap登錄
  • 支援接入外部的登錄認證系統
  • 提供客戶端jar包,方便客戶端開發

github鏈接: https://github.com/yuanrw/IM

本篇將帶大家從零開始搭建一個輕量級的IM服務端,IM的整體設計思路和架構在我的上篇部落格中已經講過了,沒看過的同學請點擊從零開始開發IM(即時通訊)服務端

這篇將給大家帶來更多的細節實現。我將從三個方面來闡述如何構建一個完整可靠的IM系統。

  1. 可靠性
  2. 安全性
  3. 存儲設計

可靠性

什麼是可靠性?對於一個IM系統來說,可靠的定義至少是不丟消息消息不重複不亂序,滿足這三點,才能說有一個好的聊天體驗。

不丟消息

我們先從不丟消息開始講起。

首先複習一下上一篇設計的服務端架構
im-structure.png

我們先從一個簡單例子開始思考:當Alice給Bob發送一條消息時,可能要經過這樣一條鏈路:
route

  1. client–>connecter
  2. connector–>transfer
  3. transfer–>connector
  4. connector–>client

在這整個鏈路中的每個環節都有可能出問題,雖然tcp協議是可靠的,但是它只能保證鏈路層的可靠,無法保證應用層的可靠。

例如在第一步中,connector收到了從client發出的消息,但是轉發給transfer失敗,那麼這條消息Bob就無法收到,而Alice也不會意識到消息發送失敗了。

如果Bob狀態是離線,那麼消息鏈路就是:

  1. client–>connector
  2. connector–>transfer
  3. transfer–>mq

如果在第三步中,transfer收到了來自connector的消息,但是離線消息入庫失敗,
那麼這個消息也是傳遞失敗了。
為了保證應用層的可靠,我們必須要有一個ack機制,使發送方能夠確認對方收到了這條消息。

具體的實現,我們模仿tcp協議做一個應用層的ack機制。

tcp的報文是以位元組(byte)為單位的,而我們以message單位。
ack
發送方每次發送一個消息,就要等待對方的ack回應,在ack確認消息中應該帶有收到的id以便發送方識別。

其次,發送方需要維護一個等待ack的隊列。 每次發送一個消息之後,就將消息和一個計時器入隊。

另外存在一個執行緒一直輪詢隊列,如果有超時未收到ack的,就取出消息重發。

超時未收到ack的消息有兩種處理方式:

  1. 和tcp一樣不斷發送直到收到ack為止。
  2. 設定一個最大重試次數,超過這個次數還沒收到ack,就使用失敗機制處理,節約資源。例如如果是connector長時間未收到client的ack,那麼可以主動斷開和客戶端的連接,剩下未發送的消息就作為離線消息入庫,客戶端斷連後嘗試重連伺服器即可。

不重複、不亂序

有的時候因為網路原因可能導致ack收到較慢,發送方就會重複發送,那麼接收方必須有一個去重機制。
去重的方式是給每個消息增加一個唯一id。這個唯一id並不一定是全局的,只需要在一個會話中唯一即可。例如某兩個人的會話,或者某一個群。如果網路斷連了,重新連接後,就是新的會話了,id會重新從0開始。

接收方需要在當前會話中維護收到的最後一個消息的id,叫做lastId
每次收到一個新消息, 就將id與lastId作比較看是否連續,如果不連續,就放入一個暫存隊列 queue中稍後處理。

例如:

  • 當前會話的lastId=1,接著伺服器收到了消息msg(id=2),可以判斷收到的消息是連續的,就處理消息,將lastId修改為2。

  • 但是如果伺服器收到消息msg(id=3),就說明消息亂序到達了,那麼就將這個消息入隊,等待lastId變為2後,(即伺服器收到消息msg(id=2)並處理完了),再取出這個消息處理。

因此,判斷消息是否重複只需要判斷msgId>lastId && !queue.contains(msgId)即可。如果收到重複的消息,可以判斷是ack未送達,就再發送一次ack。

接收方收到消息後完整的處理流程如下:
offer.png

偽程式碼如下:

class ProcessMsgNode{      /**       * 接收到的消息       */      private Message message;      /**       * 處理消息的方法       */      private Consumer<Message> consumer;  }    public CompletableFuture<Void> offer(Long id,Message     message,Consumer<Message> consumer) {      if (isRepeat(id)) {      //消息重複          sendAck(id);          return null;      }      if (!isConsist(id)) {      //消息不連續          notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer));          return null;      }      //處理消息      return process(id, message, consumer);  }    private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) {      return CompletableFuture          .runAsync(() -> consumer.accept(message))          .thenAccept(v -> sendAck(id))          .thenAccept(v -> lastId.set(id))          .thenComposeAsync(v -> {              Long nextId = nextId(id);              if (notConsistMsgMap.containsKey(nextId)) {                  //隊列中有下個消息                  ProcessMsgNode node = notConsistMsgMap.get(nextId);                  return process(nextId, node.getMessage(), consumer);              } else {                  //隊列中沒有下個消息                  CompletableFuture<Void> future = new CompletableFuture<>();                  future.complete(null);                  return future;              }          })          .exceptionally(e -> {              logger.error("[process received msg] has error", e);              return null;          });  }

安全性

無論是聊天記錄還是離線消息,肯定都會在服務端存儲備份,那麼消息的安全性,保護客戶的隱私也至關重要。
因此所有的消息都必須要加密處理。
在存儲模組里,維護用戶資訊和關係鏈有兩張基礎表,分別是im_user用戶表和im_relation關係鏈表。

  • im_user表用於存放用戶常規資訊,例如用戶名密碼等,結構比較簡單。
  • im_relation表用於記錄好友關係,結構如下:
CREATE TABLE `im_relation` (    `id` bigint(20) COMMENT '關係id',    `user_id1` varchar(100) COMMENT '用戶1id',    `user_id2` varchar(100) COMMENT '用戶2id',    `encrypt_key` char(33) COMMENT 'aes密鑰',    `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,    `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,    PRIMARY KEY (`id`),    UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)  );
  • user_id1user_id2是互為好友的用戶id,為了避免重複,存儲時按照user_id1<user_id2的順序存,並且加上聯合索引。
  • encrypt_key是隨機生成的密鑰。當客戶端登錄時,就會從資料庫中獲取該用戶的所有的relation,存在記憶體中,以便後續加密解密。
  • 當客戶端給某個好友發送消息時,取出記憶體中該關係的密鑰,加密後發送。同樣,當收到一條消息時,取出相應的密鑰解密。

客戶端完整登錄流程如下:
login process

  1. client調用rest介面登錄。
  2. client調用rest介面獲取該用戶所有relation
  3. client向connector發送greet消息,通知上線。
  4. connector拉取離線消息推送給client。
  5. connector更新用戶session。

那為什麼connector要先推送離線消息再更新session呢?我們思考一下如果順序倒過來會發生什麼:

  1. 用戶Alice登錄伺服器
  2. connector更新session
  3. 推送離線消息
  4. 此時Bob發送了一條消息給Alice

如果離線消息還在推送的過程中,Bob發送了新消息給Alice,伺服器獲取到Alice的session,就會立刻推送。這時新消息就有可能夾在一堆離線消息當中推過去了,那這時,Alice收到的消息就亂序了。

而我們必須保證離線消息的順序在新消息之前。

那麼如果先推送離線消息,之後才更新session。在離線消息推送的過程中,Alice的狀態就是「未上線」,這時Bob新發送的消息只會入庫im_offlineim_offline表中的數據被讀完之後才會「上線」開始接受新消息。這也就避免了亂序。

存儲設計

存儲離線消息

當用戶不在線時,離線消息必然要存儲在服務端,等待用戶上線再推送。理解了上一個小節後,離線消息的存儲就非常容易了。增加一張離線消息表im_offline,表結構如下:

CREATE TABLE `im_offline` (    `id` int(11) COMMENT '主鍵',    `msg_id` bigint(20) COMMENT '消息id',    `msg_type` int(2) COMMENT '消息類型',    `content` varbinary(5000) COMMENT '消息內容',    `to_user_id` varchar(100) COMMENT '收件人id',    `has_read` tinyint(1) COMMENT '是否閱讀',    `gmt_create` timestamp COMMENT '創建時間',    PRIMARY KEY (`id`)  );

msg_type用於區分消息類型(chat,ack),content加密後的消息內容以byte數組的形式存儲。
用戶上線時按照條件to_user_id=用戶id拉取記錄即可。

防止離線消息重複推送

我們思考一下多端登錄的情況,Alice有兩台設備同時登陸,在這種並發的情況下,我們就需要某種機制來保證離線消息只被讀取一次。

這裡利用CAS機制來實現:

  1. 首先取出所有has_read=false的欄位。
  2. 檢查每條消息的has_read值是否為false,如果是,則改為true。這是原子操作。
update im_offline set has_read = true where id = ${msg_id} and has_read = false
  1. 修改成功則推送,失敗則不推送。

相信到這裡,同學們已經可以自己動手搭建一個完整可用的IM服務端了。更多問題歡迎評論區留言~~

IM1.0.0版本已上線,github鏈接:
https://github.com/yuanrw/IM
覺得對你有幫助請點個star吧~!