嗯!這篇多線程不錯!伍

開篇閑扯

前面幾篇寫了有關Java對象的內存布局、Java的內存模型、多線程鎖的分類、Synchronized、Volatile、以及並發場景下出現問題的三大罪魁禍首。看起來寫了五篇文章,實際上也僅僅是寫了個皮毛,用來應付應付部分公司「八股文」式的面試還行,但是在真正的在實際開發中會遇到各種稀奇古怪的問題。這時候就要通過線上的一些監測手段,獲取系統的運行日誌進行分析後再對症下藥,比如JDK的jstack、jmap、命令行工具vmstat、JMeter等等,一定要在合理的分析基礎上優化,否則可能就是系統小「感冒」,結果做了個闌尾炎手術。

file

又扯遠了,老樣子,還是先說一下本文主要講點啥,然後再一點點解釋。本文主要講並發包JUC中的三個類:ReentrantLock、ReentrantReadWriteLock和StampedLock以及AQS(AbstractQueuedSynchronizer)的一些基本概念。

file

先來個腦圖:

file

Lock接口

public interface Lock {

    //加鎖操作,加鎖失敗就進入阻塞狀態並等待鎖釋放
    void lock();

    //與lock()方法一直,只是該方法允許阻塞的線程中斷    
    void lockInterruptibly() throws InterruptedException;

    //非阻塞獲取鎖
    boolean tryLock();

    //帶參數的非阻塞獲取鎖
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

    //統一的解鎖方法
    void unlock();

}

上面的源碼展示了作為頂層接口Lock定義的一些基礎方法。

lock只是個顯示的加鎖接口,對應不同的實現類,可以供開發人員進行自定義擴展。比如一些定時的可輪詢的獲取鎖模式,公平鎖與非公平鎖,讀寫鎖,以及可重入鎖等,都能夠很輕鬆的實現。Lock的鎖是基於Java代碼實現的,加解鎖都是通過lock()和unlock()方法實現的。從性能上來說,Synchronized的性能(吞吐量)以及穩定性是略差於Lock鎖的。但是,在Doug Lee參與編寫的《Java並發編程實踐》一書中又特彆強調了,如果不是對Lock鎖中提供的高級特性有絕對的依賴,建議還是使用Synchronized來作為並發同步的工具。因為它更簡潔易用,不會因為在使用Lock接口時忘記在Finally中解鎖而出bug。說到底,還是為了降低編程門檻,讓Java語言更加好用。

file

其實常見的幾個實現類有:ReentrantLock、ReentrantReadWriteLock、StampedLock
接下來將詳細講解一下。

ReentrantLock

先簡單舉個使用的例子:

/**
 * FileName: TestLock
 * Author:   RollerRunning
 * Date:     2020/12/7 9:34 PM
 * Description:
 */
public class TestLock {
    private static int count=0;
    private static Lock lock=new ReentrantLock();
    public static void add(){
        // 加鎖
        lock.lock();
        try {
            count++;
            Thread.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            //在finally中解鎖,加解鎖必須成對出現
            lock.unlock();
        }
    }
}

ReentrantLock只支持獨佔式的獲取公平鎖或者是非公平鎖(都是基於Sync內部類實現,而Sync又繼承自AQS),在它的內部類Sync繼承了AbstractQueuedSynchronizer,並同時實現了tryAcquire()、tryRelease()和isHeldExclusively()方法等。同時,在ReentrantLock中還有其他兩個內部類,一個是實現了公平鎖一個實現了非公平鎖,下面是ReentrantLock的部分源碼:

/**
 * 非公平鎖
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

/**
 * 公平鎖
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    //加鎖時調用
    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        //獲取當前線程
        final Thread current = Thread.currentThread();
        //獲取父類 AQS 中的int型state
        int c = getState();
        //判斷鎖是否被佔用
        if (c == 0) {
            //這個if判斷中,先判斷隊列是否為空,如果為空則說明鎖可以正常獲取,然後進行CAS操作並修改state標誌位的信息
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                //CAS操作成功,設置AQS中變量exclusiveOwnerThread的值為當前線程,表示獲取鎖成功
                setExclusiveOwnerThread(current);
                //返回獲取鎖成功
                return true;
            }
        }
        //而當state的值不為0時,說明鎖已經被拿走了,此時判斷鎖是不是自己拿走的,因為他是個可重入鎖。
        else if (current == getExclusiveOwnerThread()) {
            //如果是當前線程在佔用鎖,則再次獲取鎖,並修改state的值
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //當標誌位不為0,且佔用鎖的線程也不是自己時,返回獲取鎖失敗
        return false;
    }
}

/**
 * AQS中排隊的方法
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

上面是以公平鎖為例對源碼進行了簡單的注釋,可以根據這個思路,看一看非公平鎖的源碼實現,再關閉源碼試着畫一下整個流程圖,了解其內部實現的真諦。我先畫為敬了:

file

這裡涵蓋了ReentrantLock的加鎖基本流程,觀眾老爺是不是可以試着畫一下解鎖的流程,還有就是這個例子是獨佔式公平鎖,獨佔式非公平鎖的總體流程大差不差,這裡就不贅述了。

ReentrantReadWriteLock

一個簡單的使用示例,大家可以自己運行感受一下:

/**
 * FileName: ReentrantReadWriteLockTest
 * Author:   RollerRunning
 * Date:     2020/12/8 6:48 PM
 * Description: ReentrantReadWriteLock的簡單使用示例
 */
