從零開始開發IM(即時通訊)服務端(二)
- 2019 年 10 月 3 日
- 筆記
好消息:IM1.0.0版本已經上線啦,支援特性:
- 私聊發送文本/文件
- 已發送/已送達/已讀回執
- 支援使用ldap登錄
- 支援接入外部的登錄認證系統
- 提供客戶端jar包,方便客戶端開發
github鏈接: https://github.com/yuanrw/IM
本篇將帶大家從零開始搭建一個輕量級的IM服務端,IM的整體設計思路和架構在我的上篇部落格中已經講過了,沒看過的同學請點擊從零開始開發IM(即時通訊)服務端 。
這篇將給大家帶來更多的細節實現。我將從三個方面來闡述如何構建一個完整可靠的IM系統。
- 可靠性
- 安全性
- 存儲設計
可靠性
什麼是可靠性?對於一個IM系統來說,可靠的定義至少是不丟消息、消息不重複、不亂序,滿足這三點,才能說有一個好的聊天體驗。
不丟消息
我們先從不丟消息開始講起。
首先複習一下上一篇設計的服務端架構:
我們先從一個簡單例子開始思考:當Alice給Bob發送一條消息時,可能要經過這樣一條鏈路:
- client–>connecter
- connector–>transfer
- transfer–>connector
- connector–>client
在這整個鏈路中的每個環節都有可能出問題,雖然tcp協議是可靠的,但是它只能保證鏈路層的可靠,無法保證應用層的可靠。
例如在第一步中,connector
收到了從client
發出的消息,但是轉發給transfer
失敗,那麼這條消息Bob就無法收到,而Alice也不會意識到消息發送失敗了。
如果Bob狀態是離線,那麼消息鏈路就是:
- client–>connector
- connector–>transfer
- transfer–>mq
如果在第三步中,transfer
收到了來自connector
的消息,但是離線消息入庫失敗,
那麼這個消息也是傳遞失敗了。
為了保證應用層的可靠,我們必須要有一個ack機制,使發送方能夠確認對方收到了這條消息。
具體的實現,我們模仿tcp協議做一個應用層的ack機制。
tcp的報文是以位元組(byte)
為單位的,而我們以message
單位。
發送方每次發送一個消息,就要等待對方的ack回應,在ack確認消息中應該帶有收到的id以便發送方識別。
其次,發送方需要維護一個等待ack的隊列。 每次發送一個消息之後,就將消息和一個計時器入隊。
另外存在一個執行緒一直輪詢隊列,如果有超時未收到ack的,就取出消息重發。
超時未收到ack的消息有兩種處理方式:
- 和tcp一樣不斷發送直到收到ack為止。
- 設定一個最大重試次數,超過這個次數還沒收到ack,就使用失敗機制處理,節約資源。例如如果是
connector
長時間未收到client
的ack,那麼可以主動斷開和客戶端的連接,剩下未發送的消息就作為離線消息入庫,客戶端斷連後嘗試重連伺服器即可。
不重複、不亂序
有的時候因為網路原因可能導致ack收到較慢,發送方就會重複發送,那麼接收方必須有一個去重機制。
去重的方式是給每個消息增加一個唯一id。這個唯一id並不一定是全局的,只需要在一個會話中唯一即可。例如某兩個人的會話,或者某一個群。如果網路斷連了,重新連接後,就是新的會話了,id會重新從0開始。
接收方需要在當前會話中維護收到的最後一個消息的id,叫做lastId
。
每次收到一個新消息, 就將id與lastId
作比較看是否連續,如果不連續,就放入一個暫存隊列 queue中稍後處理。
例如:
-
當前會話的
lastId
=1,接著伺服器收到了消息msg(id=2)
,可以判斷收到的消息是連續的,就處理消息,將lastId
修改為2。 -
但是如果伺服器收到消息
msg(id=3)
,就說明消息亂序到達了,那麼就將這個消息入隊,等待lastId
變為2後,(即伺服器收到消息msg(id=2)
並處理完了),再取出這個消息處理。
因此,判斷消息是否重複只需要判斷msgId>lastId && !queue.contains(msgId)
即可。如果收到重複的消息,可以判斷是ack未送達,就再發送一次ack。
接收方收到消息後完整的處理流程如下:
偽程式碼如下:
class ProcessMsgNode{ /** * 接收到的消息 */ private Message message; /** * 處理消息的方法 */ private Consumer<Message> consumer; } public CompletableFuture<Void> offer(Long id,Message message,Consumer<Message> consumer) { if (isRepeat(id)) { //消息重複 sendAck(id); return null; } if (!isConsist(id)) { //消息不連續 notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer)); return null; } //處理消息 return process(id, message, consumer); } private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) { return CompletableFuture .runAsync(() -> consumer.accept(message)) .thenAccept(v -> sendAck(id)) .thenAccept(v -> lastId.set(id)) .thenComposeAsync(v -> { Long nextId = nextId(id); if (notConsistMsgMap.containsKey(nextId)) { //隊列中有下個消息 ProcessMsgNode node = notConsistMsgMap.get(nextId); return process(nextId, node.getMessage(), consumer); } else { //隊列中沒有下個消息 CompletableFuture<Void> future = new CompletableFuture<>(); future.complete(null); return future; } }) .exceptionally(e -> { logger.error("[process received msg] has error", e); return null; }); }
安全性
無論是聊天記錄還是離線消息,肯定都會在服務端存儲備份,那麼消息的安全性,保護客戶的隱私也至關重要。
因此所有的消息都必須要加密處理。
在存儲模組里,維護用戶資訊和關係鏈有兩張基礎表,分別是im_user
用戶表和im_relation
關係鏈表。
im_user
表用於存放用戶常規資訊,例如用戶名密碼等,結構比較簡單。im_relation
表用於記錄好友關係,結構如下:
CREATE TABLE `im_relation` ( `id` bigint(20) COMMENT '關係id', `user_id1` varchar(100) COMMENT '用戶1id', `user_id2` varchar(100) COMMENT '用戶2id', `encrypt_key` char(33) COMMENT 'aes密鑰', `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP, `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`) );
user_id1
和user_id2
是互為好友的用戶id,為了避免重複,存儲時按照user_id1
<user_id2
的順序存,並且加上聯合索引。encrypt_key
是隨機生成的密鑰。當客戶端登錄時,就會從資料庫中獲取該用戶的所有的relation
,存在記憶體中,以便後續加密解密。- 當客戶端給某個好友發送消息時,取出記憶體中該關係的密鑰,加密後發送。同樣,當收到一條消息時,取出相應的密鑰解密。
客戶端完整登錄流程如下:
- client調用rest介面登錄。
- client調用rest介面獲取該用戶所有
relation
。 - client向connector發送greet消息,通知上線。
- connector拉取離線消息推送給client。
- connector更新用戶session。
那為什麼connector要先推送離線消息再更新session呢?我們思考一下如果順序倒過來會發生什麼:
- 用戶
Alice
登錄伺服器 connector
更新session- 推送離線消息
- 此時Bob發送了一條消息給Alice
如果離線消息還在推送的過程中,Bob發送了新消息給Alice,伺服器獲取到Alice的session,就會立刻推送。這時新消息就有可能夾在一堆離線消息當中推過去了,那這時,Alice收到的消息就亂序了。
而我們必須保證離線消息的順序在新消息之前。
那麼如果先推送離線消息,之後才更新session。在離線消息推送的過程中,Alice的狀態就是「未上線」,這時Bob新發送的消息只會入庫im_offline
,im_offline
表中的數據被讀完之後才會「上線」開始接受新消息。這也就避免了亂序。
存儲設計
存儲離線消息
當用戶不在線時,離線消息必然要存儲在服務端,等待用戶上線再推送。理解了上一個小節後,離線消息的存儲就非常容易了。增加一張離線消息表im_offline
,表結構如下:
CREATE TABLE `im_offline` ( `id` int(11) COMMENT '主鍵', `msg_id` bigint(20) COMMENT '消息id', `msg_type` int(2) COMMENT '消息類型', `content` varbinary(5000) COMMENT '消息內容', `to_user_id` varchar(100) COMMENT '收件人id', `has_read` tinyint(1) COMMENT '是否閱讀', `gmt_create` timestamp COMMENT '創建時間', PRIMARY KEY (`id`) );
msg_type
用於區分消息類型(chat
,ack
),content
加密後的消息內容以byte數組的形式存儲。
用戶上線時按照條件to_user_id=用戶id
拉取記錄即可。
防止離線消息重複推送
我們思考一下多端登錄的情況,Alice有兩台設備同時登陸,在這種並發的情況下,我們就需要某種機制來保證離線消息只被讀取一次。
這裡利用CAS機制來實現:
- 首先取出所有
has_read=false
的欄位。 - 檢查每條消息的
has_read
值是否為false,如果是,則改為true。這是原子操作。
update im_offline set has_read = true where id = ${msg_id} and has_read = false
- 修改成功則推送,失敗則不推送。
相信到這裡,同學們已經可以自己動手搭建一個完整可用的IM服務端了。更多問題歡迎評論區留言~~
IM1.0.0版本已上線,github鏈接:
https://github.com/yuanrw/IM
覺得對你有幫助請點個star吧~!