02-Java中的鎖詳解

I. 使用Lock接口

只要不涉及到複雜用法,一般採用的是Java的synchronized機制

不過,Lock可以提供一些synchronized不支持的機制

  • 非阻塞的獲取鎖:嘗試獲取鎖,如果能獲取馬上獲取,不能獲取馬上返回,不會阻塞
  • 中斷獲取鎖:當獲取鎖的線程被中斷時,拋出異常,鎖被釋放
  • 超時獲取鎖:為嘗試獲取鎖設定超時時間

相應API:

  • void lock():普通的獲取鎖
  • void lockInterruptibly() throws InterruptedException:可中斷的獲取鎖,鎖的獲取中可以中斷線程
  • boolean tryLock():非阻塞獲取鎖
  • boolean tryLock(long time, TimeUnit unit):超時獲取鎖
  • void unlock():釋放鎖

一般框架:

//不要將lock寫進try塊,防止無故釋放
Lock lock = new ReentrantLock();
lock.lock();
try{
 ...;   
}finally{
    lock.unlock();
}

II. 隊列同步器AQS

AbstractQueuedSynchronizer:隊列同步器,簡稱AQS,用來構建鎖或者其他同步組件的基礎框架

使用一個int的成員變量表示同步狀態,通過內置的FIFO隊列完成資源的排隊工作

AQS實現鎖可以看作:獲取同步狀態,成功則加鎖成功;失敗則加鎖失敗

調用AQS內部的獲取同步狀態的API,保證是線程安全的

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)

image-20211101174622290

1. 自己實現一個Mutex互斥鎖

首先要繼承一個Lock接口,然後自己實現裏面的方法

public class Mutex implements Lock {...}

Lock裏面的方法是沒有默認實現的,因此都需要重寫

image-20211101172630798

一般會實現一個繼承於AQS的內部類來執行獲取同步狀態的實現:加鎖相當於獲取同步狀態

public class Mutex implements Lock {
    private static class Syn extends AbstractQueuedSynchronizer{...}
}

可以看到,AQS的方法和鎖需要實現的方法是對應的

先實現對應的AQS的幾個方法

private static class Syn extends AbstractQueuedSynchronizer{
    //判斷同步器是否被線程佔用
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }
    //獲取鎖
    @Override
    protected boolean tryAcquire(int arg) {
        if(compareAndSetState(0,1)){
            setExclusiveOwnerThread(Thread.currentThread());    //設置佔用線程
            return true;
        }
        return false;
    }
    //釋放鎖
    @Override
    protected boolean tryRelease(int arg) {
        if(getState() == 0) throw new IllegalMonitorStateException();
        setExclusiveOwnerThread(null);  //清空佔用線程
        setState(0);
        return true;
    }
}

鎖的獲取和AQS獲取同步狀態其實是一個道理

通過代理模式可以像下面這樣實現

