Netty-入门

  • 2021 年 5 月 20 日
  • 筆記

Hello World

目标

开发一个简单的服务器端和客户端

  • 客户端向服务器端发送 hello, world
  • 服务器仅接收,不返回

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

服务器端

@Slf4j
public class HelloServer {

    public static void main(String[] args) {
        new ServerBootstrap()
                // 1. 创建 NioEventLoopGroup
                .group(new NioEventLoopGroup())
                // 2. 选择 Socket 实现类,NioServerSocketChannel 是基于 nio 实现的
                .channel(NioServerSocketChannel.class)
                // 3. 子线程处理类 handler
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 5. socketChannel 解码器
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 6. SocketChannel 的业务处理,使用上一个处理器的处理结果
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
                                log.info("msg = " + msg);
                            }
                        });
                    }
                })
                // 4. 绑定监听端口
                .bind(8080);
    }
}

代码解读

  • 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开
  • 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现
  • 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
  • 4 处,ServerSocketChannel 绑定的监听端口
  • 5 处,SocketChannel 的处理器,解码 ByteBuf => String
  • 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果

客户端

@Slf4j
public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        // 1. 启动类
        new Bootstrap()
                // 2. 添加 EventLoop
                .group(new NioEventLoopGroup())
                // 3. 选择客户端 channel 实现
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 8
                        nioSocketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 4. 连接到服务器
                .connect(new InetSocketAddress("localhost",8080))
                // 5
                .sync()
                // 6
                .channel()
                // 7. 向服务器发送数据
                .writeAndFlush("hello word");
    }
}

代码解读

  • 1 处,创建 NioEventLoopGroup,同 Server
  • 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
  • 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
  • 4 处,指定要连接的服务器和端口
  • 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
  • 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
  • 7 处,写入消息并清空缓冲区
  • 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出
  • 数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程

组件

EventLoop

事件循环对象

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,
    • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

事件循环组

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

  • 继承自 netty 自己的 EventExecutorGroup
    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop
@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        /**
         * 细分1:
         * boss 只负责 ServerSocketChannel 上的 accept 事件
         * worker 只负责 SocketChannel 上的读写事件
         */
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        /**
         * 细分2:
         * 创建一个独立的 EventLoopGroup
         */
        EventLoopGroup dfGroup = new DefaultEventLoopGroup();
        new ServerBootstrap()
                .group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel sc) throws Exception {
                        sc.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.info(buf.toString(Charset.defaultCharset()));
                                ctx.fireChannelRead(msg); // 把消息传递给下一个 handler
                            }
                        }).addLast(dfGroup,"handler2",new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf)msg;
                                log.info(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8085);
    }
}
@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {

        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8085))
                .sync()
                .channel();

        log.info(channel.toString());
        System.out.println();
    }
}

优雅关闭

优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的

Channel

channel 的主要作用

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

ChannelFuture

@Slf4j
public class EventLoopServer {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        new ServerBootstrap()
                .group(boss,worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                log.info(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                })
                .bind(8086);
    }
}
@Slf4j
public class EventLoopClient {
    public static void main(String[] args) throws InterruptedException {

        // 2. 带有 Future ,Promise 的类型都是和异步方法配套使用,用来处理结果
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel sc) throws Exception {
                        sc.pipeline().addLast(new StringEncoder());
                    }
                })
                // 1. 连接到服务器
                // 异步阻塞,main 线程发起了调用,真正执行 connect 连接的是 nio 线程
                .connect(new InetSocketAddress("localhost", 8086));

        // 2.1 使用 sync 方法同步处理结果
//        channelFuture.sync();  // 阻塞住当前线程,直到 nio 线程连接建立完毕
//        Channel channel = channelFuture.channel();
//        log.info(channel.toString());
//        channel.writeAndFlush("hello word;");

        // 2.2 使用 addListener(回调对象)方法异步处理结果
        channelFuture.addListener(new ChannelFutureListener() {
            // 在 nio 线程连接建立好之后,会调用 operationComplete
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                log.info(channel.toString());
                channel.writeAndFlush("hello word");
            }
        });
    }
}

CloseFuture

@Slf4j
public class EventLoopClient2 {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8086));
        Channel channel = channelFuture.sync().channel();
        new Thread(() -> {
            Scanner scanner = new Scanner(System.in);
            while (true) {
                String str = scanner.nextLine();
                if("q".equals(str)){
                    channel.close();
                    break;
                }
                channel.writeAndFlush(str);
            }
        },"input").start();

        // 1. 获取 CloseFuture 对象同步处理关闭
        ChannelFuture closeFuture = channel.closeFuture();
//        closeFuture.sync();  // 阻塞
//        log.info("处理关闭后的操作...");

        // 2. 异步处理关闭
        closeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                log.info("处理关闭之后的操作...");
                // 优雅的停止接受任务
                group.shutdownGracefully();
            }
        });
    }
}