在使用TCP協議進行消息發送時,對消息分幀

成幀與解析

閱讀 《java TCP/IP Socket 編程》第三章筆記

成幀技術(frame)是解決如何在接收端定位消息的首尾位置的問題。在進行數據收發時,必須指定消息接收者如何確定何時消息已經接收完整。

在TCP協議中,消息是按照位元組來傳輸的,而且TCP協議中是沒有消息邊界的概念的。因為當client和server雙方建立TCP連接後,雙方可以自由發送位元組數據。

為了能夠在消息傳輸中確定消息的邊界,需要引入額外的信息來標示消息邊界。常用的辦法有兩種:

基於定界符與基於顯式消息長度

基於定界符

我們在消息的末尾添加一個唯一標記作為消息結束符,這個唯一的標記一般是一個位元組或者一組位元組序列,並且在消息中不能出現這個標記。

基於定界符的方法一般用於以文本方式編碼的消息中,定義一個特殊的字符作為分隔符來表示消息結束。但是這個分隔符也有可能作為普通字符可能會出現在消息中,導致消息解析出現錯誤。為了讓消息中不出現分隔符,需要引入填充(stuff)技術,在發送端對消息進行掃描,如果碰到分隔符,就將這個分隔符用一個替換符和其他符號(比如將原始字符二進制中的第三位取反得到一個新的位元組作為)替換,同樣的,如果掃描中遇到替換符,將替換符也用一個替換付和其他符號替換。在消息的接收端,同樣也對接收到的消息進行掃描,當碰到替換符時,說明該字符不是消息中的,要將後面一個字符進行還原得到相應的原始字符,這個才是消息中真正的字符。當遇到分隔符時,說明該消息已經結束

顯式消息長度

在消息前面添加一個固定大小的字段(一個位元組或者兩個位元組長度),用於表示消息包含的位元組個數(也就是消息的長度)。在消息發送時,計算消息的長度(位元組數),作為消息的前綴。如果使用一個位元組保存長度,則消息長度最大為\(2^8=256\)個位元組,如果是兩個位元組保存長度,則消息長度最大為\(2^{16}=65536\)個位元組

消息成幀與解析的實現

在java中,當client和server之間建立tcp連接後,就可以通過輸入輸出流(I/O stream)來進行消息傳輸。發送消息時,將待發送的消息寫入OutputStream流中,然後發送到接收端InputStream流;接收端則從InputStream流中讀取出消息。如何實現將消息按幀發送與接收,就需要要利用我們上面提到的方法。

我們先定義一個Framer接口,來聲明兩個方法,消息成幀frameMsg()和消息抽取nextMsg()

package chapter_3.frame;

import java.io.IOException;
import java.io.OutputStream;

/**
 * @author fulv
 * Framer接口聲明了兩個方法,用於消息成幀和解析將待發送消息封裝成幀並輸出到指定流
 */
public interface Framer {

    /**
     * 將輸入的消息msg封裝成幀,然後輸出到out流
     *
     * @param msg 輸入的消息
     * @param out 消息輸出流
     */
    void frameMsg(byte[] msg, OutputStream out);

    /**
     * 從指定流中讀取下一個消息幀
     *
     * @return byte[]
     */
    byte[] nextMsg() throws IOException;
}

然後分別使用基於分隔符和基於顯式消息長度兩種方法來實現Framer接口

基於分隔符:

在這裡,我們使用字符’\n’作為消息分隔符,它對應的位元組為0x0A;使用的替換符為0x7D。替換的策略是:當掃描到待發送的消息byte數組中有0x0A時,將其替換為(0x7D,0x2A),如果遇到0x7D,將其替換為(0x7D,0x5D)。這裏面第二個字符通過將待替換字符從左向右數第三位取反獲得。

在 接收端,從輸入流中讀取位元組流數據,遇到0x7D時,說明後面一個位元組對應的是特殊位元組,需要轉換得到原始位元組。如果遇到0x0A說明到達消息幀末尾,完成了一個消息幀的讀取。

package chapter_3.frame;

import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;


/**
 * 採用界定符的方式來實現消息的封裝成幀以及消息幀的解析
 *
 * @author fulv
 */
public class DelimitFramer implements Framer {

    /**
     * 數據輸入源,從中解析出消息幀
     */
    private InputStream in;

    /**
     * 消息幀的定界符
     */
    private static final byte DELIMITER = '\n';
    /**
     * 替換字符,用於將出現在消息內部的'\n'進行替換,避免出現解析錯誤
     */
    private static final byte REPLACE_CHAR = (byte) 0x7d;

    private static final byte MASK = (byte) 0x20;

    public DelimitFramer(InputStream in) {
        this.in = in;
    }

    @Override
    public void frameMsg(byte[] msg, OutputStream out) {
        //向判斷傳入的消息中是否包含界定符與替換符,如果存在,執行相關位元組填充操作
        //將對應的界定符和替換符換成兩個字符,其中第一個為替換符,第二個為將要替換的字符的從左到右的第二位取反形成的字符
        int count = 0;
        for (byte b : msg) {
            if (DELIMITER == b || REPLACE_CHAR == b) {
                count++;
            }
        }
        byte[] extendMsg = new byte[msg.length + count];
        for (int i = 0, j = 0; i < msg.length; i++) {
            if (DELIMITER == msg[i] || REPLACE_CHAR == msg[i]) {
                extendMsg[j++] = REPLACE_CHAR;
                extendMsg[j++] = byteStuff(msg[i]);
            } else {
                extendMsg[j++] = msg[i];
            }
        }
        try {
            out.write(extendMsg);
            out.write(DELIMITER);
            out.flush();
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("消息寫入流失敗");
        }


    }

