基于node+socket.io+redis的多房间多进程聊天室
- 2019 年 12 月 4 日
- 筆記
本文作者:IMWeb jaychen 原文出处:IMWeb社区 未经同意,禁止转载

一、相关技术介绍:
消息实时推送,指的是将消息实时地推送到浏览器,用户不需要刷新浏览器就可以实时获取最新的消息,实时聊天室的技术原理也是如此。传统的Web站点为了实现推送技术,所用的技术都是轮询,这种传统的模式带来很明显的缺点,即浏览器需要不断的向服务器发出请求。 短轮询(Polling)

客户端需要定时往浏览器轮询发送请求,且只有当服务有数据更新后,客户端的下一次轮询请求才能拿到更新后的数据,在数据更新前的多次请求相当于无效。这对带宽资源造成了极大的浪费,若提高轮询定时器时间,又会有数据更新不及时的烦恼。 commet 为了解决短轮询的弊端,一种基于http长连接的"服务器推"方式被hack出来。其与短轮询的区别主要是,采用commet时,客户端与服务端保持一个长连接,当数据发生改变时,服务端主动将数据推送到客户端。Comet 又可以被细分为两种实现方式,一种是长轮询机制,一种是流技术。
- 长轮询

长轮询跟短轮询不同的地方是,客户端往服务端发送请求后,服务端判断是否有数据更新,若没有,则将请求hold住,等待数据更新时,才返回响应。这样则避免了大量无效的http请求,但即使采用长轮询方式,接受数据更新的最小时间间隔还是为2*RTT(往返时间)。
- 流技术

流技术(http stream)基于iframe实现。通过HTML标签iframe src指向服务端,建立一个长连接。当有数据推送,则往客户端返回,无须再请求。但流技术有个缺点就是,在浏览器顶部会一直出现页面未加载完成的loading标示。
websocket

为了解决服务端如何更快地实时推送数据到客户端以及以上推送方式技术的不足,HTML5中定义了Websocket协议,它是一种在单个TCP连接上进行全双工通讯的协议。与http协议不同的请求/响应模式不同,Websocket在建立连接之前有一个Handshake(Opening Handshake)过程,建立连接之后,双方即可双向通信。当然,由于websocket是html5新特性,在部分浏览器(IE10以下)是不支持的。 我们来看下websocket的握手报文:

请求报文: GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Protocol: chat Sec-WebSocket-Version: 13 Origin: http://example.com
- "Upgrade "、"Connection": 告诉服务器这个请求是一个websocket协议,需要区别处理
- "Upgrade: websocket": 表明这是一个 WebSocket 类型请求,意在告诉 server 需要将通信协议切换到 WebSocket
- "Sec-WebSocket-Key": 是 client 发送的一个 base64 编码的密文,要求 server 必须返回一个对应加密的 "Sec-WebSocket-Accept" 应答,否则 client 会抛出 "Error during WebSocket handshake" 错误,并关闭连接
- "Sec-WebSocket-Protocol":一个用户定义的字符串,用来区分同URL下,不同的服务所需要的协议
- "Sec-WebSocket-Version":Websocket Draft (协议版本)
响应报文: HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo= Sec-WebSocket-Protocol: chat
- "Sec-WebSocket-Accept": 这个则是经过服务器确认,并且加密过后的 Sec-WebSocket-Key。加密方式为将Sec-WebSocket-Key与一段固定的 GUID 字符串进行连接,然后进行SHA-1 hash,接着base64编码得到。
socket.io(http://socket.io) 是一个完全由JavaScript实现,基于Node.js、支持WebSocket的协议用于实时通信、跨平台的开源框架。Socket.IO除了支持WebSocket通讯协议外,还支持许多种轮询机制以及其它实时通信方式,并封装成了通用的接口,并能够根据浏览器对通讯机制的支持情况自动地选择最佳的方式来实现网络实时应用。
首先,我们创建一个socket.io server对象,指定监听80端口。并且指定收到message消息,以及socket端口的监听方法。接着,当socket建立连接后,通过socket.emit方法,可以往客户端发送消息。
var io = require('socket.io')(); io.on('connection', function(socket) { //接受消息 socket.on('message', function (msg) { console.log('receive messge : ' + msg ); }); //发送消息 socket.emit('message', 'hello'); //断开连接回调 socket.on('disconnect', function () { console.log('socket disconnect'); }); }); io.listen(80);
客户端的代码也非常简单,只要引入socket.io对应的客户端库。 在socket建立连接的回调中,使用socket.emit以及socket.on就可以分别做消息的发送以及监听了。
<script> var socket = io('http://localhost/'); socket.on('connect', function () { socket.emit('message', 'hi, i am client!'); socket.on('message', function (msg) { console.log('msg received from server'); }); }); </script>
二、多节点集群架构设计
若只是单机部署应用,单纯使用socket.io的消息事件监听处理即可满足我们的需求。但随着业务的扩大,我们需要考虑多机集群部署,客户端可以连接到任一节点,并发送消息。如何做到多节点的同时推送,我们需要建立一套多节点之间的消息分发/订阅架构。这时我们引入redis的pub/sub功能。
redis redis是一个key-value存储系统,在该项目中主要起到一个消息分发中心(publish/subscribe)的作用。用户通过socket.io namespace 订阅房间号后,socket.io server则往redis订阅(subscribe)该房间号channel。当在该房间中的某一用户发送消息时,则通过redis的publish功能往redis该房间号channel publish消息。这样所有订阅该房间号channel的websocket连接则会收到消息回调,然后推送给客户端。
nginx 由于采用了集群架构,则需要nginx来做反向代理。需要注意的是,websocket的支持需要nginx1.3以上版本。并且我们需要通过配置ip_hash做粘性会话(ip_hash)处理,避免在低版本浏览器socket.io使用兼容方案轮询请求,请求到不同机器,造成session异常。
三、架构设计图

客户端通过socket.io namespace 指定对应roomid,请求到nginx。nginx根据ip_hash反向代理到对应机器的某一端口的socket.io server 进程。建立websocket连接,并往redis订阅对应到房间(roomid)channel。到这个时候,一个订阅了某一房间的websocket通道建立完成。 当用户发送消息时,socket.io server捕获到该房间到消息后,即往redis对应房间id的channel publish消息。这时所有订阅了该房间id channel的socket.io server就会收到订阅响应,接着找到对应房间id的webscoket通道,并将消息推送到客户端。
四、代码示例(多房间实时聊天室):
nginx配置(nginx版本须>1.3): 在http{}里配置定义upstream,并设置ip_hash。使同一个ip的请求能够落在同一个机器同一个进程中。 如果改节点挂了,则自动重连到另外一个节点,该方案对于后期扩容也非常方便。
upstream io_nodes { ip_hash; server 127.0.0.1:6001; server 127.0.0.1:6002; server 127.0.0.1:6003; server 127.0.0.1:6004; server 127.0.0.1:6005; server 127.0.0.1:6006; server 127.0.0.1:6007; server 127.0.0.1:6008; server 10.x.x.x:6001; server 10.x.x.x:6002; server 10.x.x.x:6003; server 10.x.x.x:6004; server 10.x.x.x:6005; server 10.x.x.x:6006; server 10.x.x.x:6007; server 10.x.x.x:6008; }
在server中,配置location:
location / { proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header Host $host; proxy_http_version 1.1; proxy_pass http://io_nodes; proxy_redirect off; }
cluster.js 我们采用了多进程的设计,充分利用cpu多核优势。通过主进程统一管理维护子进程,每个进程监听一个端口。
var cupNum = require('os').cpus().length, workerArr = [], roomInfo = []; var connectNum = 0; for (var i = 0; i < cupNum; i++) { workerArr.push(fork('./fork_server.js', [6001 + i])); workerArr[i].on('message', function(msg) { if (msg.cmd && msg.cmd === 'client connect') { connectNum++; console.log('socket server connectnum:' + connectNum); } if (msg.cmd && msg.cmd === 'client disconnect') { connectNum--; console.log('socket server connectnum:' + connectNum); } });
fork_server.js
var process = require('process'); var io = require('socket.io')(); var num = 0; var redis = require('redis'); var redisClient = redis.createClient; //建立redis pub、sub连接 var pub = redisClient({port:13800, host: '127.0.0.1', password:'xxxx'}); var sub = redisClient({port: 13800, host:'127.0.0.1', password:'xxxx'}); var roomSet = {}; //获取父进程传递端口 var port = parseInt(process.argv[2]); //当websocket连接时 io.on('connection', function(socket) { //客户端请求ws URL: http://127.0.0.1:6001?roomid=k12_webcourse_room_1 var roomid = socket.handshake.query.roomid; console.log('worker pid: ' + process.pid + ' join roomid: '+ roomid); socket.on('join', function (data) { socket.join(roomid); //加入房间 // 往redis订阅房间id if(!roomSet[roomid]){ roomSet[roomid] = {}; console.log('sub channel ' + roomid); sub.subscribe(roomid); } roomSet[roomid][socket.id] = {}; reportConnect(); console.log(data.username + ' join, IP: ' + socket.client.conn.remoteAddress); roomSet[roomid][socket.id].username = data.username; // 往该房间id的reids channel publish用户进入房间消息 pub.publish(roomid, JSON.stringify({"event":'join',"data": data})); }); //用户发言 推送消息到redis socket.on('say', function (data) { console.log("Received Message: " + data.text); pub.publish(roomid, JSON.stringify({"event":'broadcast_say',"data": { username: roomSet[roomid][socket.id].username, text: data.text }})); }); socket.on('disconnect', function() { num--; console.log('worker pid: ' + process.pid + ' clien disconnection num:' + num); process.send({ cmd: 'client disconnect' }); if (roomSet[roomid] && roomSet[roomid][socket.id] && roomSet[roomid][socket.id].username) { console.log(roomSet[roomid][socket.id].username + ' quit'); pub.publish(roomid, JSON.stringify({"event":'broadcast_quit',"data": { username: roomSet[roomid][socket.id].username }})); } roomSet[roomid] && roomSet[roomid][socket.id] && (delete roomSet[roomid][socket.id]); }); }); /** * 订阅redis 回调 * @param {[type]} channel [频道] * @param {[type]} count [数量] * @return {[type]} [description] */ sub.on("subscribe", function (channel, count) { console.log('worker pid: ' + process.pid + ' subscribe: ' + channel); }); /** * 收到redis publish 对应channel的消息 * @param {[type]} channel [description] * @param {[type]} message * @return {[type]} [description] */ sub.on("message", function (channel, message) { console.log("message channel " + channel + ": " + message); //往对应房间广播消息 io.to(channel).emit('message', JSON.parse(message)); }); /** * 上报连接到master进程 * @return {[type]} [description] */ var reportConnect = function(){ num++; console.log('worker pid: ' + process.pid + ' client connect connection num:' + num); process.send({ cmd: 'client connect' }); }; io.listen(port); console.log('worker pid: ' + process.pid + ' listen port:' + port);
客户端:
<script src="static/socket.io.js"></script> <script> var roomid = (function () { return prompt('请输入房间号','') })(); var userInfo = { username: (function () { return prompt('请输入rtx昵称', ''); })() }; if(roomid != null && roomid != "") { var socket = io.connect('http://10.244.146.2?roomid='+ roomid); socket.emit('join', { username: userInfo.username }); socket.on('message', function(msg){ switch (msg.event) { case 'join': if (msg.data.username) { console.log(msg.data.username + '加入了聊天室'); var data = { text: msg.data.username + '加入了聊天室' }; showNotice(data); } break; /*收到消息广播后,显示消息*/ case 'broadcast_say': if(msg.data.username!==userInfo.username) { console.log(msg.data.username + '说: ' + msg.data.text); showMessage(msg.data); } break; /*离开聊天室广播后,显示消息*/ case 'broadcast_quit': if (msg.data.username) { console.log(msg.data.username + '离开了聊天室'); var data = { text: msg.data.username + '离开了聊天室' }; showNotice(data); } break; } }) } /*点击发送按钮*/ document.getElementById('send').onclick = function () { var keywords = document.getElementById('keywords'); if (keywords.value === '') { keywords.focus(); return false; } var data = { text: keywords.value, type: 0, username: userInfo.username }; /*向服务器提交一个say事件,发送消息*/ socket.emit('say', data); showMessage(data); keywords.value = ""; keywords.focus(); }; /*展示消息*/ function showMessage(data) { var itemArr = []; itemArr.push('<dd class="'+(data.type === 0 ? "me" : "other")+'">'); itemArr.push('<ul>'); itemArr.push('<li class="nick-name">' + data.username + '</li>'); itemArr.push('<li class="detail">'); itemArr.push('<div class="head-icon"></div>'); itemArr.push('<div class="text">' + data.text + '</div>'); itemArr.push('</li>'); itemArr.push('</ul>'); itemArr.push('</dd>'); document.getElementById('list') += itemArr.join(''); } /*展示通知*/ function showNotice(data) { var item = '<dd class="tc"><span>' + data.text + '</span><dd>'; document.getElementById('list') += item; } /*回车事件*/ document.onkeyup = function (e) { if (!e) e = window.event; if ((e.keyCode || e.which) == 13) { document.getElementById('send').click(); } } </script>