基於node+socket.io+redis的多房間多進程聊天室
- 2019 年 12 月 4 日
- 筆記
本文作者:IMWeb jaychen 原文出處:IMWeb社區 未經同意,禁止轉載

一、相關技術介紹:
消息實時推送,指的是將消息實時地推送到瀏覽器,用戶不需要刷新瀏覽器就可以實時獲取最新的消息,實時聊天室的技術原理也是如此。傳統的Web站點為了實現推送技術,所用的技術都是輪詢,這種傳統的模式帶來很明顯的缺點,即瀏覽器需要不斷的向服務器發出請求。 短輪詢(Polling)

客戶端需要定時往瀏覽器輪詢發送請求,且只有當服務有數據更新後,客戶端的下一次輪詢請求才能拿到更新後的數據,在數據更新前的多次請求相當於無效。這對帶寬資源造成了極大的浪費,若提高輪詢定時器時間,又會有數據更新不及時的煩惱。 commet 為了解決短輪詢的弊端,一種基於http長連接的"服務器推"方式被hack出來。其與短輪詢的區別主要是,採用commet時,客戶端與服務端保持一個長連接,當數據發生改變時,服務端主動將數據推送到客戶端。Comet 又可以被細分為兩種實現方式,一種是長輪詢機制,一種是流技術。
- 長輪詢

長輪詢跟短輪詢不同的地方是,客戶端往服務端發送請求後,服務端判斷是否有數據更新,若沒有,則將請求hold住,等待數據更新時,才返迴響應。這樣則避免了大量無效的http請求,但即使採用長輪詢方式,接受數據更新的最小時間間隔還是為2*RTT(往返時間)。
- 流技術

流技術(http stream)基於iframe實現。通過HTML標籤iframe src指向服務端,建立一個長連接。當有數據推送,則往客戶端返回,無須再請求。但流技術有個缺點就是,在瀏覽器頂部會一直出現頁面未加載完成的loading標示。
websocket

為了解決服務端如何更快地實時推送數據到客戶端以及以上推送方式技術的不足,HTML5中定義了Websocket協議,它是一種在單個TCP連接上進行全雙工通訊的協議。與http協議不同的請求/響應模式不同,Websocket在建立連接之前有一個Handshake(Opening Handshake)過程,建立連接之後,雙方即可雙向通信。當然,由於websocket是html5新特性,在部分瀏覽器(IE10以下)是不支持的。 我們來看下websocket的握手報文:

