Netty為什麼不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新變數呢?

更多技術分享可關注我

前言

如果仔細閱讀過Netty的執行緒調度模型的源碼,或者NIO執行緒對象及其執行緒池的創建源碼,那麼肯定會遇到類似“AtomicIntegerFieldUpdater”的身影,不禁想知道——Netty為何不直接使用原子類包裝普通的比如計數的變數?

下面帶著這個疑問,深入Netty以及JDK源碼去窺探一二,順便學習先進的用法。原文:​Netty為什麼不直接用AtomicXXX,而要用AtomicXXXFieldUpdater去更新變數呢?

JDK的Atomic原子操作類實現機制

在JDK里,Atomic 開頭的原子操作類有很多,涉及到 Java 常用的數字類型的,基本都有相應的 Atomic 原子操作類,如下圖所示:

原子操作類都是執行緒安全的,編碼時可以放心大膽的使用。下面以其中常用的AtomicInteger原子類為例子,分析這些原子類的底層實現機制,輔助理解Netty為何沒有直接使用原子類。具體使用的demo就不寫了,想必Javaer都多少用過或者見過,直接看AtomicInteger類核心源碼:

 1 private volatile int value; // 簡化了部分非核心源碼   2  3 // 初始化,簡化了部分非核心源碼   4 public AtomicInteger(int initialValue) {   5     value = initialValue;   6 }   7 public final int get() {   8     return value;   9 }  10 // 自增 1,並返回自增之前的值      11 public final int getAndIncrement() {  12     return unsafe.getAndAddInt(this, valueOffset, 1);  13 }  14 // 自減 1,並返回自增之前的值      15 public final int getAndDecrement() {  16     return unsafe.getAndAddInt(this, valueOffset, -1);  17 }

以上,AtomicInteger可以對int類型的值進行執行緒安全的自增或者自減等操作。從源碼中可以看到,執行緒安全的操作方法底層都是使用unsafe方法實現,這是一個JDK的魔法類,能實現很多貼近底層的功能,所以並不是Java的實現的,但是能保證底層的這些getAndXXX操作都是執行緒安全的,關於unsafe具體的用法和細節,可以參考這篇文章Java魔法類:Unsafe應用解析(https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html,可能無法直接打開,複製黏貼到瀏覽器即可)

題外話:如果AtomicXXX的對象是自定義類型呢?不要慌,Java 也提供了自定義類型的原子操作類——AtomicReference,它操作的對象是個泛型對象,故能支援自定義的類型,其底層是沒有自增方法的,操作的方法可以作為函數入參傳遞,源碼如下:

 1 // 對 x 執行 accumulatorFunction 操作   2 // accumulatorFunction 是個函數,可以自定義想做的事情   3 // 返回老值   4 public final V getAndAccumulate(V x,   5                                 BinaryOperator<V> accumulatorFunction) {   6     // prev 是老值,next 是新值   7     V prev, next;   8     // 自旋 + CAS 保證一定可以替換老值   9     do {  10         prev = get();  11         // 執行自定義操作  12         next = accumulatorFunction.apply(prev, x);  13     } while (!compareAndSet(prev, next));  14     return prev;  15 }

 

JDK的AtomicXXXFieldUpdater原子更新器及其優勢

在Java5中,JDK就開始提供原子類了,當然也包括原子的更新器——即後綴為FieldUpdater的類,如下Integer、Long,還有一個自定義類型的原子更新器,共三類:

這些原子更新器常見於各種優秀的開源框架里,而很少被普通的業務程式設計師直接使用,其實這些原子更新器也可以被用來包裝共享變數(必須是volatile修飾的對象屬性),來為這些共享變數實現原子更新的功能。這些被包裝的共享變數可以是原生類型,也可以是引用類型,那麼不禁要問:已經有了原子類,為啥還額外提供一套原子更新器呢? 

簡單的說有兩個原因,以int變數為例,基於AtomicIntegerFieldUpdater實現的原子計數器,比單純的直接用AtomicInteger包裝int變數的花銷要小,因為前者只需要一個全局的靜態變數AtomicIntegerFieldUpdater即可包裝volatile修飾的非靜態共享變數,然後配合CAS就能實現原子更新,而這樣做,使得後續同一個類的每個對象中只需要共享這個靜態的原子更新器即可為對象計數器實現原子更新,而原子類是為同一個類的每個對象中都創建了一個計數器 + AtomicInteger對象,這種開銷顯然就比較大了。

