【深入淺出多執行緒】無鎖編程
多執行緒編程中,鎖是大家比較熟悉的概念,但對無鎖編程則沒有太多了解。無鎖編程是指不用加鎖的方式去解決原本需要加鎖才能解決的問題,從而使程式具有更高的性能,降低硬體成本。我們從並發開始說起。
一、並發相關概念
並發數:伺服器同時並行處理的請求數量。
QPS:每秒處理完成的請求數量,是衡量系統吞吐量的一種方式。例如:Tomcat接收了200個請求,但1秒內處理完畢的請求為20個,則QPS為20。
高並發:高並發是一個相對的概念,假設機器配置差勁(例如1核1G),如果在這個機器上部署一個項目,這時200個請求就已經到達該機器的上限了,那麼這時面臨的場景就是高並發的場景。處理高並發並不一定是並發量非常高,我們才會用到這樣的技術。可能你們公司的並發量只有1000,15萬的用戶量,依然可能會用到高並發的技術,為什麼——降低成本(用戶少,公司沒錢__)。
二、並發下的原子操作
Oricle官網上Java入門手冊中關於原子操作(Atomic Variables)的定義如下:
In programming, an atomic action is one that effectively happens all at once. An atomic action cannot stop in the middle: it either happens completely, or it doesn’t happen at all. No side effects of an atomic action are visible until the action is complete.
原子操作可以是一個步驟或多個操作步驟,要麼完全發生,要麼根本不發生,不能停在中間狀態。將整個操作視作一個整體,資源在該次操作中保持一致,這是執行緒安全中原子性的核心特徵。
三、並發的原子性問題
來看一個計數器的例子:
class Counter {
private int i = 0;
public void increment() {
i++;
}
public void decrement() {
i--;
}
public int value() {
return i;
}
}
我們為它配上啟動器,啟動器中開啟兩個執行緒,每個執行緒執行10000次increment()
函數調用,所以執行完預期計數器結果為20000。
public class Starter {
public static void main(String[] args) throws InterruptedException {
var counter = new Counter();
for (int i = 0; i < 2; i++) {
new Thread(() -> {
for (int j = 0; j < 10000; j++) {
counter.increment();
}
}).start();
}
Thread.sleep(5000);
System.out.println(counter.value());
}
}
運行結果一般是不正確的。
這顯然是一個不正確的結果,為什麼不正確?——i++
並沒有保證原子性。我們可以通過分析JVM執行的位元組碼(byte code)來一探究竟(IDEA可以通過安裝插件來實現,具體參考:IDEA下查看Java位元組碼插件):
0 aload_0
1 dup
2 getfield #2 <Counter.i : I>
5 iconst_1
6 iadd
7 putfield #2 <Counter.i : I>
10 return
可以看到位元組碼包括以下關鍵的三個步驟:
- 取值:
2 getfield #2 <Counter.i : I>
- 相加:
6 iadd
- 放回:
7 putfield #2 <Counter.i : I>
接下來結合多執行緒來進行分析為什麼i++
沒有實現原子性。
做了兩次計算,得出同樣的值,很明顯這不是我們期望的結果,不符合我們編寫程式的期望——操作失敗。這就是原子性問題的來源。
這裡舉一個計數器的應用——限流。限流的目的是超過一定數量的請求,不予操作。例如,有一個Controller方法:
@RestController
class XXXController {
final Counter counter = new Counter();
@GetController(...)
public void index() {
// 統計正在處理的請求數量
counter.increase();
try {
if (counter.value() > 3000) {
return;
}
// TODO: 很多業務邏輯,消耗CPU、記憶體,資源有限
} finally {
counter.decrease();
}
}
}
類似的限流在Redis
、SpringCloud
、Nginx
、微服務、網關中都有,其最核心技術一定是計數器。
三、並發控制策略
並發導致了上述問題,我們需要控制並發,以達到我們想要的效果。並發控制有兩種策略:悲觀策略(Pessimistic concurrency control)和樂觀策略(Optimistic concurrency control)。這是兩種解決並發問題的思路。
悲觀策略是一種悲觀的思想,即認為寫較多,遇到並發的可能性高,每次拿數據時,都會認為別人會修改數據,所以同一時間只允許一個執行緒執行。
樂觀策略是一種樂觀的思想,即認為讀多寫少,遇到並發的可能性低,每次拿數據時都認為別人不會修改,所以不會上鎖。但是在更新的時候會判斷判斷值有沒有發生變化,如果發生變化了,意味著其他執行緒已經更新,則此次計算更新失效,需要重新取值計算。
適用場景:
- 重試成本小;
- 並發數量小;
無鎖編程要先了解有鎖編程的問題。
四、鎖帶來的問題
按照悲觀策略的思想,我們當然可以通過加鎖的方式解決並發不一致問題,計數器例子的加鎖版本程式碼如下:
class SynchronizedCounter {
private int i = 0;
public synchronized void increment() {
i++;
}
public synchronized void decrement() {
i--;
}
public synchronized int value() {
return i;
}
}
但是加鎖存在一些問題:
我們知道,執行緒的數量一定的遠遠大於CPU數量的,如上圖所示,CPU只有3個,而執行緒數則可能成百上千。如何把這麼多的執行緒分配到有限的CPU上,這就是作業系統調度程式做的事情。JVM進程中的執行緒也是由作業系統中的調度器負責將其調度到指定的CPU上執行。
再來分析上面加鎖版本的計數器。執行緒-1和執行緒-2都在搶鎖,假設執行緒-1搶到了鎖,執行緒2沒有搶到鎖,執行緒2變為阻塞狀態。那麼,下一次執行緒-2
什麼時候?可能會在1ms之後,也可能10ms之後執行。也就是說,由於作業系統調度存在,所以這是不確定的。執行緒-1
執行完畢。
假設執行一次i++
需要0.05ms,則可能會發生以下場景:
第一次i++ 0.05ms
執行緒切換 — 10ms
第二次i++ 0.05ms
這就是說,每發生一次槍鎖失敗導致的執行緒切換,都會增加一定的執行緒切換時間、等待時間,從而導致請求的處理時間變長,降低了系統的吞吐量。
五、無鎖編程實現
通過分析計數器例子中計數器累加操作i++
,我們發現:
- 讀取值:沒有問題——不會破壞數據的狀態;
- 計算值:沒有問題——不會破壞數據的狀態;
- 賦值:會修改記憶體數據,導致數據結果不符合程式的原子性要求——會破壞數據的狀態,即:存在並發安全問題
按照樂觀並發控制策略,假設不存在多執行緒競爭,我們不進行加鎖操作,那麼第1步和第2步都沒有問題。第3步,寫結果到記憶體時,需要判斷在第1步和第2步中,剛才的假設是否成立,如果成立,則直接將結果寫入記憶體,否則剛才計算結果不正確,需要重新計算。也就是說,最後一步是無鎖編程的核心,如何確保判斷並根據結果寫回記憶體這一操作具備原子性?
這需要硬體的支援。
最後一步判斷並根據結果寫回記憶體本質是操作記憶體中一個值,而記憶體正好提供了這麼一種機制——CAS(Compare and swap)機制,屬於硬體同步原語。該方法需要三個參數:記憶體地址、當前值A、新值B。
CAS的具體操作:先比較輸入的當前值和記憶體中的值是否一致,不一致則代表記憶體數據發生了變化,CAS失敗。如果輸入的舊值A和記憶體中的值一致,則將值A替換為值B,CAS操作成功。
Java提供了一種直接操作記憶體的工具類sun.misc.Unsafe
,顧名思義,這個類對於普通程式設計師來說是」危險「的,一般應用開發者不會用到這個類。在Unsafe
類中就有一系列的compareAndSwapXXX()
方法,例如,本文要用到的compareAndSwapInt
函數:
@ForceInline
public final boolean compareAndSwapInt(Object o, long offset,
int expected,
int x) {
return theInternalUnsafe.compareAndSetInt(o, offset, expected, x);
}
這個函數的四個參數分別代表要操作的對象、對象屬性相對該對象的偏移量、期望記憶體中的值以及計算後的值。參數1、3、4都很容易獲取,第2個成員變數相對該對象的偏移量怎麼獲取?我們也可以通過這個工具類獲取:
@ForceInline
public long objectFieldOffset(Field f) {
return theInternalUnsafe.objectFieldOffset(f);
}
最後我們要調用這些方法,我們首先要獲取Unsafe
的實例變數。然而,這個類的構造函數時私有的,應用程式也無法通過其靜態成員方法getUnsafe()
獲取其實例。那麼,怎麼獲取Unsafe
的實例呢?這就要使用強大的反射機制了(反射就像一面鏡子,我們往JVM中放了什麼東西,我們就可以從JVM里獲取到這些東西,也許這是反射名字的由來吧!)。
public static Unsafe getUnsafe() throws IllegalAccessException {
Field unsafeField = Unsafe.class.getDeclaredFields()[0];
unsafeField.setAccessible(true);
return (Unsafe) unsafeField.get(null);
}
最後,基於CAS機制的Counter實現如下:
public class CASCounter {
private int i = 0;
private static Unsafe unsafe; // Unsafe實例
private static long offset; // i相對CASCounter對象實例的偏移
static {
try {
// 通過反射機制獲取Unsafe實例
var unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
unsafe = (Unsafe) unsafeField.get(null);
// 獲取i相對CASCounter對象實例的偏移
Field fc = CASCounter.class.getDeclaredField("i");
offset = unsafe.objectFieldOffset(fc);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}
public void increment() {
while (true) {
int currentValue = i;
int newValue = currentValue + 1;
// 原子方式的比較寫回操作,如果成功則返回,否則重新獲取並計算(重試)
if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
return;
}
}
}
public void decrement() {
while (true) {
int currentValue = i;
int newValue = currentValue - 1;
// 原子方式的比較寫回操作,如果成功則返回,否則重新獲取並計算(重試)
if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
return;
}
}
}
public int value() {
return i;
}
}
六、性能比較
我們在啟動器開啟的執行緒中加入計時邏輯,如下:
for (int i = 0; i < 2; i++) {
new Thread(() -> {
var begin = System.nanoTime();
for (int j = 0; j < 10000; j++) {
counter.increment();
}
System.out.println(System.nanoTime() - begin);
}).start();
}
運行結果如下:
使用SynchronizeCounter
的運行結果
使用CASCounter
的運行結果
可以看到兩者性能差距在10倍以上!