­

The art of multipropcessor programming 讀書筆記-3. 自旋鎖與爭用(2)

本系列是 The art of multipropcessor programming 的讀書筆記,在原版圖書的基礎上,結合 OpenJDK 11 以上的版本的代碼進行理解和實現。並根據個人的查資料以及理解的經歷,給各位想更深入理解的人分享一些個人的資料

自旋鎖與爭用

3. 隊列鎖

之前實現的基於回退的鎖,除了通用性以外,還有如下兩個問題:

  • CPU 高速緩存一致性流量:雖然由於回退存在,所以流量比 TASLock 要小,但是多線程訪問鎖的狀態還是有一定因為緩存一致性導致的流量消耗的。
  • 可能降低訪問臨界區的效率:由於所有線程的 sleep 延遲過大,導致當前所有線程都在 sleep,但是鎖實際上已經釋放。

可以將線程放入一個隊列,來解決上面兩個問題:

  • 隊列中,每個線程檢查它的前驅線程是否已經完成,判斷鎖是否被釋放,不用訪問鎖的狀態。這樣訪問的是不同的內存,減少了鎖釋放修改狀態導致的 CPU 高速緩存一致性流量
  • 不需要 sleep,可以通過前驅線程告知線程鎖被釋放,嘗試獲取鎖,提高了訪問臨界區的效率

最後,通過隊列,也是實現了 FIFO 的公平性。

3.1. 基於數組的鎖

我們通過一個數組來實現隊列的功能,其流程是:

  • 需要的存儲:
    • boolean 數組,為 true 則代表對應槽位的線程獲取到了鎖,為 false 則為對應槽位的線程沒有獲取到了鎖
    • 保存當前最新槽位的原子變量,每次上鎖都會將這個原子變量加 1,之後對 boolean 數組的大小取余。這個值代表這個線程佔用了 boolean 數組的這個位置,boolean 數組的這個位置的值代表這個線程是否獲取到了鎖。這也說明,boolean 數組的容量決定了這個鎖同時可以有多少線程進行爭用
    • ThreadLocal,記錄當前線程佔用的 boolean 數組的位置
  • 上鎖流程:
    • 原子變量 + 1,對 boolean 數組的大小取余得到 current
    • 將 current 記錄到 ThreadLocal
    • 當 boolean 數組 cuurent 位置的值為 false 的時候,自旋等待
  • 解鎖流程:
    • 從 ThreadLocal 中獲取當前線程對應的位置 mine
    • 將 boolean 數組的 mine 位置標記為 false
    • 將 boolean 數組的 mine + 1 對數組大小取余的位置(防止數組越界)標記為 true

其源碼是:

public class ArrayLock implements Lock {
	private final ThreadLocal<Integer> mySlotIndex = ThreadLocal.withInitial(() -> 0);
	private final AtomicInteger tail = new AtomicInteger(0);
	private final boolean[] flags;
	private final int capacity;

	public ALock(int capacity) {
		this.capacity = capacity;
		this.flags = new boolean[capacity];
	}

	@Override
	public void lock() {
		int current = this.tail.getAndIncrement() % capacity;
		this.mySlotIndex.set(current);
		while (!this.flags[current]) {
		}
	}

	@Override
	public void unlock() {
		int mine = this.mySlotIndex.get();
		this.flags[mine] = false;
		this.flags[(mine + 1)  % capacity] = true;
	}
}

在這個源碼實現上,我們還可以做很多優化:

  1. 自旋等待可以不用強 Spin,而是 CPU 佔用更低並且針對不同架構並且針對自旋都做了 CPU 指令優化的 Thread.onSpinWait()
  2. boolean 數組的每個槽位需要做緩存行填充,防止 CPU false sharing 的發生導致緩存行失效信號過多發佈。
  3. boolean 數組的更新需要是 volatile 更新,普通更新會延遲總線信號,導致其他等帶鎖的線程感知的更慢從而空轉更多次。
  4. 取余是非常低效的運算,需要轉化為與運算,對 2 的 n 次方取余相當於對 2 的 n 次方減去 1 取與運算,我們需要將傳入的 capacity 值轉化為大於 capacity 最近的 2 的 n 次方的值來實現。
  5. this.flags[current] 這個讀取數組的操作需要放在循環外面,防止每次讀取數組的性能消耗。

優化後的源碼是:

public class ArrayLock implements Lock {
	private final ThreadLocal<Integer> mySlotIndex = ThreadLocal.withInitial(() -> 0);
	private final AtomicInteger tail = new AtomicInteger(0);
	private final ContendedBoolean[] flags;
	private final int capacity;

	private static class ContendedBoolean {
	    //通過註解實現緩存行填充
		@Contended
		private boolean flag;
	}

    //通過句柄實現 volatile 更新
	private static final VarHandle FLAG;
	static {
		try {
			//初始化句柄
			FLAG = MethodHandles.lookup().findVarHandle(ContendedBoolean.class, "flag", boolean.class);
		} catch (Exception e) {
			throw new Error(e);
		}
	}

	public ArrayLock(int capacity) {
		capacity |= capacity >>> 1;
		capacity |= capacity >>> 2;
		capacity |= capacity >>> 4;
		capacity |= capacity >>> 8;
		capacity |= capacity >>> 16;
		capacity += 1;  //大於N的最小的2的N次方
		this.flags = new ContendedBoolean[capacity];
		for (int i = 0; i < this.flags.length; i++) {
			this.flags[i] = new ContendedBoolean();
		}
		this.capacity = capacity;
		this.flags[0].flag = true;
	}

	@Override
	public void lock() {
		int current = this.tail.getAndIncrement() & (capacity - 1);
		this.mySlotIndex.set(current);
		ContendedBoolean contendedBoolean = this.flags[current];
		while (!contendedBoolean.flag) {
			Thread.onSpinWait();
		}
	}

	@Override
	public void unlock() {
		int mine = this.mySlotIndex.get();
		FLAG.setVolatile(this.flags[mine], false);
		FLAG.setVolatile(this.flags[(mine + 1) & (capacity - 1)], true);
	}
}

但是,即使有這些優化,在高並發大量鎖調用的時候,這個鎖的性能依然會很差。這個我們之後會分析優化。