請求報文: GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Protocol: chat Sec-WebSocket-Version: 13 Origin: http://example.com
- "Upgrade "、"Connection": 告訴服務器這個請求是一個websocket協議,需要區別處理
- "Upgrade: websocket": 表明這是一個 WebSocket 類型請求,意在告訴 server 需要將通信協議切換到 WebSocket
- "Sec-WebSocket-Key": 是 client 發送的一個 base64 編碼的密文,要求 server 必須返回一個對應加密的 "Sec-WebSocket-Accept" 應答,否則 client 會拋出 "Error during WebSocket handshake" 錯誤,並關閉連接
- "Sec-WebSocket-Protocol":一個用戶定義的字符串,用來區分同URL下,不同的服務所需要的協議
- "Sec-WebSocket-Version":Websocket Draft (協議版本)
響應報文: HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= Sec-WebSocket-Protocol: chat
- "Sec-WebSocket-Accept": 這個則是經過服務器確認,並且加密過後的 Sec-WebSocket-Key。加密方式為將Sec-WebSocket-Key與一段固定的 GUID 字符串進行連接,然後進行SHA-1 hash,接着base64編碼得到。
socket.io(http://socket.io) 是一個完全由JavaScript實現,基於Node.js、支持WebSocket的協議用於實時通信、跨平台的開源框架。Socket.IO除了支持WebSocket通訊協議外,還支持許多種輪詢機制以及其它實時通信方式,並封裝成了通用的接口,並能夠根據瀏覽器對通訊機制的支持情況自動地選擇最佳的方式來實現網絡實時應用。
首先,我們創建一個socket.io server對象,指定監聽80端口。並且指定收到message消息,以及socket端口的監聽方法。接着,當socket建立連接後,通過socket.emit方法,可以往客戶端發送消息。
var io = require('socket.io')(); io.on('connection', function(socket) { //接受消息 socket.on('message', function (msg) { console.log('receive messge : ' + msg ); }); //發送消息 socket.emit('message', 'hello'); //斷開連接回調 socket.on('disconnect', function () { console.log('socket disconnect'); }); }); io.listen(80);
客戶端的代碼也非常簡單,只要引入socket.io對應的客戶端庫。 在socket建立連接的回調中,使用socket.emit以及socket.on就可以分別做消息的發送以及監聽了。
<script> var socket = io('http://localhost/'); socket.on('connect', function () { socket.emit('message', 'hi, i am client!'); socket.on('message', function (msg) { console.log('msg received from server'); }); }); </script>
二、多節點集群架構設計
若只是單機部署應用,單純使用socket.io的消息事件監聽處理即可滿足我們的需求。但隨着業務的擴大,我們需要考慮多機集群部署,客戶端可以連接到任一節點,並發送消息。如何做到多節點的同時推送,我們需要建立一套多節點之間的消息分發/訂閱架構。這時我們引入redis的pub/sub功能。
redis redis是一個key-value存儲系統,在該項目中主要起到一個消息分發中心(publish/subscribe)的作用。用戶通過socket.io namespace 訂閱房間號後,socket.io server則往redis訂閱(subscribe)該房間號channel。當在該房間中的某一用戶發送消息時,則通過redis的publish功能往redis該房間號channel publish消息。這樣所有訂閱該房間號channel的websocket連接則會收到消息回調,然後推送給客戶端。
nginx 由於採用了集群架構,則需要nginx來做反向代理。需要注意的是,websocket的支持需要nginx1.3以上版本。並且我們需要通過配置ip_hash做粘性會話(ip_hash)處理,避免在低版本瀏覽器socket.io使用兼容方案輪詢請求,請求到不同機器,造成session異常。
三、架構設計圖

客戶端通過socket.io namespace 指定對應roomid,請求到nginx。nginx根據ip_hash反向代理到對應機器的某一端口的socket.io server 進程。建立websocket連接,並往redis訂閱對應到房間(roomid)channel。到這個時候,一個訂閱了某一房間的websocket通道建立完成。 當用戶發送消息時,socket.io server捕獲到該房間到消息後,即往redis對應房間id的channel publish消息。這時所有訂閱了該房間id channel的socket.io server就會收到訂閱響應,接着找到對應房間id的webscoket通道,並將消息推送到客戶端。
四、代碼示例(多房間實時聊天室):
nginx配置(nginx版本須>1.3): 在http{}里配置定義upstream,並設置ip_hash。使同一個ip的請求能夠落在同一個機器同一個進程中。 如果改節點掛了,則自動重連到另外一個節點,該方案對於後期擴容也非常方便。
upstream io_nodes { ip_hash; server 127.0.0.1:6001; server 127.0.0.1:6002; server 127.0.0.1:6003; server 127.0.0.1:6004; server 127.0.0.1:6005; server 127.0.0.1:6006; server 127.0.0.1:6007; server 127.0.0.1:6008; server 10.x.x.x:6001; server 10.x.x.x:6002; server 10.x.x.x:6003; server 10.x.x.x:6004; server 10.x.x.x:6005; server 10.x.x.x:6006; server 10.x.x.x:6007; server 10.x.x.x:6008; }
在server中,配置location:
location / { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; proxy_http_version 1.1; proxy_pass http://io_nodes; proxy_redirect off; }
cluster.js 我們採用了多進程的設計,充分利用cpu多核優勢。通過主進程統一管理維護子進程,每個進程監聽一個端口。
var cupNum = require('os').cpus().length, workerArr = [], roomInfo = []; var connectNum = 0; for (var i = 0; i < cupNum; i++) { workerArr.push(fork('./fork_server.js', [6001 + i])); workerArr[i].on('message', function(msg) { if (msg.cmd && msg.cmd === 'client connect') { connectNum++; console.log('socket server connectnum:' + connectNum); } if (msg.cmd && msg.cmd === 'client disconnect') { connectNum--; console.log('socket server connectnum:' + connectNum); } });
fork_server.js
var process = require('process'); var io = require('socket.io')(); var num = 0; var redis = require('redis'); var redisClient = redis.createClient; //建立redis pub、sub連接 var pub = redisClient({port:13800, host: '127.0.0.1', password:'xxxx'}); var sub = redisClient({port: 13800, host:'127.0.0.1', password:'xxxx'}); var roomSet = {}; //獲取父進程傳遞端口 var port = parseInt(process.argv[2]); //當websocket連接時 io.on('connection', function(socket) { //客戶端請求ws URL: http://127.0.0.1:6001?roomid=k12_webcourse_room_1 var roomid = socket.handshake.query.roomid; console.log('worker pid: ' + process.pid + ' join roomid: '+ roomid); socket.on('join', function (data) { socket.join(roomid); //加入房間 // 往redis訂閱房間id if(!roomSet[roomid]){ roomSet[roomid] = {}; console.log('sub channel ' + roomid); sub.subscribe(roomid); } roomSet[roomid][socket.id] = {}; reportConnect(); console.log(data.username + ' join, IP: ' + socket.client.conn.remoteAddress); roomSet[roomid][socket.id].username = data.username; // 往該房間id的reids channel publish用戶進入房間消息 pub.publish(roomid, JSON.stringify({"event":'join',"data": data})); }); //用戶發言 推送消息到redis socket.on('say', function (data) { console.log("Received Message: " + data.text); pub.publish(roomid, JSON.stringify({"event":'broadcast_say',"data": { username: roomSet[roomid][socket.id].username, text: data.text }})); }); socket.on('disconnect', function() { num--; console.log('worker pid: ' + process.pid + ' clien disconnection num:' + num); process.send({ cmd: 'client disconnect' }); if (roomSet[roomid] && roomSet[roomid][socket.id] && roomSet[roomid][socket.id].username) { console.log(roomSet[roomid][socket.id].username + ' quit'); pub.publish(roomid, JSON.stringify({"event":'broadcast_quit',"data": { username: roomSet[roomid][socket.id].username }})); } roomSet[roomid] && roomSet[roomid][socket.id] && (delete roomSet[roomid][socket.id]); }); }); /** * 訂閱redis 回調 * @param {[type]} channel [頻道] * @param {[type]} count [數量] * @return {[type]} [description] */ sub.on("subscribe", function (channel, count) { console.log('worker pid: ' + process.pid + ' subscribe: ' + channel); }); /** * 收到redis publish 對應channel的消息 * @param {[type]} channel [description] * @param {[type]} message * @return {[type]} [description] */ sub.on("message", function (channel, message) { console.log("message channel " + channel + ": " + message); //往對應房間廣播消息 io.to(channel).emit('message', JSON.parse(message)); }); /** * 上報連接到master進程 * @return {[type]} [description] */ var reportConnect = function(){ num++; console.log('worker pid: ' + process.pid + ' client connect connection num:' + num); process.send({ cmd: 'client connect' }); }; io.listen(port); console.log('worker pid: ' + process.pid + ' listen port:' + port);
客戶端:
<script src="static/socket.io.js"></script> <script> var roomid = (function () { return prompt('請輸入房間號','') })(); var userInfo = { username: (function () { return prompt('請輸入rtx昵稱', ''); })() }; if(roomid != null && roomid != "") { var socket = io.connect('http://10.244.146.2?roomid='+ roomid); socket.emit('join', { username: userInfo.username }); socket.on('message', function(msg){ switch (msg.event) { case 'join': if (msg.data.username) { console.log(msg.data.username + '加入了聊天室'); var data = { text: msg.data.username + '加入了聊天室' }; showNotice(data); } break; /*收到消息廣播後,顯示消息*/ case 'broadcast_say': if(msg.data.username!==userInfo.username) { console.log(msg.data.username + '說: ' + msg.data.text); showMessage(msg.data); } break; /*離開聊天室廣播後,顯示消息*/ case 'broadcast_quit': if (msg.data.username) { console.log(msg.data.username + '離開了聊天室'); var data = { text: msg.data.username + '離開了聊天室' }; showNotice(data); } break; } }) } /*點擊發送按鈕*/ document.getElementById('send').onclick = function () { var keywords = document.getElementById('keywords'); if (keywords.value === '') { keywords.focus(); return false; } var data = { text: keywords.value, type: 0, username: userInfo.username }; /*向服務器提交一個say事件,發送消息*/ socket.emit('say', data); showMessage(data); keywords.value = ""; keywords.focus(); }; /*展示消息*/ function showMessage(data) { var itemArr = []; itemArr.push('<dd class="'+(data.type === 0 ? "me" : "other")+'">'); itemArr.push('<ul>'); itemArr.push('<li class="nick-name">' + data.username + '</li>'); itemArr.push('<li class="detail">'); itemArr.push('<div class="head-icon"></div>'); itemArr.push('<div class="text">' + data.text + '</div>'); itemArr.push('</li>'); itemArr.push('</ul>'); itemArr.push('</dd>'); document.getElementById('list') += itemArr.join(''); } /*展示通知*/ function showNotice(data) { var item = '<dd class="tc"><span>' + data.text + '</span><dd>'; document.getElementById('list') += item; } /*回車事件*/ document.onkeyup = function (e) { if (!e) e = window.event; if ((e.keyCode || e.which) == 13) { document.getElementById('send').click(); } } </script>