Netty 學習(二):服務端與客戶端通訊

Netty 學習(二):服務端與客戶端通訊

作者: Grey

原文地址:

部落格園:Netty 學習(二):服務端與客戶端通訊

CSDN:Netty 學習(二):服務端與客戶端通訊

說明

Netty 中服務端和客戶端通訊,包括兩個方面,一個是 IO 處理邏輯的配置,一個是通訊載體的設置。

IO 處理邏輯

無論是客戶端,還是服務端,都是通過 Bootstrap 的 handler()方法指定的。我們通過模擬一個簡單的客戶端發送消息給服務端,服務端回寫消息給客戶端的示常式序來說明

服務端程式碼如下(每個配置見注釋說明)

package netty.v3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.nio.charset.StandardCharsets;
import java.util.Date;

/**
 * Netty 自動綁定遞增埠,增加了IO處理邏輯
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/12
 * @since
 */
public class NettyServer {

    public static void main(String[] args) {
        // 引導服務端的啟動
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 用於監聽埠,接收新連接的執行緒組
        NioEventLoopGroup boss = new NioEventLoopGroup();
        // 表示處理每一個連接的數據讀寫的執行緒組
        NioEventLoopGroup worker = new NioEventLoopGroup();
        serverBootstrap.group(boss, worker)
                // 指定IO模型為NIO
                .channel(NioServerSocketChannel.class)
                // 可以給服務端的Channel指定一些屬性,非必須
                .attr(AttributeKey.newInstance("serverName"), "nettyServer")
                // 可以給每一個連接都指定自定義屬性,非必須
                .childAttr(AttributeKey.newInstance("clientKey"), "clientValue")
                // 使用option方法可以定義服務端的一些TCP參數
                // 這個設置表示系統用於臨時存放已經完成三次握手的請求的隊列的最大長度,
                // 如果連接建立頻繁,伺服器創建新的連接比較慢,則可以適當調大這個參數
                .option(ChannelOption.SO_BACKLOG, 1024)
                // 以下兩個配置用於設置每個連接的TCP參數
                // SO_KEEPALIVE: 表示是否開啟TCP底層心跳機制,true表示開啟
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // TCP_NODELAY:表示是否開啟Nagle演算法,true表示關閉,false表示開啟
                // 如果要求高實時性,有數據發送時就馬上發送,就設置為關閉;
                // 如果需要減少發送次數,減少網路交互,就設置為開啟。
                .childOption(ChannelOption.TCP_NODELAY, true)
                // 定義後面每一個連接的數據讀寫
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            // 不管服務端還是客戶端,收到數據後都會調用channelRead()方法
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(StandardCharsets.UTF_8));
                                // 服務端將讀到的數據返回客戶端
                                System.out.println(new Date() + ": 服務端寫出數據");
                                ctx.channel().writeAndFlush(getByteBuf(ctx));
                            }
                            private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
                                byte[] bytes = "hello world from server!".getBytes(StandardCharsets.UTF_8);
                                ByteBuf buffer = ctx.alloc().buffer();
                                buffer.writeBytes(bytes);
                                return buffer;
                            }
                        });
                    }
                });
        // 本地綁定一個8000埠啟動服務
        bind(serverBootstrap, 8000);
    }

    public static void bind(final ServerBootstrap serverBootstrap, final int port) {
        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("埠[" + port + "]綁定成功");
            } else {
                System.err.println("埠[" + port + "]綁定失敗");
                bind(serverBootstrap, port + 1);
            }
        });
    }
}

客戶端程式碼如下(關於每個配置的說明見注釋)

package netty.v3;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * Netty 可自動重連的客戶端,增加了IO處理邏輯
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/12
 * @since
 */
public class NettyClient {
    static final int MAX_RETRY = 6;
    static final String HOST = "localhost";
    static final int PORT = 8000;

    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();

        NioEventLoopGroup group = new NioEventLoopGroup();
        bootstrap
                // 指定執行緒模型
                .group(group)
                // 指定IO類型為NIO
                .channel(NioSocketChannel.class)
                // attr可以為客戶端Channel綁定自定義屬性
                .attr(AttributeKey.newInstance("clientName"), "nettyClient")
                // 連接的超時時間,如果超過這個時間,仍未連接到服務端,則表示連接失敗
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                // 表示是否開啟TCP底層心跳機制,true表示開啟
                .option(ChannelOption.SO_KEEPALIVE, true)
                // 是否開啟Nagle演算法,如果要求高實時性,有數據就馬上發送,則為true
                // 如果需要減少發送次數,減少網路交互,就設置為false
                .option(ChannelOption.TCP_NODELAY, true)
                // IO處理邏輯
                .handler(new ChannelInitializer<>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            // 這個方法會在客戶端連接建立成功之後被調用
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) {
                                System.out.println(new Date() + ": 客戶端寫出數據");
                                // 包裝成ByteBuf並發送到服務端
                                // 註:Netty中的數據是以 ByteBuf 為單位的。
                                ctx.channel().writeAndFlush(getByteBuf(ctx));
                            }

                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                System.out.println(new Date() + ": 客戶端讀取到的數據 -> " + byteBuf.toString(StandardCharsets.UTF_8));
                            }

                            private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
                                ByteBuf buffer = ctx.alloc().buffer();
                                byte[] bytes = "hello world".getBytes(StandardCharsets.UTF_8);
                                buffer.writeBytes(bytes);
                                return buffer;
                            }
                        });
                    }
                });
        connect(bootstrap, HOST, PORT, MAX_RETRY);
    }

    private static void connect(final Bootstrap bootstrap, final String host, final int port, int retry) {
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("連接成功!");
            } else if (retry == 0) {
                System.err.println("重試次數已經使用完畢");
            } else {
                // 第幾次重試
                int order = (MAX_RETRY - retry) + 1;
                // 本次的重試間隔
                int delay = 1 << order;
                System.out.println(new Date() + ": 連接失敗,第" + order + "次重連...");
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
            }
        });
    }

}

