canal源碼之BooleanMutex(基於AQS中共享鎖實現)
在看canal源碼時發現一個有趣的鎖實現–BooleanMutex
這個鎖在canal裡面多處用到,相當於一個開關,比如系統初始化/授權控制,沒許可權時阻塞等待,有許可權時所有執行緒都可以快速通過
先看它的核心基於AQS的鎖實現:
private final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 2559471934544126329L; /** State value representing that TRUE */ private static final int TRUE = 1; /** State value representing that FALSE */ private static final int FALSE = 2; private boolean isTrue(int state) { return (state & TRUE) != 0; } /** * 實現AQS的介面,獲取共享鎖的判斷 */ protected int tryAcquireShared(int state) { // 如果為true,直接允許獲取鎖對象 // 如果為false,進入阻塞隊列,等待被喚醒 return isTrue(getState()) ? 1 : -1; } /** * 實現AQS的介面,釋放共享鎖的判斷 */ protected boolean tryReleaseShared(int ignore) { // 始終返回true,代表可以release return true; } boolean innerState() { return isTrue(getState()); } void innerGet() throws InterruptedException { acquireSharedInterruptibly(0); } void innerGet(long nanosTimeout) throws InterruptedException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); } void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; // 直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免並發更新true操作 releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread return; } } } void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; // 直接退出 } if (compareAndSetState(s, FALSE)) {// cas更新狀態,避免並發更新false操作 return; } } } }
它重寫了AQS中共享鎖的判斷可持有方法tryAcquireShared和判斷可釋放鎖的方法tryReleaseShared
也就是說,它是重寫AQS中共享鎖的方法來實現的,有意思的是它不像普通AQS實現的共享鎖,比如Semaphore這種有許可證個數,也就是同時能被幾個執行緒持有是固定的,而它理論上只要打開了開關就允許任意執行緒通過
下面看下它是怎麼實現的:
首先它定義了兩個int類型屬性:
/** State value representing that TRUE */ private static final int TRUE = 1; /** State value representing that FALSE */ private static final int FALSE = 2;
這兩個屬性代表了AQS中state的值,也就是說這把鎖中的state值始終為1或者2,看下下面這兩個方法
void innerSetTrue() { for (;;) { int s = getState(); if (s == TRUE) { return; // 直接退出 } if (compareAndSetState(s, TRUE)) {// cas更新狀態,避免並發更新true操作 releaseShared(0);// 釋放一下鎖對象,喚醒一下阻塞的Thread return; } } } void innerSetFalse() { for (;;) { int s = getState(); if (s == FALSE) { return; // 直接退出 } if (compareAndSetState(s, FALSE)) {// cas更新狀態,避免並發更新false操作 return; } } }
顯而易見,這兩個方法都自旋的CAS改變state的值,那這兩個方法在哪裡調用呢?
public void set(Boolean mutex) { if (mutex) { sync.innerSetTrue(); } else { sync.innerSetFalse(); } }
這個方法封裝了改變state值的方法,其實它就相當於我們常用鎖的加鎖和釋放鎖方法,原因在於它獲取共享鎖的判斷:
protected int tryAcquireShared(int state) { return isTrue(getState()) ? 1 : -1; } private boolean isTrue(int state) { return (state & TRUE) != 0; }
由於state值始終為1或0,當state值為0時isTrue返回false,tryAcquireShared返回-1,AQS在共享鎖的獲取鎖判斷中如果tryAcquireShared<0代表共享鎖許可證已經頒發完,後續申請鎖的執行緒需要進入FIFO隊列等待
相反如果state值為1,tryAcquireShared>0就直接獲取鎖對象
所以,當set(false)的時候,自旋的CAS把state值改成0,執行緒需要等待,相反set(true)就能獲取鎖成功