在使用TCP協議進行消息發送時,對消息分幀
- 2021 年 7 月 25 日
- 筆記
- java socket編程, Java web, Java基礎
成幀與解析
閱讀 《java TCP/IP Socket 編程》第三章筆記
成幀技術(frame)是解決如何在接收端定位消息的首尾位置的問題。在進行數據收發時,必須指定消息接收者如何確定何時消息已經接收完整。
在TCP協議中,消息是按照位元組來傳輸的,而且TCP協議中是沒有消息邊界的概念的。因為當client和server雙方建立TCP連接後,雙方可以自由發送位元組數據。
為了能夠在消息傳輸中確定消息的邊界,需要引入額外的信息來標示消息邊界。常用的辦法有兩種:
基於定界符與基於顯式消息長度。
基於定界符
我們在消息的末尾添加一個唯一標記作為消息結束符,這個唯一的標記一般是一個位元組或者一組位元組序列,並且在消息中不能出現這個標記。
基於定界符的方法一般用於以文本方式編碼的消息中,定義一個特殊的字符作為分隔符來表示消息結束。但是這個分隔符也有可能作為普通字符可能會出現在消息中,導致消息解析出現錯誤。為了讓消息中不出現分隔符,需要引入填充(stuff)技術,在發送端對消息進行掃描,如果碰到分隔符,就將這個分隔符用一個替換符和其他符號(比如將原始字符二進制中的第三位取反得到一個新的位元組作為)替換,同樣的,如果掃描中遇到替換符,將替換符也用一個替換付和其他符號替換。在消息的接收端,同樣也對接收到的消息進行掃描,當碰到替換符時,說明該字符不是消息中的,要將後面一個字符進行還原得到相應的原始字符,這個才是消息中真正的字符。當遇到分隔符時,說明該消息已經結束
顯式消息長度
在消息前面添加一個固定大小的字段(一個位元組或者兩個位元組長度),用於表示消息包含的位元組個數(也就是消息的長度)。在消息發送時,計算消息的長度(位元組數),作為消息的前綴。如果使用一個位元組保存長度,則消息長度最大為\(2^8=256\)個位元組,如果是兩個位元組保存長度,則消息長度最大為\(2^{16}=65536\)個位元組
消息成幀與解析的實現
在java中,當client和server之間建立tcp連接後,就可以通過輸入輸出流(I/O stream)來進行消息傳輸。發送消息時,將待發送的消息寫入OutputStream流中,然後發送到接收端InputStream流;接收端則從InputStream流中讀取出消息。如何實現將消息按幀發送與接收,就需要要利用我們上面提到的方法。
我們先定義一個Framer接口,來聲明兩個方法,消息成幀frameMsg()和消息抽取nextMsg()
package chapter_3.frame;
import java.io.IOException;
import java.io.OutputStream;
/**
* @author fulv
* Framer接口聲明了兩個方法,用於消息成幀和解析將待發送消息封裝成幀並輸出到指定流
*/
public interface Framer {
/**
* 將輸入的消息msg封裝成幀,然後輸出到out流
*
* @param msg 輸入的消息
* @param out 消息輸出流
*/
void frameMsg(byte[] msg, OutputStream out);
/**
* 從指定流中讀取下一個消息幀
*
* @return byte[]
*/
byte[] nextMsg() throws IOException;
}
然後分別使用基於分隔符和基於顯式消息長度兩種方法來實現Framer接口
基於分隔符:
在這裡,我們使用字符’\n’作為消息分隔符,它對應的位元組為0x0A;使用的替換符為0x7D。替換的策略是:當掃描到待發送的消息byte數組中有0x0A時,將其替換為(0x7D,0x2A),如果遇到0x7D,將其替換為(0x7D,0x5D)。這裏面第二個字符通過將待替換字符從左向右數第三位取反獲得。
在 接收端,從輸入流中讀取位元組流數據,遇到0x7D時,說明後面一個位元組對應的是特殊位元組,需要轉換得到原始位元組。如果遇到0x0A說明到達消息幀末尾,完成了一個消息幀的讀取。
package chapter_3.frame;
import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
/**
* 採用界定符的方式來實現消息的封裝成幀以及消息幀的解析
*
* @author fulv
*/
public class DelimitFramer implements Framer {
/**
* 數據輸入源,從中解析出消息幀
*/
private InputStream in;
/**
* 消息幀的定界符
*/
private static final byte DELIMITER = '\n';
/**
* 替換字符,用於將出現在消息內部的'\n'進行替換,避免出現解析錯誤
*/
private static final byte REPLACE_CHAR = (byte) 0x7d;
private static final byte MASK = (byte) 0x20;
public DelimitFramer(InputStream in) {
this.in = in;
}
@Override
public void frameMsg(byte[] msg, OutputStream out) {
//向判斷傳入的消息中是否包含界定符與替換符,如果存在,執行相關位元組填充操作
//將對應的界定符和替換符換成兩個字符,其中第一個為替換符,第二個為將要替換的字符的從左到右的第二位取反形成的字符
int count = 0;
for (byte b : msg) {
if (DELIMITER == b || REPLACE_CHAR == b) {
count++;
}
}
byte[] extendMsg = new byte[msg.length + count];
for (int i = 0, j = 0; i < msg.length; i++) {
if (DELIMITER == msg[i] || REPLACE_CHAR == msg[i]) {
extendMsg[j++] = REPLACE_CHAR;
extendMsg[j++] = byteStuff(msg[i]);
} else {
extendMsg[j++] = msg[i];
}
}
try {
out.write(extendMsg);
out.write(DELIMITER);
out.flush();
} catch (IOException e) {
e.printStackTrace();
System.out.println("消息寫入流失敗");
}
}
/**
* 從消息輸入流in中,取出下一個消息幀(以分隔符劃分一個消息幀)
*
* @return
*/
@Override
public byte[] nextMsg() throws IOException {
ByteArrayOutputStream msgBuffer = new ByteArrayOutputStream();
int nextByte;
while ((nextByte = in.read()) != DELIMITER) {
//已經讀完了輸入流,這裡分兩種情況
if (-1 == nextByte) {
//輸入流中的位元組已經全部讀完
if (msgBuffer.size() == 0) {
return null;
} else {
//讀取了部分位元組,但卻沒有遇到分隔符,說明輸入的消息幀是不完整或者錯誤的,返回異常
throw new EOFException("讀取到了不正確的消息幀");
}
}
//當前字符為替換字符,需要讀取下一個字符並轉換(將第三位取反)得到正確的字符
if (REPLACE_CHAR == nextByte) {
nextByte = in.read() & 0xFF;
nextByte = byteStuff((byte) nextByte);
}
msgBuffer.write(nextByte);
}
return msgBuffer.toByteArray();
}
/**
* 位元組填充函數,將傳入位元組的從左到右數的第二位取反
*
* @param originByte
* @return
*/
private static byte byteStuff(byte originByte) {
return (byte) ((originByte | MASK) & ~(originByte & MASK));
}
}
基於顯式消息長度方法:
使用兩個位元組無符號整型來表示待發送消息的長度,最長為65536。將消息長度按照位元組大端序寫入待發送的消息前,表示消息長度。
接收端,首先從輸入流中讀出消息長度,然後堵塞的從輸入流中讀取數據,直到讀取出的數據量達到消息長度,整個消息幀才讀取結束。
package chapter_3.frame;
import java.io.*;
/**
* 基於顯式長度的方法來將實現消息成幀
*
* @author fulv
*/
public class LengthFramer implements Framer {
private static final int MESSAGEMAXLENGTH = 65536;
private DataInputStream in;
public LengthFramer(DataInputStream in) {
this.in = in;
}
@Override
public void frameMsg(byte[] msg, OutputStream out) throws IOException {
if (msg.length > MESSAGEMAXLENGTH) {
throw new IOException("傳入的消息超出最大長度");
}
int msgLength = msg.length;
//將消息長度按照位元組大端序寫入輸出流中
out.write((msgLength >> 8) & 0xFF);
out.write(msgLength & 0xFF);
//將消息寫入輸出流
out.write(msg);
out.flush();
}
@Override
public byte[] nextMsg() throws IOException {
int length;
byte[] msg = null;
try {
//從輸入流中讀取兩個位元組,作為大端序的整型值解釋,表示消息長度
length = in.readUnsignedShort();
} catch (EOFException e) {
return null;
}
//存放從輸入流中讀取出的消息位元組數組
msg = new byte[length];
//readFully多次調用read方法直到讀取到指定長度的數組消息或者讀取到-1返回
in.readFully(msg);
return msg;
}
}
測試
對兩種消息分幀方式進行測試,開啟兩個線程分別表示client與server,測試消息的發送與接收。
package chapter_3.frame;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
public class TestFramer {
private static final String[] messages = {"Hello World!", "Hello China, 你好 中國", "世界人民大團結萬歲",
"在消息中發送分隔符\n和替換符}的情況"};
public static void main(String[] args) throws InterruptedException {
Thread clientThread = new Thread(() -> {
Socket socket = null;
try {
socket = new Socket(InetAddress.getLocalHost(), 8888);
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
//Framer framer = new DelimitFramer(in);
DataInputStream dataInputStream = new DataInputStream(in);
Framer framer = new LengthFramer(dataInputStream);
for (String msg : messages) {
byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
framer.frameMsg(msgBytes, out);
System.out.println(Thread.currentThread().getName() + " 發送消息: " + msg);
}
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
});
Thread serverThread = new Thread(() -> {
Socket socket = null;
try (ServerSocket serverSocket = new ServerSocket(8888)) {
while (true) {
socket = serverSocket.accept();
System.out.println("獲取到來自" + socket.getRemoteSocketAddress() + "的tcp連接");
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
//Framer framer = new DelimitFramer(in);
DataInputStream dataInputStream = new DataInputStream(in);
Framer framer = new LengthFramer(dataInputStream);
byte[] recvMsgBytes = null;
do {
recvMsgBytes = framer.nextMsg();
//System.out.println(Arrays.toString(recvMsgBytes));
if (recvMsgBytes != null) {
System.out.println(Thread.currentThread().getName() + " 接收到的消息: " + new String(recvMsgBytes, StandardCharsets.UTF_8));
}
} while (recvMsgBytes != null);
}
} catch (IOException e) {
e.printStackTrace();
}
});
serverThread.setName("server");
clientThread.setName("client");
serverThread.start();
Thread.sleep(3000);
clientThread.start();
}
}
輸出結果:
獲取到來自/10.0.75.1:2462的tcp連接
server 接收到的消息: Hello World!
client 發送消息: Hello World!
client 發送消息: Hello China, 你好 中國
server 接收到的消息: Hello China, 你好 中國
client 發送消息: 世界人民大團結萬歲
server 接收到的消息: 世界人民大團結萬歲
client 發送消息: 在消息中發送分隔符
和替換符}的情況
server 接收到的消息: 在消息中發送分隔符
和替換符}的情況