public class ReentrantReadWriteLockTest {
    private static ReentrantReadWriteLock READWRITELOCK = new ReentrantReadWriteLock();
    //獲得讀鎖
    private static ReentrantReadWriteLock.ReadLock READLOCK = READWRITELOCK.readLock();
    //獲得寫鎖
    private static ReentrantReadWriteLock.WriteLock WRITELOCK = READWRITELOCK.writeLock();

    public static void main(String[] args) {
        ReentrantReadWriteLockTest lock = new ReentrantReadWriteLockTest();
        //分別啟動兩個讀線程和一個寫線程
        Thread readThread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.read();
            }
        },"read1");

        Thread readThread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.read();
            }
        },"read2");

        Thread writeThread = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.write();
            }
        },"write");

        readThread1.start();
        readThread2.start();
        writeThread.start();
    }

    public void read() {
        READLOCK.lock();
        try {
            System.out.println("線程 " + Thread.currentThread().getName() + " 獲取讀鎖。。。");
            Thread.sleep(2000);
            System.out.println("線程 " + Thread.currentThread().getName() + " 釋放讀鎖。。。");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            READLOCK.unlock();
        }
    }

    public void write() {
        WRITELOCK.lock();
        try {
            System.out.println("線程 " + Thread.currentThread().getName() + " 獲取寫鎖。。。");
            Thread.sleep(2000);
            System.out.println("線程 " + Thread.currentThread().getName() + " 釋放寫鎖。。。");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            WRITELOCK.unlock();
        }
    }
}

前面說了ReentrantLock是一個獨佔鎖,即不論線程對數據執行讀還是寫操作,同一時刻只允許一個線程持有鎖。但是在一些讀多寫少的場景下,這種不分青紅皂白就無腦加鎖對的做法不夠極客也很影響效率。因此,基於ReentrantLock優化而來的ReentrantReadWriteLock就出現了。這種鎖的思想是「讀寫鎖分離」,多個線程可以同時持有讀鎖,但是不允許多個線程持有相同寫鎖或者同時持有讀寫鎖。關鍵源碼解讀:

//加共享鎖
protected final int tryAcquireShared(int unused) {
    //獲取當前加鎖的線程
    Thread current = Thread.currentThread();
    //獲取鎖狀態信息
    int c = getState();
    //判斷當前鎖是否可用,並判斷當前線程是否獨佔資源
    if (exclusiveCount(c) != 0 && 
        getExclusiveOwnerThread() != current)
        return -1;
    //獲取讀鎖的數量
    int r = sharedCount(c);
    //這裡做了三個判斷:是否阻塞即是否為公平鎖、持有該共享鎖的線程是否超過最大值、CAS加共享讀鎖是否成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //當前線程為第一個加讀鎖的,並設置持有鎖線程數量
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            //當前表示為重入鎖
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                //獲取當前線程的計數器
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                //添加到readHolds中,這裡是基於ThreadLocal實現的,每個線程都有自己的readHolds用於記錄自己重入的次數
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

在ReentrantReadWriteLock中,也是基於AQS來實現的,在它的內部使用了一個int型(4位元組32位)的stat來表示讀寫鎖,其中高16位表示讀鎖,低16位表示寫鎖,而對於讀寫鎖的判斷通常是對int值以及高低16位進行判斷。接下來用一張圖展示一下獲取共享的讀鎖過程:

file

至此,分別展示了獲取ReentrantLock獨佔鎖ReentrantReadWriteLock共享讀鎖的過程,希望能夠幫助大家跟面試官PK。

file

總結一下前面說的兩種鎖:

當線程持有讀鎖時,那麼就不能再獲取寫鎖。當A線程在獲取寫鎖的時候,如果當前讀鎖被佔用,立即返回失敗失敗。

當線程持有寫鎖時,該線程是可以繼續獲取讀鎖的。當A線程獲取讀鎖時如果發現寫鎖被佔用,判斷當前寫鎖持有者是不是自己,如果是自己就可以繼續獲取讀鎖,否則返回失敗。

StampedLock

StampedLock其實是對ReentrantReadWriteLock進行了進一步的升級,試想一下,當有很多讀線程,但是只有一個寫線程,最糟糕的情況是寫線程一直競爭不到鎖,寫線程就會一直處於等待狀態,也就是線程飢餓問題。StampedLock的內部實現也是基於隊列和state狀態實現的,但是它引入了stamp(標記)的概念,因此在獲取鎖時會返回一個唯一標識stamp作為當前鎖的版本,而在釋放鎖時,需要傳遞這個stamp作為標識來解鎖。

從概念上來說StampedLock比RRW多引入了一種樂觀鎖的思想,從使用層面來說,加鎖生成stamp,解鎖需要傳同樣的stamp作為參數。
最後貼一張我整理的這部分腦圖:

file

最後,感謝各位觀眾老爺,還請三連!!!
更多文章請掃碼關注或微信搜索Java棧點公眾號!