nodejs的TCP相關的一些筆記

  • TCP協議
  • 基於nodejs創建TCP服務端
  • TCP服務的事件
  • TCP報文解析與粘包解決方案

 一、TCP協議

1.1TCP協議原理部分參考: 無連接運輸的UDP、可靠數據傳輸原理、面向連接運輸的TCP

1.2圖解七層協議、TCP三次握手、TCP四次揮手:

 

 二、基於nodejs創建TCP服務端

 2.1創建nodejs的TCP服務實例(server.js):

 1 const net = require('net');
 2 //創建服務實例
 3 const server = net.createServer();
 4 const PORT = 12306;
 5 const HOST = 'localhost';
 6 //服務啟動對網絡資源的監聽
 7 server.listen(PORT, HOST);
 8 //當服務啟動時觸發的事件
 9 server.on('listening', ()=>{
10     console.log(`服務已開啟在 ${HOST}: ${PORT}`);
11 });
12 //接收消息,響應消息
13 server.on('connection', (socker) => {
14     //通過Socket上的data事件接收消息
15     socker.on('data', (chunk) => {//通過Socket上的writer方法回寫響應數據
16         const msg = chunk.toString();
17         console.log(msg);
18         //通過Socket上的writer方法回寫響應數據
19         socker.write('您好' + msg);
20     });
21 });
22 server.on('close', ()=>{
23     console.log('服務端關閉了');
24 });
25 server.on('error', (err) =>{
26     if(err.code === 'EADDRINUSE'){
27         console.log('地址正在被使用');
28     }else{
29         console.log(err);
30     }
31 });

2.2創建nodejs的TCP客戶端實例(client.js):

 1 const net = require('net');
 2 //創建客戶實例,並與服務端建立連接
 3 const client = net.createConnection({
 4     port:12306,
 5     host:'127.0.0.1'
 6 });
 7 //當套位元組與服務端連接成功時觸發connect事件
 8 client.on('connect', () =>{
 9     client.write('他鄉踏雪');//向服務端發送數據
10 });
11 //使用data事件監聽服務端響應過來的數據
12 client.on('data', (chunk) => {
13     console.log(chunk.toString());
14 });
15 client.on('error', (err)=>{
16     console.log(err);
17 });
18 client.on('close', ()=>{
19     console.log('客戶端斷開連接');
20 });

然後使用nodemon工具啟動服務(如果沒有安裝nodemon工具可以使用npm以管理員身份安裝),當然也可以直接使用node指令啟動,使用nodemon的好處就是當你修改代碼保存後它會監聽文件的變化自動重啟服務:

nodemon .\server.js

