Netty入門與實戰教程總結分享

  前言:都說Netty是Java程式設計師必須要掌握的一項技能,帶著不止要知其然還要知其所以然的目的,在慕課上找了一個學習Netty源碼的教程,看了幾章後著實有點懵逼。雖然用過Netty,並且在自己的個人網站上實現了聊天室的功能。但是使用的還是Netty很少一部分功能,很多組件都沒涉及,很多API也似懂非懂。基礎都沒打牢,學習源碼肯定懵逼。正好在掘金小冊上找到了一個Netty入門的教程,鏈接放在最後【非廣告】。學習過後,整理出了這麼一篇部落格。寫這篇部落格的目的一個是為了分享,另一個目的就是為了做個筆記,之後自己也可以當資料回顧一下。

  在學習Netty前的必需知識:NIO。如果不了解可以看這篇:手動搭建I/O網路通訊框架3:NIO編程模型,升級改造聊天室。對於BIO和AIO可以只看文字了解一下,但是NIO編程模型最好還是動手實踐一下,畢竟NIO目前是使用最廣的。還有一篇Netty實戰SpringBoot+Netty+WebSocket實現實時通訊。這是我實現個人網站的聊天室時寫的一篇,文字內容很少,主要是程式碼,最好粗略看看程式碼,因為下面有幾個地方會和這篇程式碼做一些比較,下面統稱Netty實戰。如果現在看不懂,等你認真看到這篇部落格的pipeline那裡,應該都會看懂。Netty實戰中的客戶端是Web,配合一些前端IM框架,客戶端實現起來非常簡單。但是聊天通訊的功能在APP中多一些,所以下面會說到Netty在客戶端中的使用。下面所有程式碼都確保可以正確運行,如果哪裡有問題,請留言指出,謝謝。

Netty是什麼?

  官方定義:Netty 是一個非同步事件驅動的網路應用框架,用於快速開發可維護的高性能伺服器和客戶端。

  簡單地說Netty封裝了JDK的NIO,不用再寫一大堆複雜的程式碼。既然代替了原生的NIO,肯定有比它好的理由,主要有如下幾點:

  1.Netty底層IO模型可以隨意切換,比如可以從NIO切換到BIO,但一般很少會這麼做。

  2.Netty自帶拆包解包,從NIO各種繁複的細節中脫離出來,讓開發者重點關心業務邏輯。

  3.Netty解決了NIO中Selector空輪詢BUG,這個BUG應該很多人聽說過,雖然官方聲明jdk1.6的update18修復了該問題,只不過是降低了發生的概率。

  4.對Selector做了很多細小的優化,reactor執行緒模型能做到高效的並發處理。

 

服務端啟動類詳解

  精簡的服務端Demo,與上面那篇Netty實戰中的程式碼相比只有一個啟動類,少了業務程式碼和初始化器。

public class NettyServer {
    public static void main(String[] args) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                    }
                });

        serverBootstrap.bind(8000);
    }
}
  • 兩個NioEventLoopGroup對象,可以看作兩個執行緒組。bossGroup的作用是監聽客戶端請求。workerGroup的作用是處理每條連接的數據讀寫。

  • ServerBootstrap是一個引導類,其對象的作用是引導伺服器的啟動工作。

  • .group是配置上面兩個執行緒組的角色,也就是誰去監聽誰去處理讀寫。上面只是創建了兩個執行緒組,並沒有實際使用。

  • .channel是配置服務端的IO模型,上面程式碼配置的是NIO模型。也可以配置為BIO,如OioServerSocketChannel.class。

  • .childHandler用於配置每條連接的數據讀寫和業務邏輯等。上面程式碼用的是匿名內部類,並沒有什麼內容。實際使用中為了規範起見,一般會再寫一個單獨的類也就是初始化器,在裡面寫上需要的操作。就如Netty實戰那篇中的程式碼一樣。

  • 最後就是綁定監聽埠了。

  引導類最小化的參數配置就是如上四個:配置執行緒組、IO模型、處理邏輯、綁定埠。

引導類serverBootstrap 的其他方法:

  1.handler()方法:上面的cildHandler是處理連接的讀寫邏輯,這個是用於指定服務端啟動中的邏輯.

serverBootstrap.handler(new ChannelInitializer<NioServerSocketChannel>() {
    protected void initChannel(NioServerSocketChannel ch) {
        System.out.println("服務端啟動中");
    }
})

  2.attr()方法:給服務端的channel指定一些自定義屬性。然後通過channel.attr()取出這個屬性,其實就是給channel維護一個map。一般也用不上。

  3.childAttr()方法:作用和上面一樣,這個是針對客戶端的channel。

  4.option()方法:給服務端的channel設置屬性,如

serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)

  上面程式碼表示系統用於臨時存放已完成三次握手的請求的隊列的最大長度,如果連接建立頻繁,伺服器處理創建新連接較慢,可以適當調大這個參數

  5.childOption()方法:大家肯定已經明白了Netty的命名規律,這個是給每條客戶端連接設置TCP相關的屬性,如

serverBootstrap
        //開啟TCP底層心跳機制
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        //開啟Nagle演算法,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網路交互,就開啟。
        .childOption(ChannelOption.TCP_NODELAY, true)

 

客戶端啟動類

  還是說說那篇Netty實戰,裡面的客戶端是Web,所以用到了WebSocket。主要的重點還是在服務端上,客戶端實現起來相對容易,因為它只用發送消息和接收消息。下面依舊寫出一個精簡的客戶端Demo,可以根據自己的項目類型還選擇客戶端的實現。

public class NettyClient {
    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // 1.指定執行緒模型
                .group(workerGroup)
                // 2.指定 IO 類型為 NIO
                .channel(NioSocketChannel.class)
                // 3.IO 處理邏輯
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                    }
                });
        // 4.建立連接
        bootstrap.connect("127.0.0.1", 8000).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("連接成功!");
            } else {
                System.err.println("連接失敗!");
                //重新連接
            }
        });
    }
}

  弄懂服務端程式碼後,客戶端就很好理解了,就不再一一說明了。主要作用就在注釋中。

重新連接

  網路環境差的情況下,客戶端第一次連接可能會失敗,所以我們需要嘗試重新連接。可以把連接connect上面的程式碼封裝起來,然後傳入一個Bootstrap類型的對象,通過這個對象循環連接。但是一般情況下,連接失敗後不會馬上重連,而是會通過一個指數退避的方式,比如每隔1s、2s、4s、8s….重新連接。

int MAX_RETRY=5;
connect(bootstrap, "127.0.0.1", 8000, MAX_RETRY);
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
    bootstrap.connect(host, port).addListener(future -> {
        if (future.isSuccess()) {
            System.out.println("連接成功!");
        } else if (retry == 0) {
            System.err.println("重試次數已用完,放棄連接!");
        } else {
            // 第幾次重連
            int order = (MAX_RETRY - retry) + 1;
            // 本次重連的間隔,1<<order相當於1乘以2的order次方
            int delay = 1 << order;
            System.err.println(new Date() + ": 連接失敗,第" + order + "次重連……");
            bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                    .SECONDS);
        }
    });
}

  在上面的程式碼中,我們看到,我們定時任務是調用 bootstrap.config().group().schedule(), 其中 bootstrap.config() 這個方法返回的是 BootstrapConfig,他是對 Bootstrap 配置參數的抽象,然後 bootstrap.config().group() 返回的就是我們在一開始的時候配置的執行緒模型 workerGroup,調 workerGroup 的 schedule 方法即可實現定時任務邏輯。

  在 schedule 方法塊裡面,前面四個參數我們原封不動地傳遞,最後一個重試次數參數減掉一,就是下一次建立連接時候的上下文資訊。我們可以自行修改程式碼,更改到一個連接不上的服務端 Host 或者 Port,查看控制台日誌就可以看到5次重連日誌。

  bootstrap的其他方法,attr()和option()。作用和服務端的方法一樣。attr設置自定義屬性,option設置TCP相關的屬性。

 

數據傳輸的載體:ByteBuf

  NIO中經常使用的ByteBuffer,但它還有一些缺陷:

  1.長度固定。2.讀寫狀態需要通過filp()和rewind()手動轉換。3.功能有限。

  如果看過上面NIO那篇部落格中聊天室Demo,其實都會發現這些問題。長度設置固定的1024個位元組,讀寫也要用filp()轉換。所以Netty為了解決ByteBuffer的這些缺陷,設計了ByteBuf。其結構如下:

  1. ByteBuf 是一個位元組容器,容器裡面的的數據分為三個部分,第一個部分是已經丟棄的位元組,這部分數據是無效的;第二部分是可讀位元組,這部分數據是 ByteBuf 的主體數據, 從 ByteBuf 裡面讀取的數據都來自這一部分;最後一部分的數據是可寫位元組,所有寫到 ByteBuf 的數據都會寫到這一段。最後一部分虛線表示的是該 ByteBuf 最多還能擴容多少容量。

  2. 以上三段內容是被兩個指針給劃分出來的,從左到右,依次是讀指針(readerIndex)、寫指針(writerIndex),然後還有一個變數 capacity,表示 ByteBuf 底層記憶體的總容量。

  3. 從 ByteBuf 中每讀取一個位元組,readerIndex 自增1,ByteBuf 裡面總共有 writerIndex-readerIndex 個位元組可讀, 由此可以推論出當 readerIndex 與 writerIndex 相等的時候,ByteBuf 不可讀。

  4. 寫數據是從 writerIndex 指向的部分開始寫,每寫一個位元組,writerIndex 自增1,直到增到 capacity,這個時候,表示 ByteBuf 已經不可寫了。

  5. ByteBuf 裡面其實還有一個參數 maxCapacity,當向 ByteBuf 寫數據的時候,如果容量不足,那麼這個時候可以進行擴容,直到 capacity 擴容到 maxCapacity,超過 maxCapacity 就會報錯。

ByteBuf的API

  capacity():表示ByteBuf底層佔用了多少位元組,包括丟棄位元組、可讀位元組、可寫位元組。

  maxCapacity():表示ByteBuf最大能佔用多少位元組,也就是包括後面的可擴容的記憶體。

  readableBytes() 與 isReadable():前者表示當前可讀位元組數,也就是寫指針-讀指針。後者表示是否可讀。

  writableBytes()、 isWritable() 與 maxWritableBytes():第一個表示可寫位元組數。第二個表示是否可寫。第三個表示最大可寫位元組數。

  readerIndex() 與 readerIndex(int):前者返回當前的讀指針。後者可以設置讀指針。

  writeIndex() 與 writeIndex(int):和上面一樣,只是讀指針變成了寫指針。

  markReaderIndex() 與 resetReaderIndex():前者表示把當前讀指針保存起來。後者表示把當前的讀指針恢復到保存時的值。他們的功能其實readerIndex() 與 readerIndex(int)一樣可以實現,但一般會選擇下面兩句,因為不用定義一個變數。

int readerIndex = buffer.readerIndex();
buffer.readerIndex(readerIndex);

//和上面兩句等價
buffer.markReaderIndex();
buffer.resetReaderIndex();

  writeBytes(byte[] src) 與 buffer.readBytes(byte[] dst):前者表示把src寫到ByteBuf。後者表示把ByteBuf全部數據讀取到dst。

  writeByte(byte b) 與 buffer.readByte():前者表示把位元組b寫道ByteBuf。後者表示從ByteBuf讀取一個位元組。類似的 API 還有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 與 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble() 這裡就不一一贅述了。

  release() 與 retain():由於 Netty 使用了堆外記憶體,而堆外記憶體是不被 jvm 直接管理的,也就是說申請到的記憶體無法被垃圾回收器直接回收,所以需要我們手動回收。Netty 的 ByteBuf 是通過引用計數的方式管理的,如果一個 ByteBuf 沒有地方被引用到,需要回收底層記憶體。默認情況下,當創建完一個 ByteBuf,它的引用為1,然後每次調用 retain() 方法, 它的引用就加一, release() 方法原理是將引用計數減一,減完之後如果發現引用計數為0,則直接回收 ByteBuf 底層的記憶體。

  slice()、duplicate()、copy():這三個都會返回一個新的ByteBuf。第一個是截取讀指針到寫指針範圍內的一段內容。第二個是截取整個ByteBuf,包括數據和指針資訊。第三個是拷貝所有資訊,除了第二個API的內容還包括底層資訊,因此拷貝後的新ByteBuf任何操作不會影響原始的ByteBuf。

 

 

實戰:服務端和客戶端雙向通訊

  了解客戶端、服務端的啟動類和ByteBuf以後,可以進行一個簡單的實戰了。

  首先看看前面的客戶端程式碼,.handler里重寫的initChannel方法並沒實際內容。現在加上邏輯處理器,其實就是一個執行邏輯程式碼的類,怎麼叫無所謂,明白它的意思就行。

  客戶端

.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new FirstClientHandler());
    }
});

  ch.pipeline().addLast()就是添加一個邏輯處理器。我們在FirstClientHandler里添加對應的邏輯程式碼就行。

public class FirstClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("客戶端發送消息...");
        // 1. 獲取數據
        ByteBuf buffer = getByteBuf(ctx);
        // 2. 寫數據
        ctx.channel().writeAndFlush(buffer);
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 獲取二進位抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        // 2. 準備數據,指定字元串的字符集為 utf-8
        byte[] bytes = ("【客戶端】:這是客戶端發送的消息:"+new Date()).getBytes(Charset.forName("utf-8"));
        // 3. 填充數據到 ByteBuf
        buffer.writeBytes(bytes);
        return buffer;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        //接收服務端的消息並列印
        System.out.println(byteBuf.toString(Charset.forName("utf-8")));
    }
}
  • channelActive()方法會在客戶端與伺服器建立連接後調用。所以我們可以在這裡面編寫邏輯程式碼

  • .alloc().buffer()的作用是把字元串的二進位數據填充到ByteBuf。

  • .writeBytes()的作用是把數據寫到伺服器。

  • channelRead()在接受到服務端的消息後調用。

  服務端

  同樣的我們需要在initChannel()里添加一個邏輯處理器。

.childHandler(new ChannelInitializer<NioSocketChannel>() {
    protected void initChannel(NioSocketChannel ch) {
        ch.pipeline().addLast(new FirstServerHandler());
    }
});

  邏輯處理器里的程式碼

public class FirstServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(Charset.forName("utf-8")));
        //接收到客戶端的消息後我們再回復客戶端
        ByteBuf out = getByteBuf(ctx);
        ctx.channel().writeAndFlush(out);
    }
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "【伺服器】:我是伺服器,我收到你的消息了!".getBytes(Charset.forName("utf-8"));
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }
}

  channelRead()方法在接收到客戶端發來的數據後調用。

  最後我們啟動客戶端和服務端的啟動類,效果如下:

 

 

自定義客戶端與服務端的通訊協議

什麼是通訊協議?

  TCP通訊的數據包格式均為二進位,協議指的就是客戶端與服務端制定一個規則,這個規則規定了每個二進位數據包中每一段的位元組代表什麼含義。客戶端與服務端通訊的過程如下。

 

  客戶端首先把一個Java對象轉換成二進位數據,然後通過網路傳輸給服務端。這裡的傳輸過程屬於TCP/IP協議負責,與我們應用層無關。

通訊協議的設計

  1.第一個欄位是魔數,可以理解為識別這條二進位數據類型的欄位。在《深入理解Java虛擬機》中這麼解釋:使用魔數而不是擴展名來進行識別主要是基於安全方面的考慮,因為文件擴展名可以隨意地改動。文件格式的制定者可以自由地選擇魔數值,只要這個魔數值還沒有被廣泛採用過同時又不會引起混淆即可。

  2.第二個是版本號,就像IPV4和IPV6一樣。能夠支援協議的升級。

  3.第三個表示如何把Java對象轉換成二進位數據和把二進位數據轉回Java對象。

  4.第四個用於區分這條數據是幹嘛的或者說叫數據類型,如:這是發送的消息,還是登錄的請求等。服務端就可以根據這個指令執行不同的邏輯程式碼。

  5.第五個代表後面的數據長度。

  6.第六個代表發送的數據,如果指令表明這是個登錄數據,裡面存儲的就是帳號密碼。

通訊協議的實現

  以實現登錄為例,下面介面和類有點多,建議先把程式碼拷貝到IDE里,分好包寫好注釋,助於理解它們的關係。

  1.首先創建一個Java對象,這裡以登錄時的請求響應為例

@Data
public abstract class Packet {
    //協議版本
    private Byte version = 1;
    //獲取數據類型
    public abstract Byte getCommand();
}

  @Date註解由lombok提供,不了解的可以看看這個//www.cnblogs.com/lbhym/p/12551021.html

public interface Command {
    //定義登錄請求指令和響應指令為1和2,其他的指令同理如MESSAGE_REQUEST等
    Byte LOGIN_REQUEST = 1;
    Byte LOGIN_RESPONSE = 2;
}
//這個是登錄請求數據包的Java對象,所以調用的是上面介面的登錄請求指令,其他類型的數據包同理
@Data
public class LoginRequestPacket extends Packet {
    //定義用戶資訊
    private Integer userId;
    private String username;
    private String password;
    @Override
    public Byte getCommand() {
        return LOGIN_REQUEST;
    }
}
@Data
public class LoginResponsePacket extends Packet {
    //是否登錄成功
    private boolean success;
    //如果失敗,返回的資訊
    private String reason;
    @Override
    public Byte getCommand() {
        return LOGIN_RESPONSE;
    }
}

  2.Java對象創建完了,再定義Java對象轉換的規則

//序列化介面
public interface Serializer {
    Serializer DEFAULT = new JSONSerializer();
    //序列化演算法
    byte getSerializerAlogrithm();
    //java 對象轉換成二進位
    byte[] serialize(Object object);
    //二進位轉換成 java 對象
    <T> T deserialize(Class<T> clazz, byte[] bytes);
}

  介面定義完後開始實現介面。這裡的序列化演算法使用的是fastjson裡面的。需要在pom.xml里導入。

public interface SerializerAlgorithm {
    //json序列化標識,如果你有其他的序列化方式可以在這註明標識,類似上面的登錄指令
    byte JSON = 1;
}
//實現上面定義的序列化介面
public class JSONSerializer implements Serializer {
    @Override
    public byte getSerializerAlgorithm() {
        //獲取上面的序列化標識
        return SerializerAlgorithm.JSON;
    } 
    @Override
    public byte[] serialize(Object object) {
        return JSON.toJSONBytes(object);
    }
    @Override
    public <T> T deserialize(Class<T> clazz, byte[] bytes) {
        return JSON.parseObject(bytes, clazz);
    }
}

  3.創建一個類PacketCodeC,裡面寫上編解碼的方法。這裡再說一點,因為使用了@Data註解,所以有的get方法在語法檢測階段會報錯,可以在IDEA裡面下載Lombok插件。

