JUC源碼學習筆記4——原子類,CAS,Volatile記憶體屏障,快取偽共享與UnSafe相關方法
JUC源碼學習筆記4——原子類,CAS,Volatile記憶體屏障,快取偽共享與UnSafe相關方法
volatile的原理和記憶體屏障參考《Java並發編程的藝術》
原子類源碼基於JDK8
一丶volatile 與記憶體屏障
volatile
修飾的欄位,Java執行緒模型保證所有執行緒看到這個變數值是一致的。
1.volatile是如何保證可見性
volatile
修飾的變數執行寫操作的時候多出lock
前綴指令的程式碼,lock前綴的指令會導致
- 將當前這個處理器快取行的數據寫回到系統記憶體
- 這個寫回記憶體的操作將導致其他CPU里快取了該地址記憶體的數據無效
為了提高處理速度,處理器不直接和記憶體通訊,而是先把系統記憶體的數據讀到內部快取後繼續操作,但是操作完不知道何時寫回記憶體。如果對volatile
修飾的變數執行寫操作,將會讓數據寫回到系統記憶體,但是其他執行緒還是使用快取中的舊值,還是會存在問題。所以在多處理器下為了保證每一個處理器快取時一致的,就會實現快取一致性協議,每個處理器通過嗅探匯流排上傳播的數據來檢查自己快取的數據是否過期,如果發現自己快取行中對應的記憶體地址被修改了,就會將當前處理器的快取行設置成無效,當前處理器對這個數據進行修改操作時,會重新從主記憶體拉取最新的數據到快取。
2 指令重排序
在程式執行時,為了提高性能,處理器和編譯器通常會對指令進行重排序。
- 編譯器優化重排序,編譯器在不改變語義的情況下,重新安排語句執行順序。
- 指令級別並行重排序,如果不存在數據依賴性,處理器改變語句對應機器指令的執行順序。
- 記憶體系統重排序,由於處理器使用快取和讀/寫緩衝區,這使得載入和存儲的操作看起來是亂序執行
為了保證記憶體可見性,Java編譯器在生成指令序列的適當位置會插入記憶體屏障來禁止處理器級別的(指令級別並行重排序,記憶體系統重排序)指令重排序
3 JMM中記憶體屏障的類型
不同硬體實現記憶體屏障的方式不同,Java記憶體模型屏蔽了這種底層硬體平台的差異,由JVM來為不同的平台生成相應的機器碼。
- Load Barrier: 在讀指令前插入讀屏障,可以讓高速快取中的數據失效,重新從主記憶體載入數據
- Store Barrier:在寫指令之後插入寫屏障,能讓寫入快取的最新數據寫回到主記憶體
實際使用中,又分為以下四種:
類型 | 解釋 |
---|---|
LoadLoad | 對於Load1,Loadload,Load2 ,確保Load1所要讀入的數據能夠在被Load2和後續的load指令訪問前讀入 |
StoreStore | 對於Store1,StoreStore,Store2 確保Store1的數據在Store2以及後續Store指令操作相關數據之前對其它處理器可見(例如向主存刷新數據)。 |
LoadStore | 對於 Load1; LoadStore; Store2 ,確保Load1的數據在Store2和後續Store指令被刷新之前讀取 |
StoreLoad | 對於Store1; StoreLoad; Load2 ,確保Store1的數據在被Load2和後續的Load指令讀取之前對其他處理器可見。StoreLoad屏障可以防止一個後續的load指令 不正確的使用了Store1的數據,而不是另一個處理器在相同記憶體位置寫入一個新數據。正因為如此,所以在下面所討論的處理器為了在屏障前讀取同樣記憶體位置存過的數據,必須使用一個StoreLoad屏障將存儲指令和後續的載入指令分開。Storeload屏障在幾乎所有的現代多處理器中都需要使用,但通常它的開銷也是最昂貴的。它們昂貴的部分原因是它們必須關閉通常的略過快取直接從寫緩衝區讀取數據的機制。這可能通過讓一個緩衝區進行充分刷新(flush),以及其他延遲的方式來實現。 |
4.volatile的記憶體語義
- 可見性:對一個volatile變數的讀一定能看到(任何執行緒)對這個volatile變數最後的寫
- 原子性:對任意單個volatile變數的讀和寫具有原子性,但是自增這種複合操作不具備原子性
5.volatile記憶體語義的實現
JMM為了實現volatile的記憶體語義限制了編譯器重排序和處理器重排序
- 當第一個操作是普通變數都或者寫且第二個操作是volatile寫時,編譯器不能重排序這兩個操作
- 當第二個操作是volatile寫時,無論第一個操作是什麼都不可以重排序,保證了volatile寫操作前的指令不會重排序到volatile寫之後
- 當第一個操作是volatile讀時,不管第二個操作是什麼,都不可重排序,保證了volatile讀之後的指令不會重排序到volatile讀之前
- 當第一個操作是volatile寫,第二個操作是volatile讀,不能重排序
為了實現volatile的記憶體語義,JMM在volatile讀和寫的時候會插入記憶體屏障
-
volatile寫的記憶體屏障
這裡的store store 屏障可以保證前面所有普通寫對所有處理器可見,實現了在volatile寫之前寫入快取的最新數據寫回到主記憶體
volatile寫之後的記憶體屏障,避免與後續的volatile讀寫出現重排序,由於虛擬機無法判斷volatile寫之後是否需要一個store load屏障,比如在volatile寫之後立即return,為了保證volatile的記憶體語義,JMM十分保守的插入一個store load屏障。
-
volatile 讀的記憶體屏障
這裡的loadload保證了下面普通讀不可以在volatile讀之前,loadstore保證普通寫不可在volatile之前
二丶CAS
1.什麼是CAS
即比較並替換,實現並發演算法時常用到的一種技術。CAS操作包含三個操作數——記憶體位置、預期原值及新值。執行CAS操作的時候,將記憶體位置的值與預期原值比較,如果相匹配,那麼處理器會自動將該位置值更新為新值,否則,處理器不做任何操作。CAS是一條CPU的原子指令(cmpxchg指令),不會造成所謂的數據不一致問題,Unsafe提供的CAS方法(如compareAndSwapXXX)底層實現即為CPU指令cmpxchg。
2.CAS的缺點
-
ABA問題是指在CAS操作時,其他執行緒將變數值A改為了B,但是又被改回了A,等到本執行緒使用期望值A與當前變數進行比較時,發現變數A沒有變,於是CAS就將A值進行了交換操作,但是實際上該值已經被其他執行緒改變過,這與樂觀鎖的設計思想不符合。ABA問題的解決思路是,每次變數更新的時候把變數的版本號加1,那麼A-B-A就會變成A1-B2-A3,只要變數被某一執行緒修改過,改變數對應的版本號就會發生遞增變化,從而解決了ABA問題。
-
熱點數據更新問題
如果一個數據同時被1000個執行緒更新,那麼存在一個倒霉蛋執行緒自旋1000次才能成功修改,第一個成功的執行緒會導致999個執行緒失敗,999個執行緒必須自旋,依次類推,自旋是消耗CPU資源的,如果一直不成功,那麼會佔用CPU資源。
解決方法:破壞掉for死循環,當超過一定時間或者一定次數時,return退出。或者把熱點數據拆分開,最後再匯總
這些問題在後面的原子類程式碼中都有具體的實踐
三丶原子類
Java8在java.util.atomic
具有16個類,大致可以分為
- 原子更新基本類型
AtomicBoolean
AtomicInteger
AtomicLong
- 原子更新數組
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
原子更新引用數組
- 原子更新引用類型
AtomicReference
原子更新引用類型AtomicReferenceFieldUpdater
原子更新引用類型的欄位AtomicMarkableReference
原子更新代標記位的引用類型,可以更新一個布爾類型的標記位和引用類型。AtomicStampedReference
原子更新帶有版本號,引用類型,該類把版本和引用類型關聯起來,可以用於原子更新數據和數據的版本號,可以解決CAS出現的ABA問題
- 累加器
DoubleAccumulator
Doule類型累加器,支援函數是表達式描述值要如何變化DoubleAdder
Doule類型累加器,支援知道增大減小多少LongAccumulator
Long類型累加器,支援函數是表達式描述值要如何變化LongAdder
Long類型累加器,支援知道增大減小多少
四丶原子類源碼解析
1.原子更新基本類型
AtomicBoolean
,AtomicInteger
,AtomicLong
的原理類似,選擇AtomicInteger
看下。
1.1AtomicInteger 源碼解析
1.1.1 欄位和偏移量
-
使用
volatile
修飾內部int
類型的value 欄位private volatile int value; //value欄位就是用於存儲整形變數的,後續操作也是對這個欄位的CAS操作
volatile
修飾保證了value欄位對所有執行緒的可見性,也保證了對value的修改可以立即刷新會主存,以及對value的讀取操作也會從主存讀取。 -
靜態程式碼塊獲取
value
對於AtomicInteger
對象的偏移量private static final Unsafe unsafe = Unsafe.getUnsafe(); //value欄位偏移量 private static final long valueOffset; static { try { //調用Unsafe中的objectFieldOffset 方法獲取value欄位相對的偏移量 //cas操作需要需要知道當前value欄位的地址, //這個地址是相對AtomicInteger的偏移量, //知道AtomicInteger的地址再加上偏移就可以直接操作value地址的值了 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } }
1.1.2 構造方法
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
//無參構造value 為int 基本類型
}
1.1.3 獲取和設置value值
public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}
這裡沒有進行任何執行緒安全的控制,因為JMM保證了從主存讀取volatile
修飾的變數,和寫入volatile
修飾的變數是原子性的操作
1.1.4 獲取並賦值 getAndSet
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
這個方法獲取後賦值value為入參newValue,直接調用了Unsafe
的getAndSetInt
方法
public final int getAndSetInt(Object o, long offset, int newValue) {
//記錄CAS修改前的值
int v;
do {
//這裡和unsafe中的普通讀取是存在區別的
//獲取舊值,並賦值給v
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, newValue));
//如果CAS修改失敗,說明存在多個執行緒正在進行修改 那麼一直進行CAS
return v;
}
注意這裡的getIntVolatile
是帶有記憶體屏障的讀取volatile變數,如果這裡使用getInt
也許會導致重排序出現
public final int getAndSetInt(Object o, long offset, int newValue) {
//記錄CAS修改前的值
int v;
//獲取舊值,並賦值給v
v = getInt(o, offset);
do {
//導致這裡的CAS永遠不會成功 因為這裡讀取v 是沒有理解從主存刷新的
} while (!compareAndSwapInt(o, offset, v, newValue));
//如果CAS修改失敗,說明存在多個執行緒正在進行修改 那麼一直進行CAS
return v;
}
1.1.5 比較並設置 compareAndSet
public final boolean compareAndSet(int expect, int update) {
//入參依次是當前對象,value偏移量,期望值,更新目標
//當前對象,value偏移量可以定位到value欄位的地址
//執行CAS操作的時候,將記憶體位置的值與預期原值(expect)比較,
//如果相匹配,那麼處理器會自動將該位置值更新為新值(update),
//否則,處理器不做任何操作
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
//如果CAS更新成功返回true 否則返回false
//這個方法不會嘗試自旋到更新成功位置
}
1.1.6 獲取並自增1或自減1
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
還是調用的Unsafe
的getAndAddInt
方法
1.1.7 自增1,自增1並獲取,增加並獲取
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
1.1.8 支援函數式介面的幾個方法
這幾個方法式JDK8支援函數式介面後新增的方法
-
getAndAccumulate
public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { //舊值 prev = get(); //CAS將設置成的值 調用IntBinaryOperator獲取 next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return prev; }
比如說我要實現增大到舊值的x倍,並且返回舊值,那麼就可以使用
//這裡的2 就是增大兩倍, int doubleReturnPre = ai.getAndAccumulate(2, (pre, x) -> pre * x);
-
accumulateAndGet
public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) { int prev, next; do { prev = get(); next = accumulatorFunction.applyAsInt(prev, x); } while (!compareAndSet(prev, next)); return next; }
和
getAndAccumulate
不同在於返回是CAS更新成功的值,意味著下面這行程式碼返回的是增大後的值,而不是增大前的值//這裡的2 就是增大兩倍, int doubleReturnNew = ai.accumulateAndGet(2, (pre, x) -> pre * x);
-
updateAndGet
public final int updateAndGet(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return next; }
IntUnaryOperator的applyAsInt只接受一個參數,這裡傳入了當前值,可以在applyAsInt中定義如何更新。updateAndGet返回新值
-
getAndUpdate
public final int getAndUpdate(IntUnaryOperator updateFunction) { int prev, next; do { prev = get(); next = updateFunction.applyAsInt(prev); } while (!compareAndSet(prev, next)); return prev; }
和updateAndGet類似,返回的是舊值
1.1.9lazySet
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
lazySet提供一個store store屏障(在當代系統中是很低成本的操作,或者說沒有操作),但是沒有store load屏障,我的理解是把volatile的寫後store load替換成了store store,Store load屏障可以讓後續的load指令對其他處理器可見,但是需要將其他處理器的快取設置成無效讓它們重新從主層讀取,store store,是保證後續處理器在寫volatile變數的時候可以看見lazyset方法改變的值,但是後續的讀不保證一定可見,但是對於volatile變數的讀自然是會讀到最新值的,從而減少了開銷。lazySet的lazy 意味著最終數據的一致性,但是當前是進行了偷懶的(指store store替代了storeload)
2.AtomicBoolean
源碼基本上和AtomicInteger類似,但是並不是底層存的布爾類型,而是使用int類型,0代表false,1代表true
3.AtomicLong
和AtomicInteger類似
2.原子更新數組
AtomicIntegerArray
,AtomicLongArray
,AtomicReferenceArray
的原理類似,數組類型更新的問題在於,我要更新下標為i
的元素,我怎麼知道i這個元素的地址。如果我們知道第一個元素相對於對象的偏移base,和每個元素的偏移s,那麼第i個元素就是base+i*s
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
static {
//每個元素的大小
int scale = unsafe.arrayIndexScale(int[].class);
//必須是2的n次冪
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//每個元素大小為4 那麼第n個大小偏移就是n*4 也就是n<<2
//shift 是31 - scale的前導0 方便後續進行位移操作獲取第n個元素相對於第一個的偏移量
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
2.1計算第i個元素的位置
//前置檢查
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
//第n個元素的位置
//i*4+base==> i<<2 + base
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
2.2 獲取和設置
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
private int getRaw(long offset) {
//調用getIntVolatile 保證了可見性
return unsafe.getIntVolatile(array, offset);
}
//同樣設置也是調用putIntVolatile
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
其他方法和AtomicInteger
中大差不大都是調用Unsafe中的相關方法
3.原子更新引用
AtomicReference
還是老套路,不多贅述
3.1 原子更新引用的一個欄位
AtomicReferenceFieldUpdater
是一個抽象類,使用的時候必須調用newUpdater(持有欄位類的class,欄位類型,欄位名稱)
來獲取它的實現AtomicReferenceFieldUpdaterImpl
(調用了AtomicReferenceFieldUpdaterImpl
的構造方法涉及一些類載入器知識)後續的更新也是調用unsafe的cas相關操作
3.2 原子的更新引用和布爾標記
AtomicMarkableReference
可以同時更新引用和引用的標記,上面我們提到CAS的一個缺點——ABA問題,比如說,當前商店存在一個活動,如果賬戶辦理沖一百送50,每個賬戶依次機會,A用戶充值後獲得150元立馬消費成0元接著充值100,如果用普通的原子類AtomicInteger
程式還會再次送50元給用戶A(ABA問題,程式不知道是否贈送過了),我們可以使用鎖充值後獲取鎖往集合裡面記錄當前用戶贈送了,也可以使用AtomicMarkableReference
通過更新mark
來記錄用戶贈送過了
AtomicMarkableReference
內部維護了一個Pair
,並且private volatile Pair<V> pair
持有一個pair
3.2.1compareAndSet(舊引用,新引用,舊標記,新標記)
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
//當前AtomicMarkableReference 中的 Pair 記錄引用和 標記
Pair<V> current = pair;
return
//舊引用和Pair中引用相同,舊標記和Pair中的標記相同
expectedReference == current.reference &&
expectedMark == current.mark
&& //這裡是且
//新引用相同 且 新標記相同
((newReference == current.reference &&
newMark == current.mark)
|| //這裡是或
//CAS修改pair屬性
casPair(current, Pair.of(newReference, newMark)));
}
也就是說,首先要求舊值是和當前pair相同的,如果修改之前被其他執行緒修改了那麼短路返回false,如果引用從始至終都沒改變,那麼都不需要CAS操作,否則CAS pair屬性,下面是casPair
的源碼——還是老套路
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
3.2.2 attemptMark(舊引用,新標記)
public boolean attemptMark(V expectedReference, boolean newMark) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newMark == current.mark ||
casPair(current, Pair.of(expectedReference, newMark)));
}
和compareAndSet
的區別在於,其只要求引用相同,如果mark相同那麼什麼都不做,反之CAS修改pair
3.3 原子的更新引用和版本號
AtomicStampedReference
也是用來解決ABA問題的,不同的是其標記不只是true和false,可以是1,2,3等等等版本號,我們把AtomicMarkableReference
中活動改下,每一個賬戶可以參與3次活動,那麼在充值的時候我們把版本號加1,最後版本號來到3 表示這個賬戶參與了3次,後續充值就不贈送了。
AtomicStampedReference
實現和AtomicMarkableReference
簡直一模一樣,區別在於AtomicStampedReference
中Pair類是引用和版本號
4.累加器
上面我們提到CAS的缺點說到存在熱點數據更新導致多數執行緒失敗自旋的問題,其中一個解決辦法是自旋次數,失敗就返回活動太火爆
這種勸退消息,另外一種解決辦法是——熱點數據拆分開,最後再匯總。這個思路和ConcurrentHashMap
的分段鎖思路類似,既然我如同HashTable
導致性能低下(修改key A和B都受一把鎖的影響)那麼我把數據,不同的數據使用不同的鎖,就可以提高吞吐量了。在累加器中的體現就是,在最初無競爭時,只更新base的值,當有多執行緒競爭時通過分段的思想,讓不同的執行緒更新不同的段,最後把這些段相加就得到了完整存儲的值。
累加器的思路都類似,我們選擇LongAdder 和 LongAccumulator來看下
4.1 LongAdder
4.1.1 LongAdder的內部結構
LongAdder 內部有base用於在沒有競爭的情況下,進行CAS更新,其中還有Cell數組在衝突的時候根據執行緒唯一標識對Cell數組長度進行取模,讓執行緒去更新Cell數組中的內容。這樣最後的值就是 base+Cell數組之和,LongAdder自然只能保證最終一致性,如果邊更新邊獲取總和不能保證總和正確。
4.1.2 LongAdder的繼承關係
這裡比較迷惑的就是Striped64
這個類,此類是一個內部類,用於實現 Adder 和 Accumulator,我們上面所說的base,Cell數組其實就是在此類中的。
4.1.3 LongAdder 源碼解析
4.1.3.1Cell 類
此類位於Striped64
中,就是我們上面說的Cell數組進行熱點數據分離的Cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
//老套路 unsafe
private static final sun.misc.Unsafe UNSAFE;
//value 欄位的偏移量
private static final long valueOffset;
static {
//初始化 獲取unsafe 實例 以及獲取value 偏移量
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell
這個類還是老套路,唯一不同的是它類上面具有一個註解 @sun.misc.Contended
此註解會進行快取填充,避免快取偽共享
(這部分內容在文末);
4.1.3.2 Striped64中的屬性
NCPU
記錄了系統 CPU 的核數,因為真正的並發數最多只能是 CPU 核數,因此cells
數組一般要大於這個數。cells
數組,大小是 2 的次方,這樣將執行緒映射到cells
元素時方便計算。base
,基本數值,一般在無競爭能用上,同時在cells
初始化時也會用到。cellsBusy
,自旋鎖,在創建或擴充cells
時使用
4.1.3.3 LongAdder #void add(long x)
public void increment() {
add(1L);
}
public void decrement() {
add(-1L);
}
//LongAdder中增大和減小都是直接調用的add(long x) 方法
public void add(long x) {
//as ——Cells數組的引用
//b 獲取到的base值
//v 期望值
//m 標識Cells數組的長度-1
//a 標識當前執行緒命中的Cell單元格
Cell[] as; long b, v; int m; Cell a;
//如果 cells數組初始化了(Striped64是懶惰的初始化,沒有執行緒競爭的時候cells數組不會被初始化)
// 或者 cas的修改base值 失敗了(說明多個執行緒都在嘗試cas修改,出現了競爭)
if ((as = cells) != null || !casBase(b = base, b + x)) {
//沒有發生衝突的標識
boolean uncontended = true;
//as == null || (m = as.length - 1) < 0 表示如果cell數組為空
if (as == null || (m = as.length - 1) < 0 ||
//或者當前執行緒的cell單元沒有初始化
(a = as[getProbe() & m]) == null ||
//或者cas修改base失敗了
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
A.對於第一個if
-
從來沒有發生過競爭
並發量很低的時候
Cell
數組就是空,這個時候第一個if中的(as = cells) != null
就是false 會繼續執行後續的!casBase(b = base, b + x)
這一步會cas的更新bese 值- 如果cas更新base成功了,那麼皆大歡喜,直接結束了,說明當前並發還是很低
- 如果cas 更新失敗,說明這一瞬間有多個執行緒都在更新base值,並發比較高,當前執行緒是一個倒霉蛋,cas更新沒有搶過別人。這個時候會進入到 if程式碼塊中
-
之前發生過競爭
這個時候第一個if的
(as = cells) != null
就成立了 ,不會走第一個if中的cas操作,直接進入第二個if
B.對於第二個if
進入第二個if,當前執行緒需要把值更新到對應的cell中
-
as == null || (m = as.length - 1) < 0
這意味著cell數組沒有初始化,也就是說這是第一次存在高並發競爭的情況,那麼調用longAccumulate
這個方法會幫我們初始化cell數組的 -
(a = as[getProbe() & m]) == null
這意味著,cell數組初始化了,但是當前執行緒標識取模數組長度得到當前執行緒應該更新的cell為空- getProbe方法是獲取執行緒的一個標識,獲取的是當前執行緒中的
threadLocalRandomProbe
欄位,欄位沒有初始化的時候默認是0 - m是 cell數組的長度-1,cell數組的長度為2的n此冪,m的二進位全1,getProbe() & m就是對cell數組長度進行取模,這點在HashMap源碼中也使用到了
如果當前執行緒所屬的Cell為空,那麼也會調用
longAccumulate
這裡我們要關注一點 getProbe 方法初始的時候都是0,0取模任何數都是0 那麼每一個執行緒最開始都會分配第一個Cell, 那麼第一個Cell為空意味著什麼昵, 這個問題需要我們看完longAccumulate 方法才能揭曉 其實probe=0在longAccumulate方法中意味著 當前執行緒沒有和其他執行緒發生衝突更新 在longAccumulate 會初始化probe 設置衝突更新表示為false
- getProbe方法是獲取執行緒的一個標識,獲取的是當前執行緒中的
-
!(uncontended = a.cas(v = a.value, v + x))
這裡是調用Cell的cas方法,就是更新Cell對象中的value欄位,如果這裡cas失敗了,說明當前存在一個執行緒也在更新當前cell對象的value,兩個執行緒要更新一個cell,說明出現了衝突,也會調用longAccumulate
進行自旋更新cell單元格中的值。
4.1.3.4 Striped64
#longAccumulate
longAccumulate 方法非常長,我們拆看慢慢看
-
初始化
threadLocalRandomProbe
//如果是0 表示沒有是沒有初始化的 //這裡會為當前執行緒生成一個probe if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); //設置為沒有競爭, wasUncontended = true; }
-
自旋保證當前執行緒能把值寫入
A.如果Cell數組已經成功初始化,下面都是A的子情況
-
情況1:如果當前執行緒threadLocalRandomProbe取模後對應的cell為空,那麼我們需要在當前執行緒對應的位置new一個cell賦值上去
//as——cells數組引用 //a 當前執行緒對於的cell //n 當前數組長度 //v 期望值 //h 當前執行緒的threadLocalRandomProbe //x是當前執行緒要增加的值 Cell[] as; Cell a; int n; long v; //如果cells初始化了 if ((as = cells) != null && (n = as.length) > 0) { //如果當前執行緒threadLocalRandomProbe取模後對於的cell為空 //==========程式碼點1(後續解析中會使用到)============== if ((a = as[(n - 1) & h]) == null) { //cellsBusy是一個自旋鎖保證Cell數組的執行緒安全 //0代表無執行緒調整Cell數組大小or或創建單元格 //1 則反之 //==========程式碼點2(後續解析中會使用到)============== if (cellsBusy == 0) { //為當前執行緒創建一個cell, //x直接賦值給其中的value 後續求和會加上這個x,從而實現增加 Cell r = new Cell(x); //0代表無執行緒調整Cell數組大小or或創建單元格 //casCellsBusy 是從0設置成1 表示當前執行緒嘗試獲取這把鎖 //==========程式碼點3(後續解析中會使用到)============== if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { Cell[] rs; int m, j; //重新判斷cell數組初始化了,且當前cell是空 //看下方解析為何需要重新 //==========程式碼點4(後續解析中會使用到)============== if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //設置到cell數組上 rs[j] = r; created = true; } } finally { //釋放鎖 cellsBusy = 0; } //如果這裡成功創建了cell,說明成功把值加上去了 //那麼退出自旋 if (created) break; continue; // Slot is now non-empty } } collide = false; //....省略部分程式碼,這部分也會在後續解析 //重新刷新當前執行緒的Probe //==========程式碼點5(後續解析中會使用到)============== h = advanceProbe(h); }
這裡比較有意思有
-
在判斷
(a = as[(n - 1) & h]) == null
即當前執行緒對應的cell為空(程式碼點1
),首先在程式碼點2
是判斷了cellsBusy == 0
說明當前無執行緒在創建Cell單元格的,然後new了一個Cell,繼續在程式碼點3
還是會判斷cellsBusy == 0
,是由於我們在new一個cell的過程中可能存在消耗完時間片的情況,然後其他執行緒恰好可能已經獲得到了cellsBusy
這把鎖,這裡再次判斷cellsBusy
反之無腦獲取鎖執行casCellsBusy
,可以說doug lea真的是性能狂魔 -
在
程式碼點4
,來到程式碼點4
其實已經在程式碼點1
處已經判斷了當前執行緒對應的Cell單元格為空啊,為什麼這裡還要判斷一次昵,因為可能在當前new 一個cell的這段時間有另外一個執行緒也設置了這個位置的Cell,或者改變了cell數組,並且釋放了cellsBusy
鎖,為了保證此位置的Cell單元格的值不被當前執行緒無腦覆蓋,所有再次進行了判斷。 -
什麼時候會結束自旋,這段程式碼其實給出了一個答案——created為true
這裡的
created
只會在當前執行緒成功設置其對應的單元格為new Cell(增加的值)
時為true,也就代表著當前執行緒已經成功進行了一個增加操作 -
什麼時候會繼續自旋
-
程式碼點2
處的if (cellsBusy == 0)
不成立這意味著,當前執行緒對應的Cell為空,但是存在其他執行緒正在調整Cell數組大小or或創建單元格,為了保證Cell數組中的值不被覆蓋,這個時候會執行到
程式碼點5
調用advanceProbe
重新為當前執行緒生成一個probe//使用位操作,把當前執行緒的probe隨機打散,為啥這裡這樣進行位操作 //我只能說,可能時doug lea研究後的,或者他喜歡這個幾個數字 //但是這幾個數組都是質數,大概率後面是存在理論支撐的, static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; }
-
程式碼點3
處的if(cellsBusy == 0 && casCellsBusy())
不成立和上面的1差不多
-
if (created)
不成立這意味,
程式碼點4
處的判斷不成立,說明存在執行緒A已經完成了cell數組的擴容,導致當前執行緒對應的Cell改變了(數組長度擴大2,probe%長度=n,可能是原來的位置n,也可以是n+當前長度/2 )也可能是執行緒A給當前執行緒對應Cell單元格賦值了(執行緒A的probe對數組長度取模後和當前執行緒相同,但是執行緒A搶先一步設置了單元格)但是這時候不會調用到advanceProbe
因為可以沿用之前的probe找到對應的位置進行設置值,這個坑位還是可以設置值的只是有人搶先一步了,不能直接new Cell(x)
,需要讓這個Cell值增加x
,但是1和2調用advanceProbe
的原因是,為了提升性能,讓他隨便找個其他坑位做增加的操作。再次給看跪了,doug lea真性能狂魔
-
-
-
情況2:如果在
LongAdder#add
方法中對應Cell進行CAS失敗,那麼rehash後繼續自旋if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { //省略了情況1的程式碼 } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; //....省略部分程式碼,這部分也會在後續解析 //重新刷新當前執行緒的Probe h = advanceProbe(h);
wasUncontended
這個變數位false只可能是調用longAccumulate
這個方法入參就為false,讓我們回到LongAdder#add
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { //初始為true boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || //注意這裡 !(uncontended = a.cas(v = a.value, v + x))) //要讓uncontended為false //那麼說明上面的a.cas(v = a.value, v + x)失敗了 longAccumulate(x, null, uncontended) } }
這裡我們可以看到,必須是當前執行緒對其cell進行cas操作失敗才可能為false,這裡的false意味著,當前並發很高,有幾個老六執行緒在對這個一個cell進行cas,那麼這個時候會執行到
else if (!wasUncontended) wasUncontended = true
然後執行advanceProbe
,這意味著,只能說當前執行緒命不好
執行重新rehash下probe
換一個Cell單元格進行操作,可以理解為Java就業太卷了,換Go語言了。這樣做的好處是提高了其他cell單元格的利用率,性能up,這裡把wasUncontended
隨後設置為true,可以理解為,當前執行緒都要rehash了,後續發生還不行那就是「崗位不夠了」得擴容Cell數組了,後續也就用不著wasUncontended
-
情況3:成功把值增加到對應的Cell
if ((as = cells) != null && (n = as.length) > 0) { //省略講過的程式碼。。。 // 成功把值增加到對應的cell else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) //自旋結束 break;
這裡出現了自旋退出的另外一個情況,那就是當前執行緒成功把增加的值設置到其對應的cell單元格,這時候結束自旋,很合理。
這裡出現了一個
fn
指的是調用當前方法傳入的LongBinaryOperator
是一個函數式介面。LongAdder的add方法默認傳入的是空,會執行v + x
也就是增加cell單元格的值,這個LongBinaryOperator
在LongAccumulator
使用到,後續我們看下@FunctionalInterface public interface LongBinaryOperator { long applyAsLong(long left, long right); }
-
情況四:對Cell數組進行擴容
如果並發實在是太大了,Cell數組單元格的數量已經容納不下這麼多執行緒一起執行了,那麼為了避免想
AtomicLong
一樣無腦自旋,浪費CPU,這時候會選擇對Cell數組進行擴容。if ((as = cells) != null && (n = as.length) > 0) { //省略講過的程式碼。。。 //==========程式碼點1(後續解析中會使用到)============== //collide 表示擴容的意向,為true並不代表一定會擴容 //如果cell數組的長度大於了jvm可以使用的核心數 或者cells數組引用改變了 else if (n >= NCPU || cells != as) collide = false; //==========程式碼點2(後續解析中會使用到)============== else if (!collide) collide = true; //拿到cellsBusy這把鎖 else if (cellsBusy == 0 && casCellsBusy()) { try { //判斷下cells引用沒有改變 //==========程式碼點3(後續解析中會使用到)============== if (cells == as) { //擴容 擴大1倍 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; //改變cells應用指向 cells = rs; } } finally { //釋放鎖 cellsBusy = 0; } //擴容意向為false collide = false; continue; } //rehash probe h = advanceProbe(h); }
這裡有意思的點有
-
(n >= NCPU || cells != as)
如果cell數組長度已經大於等於jvm可以使用的cpu核心數了,或者cells引用指向改變了,那麼擴容意向設置為false,然後執行advanceProbe
對當前執行緒的probe進行rehashjava執行緒模型的學習筆記中,我們指出,java執行緒和作業系統是一對一模型,我理解這裡一個cpu核心執行在一個時刻運行一個執行緒,所以cells數組太大也沒什麼用。那麼為什麼
cells != as
成立也是進行rehash probe然後繼續自旋昵,這裡可以理解為當前執行緒嘗試對它對應的cell單元格進行cas操作,但是失敗了,這個時候發現cells != as
說明有其他執行緒對當前cell數組進行了擴容,從而改變了cells數組的引用指向(as是就的cells數組)為了防止多次擴容,這個時候就設置以下擴容意向為false 然後讓當前執行緒「從卷java,轉變為卷Go」換一個並發不那麼高的Cell數組單元格進行cas操作。 -
程式碼點2
進入到這裡的情況有(a = as[(n - 1) & h]) == null
但是被其他執行緒初始化了對應位置的Cell,cas設置對應cell失敗,cell數組已經達到jvm可用cpu,當前執行緒執行的途中沒有其他執行緒完成擴容。但是當前還是無法在自己對應的cell上成功進行cas,說明和其他執行緒發生了衝突,這個時候讓當前執行緒rehash以下probe然後再次自旋一次,如果還是無法在自己對應的cell進行cas操作,且沒有發生擴容的話會來到下面的3 -
拿到鎖,對cell數組進行擴容,進入這裡,說明沒有其他執行緒進行擴容,當前執行緒對應的cell不為null,但是對cell進行CAS操作還是失敗。這時候為了提高性能只能犧牲一點空間了,進行擴容。有意思的點在於
程式碼點3
,為什麼在這裡還是需要進行一次判斷昵,因為cellsBusy == 0 && casCellsBusy()
這兩個操作不是原子的,可能cellsBusy == 0
執行完失去了時間片,這時候有一個老六進行了擴容,改變了cells數組引用指向,並且釋放了鎖,這時候如果不做這個判斷,可能導致cell數組元素的丟失。後續就是對cells進行擴容,然後釋放鎖,設置擴容意向為false,然後continue
,注意這個continue
,這會導致當前執行緒不會執行advanceProbe
,為什麼昵,哥們都擴容了,你現在讓我換個格子,那我為啥要擴容,屬於是「Java太卷,但是我命由我不由天,為自己創造崗位」,後續這個執行緒進行自旋的時候隨機到的Cell數組可能還是原來的,可能是原來位置加上當前cell數組長度的一半,但是還是可以把一些「競爭者」分散開了
-
B.當前Cell數組沒有初始化,當前執行緒進行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //是否完成了初始化 boolean init = false; try { //==========程式碼點1(後續解析中會使用到)============== //確認沒有其他老六搶先初始化 if (cells == as) { // 初始化 Cell[] rs = new Cell[2]; //選擇一個格子 設置為x,probe奇數那麼選擇rs[1] 反之rs[0] rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //釋放鎖 cellsBusy = 0; } //如果成功初始化 那麼結束 if (init) break; }
-
這裡有意思的點,在程式碼點1
還是會進行cells == as
的判斷,這是由於cellsBusy == 0 && cells == as && casCellsBusy()
並不是一個原子操作,可能存在其他執行緒,搶先初始化cell數組,所以需要再次判斷以下。這裡我們可以看到初始化的cell數組大小為2,後續都是擴大一倍
C.Cell沒有初始化,但是當前執行緒嘗試初始化失敗,嘗試操作base值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
來到這裡,說明A和B都是不成立的,也就意味著,當前執行緒進來的時候發現cell沒有初始化,然後來到B,但是cellsBusy == 0 && cells == as && casCellsBusy()
發現不成立,不成立的情況有
cellsBusy == 0
不成立,說明之前有執行緒已經拿到鎖了,正在初始化cells == as
不成立,有一個執行緒已經完成了初始化,導致cell引用指向改變casCellsBusy()
不成立,競爭鎖的過程中失敗了
這個時候會讓當前執行緒嘗試更新下base值,說不定很多執行緒都在嘗試更新cell元素,這個時候更新下base 可能也許會成功。
4.1.3.5 sum
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
沒什麼好說的,強於doug lea也只能保證最終一致性,顯然如果存在其他執行緒並發add的時候,這個方法只能拿到快照
數據
4.1.3.6 reset
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
沒什麼好說的,執行緒不安全,如果存在其他執行緒add,這時候調用reset,可能導致並沒有reset成功,或者說如果其他執行緒擴容到一般,調用reset,那麼reset也會不成功。還有一點是reset並不會改變cell數組大小
4.1.3.7 sumThenReset
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
求和並設置為0並不會改變cell數組大小。
4.2 LongAccumulator
大致和LongAdder類似,LongAccumulator
需要指定如何如果操作 ——LongBinaryOperator(舊值沒有衝突時時base,衝突時是cell,accumulate傳入的值)
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
4.2.1 accumulate
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
//cell數組沒有初始化
if ((as = cells) != null ||
//或者 需要更新,cas失敗
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
//cell數組沒有初始化
if (as == null || (m = as.length - 1) < 0 ||
//或者當前執行緒對應的 cell為null
(a = as[getProbe() & m]) == null ||
!(uncontended =
//需要更新
(r = function.applyAsLong(v = a.value, x)) == v ||
//或cas失敗
a.cas(v, r)))
//這裡傳入了 function 在 longAccumulate中就不是簡單的自增了
longAccumulate(x, function, uncontended);
}
}
4.2.3 get
public long get() {
Cell[] as = cells; Cell a;
long result = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
result = function.applyAsLong(result, a.value);
}
}
return result;
}
這一這裡返回的值,不是進行累加而是function.applyAsLong(result, a.value)
取決於你定義的操作——LongBinaryOperator
4.3 DoubleAdder 和DoubleAccumulator
和LongAdder 與LongAccumulator 類似,但是是通過把Double轉換成Long調用doubleAccumulate
來完成的
五.快取偽共享
我們在解析LongAdder
源碼的時候看到 Striped64
中的Cell
類上面存在一個@sun.misc.Contended
的註解,我們說這是為了反正快取偽共享,下面我們聊下啥是快取偽共享
@sun.misc.Contended static final class Cell {
volatile long value;
//省略部分程式碼
}
1.什麼是偽共享
快取是由快取行組成的,通常一個快取行是 64 位元組,在程式運行的過程中,快取每次更新都從主記憶體中載入連續的 64 個位元組。因此,如果訪問一個 long 類型的數組時,當數組中的一個值被載入到快取中時,另外 7 個連續的元素也會被載入到快取中,地址上不連續的就不會載入到同一個快取行了。這種免費載入也有一個壞處。設想如果我們有個 long 類型的變數 a,它不是數組的一部分,而是一個單獨的變數,並且還有另外一個 long 類型的變數 b 緊挨著它,那麼當載入 a 的時候將免費載入 b。如果一個 CPU 核心的執行緒在對 a 進行修改,另一個 CPU 核心的執行緒卻在對 b 進行讀取,當前者修改 a 時,會把 a 和 b 同時載入到前者核心的快取行中,更新完 a 後其它所有包含 a 的快取行都將失效,因為其它快取中的 a 不是最新值了,而當後者讀取 b 時,發現這個快取行已經失效了,需要從主記憶體中重新載入
。這就很坑爹了,我只是想更新a,但是卻讓有效的b無效了。
2.解決偽共享的辦法
2.1填充無用位元組
只要我填一些無用的位元組,在a和b之間,讓a和b不在一個快取行中就解決了這個問題,但是現在虛擬機很聰明,會對我們手動填充的無用位元組進行忽視
2.2 使用@sun.misc.Contended
這也是一種填充無用位元組的做法,但是是jvm幫我填充。
如下Long1這個類標註了@sun.misc.Contended
我們在啟動的jvm的時候加上 -XX:-RestrictContended
對比不加 @sun.misc.Contended
註解的時候,其實有很大的差別(幾個數量級的差距)
public static void main(String[] args) {
test2();
}
private static void test2() {
Long1 long1 = new Long1();
CountDownLatch latch = new CountDownLatch(2);
long start = System.currentTimeMillis();
new Thread(() -> {
for (int i = 0; i < 1000000000; i++) {
long1.l1++;
}
latch.countDown();
}).start();
new Thread(() -> {
for (int i = 0; i < 1000000000; i++) {
long1.l2++;
}
latch.countDown();
}).start();
try {
latch.await();
long end = System.currentTimeMillis();
System.out.println(end - start);
} catch (InterruptedException e) {
}
}
@Contended
static class Long1 {
private volatile long l1;
private volatile long l2;
}
3.為什麼Cell
要加@sun.misc.Contended
如果使用@sun.misc.Contended
那麼ACell 和BCell不在一個緩衝行,就不會發生這樣的情況了,從主記憶體載入數據到快取還是需要消耗一定時間的。