Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

作者: Grey

原文地址:

博客园:Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

CSDN:Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

说明

在 Netty 服务端代码中,我们一般会创建了两个 NioEventLoopGroup:bossGroup 和 workerGroup

其中: bossGroup用于监听端口,接收新连接的线程组;workerGroup 用于处理每一个连接的数据读写的线程组。

bossGroup 创建第一个 NioEventLoop 线程

NioEventLoop 的启动入口在AbstractUnsafe

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ......
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

其中inEventLoop()方法调用的是AbstractEventExecutor的实现

    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }

而这个实现又调用了子类SingleThreadEventExecutor的如下方法

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }

在服务端刚启动的时候,Thread.currentThread()就是当前 main 方法对应的主线程,而this.thread还没有开始赋值,所以此时为null,

所以eventLoop.inEventLoop()在一开始调用的时候,返回的是 false,进入AbstractUnsafe的如下else逻辑中

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ......
            AbstractChannel.this.eventLoop = eventLoop;
            // 首次执行的时候 eventLoop.inEventLoop() 返回 false,执行 else 逻辑
            if (eventLoop.inEventLoop()) {
                ......
            } else {
               ......
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
               ......
            }
        }

其中executor方法对应的是SingleThreadEventExecutorexecute方法

    private void execute(Runnable task, boolean immediate) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown()) {
                ......
            }
        }

        if (!addTaskWakesUp && immediate) {
           ......
        }
    }

inEventLoop()经过上述分析,为false,所以执行startThread()方法

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

这里主要的逻辑就是判断线程是否启动,如果没有启动,就调用doStartThread()启动。doStartThread()的逻辑是

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                ...
                SingleThreadEventExecutor.this.run();
                ......
            }
        });
    }

通过一个成员变量thread来保存ThreadPerTaskExecutor创建出来的线程(即:FastThreadLocalThread),NioEventLoop 保存完线程的引用之后,随即调用 run 方法。

workGroup 对应的 NioEventLoop 创建线程和启动

workGroup 对应的 NioEventLoop 创建的线程主要做如下事情

  1. 执行一次事件轮询。首先轮询注册到 Reactor 线程对应的 Selector 上的所有 Channel 的 IO 事件。

  2. 处理产生 IO 事件的 Channel。如果有读写或者新连接接入事件,则处理:

  3. 处理任务队列。

以上三个步骤分别对应了下述三个方法

事件轮询

事件轮询调用了NioEventLoop的如下方法

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // Timeout will only be 0 if deadline is within 5 microsecs
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

处理 IO 事件的 Channel

调用的是NioEventLoop的如下方法

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            // 处理优化过的 SelectedKeys
            processSelectedKeysOptimized();
        } else {
            // 处理正常的 SelectedKeys
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

上述两个分支分别处理了不同类型的 key:重点关注优化过的 SelectedKeys,selectedKeys 在 NioEventLoop 中是一个SelectedSelectionKeySet对象,这个对象虽然叫Set,但是底层使用了数组

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    ......
}

add 方法的主要流程是:

  1. 将SelectionKey塞到该数组的尾部;

  2. 更新该数组的逻辑长度+1;

  3. 如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容。

待程序运行一段时间后,等数组的长度足够长,每次在轮询到 NIO 事件的时候,Netty 只需要O(1)的时间复杂度就能将SelectionKey塞到set中去,而 JDK 底层使用的HashSet,put的时间复杂度最少是O(1),最差是O(n)。

进入processSelectedKeysOptimized方法

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See //github.com/netty/netty/issues/2363
            selectedKeys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See //github.com/netty/netty/issues/2363
                for (;;) {
                    i++;
                    if (selectedKeys[i] == null) {
                        break;
                    }
                    selectedKeys[i] = null;
                }

                selectAgain();
                // Need to flip the optimized selectedKeys to get the right reference to the array
                // and reset the index to -1 which will then set to 0 on the for loop
                // to start over again.
                //
                // See //github.com/netty/netty/issues/1523
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }

主要是三个步骤:

第一步,取出 IO 事件及对应的 Channel。其中selectedKeys[i] = null;的目的是防止内存泄漏

第二步,处理 Channel

if (a instanceof AbstractNioChannel) {
    processSelectedKey(k, (AbstractNioChannel) a);
} else {
    @SuppressWarnings("unchecked")
    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
    processSelectedKey(k, task);
}

Netty 的轮询注册机制其实是将 AbstractNioChannel 内部的 JDK 类 SelectableChannel 对象注册到 JDK 类 Selector 对象上,并且将 AbstractNioChannel 作为SelectableChannel 对象的一个 attachment 附属上,这样在 JDK 轮询出某条 SelectableChannel 有 IO 事件发生时,就可以直接取出 AbstractNioChannel 进行后续操作。

在Netty的Channel中,有两大类型的Channel,

一个是NioServerSocketChannel,由boss NioEventLoopGroup负责处理;

一个是NioSocketChannel,由worker NioEventLoop负责处理,

所以:

(1)对于boss NioEventLoop来说,轮询到的是连接事件,后续通过NioServerSocketChannel的Pipeline将连接交给一个worker NioEventLoop处理;

(2)对于worker NioEventLoop来说,轮询到的是读写事件,后续通过NioSocketChannel的Pipeline将读取到的数据传递给每个ChannelHandler来处理。

第三步,判断是否需要再一次轮询

needsToSelectAgain变量控制,needsToSelectAgain变量在如下方法中被调用,在NioEventLoop

private static final int CLEANUP_INTERVAL = 256;
    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

cancel方法是用于将key取消,并且在被取消的key到达CLEANUP_INTERVAL的时候,设置needsToSelectAgain为 true,CLEANUP_INTERVAL默认值为256。

也就是说,对于每个NioEventLoop而言,每隔256个Channel从Selector上移除的时候,就标记needsToSelectAgain为true,然后将SelectedKeys的内部数组全部清空,方便JVM垃圾回收,然后调用selectAgain重新填装SelectionKeys数组。

处理任务队列

调用的是如下方法

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = getCurrentTimeNanos();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

一路追踪下去,进入SingleThreadEventExecutorofferTask()方法

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }

Netty 内部使用一个 taskQueue 将Task保存起来。这个 taskQueue 其实是一个 MPSC Queue,每一个 NioEventLoop 都与它一一对应。

接下来执行SingleThreadEventExecutorrunAllTasks()方法

   protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = getCurrentTimeNanos();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = getCurrentTimeNanos();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

主要流程如下:

1.NioEventLoop在执行过程中不断检测是否有事件发生,如果有事件发生就处理,处理完事件之后再处理外部线程提交过来的异步任务。

2.在检测是否有事件发生的时候,为了保证异步任务的及时处理,只要有任务要处理,就立即停止事件检测,随即处理任务。

3.外部线程异步执行的任务分为两种:定时任务和普通任务,分别落地到 MpscQueue 和 PriorityQueue ,而 PriorityQueue 中的任务最终都会填充到 MpscQueue 中处理。

4.Netty每隔64个任务检查一次是否该退出任务循环。

完整代码见:hello-netty

本文所有图例见:processon: Netty学习笔记

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码

Tags: