並發編程從零開始(十一)-Atomic類

並發編程從零開始(十一)-Atomic類

7 Atomic類

7.1 AtomicInteger和AtomicLong

如下面程式碼所示,對於一個整數的加減操作,要保證執行緒安全,需要加鎖,也就是加synchronized關鍵字。

image-20211029092511458

但有了Concurrent包的Atomic相關的類之後,synchronized關鍵字可以用AtomicInteger代替,其性能更好,對應的程式碼變為:

image-20211029092538518

AtomicInteger的 getAndIncrement() 方法和 getAndDecrement() 方法都調用了一個方法:U.getAndAddInt(…) 方法,該方法基於CAS實現:

image-20211030132301484

do-while循環直到判斷條件返回true為止。該操作稱為自旋

image-20211030133736853

getAndAddInt 方法具有volatile的語義,也就是對所有執行緒都是同時可見的。

而 weakCompareAndSetInt 方法的實現:

image-20211030165050151

調用了 compareAndSetInt 方法,該方法的實現:

第一個參數表示要修改哪個對象的屬性值;

第二個參數是該對象屬性在記憶體的偏移量;

第三個參數表示期望值;

第四個參數表示要設置為的目標值。

7.1.1 悲觀鎖與樂觀鎖

對於悲觀鎖,認為數據發生並發衝突的概率很大,讀操作之前就上鎖。synchronized關鍵字,後面要講的ReentrantLock都是悲觀鎖的典型。

對於樂觀鎖,認為數據發生並發衝突的概率比較小,讀操作之前不上鎖。等到寫操作的時候,再判斷數據在此期間是否被其他執行緒修改了。如果被其他執行緒修改了,就把數據重新讀出來,重複該過程;如果沒有被修改,就寫回去。判斷數據是否被修改,同時寫回新值,這兩個操作要合成一個原子操作,也就是CAS ( Compare And Set )。

AtomicInteger的實現就是典型的樂觀鎖。

7.1.2 Unsafe的CAS詳解

Unsafe類是整個Concurrent包的基礎,裡面所有方法都是native的。具體到上面提到的compareAndSetInt方法,即:

image-20211030165845134

要特別說明一下第二個參數,它是一個long型的整數,經常被稱為xxxOffset,意思是某個成員變數在對應的類中的記憶體偏移量(該變數在記憶體中的位置),表示該成員變數本身。

第二個參數的值為AtomicInteger中的屬性VALUE:

image-20211030170438323

而Value為:

private static final long VALUE = U.objectFieldOffset(AtomicInteger.class,"value");

而Unsafe的 objectFieldOffset(…) 方法調用,就是為了找到AtomicInteger類中value屬性所在的記憶體偏移量。

objectFieldOffset 方法的實現:

image-20211030170606465

其中objectFieldOffset1的實現為:

image-20211030170630993

所有調用CAS的地方,都會先通過這個方法把成員變數轉換成一個Offset。以AtomicInteger為例:

image-20211030170646297

從上面程式碼可以看到,無論是Unsafe還是VALUE,都是靜態的,也就是類級別的,所有對象共用的。

此處的VALUE就代表了value變數本身,後面執行CAS操作的時候,不是直接操作value,而是操作VALUE。

7.1.3 自旋和阻塞

當一個執行緒拿不到鎖的時候,有以下兩種基本的等待策略:

  • 策略1:放棄CPU,進入阻塞狀態,等待後續被喚醒,再重新被作業系統調度。

  • 策略2:不放棄CPU,空轉,不斷重試,也就是所謂的「自旋」。

很顯然,如果是單核的CPU,只能用策略1。因為如果不放棄CPU,那麼其他執行緒無法運行,也就無法釋放鎖。但對於多CPU或者多核,策略2就很有用了,因為沒有執行緒切換的開銷。

AtomicInteger的實現就用的是「自旋」策略,如果拿不到鎖,就會一直重試。

注意:以上兩種策略並不互斥,可以結合使用。如果獲取不到鎖,先自旋;如果自旋還拿不到鎖,再阻塞,synchronized關鍵字就是這樣的實現策略。

除了AtomicInteger,AtomicLong也是同樣的原理。


7.2 AtomicBoolean和AtomicReference

7.2.1 為什麼需要AtomicBoolean

對於int或者long型變數,需要進行加減操作,所以要加鎖;但對於一個boolean類型來說,true或false的賦值和取值操作,加上volatile關鍵字就夠了,為什麼還需要AtomicBoolean呢?

這是因為往往要實現下面這種功能:

image-20211030171146030

也就是要實現 compare和set兩個操作合在一起的原子性,而這也正是CAS提供的功能。上面的程式碼,就變成:

if(compareAndSet(false,true)){
	// ...
}

同樣地,AtomicReference也需要同樣的功能,對應的方法如下:

image-20211030171309527

image-20211030171318559

其中,expect是舊的引用,update為新的引用。

7.2.2 如何支援boolean和double類型

在Unsafe類中,只提供了三種類型的CAS操作:int、long、Object(也就是引用類型)。即,在jdk的實現中,這三種CAS操作都是由底層實現的,其他類型的CAS操作都要轉換為這三種之一進行操作。

image-20211030171535911

其中的參數:

  1. 第一個參數是要修改的對象

  2. 第二個參數是對象的成員變數在記憶體中的位置(一個long型的整數)

  3. 第三個參數是該變數的舊值

  4. 第四個參數是該變數的新值。

AtomicBoolean類型如何支援?

對於用int型來代替的,在入參的時候,將boolean類型轉換成int類型;在返回值的時候,將int類型轉換成boolean類型。如下所示:

image-20211030171611848

如果是double類型,又如何支援呢?

這依賴double類型提供的一對double類型和long類型互轉的方法:

image-20211030171632311

image-20211030171637744

Unsafe類中的方法實現:

image-20211030171651704


7.3 AtomicStampedReference 和 AtomicMarkableReference

7.3.1 ABA問題與解決方法

到目前為止,CAS都是基於「值」來做比較的。但如果另外一個執行緒把變數的值從A改為B,再從B改回到A,那麼儘管修改過兩次,可是在當前執行緒做CAS操作的時候,卻會因為值沒變而認為數據沒有被其他執行緒修改過,這就是所謂的ABA問題

要解決 ABA 問題,不僅要比較「值」,還要比較「版本號」,而這正是 AtomicStampedReference做的事情,其對應的CAS方法如下:

image-20211030172100832

之前的 CAS只有兩個參數,這裡的 CAS有四個參數,後兩個參數就是版本號的舊值和新值。

當expectedReference != 對象當前的reference時,說明該數據肯定被其他執行緒修改過;

當expectedReference == 對象當前的reference時,再進一步比較expectedStamp是否等於對象當前的版本號,以此判斷數據是否被其他執行緒修改過。

7.3.2 為什麼沒有AtomicStampedInteger或AtomicStampedLong

要解決Integer或者Long型變數的ABA問題,為什麼只有AtomicStampedReference,而沒有AtomicStampedInteger或者AtomictStampedLong呢?

因為這裡要同時比較數據的「值」和「版本號」,而Integer型或者Long型的CAS沒有辦法同時比較兩個變數。

於是只能把值和版本號封裝成一個對象,也就是這裡面的Pair內部類,然後通過對象引用的CAS來實現。程式碼如下所示:

image-20211030172446634

image-20211030172525425

當使用的時候,在構造方法裡面傳入值和版本號兩個參數,應用程式對版本號進行累加操作,然後調用上面的CAS。如下所示:

image-20211030172547448

7.3.3 AtomicMarkableReference

AtomicMarkableReference與AtomicStampedReference原理類似,只是Pair裡面的版本號是boolean類型的,而不是整型的累加變數,如下所示:

image-20211030173012513

因為是boolean類型,只能有true、false 兩個版本號,所以並不能完全避免ABA問題,只是降低了ABA發生的概率。


7.4 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater和AtomicReferenceFieldUpdater

7.4.1 為什麼需要AtomicXXXFieldUpdater

如果一個類是自己編寫的,則可以在編寫的時候把成員變數定義為Atomic類型。但如果是一個已經有的類,在不能更改其源程式碼的情況下,要想實現對其成員變數的原子操作,就需要AtomicIntegerFieldUpdater、AtomicLongFieldUpdater 和 AtomicReferenceFieldUpdater。

通過AtomicIntegerFieldUpdater理解它們的實現原理。

AtomicIntegerFieldUpdater是一個抽象類。

首先,其構造方法是protected,不能直接構造其對象,必須通過它提供的一個靜態方法來創建,如下所示:

image-20211030173317141