[nodemon] 2.0.15
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node .\server.js`
服務已開啟在 localhost: 12306

然後接着使用nodemon工具啟動客戶端程序,創建客戶端實例連接服務器並發送TCP消息:

nodemon .\client.js

[nodemon] 2.0.15
[nodemon] to restart at any time, enter `rs`
[nodemon] watching path(s): *.*
[nodemon] watching extensions: js,mjs,json
[nodemon] starting `node .\client.js`
您好他鄉踏雪

服務端接收到消息以後並在消息前添加「您好」後返回該消息,服務端的控制台會先打印一下內容:

他鄉踏雪

以上就是一個簡單的TCP服務與客戶端的交互示例,除了使用nodemon啟動服務和客戶端以外,還可以使用系統的telnet工具在控制台上測試連接服務,你的系統可能默認沒有啟動這個程序,telnet相關使用可以參考這裡://baijiahao.baidu.com/s?id=1723367561977342393&wfr=spider&for=pc

2.3TCP服務的事件

在前面的示例代碼中已經有了TCP事件相關API的應用代碼,這裡針對這些事件做一些概念性的介紹:

通過net.createServer()創建的服務,它是一個EventEmitter實例,這個示例負責啟動nodejs底層TCP模塊對網絡資源的監聽。在這個實例內部還會管理一個Socket實例,它是一個雙工流(關於雙工流點擊參考)實例,Socket實例負責接收和響應具體的TCP數據。

net.createServer()上的事件:

listening:調用server.listen()綁定端口或者Domain Socket後觸發。
connection:每個客戶端套位元組連接到服務器時觸發,其回調函數會接收到一個Socket實例作為參數。
close:當服務器關閉時觸發,在調用server.close()後,服務器將停止接收新的套位元組連接,但保持當前存在的連接,等待所有連接斷開後,會觸發該事件。
error:當服務器發生異常時,將會觸發該事件。

連接事件,也就是Socket實例上的事件,這個事件對應tream實例上的事件,因為Socket本身就是基於雙工流構造的。

data:當一端調用write()方法發送數據時,另一端會觸發data事件,事件傳遞的數據即是write()發送的數據。
end:當連接中的任意一段發送了FIN數據時,將會觸發該事件。
connect:該事件用於客戶端,當套位元組與服務器連接成功後會觸發。
drain:當任意一端調用write()發送數據時,當前端會觸發該事件。
error:當異常發生時,觸發該事件。
close:當套位元組完全關閉時,觸發該事件。
timeout:當一定事件後連接不在活躍時,該事件將會觸發,通知用戶當前連接已經被閑置。

 三、TCP報文解析與粘包解決方案

由於TCP針對網絡中小數據包有一定的優化策略:Nagle算法。

如果每次發送一個很小的數據包,比如一個位元組內容的數據包而不優化,就會導致網絡中只有極少數有效數據的數據包,這會導致浪費大量的網絡資源。Nagle算法針對這種情況,要求緩存區的數據達到一定數據量或者一定時間後才將其發出,所以數據包將會被Nagle算法合併,以此來優化網絡。這種優化雖然提高了網絡帶寬的效率,但有的數據可能會被延遲發送。

在Nodejs中,由於TCP默認啟動Nagle算法,可以調用socket.setNoDelay(ture)去掉Nagle算法,使得write()可以立即發送數據到網絡中。但需要注意的是,儘管在網絡的一端調用write()會觸發另一端的data事件,但是並不是每次write()都會觸發另一端的data事件,再關閉Nagle算法後,接收端可能會將接收到的多個小數據包合併,然後只觸發一次data事件。也就是說socket.setNoDelay(ture)只能解決一端的數據粘包問題。

使用第二節中的client.js示例代碼來測試數據粘包問題:

//在客戶端的connect事件回調中通過多個write()發送數據,它可能會將多次write()寫入的數據一次發出
client.on('connect', () =>{
    client.write('他鄉踏雪');//向服務端發送數據
    client.write('他鄉踏雪1');
    client.write('他鄉踏雪2');
    client.write('他鄉踏雪3');
});

我的測試結果是在服務端和客戶端都出現了粘包問題:

3.1解決粘包問題的簡單粗暴的方案

將多次write()發送的數據,通過定時器延時發送,這個延時超過Nagle算法優化合併的時間就可以解決粘包的問題。比如上面的示例代碼可以修改成下面這樣:

 1 let dataArr = ["他鄉踏雪","他鄉踏雪","他鄉踏雪"];
 2 //當套位元組與服務端連接成功時觸發connect事件
 3 client.on('connect', () =>{
 4     client.write('他鄉踏雪');//向服務端發送數據
 5     for(let i = 0; i< dataArr.length; i++){
 6         (function(data, index){
 7             setTimeout(()=>{
 8                 client.write(data);
 9             },1000 * i);
10         })(dataArr[i], i);
11     }
12 });

上面這種方案會導致網絡連接的資源長時間被佔用,用戶體驗上也會大打折扣,這顯然不是一個合理的方案。

3.2通過拆包封包的方式解決數據粘包的問題分析

通過前面的示例和對TCP數據傳輸機制雙工流的可以了解,TCP粘包的問題就是數據的可寫流因為Nagle算法的優化,不會按照發送端的write()的寫入對應觸發接收端的data事件,它可能導致數據傳輸出現以下兩種情況:

發送端多次write()的數據可能被打包成一個數據包發送到接收端。
發送端通過write()一次寫入的數據可能因為Nagle算法的優化被截斷到兩個數據包中。

TCP的數據傳輸雖然可能會出現以上兩種問題,但由於它是基於流的傳輸機制,那麼它的數據順序在傳輸過程中是確定的先進先出原則。所以,可以通過在每次write()在數據頭部添加一些標識,將每次write()傳輸的數據間隔開,然後在接收端基於這些間隔數據的標識將數據拆分或合併。

基於定長的消息頭頭和不定長的消息體,封包拆包實現數據在流中的標識:

消息頭:也就是間隔數據的標識,採用定長的方式就可以實現有規律的獲取這些數據標識。消息頭中包括消息系列號、消息長度。
消息體:要傳輸的數據本身。

封包與拆包的工具模塊具體實現(MyTransform.js):

 1 class MyTransformCode{
 2     constructor(){
 3         this.packageHeaderLen = 4;  //設置定長的消息頭位元組長度
 4         this.serialNum = 0;         //消息序列號
 5         this.serialLen = 2;         //消息頭中每個數據佔用的位元組長度(序列號、消息長度值)
 6     }
 7     //編碼
 8     encode(data, serialNum){    //data:當前write()實際要傳輸的數據; serialNum:當前消息的編號
 9         const body = Buffer.from(data);//將要傳輸的數據轉換成二進制
10         //01 先按照指定的長度來申請一片內存空間作為消息頭header來使用
11         const headerBuf = Buffer.alloc(this.packageHeaderLen);
12         //02寫入包的頭部數據
13         headerBuf.writeInt16BE(serialNum || this.serialNum);//將當前消息編號以16進制寫入
14         headerBuf.writeInt16BE(body.length, this.serialLen);//將當前write()寫入的數據的二進制長度作為消息的長度寫入
15         if(serialNum === undefined){
16             this.serialNum ++;  //如果沒有傳入指定的序列號,表示在最佳寫入,消息序列號+1
17         }
18         return Buffer.concat([headerBuf, body]);//將消息頭和消息體合併成一個Buffer返回,交給TCP發送端
19     }
20     //解碼
21     decode(buffer){
22         const headerBuf = buffer.slice(0, this.packageHeaderLen);   //獲取消息頭的二進制數據
23         const bodyBuf = buffer.slice(this.packageHeaderLen);        //獲取消息體的二進制數據
24         return {
25             serialNum:headerBuf.readInt16BE(),
26             bodyLength:headerBuf.readInt16BE(this.serialLen),
27             body:bodyBuf.toString()
28         };
29     }
30     //獲取數據包長度的方法
31     getPackageLen(buffer){
32         if(buffer.length < this.packageHeaderLen){
33             return 0;   //當數據長度小於數據包頭部的長度時,說明它的數據是不完整的,返回0表示數據還沒有完全傳輸到接收端
34         }else{
35             return this.packageHeaderLen + buffer.readInt16BE(this.serialLen);  //數據包頭部長度+加上數據包消息體的長度(從數據包的頭部數據中獲取),就是數據包的實際長度
36         }
37     }
38 }
39 module.exports = MyTransformCode;

測試自定義封包工具的編碼、解碼:

let tf = new MyTransformCode();
let str = "他鄉踏雪";
let buf = tf.encode(str);       //編碼
console.log(tf.decode(buf));    //解碼
console.log(tf.getPackageLen(buf)); //獲取數據包位元組長度
//測試結果
{ serialNum: 0, bodyLength: 12, body: '他鄉踏雪' }
16

3.3應用封包拆包工具MyTransform實現解決TCP的粘包問題

服務端示例代碼:

 1 //應用封包解決TCP粘包問題服務端
 2 const net = require('net');
 3 const MyTransform = require('./myTransform.js');
 4 const server = net.createServer();  //創建服務實例
 5 let overageBuffer = null;           //緩存每一次data傳輸過來不完整的數據包,等待一下次data事件觸發時與chunk合併處理
 6 let tsf = new MyTransform();
 7 server.listen('12306', 'localhost');
 8 server.on('listening',()=>{
 9     console.log('服務端運行在 localhost:12306');
10 });
11 server.on('connection', (socket)=>{
12     socket.on('data', (chunk)=>{
13         if(overageBuffer && overageBuffer.length > 0){
14             chunk = Buffer.concat([overageBuffer, chunk]);  //如果上一次data有未不完成的數據包的數據片段,合併到這次chunk前面一起處理
15         }
16         while(tsf.getPackageLen(chunk) && tsf.getPackageLen(chunk) <= chunk.length){   //如果接收到的數據中第一個數據包是完整的,進入循環體對數據進行拆包處理
17             let packageLen = tsf.getPackageLen(chunk);  //用於緩存接收到的數據中第一個包的位元組長度
18             const packageCon = chunk.slice(0, packageLen);  //截取接收到的數據的第一個數據包的數據
19             chunk = chunk.slice(packageLen);    //截取除第一個數據包剩餘的數據,用於下一輪循環或下一次data事件處理
20             const ret = tsf.decode(packageCon); //解碼當前數據中第一個數據包
21             console.log(ret);
22             socket.write(tsf.encode(ret.body, ret.serialNum));  //講解碼的數據報再次封包發送回客戶端
23         };
24         overageBuffer = chunk;  //緩存不完整的數據包,等待下一次data事件接收到數據後一起處理
25     });
26 });

客戶端示例代碼:

 1 //應用封包解決TCP粘包問題客戶端
 2 const net = require('net');
 3 const MyTransform = require('./myTransform.js');
 4 let overageBuffer = null;
 5 let tsf = new MyTransform();
 6 const client = net.createConnection({
 7     host:'localhost',
 8     port:12306
 9 });
10 client.write(tsf.encode("他鄉踏雪1"));
11 client.write(tsf.encode("他鄉踏雪2"));
12 client.write(tsf.encode("他鄉踏雪3"));
13 client.write(tsf.encode("他鄉踏雪4"));
14 client.on('data', (chunk)=>{
15     if(overageBuffer && overageBuffer.length > 0){
16         chunk = Buffer.concat([overageBuffer, chunk]);  ////如果上一次data有未不完成的數據包的數據片段,合併到這次chunk前面一起處理
17     }
18     while(tsf.getPackageLen(chunk) && tsf.getPackageLen(chunk) <= chunk.length){    //如果接收到的數據中第一個數據包是完整的,進入循環體對數據進行拆包處理
19         let packageLen = tsf.getPackageLen(chunk);  //用於緩存接收到的數據中第一個包的位元組長度
20         const packageCon = chunk.slice(0, packageLen); //截取接收到的數據的第一個數據包的數據
21         chunk = chunk.slice(packageLen);    //截取除第一個數據包剩餘的數據,用於下一輪循環或下一次data事件處理
22         const ret = tsf.decode(packageCon); //解碼當前數據中第一個數據包
23         console.log(ret);
24     };
25     overageBuffer = chunk;  //緩存不完整的數據包,等待下一次data事件接收到數據後一起處理
26 });

測試效果:

基於流的數據傳輸總是先進先出的隊列傳輸原則,所以每一次數據的前面固定幾個位元組的數據都是數據中的第一個包的頭部數據,所以就可以通過MyTransform工具中的getPackageLen(buffer)獲取到第一個數據包的數據長度,基於這樣一個原則就可以準確的判斷出當前的數據中是否有完整的數據包,如果有就將這個數據包拆分出來,循環這一操作就可以將所有數據全部完整的實現數據拆分,解決TCP的Nagle算法導致到粘包和不完整數據包的問題。