public class Mutex implements Lock {
    private static class Syn extends AbstractQueuedSynchronizer{...}
    Syn syn = new Syn();
    @Override
    public void lock() {
        syn.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        syn.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return syn.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return syn.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        syn.release(1);
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

2. AQS實現分析

鎖實現的本質:信號量機制,互斥鎖也就是0和1兩個信號量

AQS維護了一個FIFO的隊列,線程獲取同步狀態失敗則會加入這個隊列,然後阻塞,直到同步狀態釋放,隊列首節點的線程被喚醒

同步隊列中的節點保存的信息有:獲取同步狀態失敗的線程引用,等待狀態,前驅和後繼節點

同步器有一個頭節點和尾節點

加入新的阻塞線程:

構造節點,加入隊列的尾節點

使用compareAndSetTail()加到尾部,這是一個原子操作

image-20211108124223446

2.1 獨佔式的獲取和釋放

獲取同步狀態:

image-20211108125056784

acquire()方法會調用tryAcquire(),如果獲取失敗,則開始調用addWaiter()來給尾節點添加新節點,再調用acquireQueued()等待請求調度

addWaiter()的作用是給FIFO隊列添加尾節點,並返回這個節點的引用

因為可能會多個線程申請失敗,因此需要使用原子操作compareAndSetTail()

image-20211108125619724

enq()的作用是快速添加失敗後的反覆嘗試,直到添加尾節點成功

image-20211108125813282

acquireQueued()用來請求調度

image-20211108130254437

可見等待調度期間是支持中斷的

這個請求調度有兩個條件:

  1. 該節點是首節點
  2. 申請互斥信號量成功

for循環的這個操作被稱為自旋

release()釋放互斥信號量,根據上文提到的獲取信號量,除了tryRelease(),還應該喚醒後繼節點

image-20211108130918463

2.2 共享式狀態獲取和釋放

最典型的場景就是讀寫場景:一個資源允許多個線程進行讀取,此時寫線程阻塞;而寫線程執行時,所有讀線程阻塞

共享鎖鎖也就是資源信號量的應用,主要解決下面問題:只想要有限的線程執行

調用tryAcquireShared()來申請資源信號量

image-20211108132000597

doAcquireShared()是申請失敗後,構造節點加入FIFO隊列然後自旋的操作

image-20211108132239857

使用releaseShared()來釋放

注意:共享式的釋放可能有多個線程,需要用CAS操作來實現tryReleaseShared()

image-20211108132450657

3. 自己實現一個TwinsLock共享鎖

需要自己實現的:

  • tryAcquiredShared()

  • tryReleaseShared():要保證釋放操作的原子性

State()的取值就是資源信號量的取值

public class TwinsLock {
    private int count;
    TwinsLock(int count){
        this.count = count;
    }
    private final Sync sync = new Sync(count);
    private static final class Sync extends AbstractQueuedSynchronizer{
        Sync(int count){
            if(count < 0) throw new IllegalArgumentException();
            setState(count);    //設置資源總數
        }
        @Override
        protected int tryAcquireShared(int arg) {
            for(;;){
                int current = getState();
                int newCount = current - arg;
                if(newCount<0 || compareAndSetState(current, newCount)){
                    return newCount;
                }
            }
        }
        @Override
        protected boolean tryReleaseShared(int arg) {
            for(;;){
                int current = getState();
                int newCount = current + arg;
                if(compareAndSetState(current, newCount)){
                    return true;
                }
            }
        }
    }
    public void lock(){
        sync.acquireShared(1);
    }
    public void unlock(){
        sync.releaseShared(1);
    }
}

III. 可重入鎖

可重入鎖:支持一個線程對資源反覆加鎖

synchronized支持可重入

ReentrantLock是可重入鎖的一種實現,支持反覆加鎖

鎖的公平性:

  • 公平:先對鎖進行獲取的請求先被滿足
  • 不公平:先對鎖進行獲取的請求不一定先被滿足

1. 實現可重入

只需要判斷當前線程是否是獲取了鎖的線程,如果是,則同步狀態加一

每次釋放同步狀態減一,減到0的時候設置獲取鎖的線程為null,此時允許其他線程獲取

接下來來看看ReetrantLock的實現

image-20211108145243069

image-20211108145354670

2. 公平鎖與非公平鎖

繼續觀察nofairTryAcquire()方法,發現只要CAS成功,則線程直接獲取到鎖

image-20211108145705513

而公平鎖需要確定隊列中沒有前驅節點,即自己就是首節點

image-20211108145927580

公平鎖:確保線程的FIFO,先上下文切換開銷大

非公平鎖:可能造成線程飢餓,但線程切換少,吞吐量更大

IV. 讀寫鎖

讀寫鎖,是一種提供共享式和獨佔式兩種方式的鎖

  • 支持公平鎖和非公平鎖
  • 支持重進入
  • 支持鎖降級

一個資源允許多個線程進行讀取,此時寫線程阻塞;而寫線程執行時,所有讀線程阻塞

1. 讀寫鎖的實現

讀寫鎖的同步狀態是按位切割使用的

維護了一個int型的同步狀態,32位

高16為讀狀態,低16位為寫狀態

1.1 寫鎖的獲取

image-20211108152531993

w是c與0x0000FFFF做與運算後的值,w=0有兩種情況:

  1. 有讀鎖,低16位全0
  2. 無讀鎖也無寫鎖,需要後面的條件判斷是否為當前線程

1.2 讀鎖的獲取

和寫鎖的獲取類似,需要判斷先有沒有寫鎖

不過讀鎖是共享式的,可以允許多個線程獲取讀鎖

不過讀鎖也支持重進入,因此不光要維護獲取讀鎖的總狀態,還要維護每個線程獲取讀鎖的狀態

2. 鎖降級

鎖降級指:線程先獲取寫鎖,然後再獲取讀鎖,最後釋放寫鎖,實現從寫鎖降到讀鎖

目的:保證讀寫操作的連貫性

使用場景:寫操作執行完馬上需要讀一次,不加讀鎖的話可能會被其他寫線程修改,再讀數據可能就變了

V. LockSupport工具

用於阻塞和喚醒線程

image-20211108154154801

VI. Condition接口

Condition接口依賴於Lock對象,用於實現等待-通知模式

核心API就是兩個,這兩個API的擴展可以增加超時時間,設置中斷不敏感等等:

  • await()
  • signal()

1. 使用Condition實現一個阻塞隊列

隊列滿的時候,填充操作阻塞;隊列空的時候,取出操作阻塞

public class BoundedQueue <T>{
    private Object[] items;
    private int addIndex, revIndex, count;
    private ReentrantLock lock = new ReentrantLock();
    private Condition empty = lock.newCondition();
    private Condition full = lock.newCondition();

    public BoundedQueue(int size){
        items = new Object[size];
    }

    /**
     * 添加元素
     * @param t
     */
    public void add(T t) throws InterruptedException {
        lock.lock();
        try{
            while(count == items.length){
                System.out.println("已滿,請等待消耗");
                empty.await();
            }
            items[addIndex] = t;
            if(++addIndex == items.length) addIndex = 0;
            count++;
            full.signal();
        }finally {
            lock.unlock();
        }
    }

    /**
     * 取出元素
     * @return
     */
    public T remove() throws InterruptedException {
        lock.lock();
        try{
            while(count == 0){
                System.out.println("已空,請等待生產");
                full.await();
            }
            Object temp = items[revIndex];
            if(++revIndex == items.length) revIndex = 0;
            count--;
            empty.signal();
            return (T) temp;
        }finally {
            lock.unlock();
        }
    }
}

2. Condition的實現分析

每個Condition會維護一個等待隊列,一個鎖支持支持多個等待隊列

image-20211108162202120

獲取到鎖的線程也就是同步隊列的首節點

此時再調用await,則首節點進入等待隊列,直到其他線程喚醒

image-20211108165508671

相應的,調用signal則是將等待隊列的首節點拆下來放到同步隊列,喚醒線程開始自旋

當節點回到同步隊列,之前調用的await()中的isOnsyncQueue()會返回true,結束等待,在調用acquireQueued()加入競爭

image-20211108165935327

通過isHeldExclusively判斷有沒有拿到鎖