    /**
     * 從消息輸入流in中,取出下一個消息幀(以分隔符劃分一個消息幀)
     *
     * @return
     */
    @Override
    public byte[] nextMsg() throws IOException {
        ByteArrayOutputStream msgBuffer = new ByteArrayOutputStream();
        int nextByte;

        while ((nextByte = in.read()) != DELIMITER) {
            //已經讀完了輸入流,這裡分兩種情況
            if (-1 == nextByte) {
                //輸入流中的位元組已經全部讀完
                if (msgBuffer.size() == 0) {
                    return null;
                } else {
                    //讀取了部分位元組,但卻沒有遇到分隔符,說明輸入的消息幀是不完整或者錯誤的,返回異常
                    throw new EOFException("讀取到了不正確的消息幀");
                }
            }

            //當前字符為替換字符,需要讀取下一個字符並轉換(將第三位取反)得到正確的字符
            if (REPLACE_CHAR == nextByte) {
                nextByte = in.read() & 0xFF;
                nextByte = byteStuff((byte) nextByte);
            }
            msgBuffer.write(nextByte);
        }
        return msgBuffer.toByteArray();
    }

    /**
     * 位元組填充函數,將傳入位元組的從左到右數的第二位取反
     *
     * @param originByte
     * @return
     */
    private static byte byteStuff(byte originByte) {
        return (byte) ((originByte | MASK) & ~(originByte & MASK));
    }
}

基於顯式消息長度方法:

使用兩個位元組無符號整型來表示待發送消息的長度,最長為65536。將消息長度按照位元組大端序寫入待發送的消息前,表示消息長度。

接收端,首先從輸入流中讀出消息長度,然後堵塞的從輸入流中讀取數據,直到讀取出的數據量達到消息長度,整個消息幀才讀取結束。

package chapter_3.frame;

import java.io.*;

/**
 * 基於顯式長度的方法來將實現消息成幀
 *
 * @author fulv
 */
public class LengthFramer implements Framer {

    private static final int MESSAGEMAXLENGTH = 65536;

    private DataInputStream in;

    public LengthFramer(DataInputStream in) {
        this.in = in;
    }

    @Override
    public void frameMsg(byte[] msg, OutputStream out) throws IOException {
        if (msg.length > MESSAGEMAXLENGTH) {
            throw new IOException("傳入的消息超出最大長度");
        }
        int msgLength = msg.length;
        //將消息長度按照位元組大端序寫入輸出流中
        out.write((msgLength >> 8) & 0xFF);
        out.write(msgLength & 0xFF);
        //將消息寫入輸出流
        out.write(msg);
        out.flush();
    }


    @Override
    public byte[] nextMsg() throws IOException {
        int length;
        byte[] msg = null;
        try {
            //從輸入流中讀取兩個位元組,作為大端序的整型值解釋,表示消息長度
            length = in.readUnsignedShort();
        } catch (EOFException e) {
            return null;
        }
        //存放從輸入流中讀取出的消息位元組數組
        msg = new byte[length];
        //readFully多次調用read方法直到讀取到指定長度的數組消息或者讀取到-1返回
        in.readFully(msg);
        return msg;
    }
}

測試

對兩種消息分幀方式進行測試,開啟兩個線程分別表示client與server,測試消息的發送與接收。

package chapter_3.frame;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

public class TestFramer {

    private static final String[] messages = {"Hello World!", "Hello China, 你好 中國", "世界人民大團結萬歲",
            "在消息中發送分隔符\n和替換符}的情況"};

    public static void main(String[] args) throws InterruptedException {
        Thread clientThread = new Thread(() -> {
            Socket socket = null;
            try {
                socket = new Socket(InetAddress.getLocalHost(), 8888);
                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                //Framer framer = new DelimitFramer(in);
                DataInputStream dataInputStream = new DataInputStream(in);
                Framer framer = new LengthFramer(dataInputStream);
                for (String msg : messages) {
                    byte[] msgBytes = msg.getBytes(StandardCharsets.UTF_8);
                    framer.frameMsg(msgBytes, out);
                    System.out.println(Thread.currentThread().getName() + " 發送消息: " + msg);
                }
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        Thread serverThread = new Thread(() -> {
            Socket socket = null;
            try (ServerSocket serverSocket = new ServerSocket(8888)) {
                while (true) {
                    socket = serverSocket.accept();
                    System.out.println("獲取到來自" + socket.getRemoteSocketAddress() + "的tcp連接");
                    InputStream in = socket.getInputStream();
                    OutputStream out = socket.getOutputStream();
                    //Framer framer = new DelimitFramer(in);
                    DataInputStream dataInputStream = new DataInputStream(in);
                    Framer framer = new LengthFramer(dataInputStream);
                    byte[] recvMsgBytes = null;
                    do {
                        recvMsgBytes = framer.nextMsg();
                        //System.out.println(Arrays.toString(recvMsgBytes));
                        if (recvMsgBytes != null) {
                            System.out.println(Thread.currentThread().getName() + " 接收到的消息: " + new String(recvMsgBytes, StandardCharsets.UTF_8));
                        }
                    } while (recvMsgBytes != null);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        serverThread.setName("server");
        clientThread.setName("client");
        serverThread.start();
        Thread.sleep(3000);
        clientThread.start();
    }
}

輸出結果:

獲取到來自/10.0.75.1:2462的tcp連接
server 接收到的消息: Hello World!
client 發送消息: Hello World!
client 發送消息: Hello China, 你好 中國
server 接收到的消息: Hello China, 你好 中國
client 發送消息: 世界人民大團結萬歲
server 接收到的消息: 世界人民大團結萬歲
client 發送消息: 在消息中發送分隔符
和替換符}的情況
server 接收到的消息: 在消息中發送分隔符
和替換符}的情況