下面看一個JDK使用原子更新器的例子,即JDK的BufferedInputStream,如下是源碼的片段節選:

 1 public class BufferedInputStream extends FilterInputStream {   2     private static int DEFAULT_BUFFER_SIZE = 8192;   3     private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;   4     protected volatile byte buf[];   5     /**   6      * Atomic updater to provide compareAndSet for buf. This is   7      * necessary because closes can be asynchronous. We use nullness   8      * of buf[] as primary indicator that this stream is closed. (The   9      * "in" field is also nulled out on close.)  10      */  11     private static final  12         AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =  13         AtomicReferenceFieldUpdater.newUpdater  14         (BufferedInputStream.class,  byte[].class, "buf");

可以看出,每個BufferedInputStream對象都包含了一個buf屬性,該屬性是對象屬性,且被volition修飾,並被原子更新器AtomicReferenceFieldUpdater包裝,注意這個引用類型的原子更新器是靜態類型的,這意味著不論用戶創建了多少個BufferedInputStream對象,在全局都只有這一個原子更新器被創建,這裡之所以不用原子類AtomicReference直接包裝buf屬性,是因為buf是一個byte數組,通常會是一個比較大的對象,如果用原子類直接包裝,那麼後續每個BufferedInputStream對象都會額外創建一個原子類的對象,會消耗更多的記憶體,負擔較重,因此JDK直接使用了原子更新器代替了原子類,Netty源碼中的類似使用也是如出一轍。

另外一個重要原因是使用原子更新器,不會破壞共享變數原來的結構,回到上述JDK的例子,buf對外仍然可以保留buf對象的原生數組屬性,只不過多了一個volatile修飾,外界可以直接獲取到這個byte數組實現一些業務邏輯,而且在必要的時候也能使用原子更新器實現原子更新,可謂兩頭不耽誤,靈活性較強!

還有一個可能的疑問點需要理解,即原子更新器雖然是靜態的,但是其修飾的共享變數確仍然是類的對象屬性,即每個類的對象仍然是只包含自己那獨一份的共享變數,不會因為原子更新器是靜態的,而受到任何影響。

結論:實現原子更新最佳的方式是直接使用原子更新器實現。一方面是更節省記憶體,另一方面是不破壞原始的共享變數,使用起來更靈活。當然如果是時延要求沒有那麼高的場景,那麼就不需要這麼嚴苛,直接使用原子類就OK,畢竟原子類的編碼簡單,開發效率高,不易出錯。

品Netty源碼,學習原子更新的最佳實現方式

前面說了很多理論,下面看一段Netty源碼,看Netty是如何優雅的使用原子更新器的。下面是Netty的NIO執行緒實現類——SingleThreadEventExecutor的部分源碼,省略了很多和本次分析無關的程式碼:

 1 /**   2  * Abstract base class for {@link OrderedEventExecutor}'s that execute all its submitted tasks in a single thread.   3  */   4 public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {   5     private static final int ST_NOT_STARTED = 1;   6     private static final int ST_STARTED = 2;   7     private static final int ST_SHUTTING_DOWN = 3;   8     private static final int ST_SHUTDOWN = 4;   9     private static final int ST_TERMINATED = 5;  10 11     private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER;  12     private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER;  13     private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);  14 15     static {  16         AtomicIntegerFieldUpdater<SingleThreadEventExecutor> updater =  17                 PlatformDependent.newAtomicIntegerFieldUpdater(SingleThreadEventExecutor.class, "state");  18         if (updater == null) {  19             updater = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");  20         }  21         STATE_UPDATER = updater;  22     }  23 24     private final Queue<Runnable> taskQueue;  25     private final Executor executor;  26     private volatile Thread thread;  27     private volatile int state = ST_NOT_STARTED;

以上截取了一小片段,並刪除了注釋,可以清晰的看到Netty封裝了JDK的Thread對象,一些標識執行緒狀態的靜態常量,執行緒執行器,非同步任務隊列,以及標識執行緒狀態的屬性state等,其中重點關注state,這個屬性是普通的共享變數,由volatile修飾,並且被靜態的原子更新器STATE_UPDATER包裝。

下面看NIO執行緒的啟動源碼:

 1     /**   2      * NioEventLoop執行緒啟動方法, 這裡會判斷本NIO執行緒是否已經啟動   3      */   4     private void startThread() {   5         if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {   6             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {   7                 doStartThread();   8             }   9         }  10     }

注釋寫到了,啟動NIO執行緒之前會做一次是否已經啟動的判斷,避免重複啟動,這個判斷邏輯就是前面提到的原子更新器實現的,當本NIO執行緒實例沒有啟動時,會做一次CAS計算,注意CAS對應作業系統的一個指令,是原子操作,如果是多個外部執行緒在啟動NIO執行緒,那麼同時只有一個外部執行緒能啟動成功一次,後續的執行緒不會重複啟動這個NIO執行緒。保證在NIO執行緒的一次生命周期內,外部執行緒只能調用一次doStartThread()方法,這樣可以實現無鎖更新,且沒有自旋,性能較好,這裡之所以不需要自旋,是因為啟動執行緒就應該是一鎚子買賣,啟動不成功,就說明是已經啟動了,直接跳過,無需重試。

 

在看一個自旋的用法:

在NIO執行緒被優雅(也可能異常)關閉時,會在死循環里,結合CAS演算法,原子更新當前NIO執行緒的狀態為關閉中。。。這裡有兩個注意事項:

1、和執行緒安全的啟動NIO執行緒的邏輯不一樣,更新執行緒狀態必須成功,不是一鎚子買賣,所以需要自旋重試,直到CAS操作成功

2、需要使用局部變數快取外部的共享變數的舊值,保證CAS操作執行期間該共享變數的舊值不被外部執行緒修改

3、同樣的,每次執行CAS操作之前,必須判斷一次舊值,只有符合更新條件,才真的執行CAS操作,否則說明已經被外界執行緒更新成功,無需重複操作,以提升性能。

 

Netty這樣做也側面反映Nerty的源碼確實很優秀,平時的業務開發,如果有類似場景,那麼可以參考學習這兩類用法。

 

總結使用原子更新器的注意事項:

1、包裝的必須是被volatile修飾的共享變數

2、包裝的必須是非靜態的共享變數

3、必須搭配CAS的套路自行實現比較並交換的邏輯

4、自行實現比較並交換的邏輯時需要注意:如果是非一鎚子買賣的原子更新操作,那麼必須用局部變數快取外部的共享變數的舊值,具體原因可以參考:Netty的執行緒調度模型分析(10)《多執行緒環境下,實例變數轉為局部變數的程式設計技巧》,且放在一個循環里操作,以保證最終一致性。

 

後記

dashuai的部落格是終身學習踐行者,大廠程式設計師,且專註於工作經驗、學習筆記的分享和日常吐槽,包括但不限於互聯網行業,附帶分享一些PDF電子書,資料,幫忙內推,歡迎拍磚!