方法 newUpdater 用於創建AtomicIntegerFieldUpdater類對象:

image-20211030173330600

newUpdater(…)靜態方法傳入的是要修改的類(不是對象)和對應的成員變數的名字,內部通過反射拿到這個類的成員變數,然後包裝成一個AtomicIntegerFieldUpdater對象。所以,這個對象表示的是的某個成員,而不是對象的成員變數。

若要修改某個對象的成員變數的值,再傳入相應的對象,如下所示:

image-20211030173359768

image-20211030173406439

accecssCheck方法的作用是檢查該obj是不是tclass類型,如果不是,則拒絕修改,拋出異常。

從程式碼可以看到,其 CAS 原理和 AtomictInteger 是一樣的,底層都調用了 Unsafe 的compareAndSetInt(…)方法。

7.4.2 限制條件

要想使用AtomicIntegerFieldUpdater修改成員變數,成員變數必須是volatile的int類型(不能是Integer包裝類),該限制從其構造方法中可以看到

image-20211030175156152

至於 AtomicLongFieldUpdater、AtomicReferenceFieldUpdater,也有類似的限制條件。其底層的CAS原理,也和AtomicLong、AtomicReference一樣。


7.5 AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray

Concurrent包提供了AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray三個數組元素的原子操作。注意,這裡並不是說對整個數組的操作是原子的,而是針對數組中一個元素的原子操作而言。

7.5.1 使用方式

以AtomicIntegerArray為例,其使用方式如下:

image-20211030174621148

相比於AtomicInteger的getAndIncrement()方法,這裡只是多了一個傳入參數:數組的下標i

其他方法也與此類似,相比於 AtomicInteger 的各種加減方法,也都是多一個下標 i,如下所示。

image-20211030174657640

image-20211030174705656

image-20211030174742375

image-20211030174750090

7.5.2 實現原理

其底層的CAS方法直接調用VarHandle中native的getAndAdd方法。如下所示:

image-20211030174841143

image-20211030174849008

明白了AtomicIntegerArray的實現原理,另外兩個數組的原子類實現原理與之類似。


7.6 Striped64與LongAdder

從JDK 8開始,針對Long型的原子操作,Java又提供了LongAdder、LongAccumulator;針對Double類型,Java提供了DoubleAdder、DoubleAccumulator。Striped64相關的類的繼承層次如下圖所示。

image-20211030174937927

7.6.1 LongAdder原理

AtomicLong內部是一個volatile long型變數,由多個執行緒對這個變數進行CAS操作。多個執行緒同時對一個變數進行CAS操作,在高並發的場景下仍不夠快,如果再要提高性能,該怎麼做呢?

把一個變數拆成多份,變為多個變數,有些類似於 ConcurrentHashMap 的分段鎖的例子。如下圖所示,把一個Long型拆成一個base變數外加多個Cell,每個Cell包裝了一個Long型變數。當多個執行緒並發累加的時候,如果並發度低,就直接加到base變數上;如果並發度高,衝突大,平攤到這些Cell上。在最後取值的時候,再把base和這些Cell求sum運算。

image-20211030175022121

以LongAdder的sum()方法為例,如下所示:

image-20211030175043594

由於無論是long,還是double,都是64位的。但因為沒有double型的CAS操作,所以是通過把double型轉化成long型來實現的。所以,上面的base和cell[]變數,是位於基類Striped64當中的。英文Striped意為「條帶」,也就是分片。

image-20211030180721782

7.6.2 最終一致性

在sum求和方法中,並沒有對cells[]數組加鎖。也就是說,一邊有執行緒對其執行求和操作,一邊還有執行緒修改數組裡的值,也就是最終一致性,而不是強一致性。這也類似於ConcurrentHashMap 中的clear()方法,一邊執行清空操作,一邊還有執行緒放入數據,clear()方法調用完畢後再讀取,hash map裡面可能還有元素。因此,在LongAdder適合高並發的統計場景,而不適合要對某個 Long 型變數進行嚴格同步的場景。

7.6.3 偽共享和快取行填充

在Cell類的定義中,用了一個獨特的註解@sun.misc.Contended,這是JDK 8之後才有的,背後涉及一個很重要的優化原理:偽共享與快取行填充。

image-20211030181530639

每個 CPU 都有自己的快取。快取與主記憶體進行數據交換的基本單位叫Cache Line(快取行)。在64位x86架構中,快取行是64位元組,也就是8個Long型的大小。這也意味著當快取失效,要刷新到主記憶體的時候,最少要刷新64位元組。

如下圖所示,主記憶體中有變數XYZ(假設每個變數都是一個Long型),被CPU1和CPU2分別讀入自己的快取,放在了同一行Cache Line裡面。當CPU1修改了X變數,它要失效整行Cache Line,也就是往匯流排上發消息,通知CPU 2對應的Cache Line失效。由於Cache Line是數據交換的基本單位,無法只失效X,要失效就會失效整行的Cache Line,這會導致YZ變數的快取也失效。

image-20211030181554168

雖然只修改了X變數,本應該只失效X變數的快取,但YZ變數也隨之失效。YZ變數的數據沒有修改,本應該很好地被 CPU1 和 CPU2 共享,卻沒做到,這就是所謂的「偽共享問題」。

問題的原因是,YZX變數處在了同一行Cache Line裡面。要解決這個問題,需要用到所謂的「快取行填充」,分別在XYZ後面加上7個無用的Long型,填充整個快取行,讓XYZ處在三行不同的快取行中,如下圖所示:

image-20211030181621233

聲明一個@jdk.internal.vm.annotation.Contended即可實現快取行的填充。之所以這個地方要用快取行填充,是為了不讓Cell[]數組中相鄰的元素落到同一個快取行里。

7.6.4 LongAdder 核心實現

下面來看LongAdder最核心的類加方法add(long x),自增、自減操作都是通過調用該方法實現的。

public void increment(){
	add(1L);
}

public void decrement(){
	add(-1L);
}

image-20211031002638169

當一個執行緒調用add(x)的時候,首先會嘗試使用casBase把x加到base變數上。如果不成功,則再用c.cas(…)方法嘗試把 x 加到 Cell 數組的某個元素上。如果還不成功,最後再調用longAccumulate(…)方法。

注意:Cell[]數組的大小始終是2的整數次方,在運行中會不斷擴容,每次擴容都是增長2倍。上面程式碼中的 cs[getProbe() & m] 其實就是對數組的大小取模。因為m=cs.length–1,getProbe()為該執行緒生成一個隨機數,用該隨機數對數組的長度取模。因為數組長度是2的整數次方,所以可以用&操作來優化取模運算。

對於一個執行緒來說,它並不在意到底是把x累加到base上面,還是累加到Cell[]數組上面,只要累加成功就可以。因此,這裡使用隨機數來實現Cell的長度取模。

如果兩次嘗試都不成功,則調用 longAccumulate(…)方法,該方法在 Striped64 裡面LongAccumulator也會用到,如下所示:

image-20211031002905561

image-20211031002926399

image-20211031002936543

7.6.5 LongAccumulator

LongAccumulator的原理和LongAdder類似,只是功能更強大,下面為兩者構造方法的對比:

image-20211031003100519

image-20211031003110111

LongAdder只能進行累加操作,並且初始值默認為0;LongAccumulator可以自己定義一個二元操作符,並且可以傳入一個初始值。

image-20211031003142800

操作符的左值,就是base變數或者Cells[]中元素的當前值;右值,就是add()方法傳入的參數x。

下面是LongAccumulator的accumulate(x)方法,與LongAdder的add(x)方法類似,最後都是調用的Striped64的LongAccumulate(…)方法。

唯一的差別就是LongAdder的add(x)方法調用的是casBase(b, b+x),這裡調用的是casBase(b, r),其中,r=function.applyAsLong(b=base, x)。

image-20211031003159561

7.6.6 DoubleAdder與DoubleAccumulator

DoubleAdder 其實也是用 long 型實現的,因為沒有 double 類型的 CAS 方法。下面是DoubleAdder的add(x)方法,和LongAdder的add(x)方法基本一樣,只是多了long和double類型的相互轉換。

image-20211031003236024

其中的關鍵Double.doubleToRawLongBits(Double.longBitsToDouble(b) + x),在讀出來的時候,它把 long 類型轉換成 double 類型,然後進行累加,累加的結果再轉換成 long 類型,通過CAS寫回去。

DoubleAccumulate也是Striped64的成員方法,和longAccumulate類似,也是多了long類型和double類型的互相轉換。

DoubleAccumulator和DoubleAdder的關係,與LongAccumulator和LongAdder的關係類似,只是多了一個二元操作符。