netty源碼分析之EventLoop中的執行緒FastThreadLocalThread和隊列

每個NioEventLoop有著自己的任務隊列(taskQueue=mpscQueue和延遲隊列PriorityQueue)和自己的處理執行緒(FastThreadLocalThread),同時也維護著自己的Selector。如果是bossGroup,在ServerBootstrap初始化時會去Selector上註冊ServerSocketChannel,同時在pipeline中添加ServerBootstrapAcceptor。io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)方法中會在(readyOps & (SelectionKey.OPREAD | SelectionKey.OPACCEPT)) != 0 || readyOps == 0成立時進入unsafe.read(),此時也就是表明有連接進入,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法中會調用doReadMessages(readBuf)方法,在io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages方法中會通過NioSocketChannel nioSocketChannel = new NioSocketChannel(this, ch);buf.add(nioSocketChannel);的方式將NioSocketChannel放入List類型的readBuf中。此時回到io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read方法中,會遍歷readBuf列表,調用pipeline.fireChannelRead(readBuf.get(i))方法,這樣以來就會觸發ServerBootstrapAcceptor中的channelRead方法,在ServerBootstrapAcceptor的channelRead方法中調用childGroup.register(child)方法,這個childGroup就是創建ServerBootstrap時傳入的workerGroup,這個child就是NioSocketChannel類型的對象。在register之後,每個NioEventLoop執行緒都會在維護自身的task隊列(普通任務隊列與定時任務)的同時,在它的run方法中還會不停地執行select,在doRegister方法中會調用pipeline.fireChannelActive();方法,在方法里會在pipeline中進行傳播,調用io.netty.channel.DefaultChannelPipeline.HeadContext#read方法,繼而調用unsafe.beginRead()方法,在io.netty.channel.nio.AbstractNioChannel#doBeginRead方法中會進行selectionKey.interestOps(interestOps | readInterestOp)方法的調用。關於接入流程可以參考:https://www.cnblogs.com/ZhuChangwu/p/11210437.html,這些處理的主要是網路IO事件,它的任務事件是放入隊列中來進行處理的。

io.netty.channel.nio.NioEventLoop

類繼承關係:

它繼承自SingleThreadEventLoop,它的超類是SingleThreadEventExecutor。而在下面你會發現NioEventLoopGroup中維護著多個SingleThreadEventExecutor。先來看下NioEventLoop和SingleThreadEventLoop、SingleThreadEventExecutor的程式碼。

SingleThreadEventExecutor

因為這個Executor的主要作用是維護其中的FastThreadLocalThread的生命周期,我們來依照這條線進行分析:

  • 執行緒創建:
 protected SingleThreadEventExecutor(     EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {     ----------------省略部分程式碼--------------------     this.parent = parent;     this.addTaskWakesUp = addTaskWakesUp;     // 創建執行緒              thread = threadFactory.newThread(new Runnable() {     @Override     public void run() {     boolean success = false;                      updateLastExecutionTime();     try {     // 回調executor的run方法     SingleThreadEventExecutor.this.run();                          success = true;     } catch (Throwable t) {                          logger.warn("Unexpected exception from an event executor: ", t);     } finally {     for (;;) {     int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);   

使用threadFactory來創建執行緒,創建的是FastThreadLocalThread,這個在下文中會詳細分析。在創建的執行緒的run方法中會回調SingleThreadEventExecutor的run方法。

  • 執行緒狀態:
 private static final int ST_NOT_STARTED = 1;     private static final int ST_STARTED = 2;     private static final int ST_SHUTTING_DOWN = 3;     private static final int ST_SHUTDOWN = 4;     private static final int ST_TERMINATED = 5;           private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;     static {     AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater = PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");     if (updater == null) {                  updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");     }              STATE_UPDATER = updater;     }   