程式碼說明:

無論是服務端還是客戶端

channelActive方法會在客戶端連接建立成功之後被調用。

channelRead方法在收到數據後都會調用

先運行服務端,控制台輸出

埠[8000]綁定成功

然後運行客戶端,客戶端列印

連接成功!
Wed Sep 14 19:58:50 CST 2022: 客戶端寫出數據
Wed Sep 14 19:58:50 CST 2022: 客戶端讀取到的數據 -> hello world from server!

服務端列印

埠[8000]綁定成功
Wed Sep 14 19:58:50 CST 2022: 服務端讀到數據 -> hello world
Wed Sep 14 19:58:50 CST 2022: 服務端寫出數據

數據載體

Netty 中的數據載體是 ByteBuf,ByteBuf 的結構如下

image

ByteBuf 和 java.nio.ByteBuffer 類似,但是提供了比 java.nio.ByteBuffer更方便使用的 API。

關於 Java 的java.nio.ByteBuffer的使用,參考:Java IO學習筆記二:DirectByteBuffer與HeapByteBuffer

接下來使用一個示例來說明 ByteBuf 的使用

程式碼如下:

package bytebuf;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
/**
 * ByteBuf 示例
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/14
 * @since
 */
public class ByteBufTest {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

        print("allocate ByteBuf(9, 100)", buffer);

        // write 方法改變寫指針,寫完之後寫指針未到 capacity 的時候,buffer 仍然可寫
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        print("writeBytes(1,2,3,4)", buffer);

        // write 方法改變寫指針,寫完之後寫指針未到 capacity 的時候,buffer 仍然可寫, 寫完 int 類型之後,寫指針增加4
        buffer.writeInt(12);
        print("writeInt(12)", buffer);

        // write 方法改變寫指針, 寫完之後寫指針等於 capacity 的時候,buffer 不可寫
        buffer.writeBytes(new byte[]{5});
        print("writeBytes(5)", buffer);

        // write 方法改變寫指針,寫的時候發現 buffer 不可寫則開始擴容,擴容之後 capacity 隨即改變
        buffer.writeBytes(new byte[]{6});
        print("writeBytes(6)", buffer);

        // get 方法不改變讀寫指針
        System.out.println("getByte(3) return: " + buffer.getByte(3));
        System.out.println("getShort(3) return: " + buffer.getShort(3));
        System.out.println("getInt(3) return: " + buffer.getInt(3));
        print("getByte()", buffer);


        // set 方法不改變讀寫指針
        buffer.setByte(buffer.readableBytes() + 1, 0);
        print("setByte()", buffer);

        // read 方法改變讀指針
        byte[] dst = new byte[buffer.readableBytes()];
        buffer.readBytes(dst);
        print("readBytes(" + dst.length + ")", buffer);

    }

    private static void print(String action, ByteBuf buffer) {
        System.out.println("after ===========" + action + "============");
        System.out.println("capacity(): " + buffer.capacity());
        System.out.println("maxCapacity(): " + buffer.maxCapacity());
        System.out.println("readerIndex(): " + buffer.readerIndex());
        System.out.println("readableBytes(): " + buffer.readableBytes());
        System.out.println("isReadable(): " + buffer.isReadable());
        System.out.println("writerIndex(): " + buffer.writerIndex());
        System.out.println("writableBytes(): " + buffer.writableBytes());
        System.out.println("isWritable(): " + buffer.isWritable());
        System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());
        System.out.println();
    }
}
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

表示分配了一塊ByteBuf。相關的參數值如下圖

image

buffer.writeBytes(new byte[]{1, 2, 3, 4});

寫入四個 byte 類型的數據,ByteBuf變化如下

image

buffer.writeInt(12);

由於int類型是4個Byte,所以,寫入後,buffer變化如下

image

buffer.writeBytes(new byte[]{5});

寫入一個byte類型的數據,此時,寫空間已經滿了。如下圖

image

buffer.writeBytes(new byte[]{6});

繼續寫入,由於寫空間已經滿了,所以要進行擴容,擴容後的結構如下

image

buffer.getByte(3);
buffer.getShort(3);
buffer.getInt(3);
buffer.setByte(buffer.readableBytes() + 1, 0);

由於get/set操作不改變讀寫指針,所以buffer還是保持原樣

image

byte[] dst = new byte[buffer.readableBytes()];
buffer.readBytes(dst);

read方法會改變讀指針,改變後結構如下

image

圖例

本文所有圖例見:processon: Netty學習筆記

程式碼

hello-netty

更多內容見:Netty專欄

參考資料

跟閃電俠學 Netty:Netty 即時聊天實戰與底層原理

深度解析Netty源碼

Tags: