【深入淺出多執行緒】無鎖編程

多執行緒編程中,鎖是大家比較熟悉的概念,但對無鎖編程則沒有太多了解。無鎖編程是指不用加鎖的方式去解決原本需要加鎖才能解決的問題,從而使程式具有更高的性能,降低硬體成本。我們從並發開始說起。

一、並發相關概念

並發數:伺服器同時並行處理的請求數量。

QPS:每秒處理完成的請求數量,是衡量系統吞吐量的一種方式。例如:Tomcat接收了200個請求,但1秒內處理完畢的請求為20個,則QPS為20。

高並發:高並發是一個相對的概念,假設機器配置差勁(例如1核1G),如果在這個機器上部署一個項目,這時200個請求就已經到達該機器的上限了,那麼這時面臨的場景就是高並發的場景。處理高並發並不一定是並發量非常高,我們才會用到這樣的技術。可能你們公司的並發量只有1000,15萬的用戶量,依然可能會用到高並發的技術,為什麼——降低成本(用戶少,公司沒錢__)。

二、並發下的原子操作

Oricle官網上Java入門手冊中關於原子操作(Atomic Variables)的定義如下:

In programming, an atomic action is one that effectively happens all at once. An atomic action cannot stop in the middle: it either happens completely, or it doesn’t happen at all. No side effects of an atomic action are visible until the action is complete.

原子操作可以是一個步驟或多個操作步驟,要麼完全發生,要麼根本不發生,不能停在中間狀態。將整個操作視作一個整體,資源在該次操作中保持一致,這是執行緒安全中原子性的核心特徵。

三、並發的原子性問題

來看一個計數器的例子:

class Counter {
    
    private int i = 0;

    public void increment() {
        i++;
    }

    public void decrement() {
        i--;
    }

    public int value() {
        return i;
    }
}

我們為它配上啟動器,啟動器中開啟兩個執行緒,每個執行緒執行10000次increment()函數調用,所以執行完預期計數器結果為20000。

public class Starter {
    public static void main(String[] args) throws InterruptedException {
        var counter = new Counter();
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    counter.increment();
                }
            }).start();
        }

        Thread.sleep(5000);
        System.out.println(counter.value());
    }
}

運行結果一般是不正確的。

image

這顯然是一個不正確的結果,為什麼不正確?——i++並沒有保證原子性。我們可以通過分析JVM執行的位元組碼(byte code)來一探究竟(IDEA可以通過安裝插件來實現,具體參考:IDEA下查看Java位元組碼插件):

 0 aload_0
 1 dup
 2 getfield #2 <Counter.i : I>
 5 iconst_1
 6 iadd
 7 putfield #2 <Counter.i : I>
10 return

可以看到位元組碼包括以下關鍵的三個步驟:

  1. 取值:2 getfield #2 <Counter.i : I>
  2. 相加: 6 iadd
  3. 放回:7 putfield #2 <Counter.i : I>

接下來結合多執行緒來進行分析為什麼i++沒有實現原子性。

image

做了兩次計算,得出同樣的值,很明顯這不是我們期望的結果,不符合我們編寫程式的期望——操作失敗。這就是原子性問題的來源。

這裡舉一個計數器的應用——限流。限流的目的是超過一定數量的請求,不予操作。例如,有一個Controller方法:

@RestController
class XXXController {
    final Counter counter = new Counter();
    
    @GetController(...)
	public void index() {
        // 統計正在處理的請求數量
        counter.increase();
        try {
            if (counter.value() > 3000) {
                return;
            }
            // TODO: 很多業務邏輯,消耗CPU、記憶體,資源有限
        } finally {
            counter.decrease();
        }
    }
}

類似的限流在RedisSpringCloudNginx、微服務、網關中都有,其最核心技術一定是計數器。

三、並發控制策略

並發導致了上述問題,我們需要控制並發,以達到我們想要的效果。並發控制有兩種策略:悲觀策略(Pessimistic concurrency control)樂觀策略(Optimistic concurrency control)。這是兩種解決並發問題的思路。

image

悲觀策略是一種悲觀的思想,即認為寫較多,遇到並發的可能性高,每次拿數據時,都會認為別人會修改數據,所以同一時間只允許一個執行緒執行。

image

樂觀策略是一種樂觀的思想,即認為讀多寫少,遇到並發的可能性低,每次拿數據時都認為別人不會修改,所以不會上鎖。但是在更新的時候會判斷判斷值有沒有發生變化,如果發生變化了,意味著其他執行緒已經更新,則此次計算更新失效,需要重新取值計算。

適用場景:

  1. 重試成本小;
  2. 並發數量小;

無鎖編程要先了解有鎖編程的問題。

四、鎖帶來的問題

按照悲觀策略的思想,我們當然可以通過加鎖的方式解決並發不一致問題,計數器例子的加鎖版本程式碼如下:

class SynchronizedCounter {
    private int i = 0;

    public synchronized void increment() {
        i++;
    }

    public synchronized void decrement() {
        i--;
    }

    public synchronized int value() {
        return i;
    }
}

但是加鎖存在一些問題:

image

我們知道,執行緒的數量一定的遠遠大於CPU數量的,如上圖所示,CPU只有3個,而執行緒數則可能成百上千。如何把這麼多的執行緒分配到有限的CPU上,這就是作業系統調度程式做的事情。JVM進程中的執行緒也是由作業系統中的調度器負責將其調度到指定的CPU上執行。

再來分析上面加鎖版本的計數器。執行緒-1和執行緒-2都在搶鎖,假設執行緒-1搶到了鎖,執行緒2沒有搶到鎖,執行緒2變為阻塞狀態。那麼,下一次執行緒-2什麼時候?可能會在1ms之後,也可能10ms之後執行。也就是說,由於作業系統調度存在,所以這是不確定的。執行緒-1執行完畢。

假設執行一次i++需要0.05ms,則可能會發生以下場景:

​ 第一次i++ 0.05ms

​ 執行緒切換 — 10ms

​ 第二次i++ 0.05ms

這就是說,每發生一次槍鎖失敗導致的執行緒切換,都會增加一定的執行緒切換時間、等待時間,從而導致請求的處理時間變長,降低了系統的吞吐量。

五、無鎖編程實現

通過分析計數器例子中計數器累加操作i++,我們發現:

  1. 讀取值:沒有問題——不會破壞數據的狀態;
  2. 計算值:沒有問題——不會破壞數據的狀態;
  3. 賦值:會修改記憶體數據,導致數據結果不符合程式的原子性要求——會破壞數據的狀態,即:存在並發安全問題

按照樂觀並發控制策略,假設不存在多執行緒競爭,我們不進行加鎖操作,那麼第1步和第2步都沒有問題。第3步,寫結果到記憶體時,需要判斷在第1步和第2步中,剛才的假設是否成立,如果成立,則直接將結果寫入記憶體,否則剛才計算結果不正確,需要重新計算。也就是說,最後一步是無鎖編程的核心,如何確保判斷並根據結果寫回記憶體這一操作具備原子性?

這需要硬體的支援。

最後一步判斷並根據結果寫回記憶體本質是操作記憶體中一個值,而記憶體正好提供了這麼一種機制——CAS(Compare and swap)機制,屬於硬體同步原語。該方法需要三個參數:記憶體地址、當前值A、新值B。

CAS的具體操作:先比較輸入的當前值和記憶體中的值是否一致,不一致則代表記憶體數據發生了變化,CAS失敗。如果輸入的舊值A和記憶體中的值一致,則將值A替換為值B,CAS操作成功。

image

Java提供了一種直接操作記憶體的工具類sun.misc.Unsafe,顧名思義,這個類對於普通程式設計師來說是」危險「的,一般應用開發者不會用到這個類。在Unsafe類中就有一系列的compareAndSwapXXX()方法,例如,本文要用到的compareAndSwapInt函數:

@ForceInline
public final boolean compareAndSwapInt(Object o, long offset,
                                       int expected,
                                       int x) {
    return theInternalUnsafe.compareAndSetInt(o, offset, expected, x);
}

這個函數的四個參數分別代表要操作的對象、對象屬性相對該對象的偏移量、期望記憶體中的值以及計算後的值。參數1、3、4都很容易獲取,第2個成員變數相對該對象的偏移量怎麼獲取?我們也可以通過這個工具類獲取:

@ForceInline
public long objectFieldOffset(Field f) {
    return theInternalUnsafe.objectFieldOffset(f);
}

最後我們要調用這些方法,我們首先要獲取Unsafe的實例變數。然而,這個類的構造函數時私有的,應用程式也無法通過其靜態成員方法getUnsafe()獲取其實例。那麼,怎麼獲取Unsafe的實例呢?這就要使用強大的反射機制了(反射就像一面鏡子,我們往JVM中放了什麼東西,我們就可以從JVM里獲取到這些東西,也許這是反射名字的由來吧!)。

public static Unsafe getUnsafe() throws IllegalAccessException {
    Field unsafeField = Unsafe.class.getDeclaredFields()[0];
    unsafeField.setAccessible(true);
    return (Unsafe) unsafeField.get(null);
}

最後,基於CAS機制的Counter實現如下:

public class CASCounter {
    private int i = 0;

    private static Unsafe unsafe;	// Unsafe實例
    private static long offset;		// i相對CASCounter對象實例的偏移

    static {
        try {
            // 通過反射機制獲取Unsafe實例
            var unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (Unsafe) unsafeField.get(null);

            // 獲取i相對CASCounter對象實例的偏移
            Field fc = CASCounter.class.getDeclaredField("i");
            offset = unsafe.objectFieldOffset(fc);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }
    }

    public void increment() {
        while (true) {
            int currentValue = i;
            int newValue = currentValue + 1;
            // 原子方式的比較寫回操作,如果成功則返回,否則重新獲取並計算(重試)
            if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
                return;
            }
        }
    }

    public void decrement() {
        while (true) {
            int currentValue = i;
            int newValue = currentValue - 1;
            // 原子方式的比較寫回操作,如果成功則返回,否則重新獲取並計算(重試)
            if (unsafe.compareAndSwapInt(this, offset, currentValue, newValue)) {
                return;
            }
        }
    }

    public int value() {
        return i;
    }
}

六、性能比較

我們在啟動器開啟的執行緒中加入計時邏輯,如下:

for (int i = 0; i < 2; i++) {
    new Thread(() -> {
        var begin = System.nanoTime();
        for (int j = 0; j < 10000; j++) {
            counter.increment();
        }
        System.out.println(System.nanoTime() - begin);
    }).start();
}

運行結果如下:

image

使用SynchronizeCounter的運行結果

image

使用CASCounter的運行結果

可以看到兩者性能差距在10倍以上!