通過一個AtomicIntegerFieldUpdater變數來維護著執行緒的狀態變化。

  • 執行緒喚醒,為什麼要有執行緒喚醒呢,我們來看下這個SingleThreadEventExecutor的實現類NioEventLoop中對run方法的實現:
 @Override     protected void run() {     for (;;) {     boolean oldWakenUp = wakenUp.getAndSet(false);     try {     if (hasTasks()) {                          selectNow();     } else {     // wakeup是要在調用selector.wakeup()之前來校檢來減少wake-up發生,這是因為selector.wakeup()是一個昂貴的操作                          select(oldWakenUp);     ------------省略部分------     if (wakenUp.get()) {                              selector.wakeup();     }     }                      cancelledKeys = 0;                      needsToSelectAgain = false;     final int ioRatio = this.ioRatio;     if (ioRatio == 100) {                          processSelectedKeys();                          runAllTasks();     } else {     final long ioStartTime = System.nanoTime();                          processSelectedKeys();     final long ioTime = System.nanoTime() - ioStartTime;                          runAllTasks(ioTime * (100 - ioRatio) / ioRatio);     }     if (isShuttingDown()) {                          closeAll();     if (confirmShutdown()) {     break;     }     }   

NIO中的Selector封裝了底層的系統調用,其中wakeup用於喚醒阻塞在select方法上的執行緒,它的實現很簡單,在linux上就是創建一 個管道並加入poll的fd集合,wakeup就是往管道里寫一個位元組,那麼阻塞的poll方法有數據可讀就立即返回。

這裡有必要再提一下ioRatio,這個參數提供了一個粗略的機制,用來大致控制處理IO相關(socket 讀,鏈接,關閉,掛起等)和非IO相關任務的時間分配比.非IO任務是,由於使用Executor介面,例如Executor#execute(..),而在EventLoopGroup隊列中的Runnable對象.參數值越小,越多的時間將消耗在非IO任務上.當前,100將禁止所有超時時間(詳見源碼runAllTasks(long timeoutNanos))並運行所有等待著的非IO任務.詳情參考官方issue:https://github.com/netty/netty/issues/6058。

控制wakeup的屬性為:

 private final boolean addTaskWakesUp;   

這裡關注下io.netty.util.concurrent.SingleThreadEventExecutor#execute方法:

 @Override     public void execute(Runnable task) {     ----------省略部分-------     boolean inEventLoop = inEventLoop();     if (inEventLoop) {//當前處理執行緒為EventLoop綁定執行緒時,放入隊列                  addTask(task);     } else {                  startThread();// 啟動新的eventLoop執行緒                  addTask(task);//添加任務入隊     if (isShutdown() && removeTask(task)) {                      reject();     }     }     // addTaskWakesUp為true就代表只有在觸發addTask(Runnable)方法時才會喚醒executor執行緒,默認為false     if (!addTaskWakesUp && wakesUpForTask(task)) {                  wakeup(inEventLoop);     }     }     // io.netty.channel.SingleThreadEventLoop#wakesUpForTask:     @Override     protected boolean wakesUpForTask(Runnable task) {     return !(task instanceof NonWakeupRunnable);     }     // 添加wakeup任務     protected void wakeup(boolean inEventLoop) {     if (!inEventLoop || STATE_UPDATER.get(this) == ST_SHUTTING_DOWN) {                  taskQueue.add(WAKEUP_TASK);     }     }   

該方法在之前的netty源碼分析中詳細地分析過,主要用於查看netty的IO執行緒的狀態,當前處理執行緒為EventLoop綁定執行緒時,放入隊列,否則啟動新的EventLoop執行緒並將任務入隊,並在執行緒處於shutdown狀態時將任務出列並執行拒絕策略。如果上面添加的不是NonWakeupRunnable類型的task,並且當前執行執行緒不是EventLoop執行緒或者當前執行緒的狀態為shutdown狀態時,添加一個WAKEUPTASK,會在io.netty.util.concurrent.SingleThreadEventExecutor#takeTask方法從隊列中取task時喚醒阻塞執行緒。

  • 關閉執行緒,在SingleThreadEventExecutor中有一個threadLock屬性:
 private final Semaphore threadLock = new Semaphore(0);

它的主要調用位於:

 @Override     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {     ------------省略部分程式碼--------------     if (threadLock.tryAcquire(timeout, unit)) {              threadLock.release();     }     return isTerminated();     }

threadLock是一個初始值為0的訊號量。一個初值為0的訊號量,當執行緒請求鎖時只會阻塞,這有什麼用呢?awaitTermination()方法揭曉答案,用來使其他執行緒阻塞等待原生執行緒關閉 。

那麼EventLoop執行緒的作用又是什麼呢?

處理IO事件

對於boss NioEventLoop也就是reactor執行緒來說,輪詢到的是基本上是連接事件(OP_ACCEPT):

源碼調用鏈:

1. io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)     public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {     super.group(parentGroup);     ----------     this.childGroup = childGroup;}    2. 上面將parentGroup傳入了super的group方法io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup):    public B group(EventLoopGroup group) {     -----------------------     this.group = group;     return (B) this;     }    傳給了AbstractBootstrap的group屬性。在AbstractBootstrap內部的io.netty.bootstrap.AbstractBootstrap#bind():    public ChannelFuture bind() {     -----------------------     return doBind(localAddress);     }     doBind方法:     private ChannelFuture doBind(final SocketAddress localAddress) {     final ChannelFuture regFuture = initAndRegister();     final Channel channel = regFuture.channel();     ---------------      io.netty.bootstrap.AbstractBootstrap#initAndRegister方法:     final ChannelFuture initAndRegister() {     final Channel channel = channelFactory().newChannel();     try {                init(channel);     ------------------------------     }     ChannelFuture regFuture = group().register(channel);//這裡是parentGroup   

這裡的group調用的是初始時傳入的parentGroup,緊接著進入io.netty.channel.EventLoopGroup#register(io.netty.channel.Channel)方法,該方法會根據傳入的Channel為ServerSocketChannel和SocketChannel來決定註冊不同的事件到Selector上,這裡主要是accept事件。執行register方法時會從boosGroup的執行緒組中使用EventExecutorChooser選擇出一個NioEventLoop來進行register操作,所以一般boosGroup中的執行緒數量都是一個。詳細分析參考之前的關於netty源碼分析的公眾號文章。這裡還有一點需要注意的是io.netty.bootstrap.ServerBootstrap#init方法:

 @Override     void init(Channel channel) throws Exception {     -------------------------------            p.addLast(new ChannelInitializer<Channel>() {     @Override     public void initChannel(Channel ch) throws Exception {                    ch.pipeline().addLast(new ServerBootstrapAcceptor(                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));     }     });     }   

這裡的ServerBootstrapAcceptor就是worker NioEventLoop工作的關鍵點了。

對於worker NioEventLoop來說,輪詢到的基本上是IO讀寫事件(以OP_READ為例):

這裡簡要地過一遍它的源碼流程:

//io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)     public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {     super.group(parentGroup);     ----------     this.childGroup = childGroup;}    //  這裡賦值給了childGroup屬性。    //接著看io.netty.bootstrap.ServerBootstrap#init方法,上面已經列出,主要是將childGroup傳給了ServerBootstrapAcceptor的childGroup屬性。我們來看下具體作用,在io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead方法:     public void channelRead(ChannelHandlerContext ctx, Object msg) {     final Channel child = (Channel) msg;                child.pipeline().addLast(childHandler);    --------------------------------------------------------     try {                    childGroup.register(child).addListener(new ChannelFutureListener() {     -----------------------------------------     });    在childGroup.register(child)中child對應的就是每個SocketChannel   

ServerBootstrapAcceptor就是大名鼎鼎的reactor模型中的acceptor,這裡childGroup.register(child)中child對應的就是每個SocketChannel,執行register方法時會從workerGroup的執行緒組中使用EventExecutorChooser選擇出一個NioEventLoop來進行register操作,主要執行Selector的事件註冊,這裡主要是讀寫事件。關於EventExecutorChooser和register的介紹之前的文章中有過詳細分析,這裡不再贅述。

任務處理

處理用戶產生的普通任務:

NioEventLoop中的Queue taskQueue被用來承載用戶產生的普通Task。任務入列方法io.netty.util.concurrent.SingleThreadEventExecutor#addTask:

protected void addTask(Runnable task) {     if (task == null) {     throw new NullPointerException("task");     }     if (isShutdown()) {                reject();     }            taskQueue.add(task);     }   

taskQueue的創建是io.netty.channel.nio.NioEventLoop#newTaskQueue方法:

 @Override     protected Queue<Runnable> newTaskQueue() {     // This event loop never calls takeTask()     return PlatformDependent.newMpscQueue();     }   

使用的是mpsc隊列,也就是多生產者單消費者隊列。

taskQueue被實現為netty的mpscQueue,即多生產者單消費者隊列。netty使用該隊列將外部用戶執行緒產生的Task聚集,並在reactor執行緒內部用單執行緒的方式串列執行隊列中的Task。

當用戶在非IO執行緒調用Channel的各種方法執行Channel相關的操作時,比如channel.write()、channel.flush()等,netty會將相關操作封裝成一個Task並放入taskQueue中,保證相關操作在IO執行緒中串列執行。

處理用戶產生的定時任務:

關於定時任務就需要看下io.netty.util.concurrent.SingleThreadEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)方法程式碼:

 @Override     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {     ----------省略部分程式碼---------------------     return schedule(new ScheduledFutureTask<Void>(     this, delayedTaskQueue, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));     }   

ScheduledFutureTask中傳入的隊列為delayedTaskQueue:

final Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();

NioEventLoop中的Queue> delayedTaskQueue = new PriorityQueue被用來承載用戶產生的定時Task。它是一個優先隊列。

當用戶在非IO執行緒需要產生定時操作時,netty將用戶的定時操作封裝成ScheduledFutureTask,即一個netty內部的定時Task,並將定時Task放入delayedTaskQueue中等待對應Channel的IO執行緒串列執行。

為了解決多執行緒並發寫入delayedTaskQueue的問題,netty將添加ScheduledFutureTask到delayedTaskQueue中的操作封裝成普通Task,放入taskQueue中,通過NioEventLoop的IO執行緒對delayedTaskQueue進行單執行緒寫操作。

處理任務隊列的邏輯:

  1. 將已到期的定時Task從delayedTaskQueue中轉移到taskQueue中
  2. 計算本次循環執行的截止時間
  3. 循環執行taskQueue中的任務,每隔64個任務檢查一下是否已過截止時間,直到taskQueue中任務全部執行完或者超過執行截止時間。

io.netty.util.concurrent.SingleThreadEventExecutor#takeTask方法:

在io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromDelayedQueue方法內部進行任務遷移的操作:

 private void fetchFromDelayedQueue() {     long nanoTime = 0L;     for (;;) {     ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();     if (delayedTask == null) {     break;     }           if (nanoTime == 0L) {                    nanoTime = ScheduledFutureTask.nanoTime();     }           if (delayedTask.deadlineNanos() <= nanoTime) {                    delayedTaskQueue.remove();// 從delayedTaskQueue中移除                    taskQueue.add(delayedTask);// 添加到任務隊列中     } else {     break;     }     }     }   

io.netty.channel.nio.NioEventLoopGroup

NioEventLoopGroup中主要維護一組EventLoop,EventLoop實現了Executor介面,裡面維護著executor執行緒和方法。

NioEventLoopGroup的類繼承關係:

MultithreadEventLoopGroup

在這個類的靜態程式碼塊中對EventLoopGroup的默認執行緒數進行了初始化:

 static {            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(     "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));     }   

並且對ThreadFactory進行的初始化:

 @Override     protected ThreadFactory newDefaultThreadFactory() {     return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);     }   

io.netty.util.concurrent.DefaultThreadFactory#DefaultThreadFactory(java.lang.Class, int):

public DefaultThreadFactory(String poolName, boolean daemon, int priority) {     if (poolName == null) {     throw new NullPointerException("poolName");     }     if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {     throw new IllegalArgumentException(     "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");     }                  prefix = poolName + '-' + poolId.incrementAndGet() + '-';     this.daemon = daemon;     this.priority = priority;     }   

這裡我們主要關注一下它的newThread方法:

 @Override     public Thread newThread(Runnable r) {     Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());     --------------------------     }           protected Thread newThread(Runnable r, String name) {     return new FastThreadLocalThread(r, name);     }   

這裡創建的執行緒為FastThreadLocalThread,接著順便來分析下FastThreadLocalThread,先來看下它與普通執行緒不一樣的屬性和方法:

 private InternalThreadLocalMap threadLocalMap;     /**         * Returns the internal data structure that keeps the thread-local variables bound to this thread.         * Note that this method is for internal use only, and thus is subject to change at any time.         */     public final InternalThreadLocalMap threadLocalMap() {     return threadLocalMap;     }           /**         * Sets the internal data structure that keeps the thread-local variables bound to this thread.         * Note that this method is for internal use only, and thus is subject to change at any time.         */     public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {     this.threadLocalMap = threadLocalMap;     }   

JDK 中自帶的 ThreadLocal 在執行緒池使用環境中,有記憶體泄漏的風險,很明顯,Netty 為了避免這個 bug,重新進行了封裝。它主要用於與io.netty.util.concurrent.FastThreadLocal合用,就如同Thread與ThreadLocal合用一樣(關於ThreadLocal、InheritThreadLocal和TransmitableThreadLocal的源碼之前有一系列的文章分別分析過,需要詳細了解的請翻閱歷史文章)。我們知道解決hash衝突的辦法主要有以下幾種:

  1. 開放定址法(線性探測再散列,二次探測再散列,偽隨機探測再散列)
  2. 再次哈希法(rehash)
  3. 鏈地址法
  4. 建立一個公共溢出區

Java中hashmap的解決辦法就是採用的鏈地址法。這裡我們補充一下hashmap中的知識點:

  • JDK1.8 HashMap的優化:一方面引入紅黑樹解決過長鏈表效率低的問題;另一方面重寫resize方法,移除了alternative hashing相關方法,避免重新計算鍵的hash。
  • HashMap的執行緒不安全體現在:多執行緒同時put添加元素會丟失元素,多執行緒同時擴容會造成死循環。

關於FastThreadLocal,我們簡要來分析幾點:

緩衝行填充

io.netty.util.internal.InternalThreadLocalMap中有幾個long類型的屬性:

 // Cache line padding (must be public)     // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.     public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;   

對cpu快取行進行填充,防止因為偽共享導致的快取失效問題。

fastGet和slowGet方法

io.netty.util.internal.InternalThreadLocalMap#get:

 public static InternalThreadLocalMap get() {     Thread thread = Thread.currentThread();     if (thread instanceof FastThreadLocalThread) {     return fastGet((FastThreadLocalThread) thread);     } else {     return slowGet();     }     }   

會根據當前執行緒類型來決定走fastGet方法還是slowGet方法.

io.netty.util.internal.InternalThreadLocalMap#fastGet:

 private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {     InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();     if (threadLocalMap == null) {                thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());     }     return threadLocalMap;     }          // 構造方法:     private InternalThreadLocalMap() {     super(newIndexedVariableTable());     }     private static Object[] newIndexedVariableTable() {     Object[] array = new Object[32];     Arrays.fill(array, UNSET);     return array;     }          // super構造方法:     UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {     this.indexedVariables = indexedVariables;     }   

在fastGet方法中針對的是FastThreadLocalThread執行緒,也就是netty的內部執行緒(與EventLoop關聯使用的執行緒,用的是io.netty.util.internal.InternalThreadLocalMap。,這個 Map 內部維護的是一個數組,和 JDK 不同,JDK 維護的是一個使用線性探測法的 Map,可見,從底層數據結構上,JDK ThreadLocalMap就已經輸了,他們的讀取速度相差很大,特別是當數據量很大的時候,Netty 的數據結構速度依然不變,而 JDK ThreadLocalMap由於使用線性探測法,速度會相應的下降。

io.netty.util.internal.InternalThreadLocalMap#slowGet:


 private static InternalThreadLocalMap slowGet() {     ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;     if (slowThreadLocalMap == null) {     UnpaddedInternalThreadLocalMap.slowThreadLocalMap =                        slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();     }           InternalThreadLocalMap ret = slowThreadLocalMap.get();     if (ret == null) {                ret = new InternalThreadLocalMap();                slowThreadLocalMap.set(ret);     }     return ret;     }   
  • 這個方法針對的是普通執行緒,非FastThreadLocalThread執行緒。它使用的是ThreadLocal變數,在ThreadLocal變數內部存放的是InternalThreadLocalMap。在之前的文章中有詳細分析過ThreadLocal,它的內部是基於ThreadLocalMap實現的,ThreadLocalMap內部Entry是一個WeakReference類型(弱引用級別比軟引用更低。當對象根節點可及、無強引用和軟引用、有弱引用指向對象時,若發生GC,該對象將直接被回收)的hashMap的結構。上上面提到過,hashmap是線性探測法的 Map。
  • 這個方法首先使用 JDK 的 ThreadLocal 獲取一個 Netty 的 InternalThreadLocalMap,如果沒有就創建一個,並將這個 InternalThreadLocalMap 設置到 JDK 的 ThreadLocal 中,然後返回這個 InternalThreadLocalMap。從這裡可以看出,為了提高性能,Netty 還是避免使用了JDK 的 threadLocalMap,他的方式是曲線救國:在JDK 的 threadLocal 中設置 Netty 的 InternalThreadLocalMap ,然後,這個 InternalThreadLocalMap 中設置 Netty 的 FastThreadLcoal。

io.netty.util.concurrent.FastThreadLocal#set與get方法

 /**         * Set the value for the current thread.         */     public final void set(V value) {     if (value != InternalThreadLocalMap.UNSET) {                set(InternalThreadLocalMap.get(), value);     } else {                remove();     }     }          public final void set(InternalThreadLocalMap threadLocalMap, V value) {     if (value != InternalThreadLocalMap.UNSET) {     // 設置變數     if (threadLocalMap.setIndexedVariable(index, value)) {                    addToVariablesToRemove(threadLocalMap, this);     }     } else {                remove(threadLocalMap);     }     }          //index是在FastThreadLocal的構造方法中初始化的:    public FastThreadLocal() {            index = InternalThreadLocalMap.nextVariableIndex();     }          //io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex:     public static int nextVariableIndex() {     // 通過AtomicInteger維護     int index = nextIndex.getAndIncrement();     if (index < 0) {                nextIndex.decrementAndGet();     throw new IllegalStateException("too many thread-local indexed variables");     }     return index;     }   

它的每個變數值在set進去後可以根據index快速定位到指定index在數組中的值,看下get方法就很清晰了:

 /**         * Returns the current value for the current thread         */     public final V get() {     return get(InternalThreadLocalMap.get());     }           /**         * Returns the current value for the specified thread local map.         * The specified thread local map must be for the current thread.         */     @SuppressWarnings("unchecked")     public final V get(InternalThreadLocalMap threadLocalMap) {     Object v = threadLocalMap.indexedVariable(index);     if (v != InternalThreadLocalMap.UNSET) {     return (V) v;     }           return initialize(threadLocalMap);     }          // io.netty.util.internal.InternalThreadLocalMap#indexedVariable     public Object indexedVariable(int index) {     Object[] lookup = indexedVariables;     return index < lookup.length? lookup[index] : UNSET;     }   

它能夠根據index的值快速定位到數組中的元素,而它的索引是通過AtomicInteger來維護的.

拓容

private void expandIndexedVariableTableAndSet(int index, Object value) {     Object[] oldArray = indexedVariables;     final int oldCapacity = oldArray.length;     int newCapacity = index;            newCapacity |= newCapacity >>> 1;            newCapacity |= newCapacity >>> 2;            newCapacity |= newCapacity >>> 4;            newCapacity |= newCapacity >>> 8;            newCapacity |= newCapacity >>> 16;            newCapacity ++;           Object[] newArray = Arrays.copyOf(oldArray, newCapacity);     Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);            newArray[index] = value;            indexedVariables = newArray;     }   

這裡和hashMap的擴容對比著看就很好理解了,這段程式碼的作用就是按原來的容量擴容2倍。並且保證結果是2的冪次方。這裡 Netty 的做法和 HashMap 一樣,按照原來的容量擴容到最近的 2 的冪次方大小,比如原來32,就擴容到64,然後,將原來數組的內容填充到新數組中,剩餘的填充 空對象,然後將新數組賦值給成員變數 indexedVariables。完成了一次擴容。

從上面幾點可以看出FastThreadLocalThread與FastThreadLocal合併時的主要特點是快,更多細節請參考:https://www.jianshu.com/p/3fc2fbac4bb7和https://www.jianshu.com/p/6adfa89ed06e。而DefaultThreadFactory創建的執行緒都是FastThreadLocalThread類型的.