public class PacketCodeC {
    //自定義一個魔數
    private static final int MAGIC_NUMBER = 0x12345678;
    //創建一個靜態實例供外部調用
    public static final PacketCodeC INSTANCE=new PacketCodeC();
    //創建兩個map,一個存儲數據類型,如:是登錄數據還是普通消息等。第二個是存儲序列化類型。
    //這樣在解碼時就可以把數據轉換為對應的類型。如:這個byte數組是LOGIN_REQUEST類型,就把它轉換成LoginRequestPacket類型的Java對象
    private  final Map<Byte, Class<? extends Packet>> packetTypeMap;
    private  final Map<Byte, Serializer> serializerMap;

    private PacketCodeC() {
        //初始化map並添加數據類型和序列化類型,如果有其他數據類型,記得在這裡添加
        packetTypeMap = new HashMap<>();
        packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
        packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
        serializerMap = new HashMap<>();
        Serializer serializer = new JSONSerializer();
        serializerMap.put(serializer.getSerializerAlogrithm(), serializer);
    }
    //編碼
    public ByteBuf encode(ByteBufAllocator bufAllocator,Packet packet) {
        // 1. 創建 ByteBuf 對象
        ByteBuf byteBuf = bufAllocator.ioBuffer();
        // 2. 序列化 Java 對象
        byte[] bytes = Serializer.DEFAULT.serialize(packet);
        // 3. 實際編碼過程,把通訊協議幾個部分,一一編碼
        byteBuf.writeInt(MAGIC_NUMBER);
        byteBuf.writeByte(packet.getVersion());
        byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlogrithm());
        byteBuf.writeByte(packet.getCommand());
        byteBuf.writeInt(bytes.length);
        byteBuf.writeBytes(bytes);
        return byteBuf;
    }
    //解碼
    public Packet decode(ByteBuf byteBuf) {
        // 跳過魔數
        byteBuf.skipBytes(4);
        // 跳過版本號
        byteBuf.skipBytes(1);
        // 序列化演算法標識
        byte serializeAlgorithm = byteBuf.readByte();
        // 指令
        byte command = byteBuf.readByte();
        // 數據包長度
        int length = byteBuf.readInt();
        byte[] bytes = new byte[length];
        byteBuf.readBytes(bytes);
        Class<? extends Packet> requestType = getRequestType(command);
        Serializer serializer = getSerializer(serializeAlgorithm);
        if (requestType != null && serializer != null) {
            return serializer.deserialize(requestType, bytes);
        }
        return null;
    }
    //獲取序列化類型
    private Serializer getSerializer(byte serializeAlgorithm) {
        return serializerMap.get(serializeAlgorithm);
    }
    //獲取數據類型
    private Class<? extends Packet> getRequestType(byte command) {
        return packetTypeMap.get(command);
    }
}

使用自定義通訊協議

  最後通過一個登錄示例,來使用一下上面自定義的通訊協議。

  基於上面的程式碼,首先更換一下客戶端和服務端的邏輯處理器,直接在原來的邏輯處理器裡面修改邏輯程式碼也行。

.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new ClientHandler());
            }
        });
.childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) {
                ch.pipeline().addLast(new ServerHandler());
            }
        }

  客戶端在連接上服務端後立即登錄,下面為客戶端登錄程式碼

public class ClientHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客戶端開始登錄");
        // 創建登錄對象
        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
        loginRequestPacket.setUserId(new Random().nextInt(10000));
        loginRequestPacket.setUsername("username");
        loginRequestPacket.setPassword("pwd");
        // 編碼
        ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);
        // 寫數據
        ctx.channel().writeAndFlush(buffer);
    }
    //接收服務端資訊
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
        //如果數據類型是登錄,就進行登錄判斷
        //同理可以判斷數據是否是普通消息,還是其他類型的數據
        if (packet instanceof LoginResponsePacket) {
            LoginResponsePacket loginResponsePacket = (LoginResponsePacket) packet;
            if (loginResponsePacket.isSuccess()) {
                System.out.println(new Date() + ": 客戶端登錄成功");
            } else {
                System.out.println(new Date() + ": 客戶端登錄失敗,原因:" + loginResponsePacket.getReason());
            }
        }
    }
}

  下面是服務端程式碼

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf requestByteBuf = (ByteBuf) msg;
        // 解碼
        Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);
        // 判斷是否是登錄請求數據包
        if (packet instanceof LoginRequestPacket) {
            LoginRequestPacket loginRequestPacket = (LoginRequestPacket) packet;
            LoginResponsePacket loginResponsePacket=new LoginResponsePacket();
            loginResponsePacket.setVersion(packet.getVersion());
            // 登錄校驗
            if (valid(loginRequestPacket)) {
                // 校驗成功
                loginResponsePacket.setSuccess(true);
                System.out.println("客戶端登錄成功!");
            } else {
                // 校驗失敗
                loginResponsePacket.setReason("帳號或密碼錯誤");
                loginResponsePacket.setSuccess(false);
                System.out.println("客戶端登錄失敗!");
            }
            // 編碼,結果發送給客戶端
            ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);
            ctx.channel().writeAndFlush(responseByteBuf);
        }
    }
    private boolean valid(LoginRequestPacket loginRequestPacket) {
        //這裡可以查詢資料庫,驗證用戶的帳號密碼是否正確
        return true;
    }
}

  最後演示效果如圖所示:

自己實現收發消息

  按照上面的登錄功能,可以自己嘗試實現一下收發消息的功能。首先還是要定義一個收發消息的Java對象。還需要在Command裡面加上收發消息的指令,在編解碼類裡面的map添加Java對象。收發消息前,肯定需要登錄。那麼怎麼判斷一個客戶端是否登錄呢?可以通過ctx獲取channel,然後通過channel的attr方法設置屬性。如果登錄成功這個屬性就設為true。然後在客戶端的啟動類里連接成功後,新建一個執行緒專門監聽用戶的輸入,新建輸入執行緒前判斷一下登錄屬性是否為true就行了。其他的地方跟登錄就沒有太大差別了。

 

pipeline與channelHandler

  通過上面的一些實戰,可以發現我們所有的邏輯程式碼都寫在了一個Handler類裡面,幸好現在需要處理的業務不是很多。如果以後功能拓展,這個類會變得非常臃腫。Netty中的pipeline和channelHandler就是解決這個問題的,它們通過責任鏈設計模式來組織程式碼邏輯,並且能夠支援邏輯的添加和刪除,能夠支援各類協議拓展,如HTTP、Websocket等。可以看看Netty實戰部落格中的初始化器類,裡面就是通過pipeline添加了各類協議和一些邏輯程式碼。

pipeline與channelHandler的構成

  

  我們知道一個連接對應一個channel,這個channel的所有處理邏輯在一個ChannelPipeline對象里,就是上圖中的pipeline,這是它的對象名。然後這個對象裡面是一個雙向鏈表結構,每個節點是一個ChannelHandlerContext對象。這個對象能拿到與channel相關的所有上下文資訊,這個對象還包含一個重要的對象:ChannelHandler,它的分類如下。

  簡單地說,它包含兩個介面和這兩個介面的實現類,圖中左邊的實現類是不是很熟悉,就是我們自己寫的邏輯處理器里的繼承的類。從名字就可以看出,它們的作用分別是讀數據和寫數據,或理解為入站和出戰。最重要的兩個方法分別為channelRead():消息入站。和write():消息出戰。

構建客戶端與服務端的pipeline

  下面的程式碼基於上面的登錄示例改造。

  我們先了解一下ByteToMessageDecoder這個類。不論客戶端還是服務端收到數據後,都會先進行解碼,這個類就是Netty提供的專門做這個事情的。使用如下:

public class PacketDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        out.add(PacketCodeC.INSTANCE.decode(in));
    }
}

  這個類有一個好處就是,ByteBuf默認使用的是堆外記憶體,而它會幫你自動釋放記憶體,無需我們關心。上面是解碼,對應的Netty也準備了一個類來專門編碼:MessageToByteEncoder.

public class PacketEncoder extends MessageToByteEncoder<Packet> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {
        PacketCodeC.INSTANCE.encode(out, packet);
    }
}

  注意encode傳入的參數,第一個參數變成了ByteBuf的類型,所以我們需要把PacketCodeC里的encode方法的參數改過來,也不需要第一行創建一個ByteBuf對象了。

  如果不明白為什麼要用到這兩個類的話,我先展示一段Netty實戰部落格裡面的程式碼:

public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline= socketChannel.pipeline();
        //http解碼器
        pipeline.addLast(new HttpServerCodec());
        //....
        //websocket支援,設置路由
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //添加自定義的助手類(邏輯處理器)
        pipeline.addLast(new NettyHandler());
    }
}

  在這篇實戰中我沒有實現自己的編解碼器,而是直接使用了http的解碼器。類似的,我們可以把自己的編解碼器也通過pepeline添加到邏輯鏈中。就像前面說的,可以添加刪除邏輯程式碼,每個功能各司其職,而不是一股腦的全在一個地方。用這兩個類還有一個好處就是Netty會自動識別這兩個類,從而自動編解碼而不需要我們自己去調用

  編解碼的問題解決了,再看看邏輯處理器類。看看登錄的程式碼,如果我們不止實現登錄功能,還有收發等其他功能,是不是要用大量的if else把各個消息類型分開,然後執行不同的邏輯。不同的邏輯都擠在一個方法中,顯然也太擁擠了。因此Netty基於這種考慮,抽象出了SimpleChannelInboundHandler。下面看看它是如何解決這個問題的:

public class ClientLoginHandler extends SimpleChannelInboundHandler<LoginResponsePacket>{
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("客戶端開始登錄....");
        // 創建登錄對象
        LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
        loginRequestPacket.setUserId(new Random().nextInt(10000));
        loginRequestPacket.setUsername("username");
        loginRequestPacket.setPassword("pwd");
        // 寫數據
        ctx.channel().writeAndFlush(loginRequestPacket);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
        if (loginResponsePacket.isSuccess()) {
            System.out.println(new Date() + ": 客戶端登錄成功");
        } else {
            System.out.println(new Date() + ": 客戶端登錄失敗,原因:" + loginResponsePacket.getReason());
        }
    }
}
public class ServerLoginHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
        //服務端登錄邏輯
        ctx.channel().writeAndFlush(login(loginRequestPacket));
    }
    private LoginResponsePacket login(LoginRequestPacket loginRequestPacket) {
        LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
        loginResponsePacket.setVersion(loginRequestPacket.getVersion());
        // 登錄校驗(自行判斷用戶資訊是否正確)
        if (true) {
            // 校驗成功
            loginResponsePacket.setSuccess(true);
            System.out.println("客戶端登錄成功!");
            return loginResponsePacket;
        } else {
            // 校驗失敗
            loginResponsePacket.setReason("帳號或密碼錯誤");
            loginResponsePacket.setSuccess(false);
            System.out.println("客戶端登錄失敗!");
            return loginResponsePacket;
        }
    }
}

  類似的,收發消息也可以這麼做。Netty會自動根據抽象類後面的泛型來區分它要調用哪個類。比如我們發送的是一個SendMessage類型的Java對象,它就會在繼承了SimpleChannelInboundHandler的類中找到泛型為SendMessage的類去執行。

  最後我們要把這些邏輯程式碼根據服務端和客戶端不同的需求添加到它們的pipeline中。

  客戶端

.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        //解碼一定要放在第一個,在這裡pipeline按順序執行,不然接收消息無法正常使用
                        ch.pipeline().addLast(new PacketDecoder());
                        ch.pipeline().addLast(new LoginResponseHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });

  服務端

.childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new PacketDecoder());
                        ch.pipeline().addLast(new LoginRequestHandler());
                        ch.pipeline().addLast(new PacketEncoder());
                    }
                });

  最後的運行結果和登錄示例一樣。看到這裡,Netty實戰中的程式碼,應該可以全部看懂了。

channelHandler的生命周期

  我們在重寫ChannelInboundHandlerAdapter或者SimpleChannelInboundHandler里的方法的時候,只用到了讀和Active,其他一大堆沒用上的方法是幹嘛的?現在就一一說明一下,這些方法運作的整個過程,我們可以理解為這個channelHandler的生命周期。以ChannelInboundHandlerAdapter為例,SimpleChannelInboundHandler繼承於ChannelInboundHandlerAdapter,所以也差不多,個別方法名不一樣而已。下面的API,從上到下,就是觸發的順序。

  handlerAdded():邏輯處理器被添加後觸發。

  channelRegistered():channel綁定到執行緒組後觸發。

  channelActive():channel準備就緒,或者說連接完成。

  channelRead():channel有數據可讀。

  channelReadComplete():channel某次數據讀完了。

  channelInactive():channel被關閉

  channelUnregistered():channel取消執行緒的綁定

  handlerRemoved():邏輯處理器被移除。

 

拆包和粘包

  以上面客戶端和服務端雙向通訊的程式碼為例。簡單修改一下,在建立連接後,客戶端用一個循環向伺服器發送消息。然後服務端列印這些消息。

等次數多了以後,服務端列印時會發現一些問題,比如我們發送的字元串為「123456789」,大部分列印的是123456789;有一部分變成了123456789123,這就是粘包;有一部分變為了1234,這就是拆包。

為什麼會有這種現象?

  雖然在我們程式碼層面,傳輸的數據單位是ByteBuf。但是到了更底層,用到了TCP協議,終究會按照位元組流發送數據。而底層並不知道應用層數據的具體含義,它會根據TCP緩衝區的實際情況進行數據包的劃分。所以最終到達服務端的數據產生上面的現象。

如何解決?

  Netty為我們提供了4種解決方法:

  1.FixedLengthFrameDecoder:固定長度拆包器,每個數據包長度都是固定的。

  2.LineBasedFrameDecoder:行拆包器,每個數據包之間以換行符作為分隔。

  3.DelimiterBasedFrameDecoder:類似行拆包器,不過我們可以自定義分隔符。

  4.LengthFieldBasedFrameDecoder:基於長度域拆包器,最常用的,只要你的自定義協議中包含數據長度這個部分,就可以使用。它需要三個參數,第一個是數據包最大長度、第二個是參數長度域偏移量、第三個是長度域長度。

  

  看看前面通訊協議的圖,所謂長度域就是數據長度就是數據長度佔用的位元組,這裡是4。長度域偏移量就是數據長度這個部分在通訊協議組成部分中的位置,前面幾個部分加起來是7,所以它的偏移量就是7。

使用LengthFieldBasedFrameDecoder

  添加到客戶端和服務端pipeline中就行了,注意要放在第一個。以服務端為例。

ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new PacketEncoder());

  如果和我一樣,客戶端用到的是WebSocket,沒有自定義協議,就不用添加拆包器了,Netty已經幫我們做好了。

 

 

心跳與空閑檢測

 

  網路應用程式普遍會遇到的一個問題就是連接假死。簡單地說就是因為網路或其他問題導致某一端實際上(TCP)已經斷開連接,但是應用程式沒有檢測到,以為還連接著。對服務端來說,這會浪費系統資源,導致性能下降。對於客戶端來說,假死會造成數據發送超時,影響體驗。

 

如何解決這個問題?

 

  只需要客戶端每隔一段時間打個招呼,表示它還活著就行了,就是所謂的心跳。Netty自帶的IdleStateHandler 就可以實現這個功能。下面就來實現它。

 

  服務端

//心跳檢測類
public class IMIdleStateHandler extends IdleStateHandler {
    //讀空閑時間,也就是多久沒讀到數據了
    private static final int READER_IDLE_TIME = 15;
    public IMIdleStateHandler() {
        //調用父類構造函數,四個參數分別為:
        //讀空閑時間、寫空閑時間、讀寫空閑時間、時間單位
        super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {
        System.out.println(READER_IDLE_TIME + "秒內未讀到數據,關閉連接");
        ctx.channel().close();
    }
}
//回復客戶端發送的心跳數據包
public class HeartBeatRequestHandler extends SimpleChannelInboundHandler<HeartBeatRequestPacket> {
    public static final HeartBeatRequestHandler INSTANCE = new HeartBeatRequestHandler();
    private HeartBeatRequestHandler() {
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HeartBeatRequestPacket requestPacket) {
        ctx.writeAndFlush(new HeartBeatResponsePacket());
    }
}
ch.pipeline().addLast(new IMIdleStateHandler());//添加到最前面
  ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);//添加到解碼和登錄請求之後

  客戶端

  服務端實現了檢測讀空閑,客戶端肯定就需要發送一個數據。

public class HeartBeatTimerHandler extends ChannelInboundHandlerAdapter {
    //心跳數據包發送時間間隔,這裡設為5秒,實際使用時建議服務端和客戶端都設成分鐘級別
    private static final int HEARTBEAT_INTERVAL = 5;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        scheduleSendHeartBeat(ctx);
        super.channelActive(ctx);
    }
    private void scheduleSendHeartBeat(ChannelHandlerContext ctx) {
        //schedule類似延時任務,每隔5秒發送心跳數據包
        ctx.executor().schedule(() -> {
            if (ctx.channel().isActive()) {
                ctx.writeAndFlush(new HeartBeatRequestPacket());
                scheduleSendHeartBeat(ctx);
            }
        }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
    }
}

  客戶端發送心跳數據包後,也需要檢測服務端是否回復了自己,所以也需要個檢測類,與服務端的程式碼一樣,就不寫了。也需要和服務端一樣,pipeline添加到相同的位置。

 

  最後:如果服務端在讀到數據後,不要再read方法裡面直接訪問資料庫或者其他比較複雜的操作,可以把這些耗時的操作放進我們的業務執行緒池中去執行。如:

ThreadPool threadPool = xxx;
protected void channelRead0(ChannelHandlerContext ctx, T packet) {
    threadPool.submit(new Runnable() {
        // 1. balabala 一些邏輯
        // 2. 資料庫或者網路等一些耗時的操作
        // 3. writeAndFlush()
    })
}

  如果我們想統計某個操作的響應時間,直接用System.currentTimeMillis()其實是不準確的,因為有些操作是非同步的,它馬上就返回了,所以我們要判斷非同步結果是否完成再計算結束時間。

protected void channelRead0(ChannelHandlerContext ctx, T packet) {
    threadPool.submit(new Runnable() {
        long begin = System.currentTimeMillis();
        // 1. balabala 一些邏輯
        // 2. 資料庫或者網路等一些耗時的操作
        // 3. writeAndFlush
        xxx.writeAndFlush().addListener(future -> {
            if (future.isDone()) {
                long time =  System.currentTimeMillis() - begin;
            }
        });
    })
}

 

資料://juejin.im/book/5b4bc28bf265da0f60130116