30行自己寫並發工具類(Semaphore, CyclicBarrier, CountDownLatch)是什麼體驗?

30行自己寫並發工具類(Semaphore, CyclicBarrier, CountDownLatch)是什麼體驗?

前言

在本篇文章當中首先給大家介紹三個工具Semaphore, CyclicBarrier, CountDownLatch該如何使用,然後仔細剖析這三個工具內部實現的原理,最後會跟大家一起用ReentrantLock實現這三個工具。

並發工具類的使用

CountDownLatch

CountDownLatch最主要的作用是允許一個或多個線程等待其他線程完成操作。比如我們現在有一個任務,有\(N\)個線程會往數組data[N]當中對應的位置根據不同的任務放入數據,在各個線程將數據放入之後,主線程需要將這個數組當中所有的數據進行求和計算,也就是說主線程在各個線程放入之前需要阻塞住!在這樣的場景下,我們就可以使用CountDownLatch

上面問題的代碼:

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    public static int[] data = new int[10];

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {
            int temp = i;
            new Thread(() -> {
                Random random = new Random();
                data[temp] = random.nextInt(100001);
                latch.countDown();
            }).start();
        }

        // 只有函數 latch.countDown() 至少被調用10次
        // 主線程才不會被阻塞
        // 這個10是在CountDownLatch初始化傳遞的10
        latch.await();
        System.out.println("求和結果為:" + Arrays.stream(data).sum());
    }
}

在上面的代碼當中,主線程通過調用latch.await();將自己阻塞住,然後需要等他其他線程調用方法latch.countDown()只有這個方法被調用的次數等於在初始化時給CountDownLatch傳遞的參數時,主線程才會被釋放。

CyclicBarrier

CyclicBarrier它要做的事情是,讓一 組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。我們通常也將CyclicBarrier稱作路障

示例代碼:

public class CycleBarrierDemo {

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5);

        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "開始等待");
                    // 所有線程都會調用這行代碼
                    // 在這行代碼調用的線程個數不足5
                    // 個的時候所有的線程都會阻塞在這裡
                    // 只有到5的時候,這5個線程才會被放行
                    // 所以這行代碼叫做同步點 
                    barrier.await();
                    // 如果有第六個線程執行這行代碼時
                    // 第六個線程也會被阻塞 知道第10
                    // 線程執行這行代碼 6-10 這5個線程
                    // 才會被放行
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "等待完成");
            }).start();
        }
    }
}

我們在初始化CyclicBarrier對象時,傳遞的數字為5,這個數字表示只有5個線程到達同步點的時候,那5個線程才會同時被放行,而如果到了6個線程的話,第一次沒有被放行的線程必須等到下一次有5個線程到達同步點barrier.await()時,才會放行5個線程。

  • 比如剛開始的時候5個線程的狀態如下,同步點還沒有5個線程到達,因此不會放行。

  • 當有5個線程或者更多的線程到達同步點barrier.await()的時候,才會放行5個線程,注意是5個線程,如果有多的線程必須等到下一次集合5個線程才會進行又一次放行,也就是說每次只放行5個線程,這也是它叫做CyclicBarrier(循環路障)的原因(因為每次放行5個線程,放行完之後重新計數,直到又有5個新的線程到來,才再次放行)。

Semaphore

Semaphore信號量)通俗一點的來說就是控制能執行某一段代碼的線程數量,他可以控制程序的並發量!

semaphore.acquire

\(\mathcal{R}\)

semaphore.release

比如在上面的acquirerelease之間的代碼\(\mathcal{R}\)就是我們需要控制的代碼,我們可以通過信號量控制在某一個時刻能有多少個線程執行代碼\(\mathcal{R}\)。在信號量內部有一個計數器,在我們初始化的時候設置為\(N\),當有線程調用acquire函數時,計數器需要減一,調用release函數時計數器需要加一,只有當計數器大於0時,線程調用acquire時才能夠進入代碼塊\(\mathcal{R}\),否則會被阻塞,只有線程調用release函數時,被阻塞的線程才能被喚醒,被喚醒的時候計數器會減一。

示例代碼:

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore mySemaphore = new Semaphore(5);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "準備進入臨界區");
                try {
                    mySemaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "已經進入臨界區");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + "準備離開臨界區");
                mySemaphore.release();
                System.out.println(Thread.currentThread().getName() + "已經離開臨界區");
            }).start();
        }
    }
}

自己動手寫並發工具類

在這一小節當中主要使用ReentrantLock實現上面我們提到的三個並發工具類,因此你首先需要了解ReentrantLock這個工具。ReentrantLock中有兩個主要的函數lockunlock,主要用於臨界區的保護,在同一個時刻只能有一個線程進入被lockunlock包圍的代碼塊。除此之外你還需要了解ReentrantLock.newCondition函數,這個函數會返回一個條件變量Condition,這個條件變量有三個主要的函數awaitsignalsignalAll,這三個函數的作用和效果跟Object類的waitnotifynotifyAll一樣,在閱讀下文之前,大家首先需要了解他們的用法。

  • 哪個線程調用函數condition.await,那個線程就會被掛起。
  • 如果線程調用函數conditon.signal,則會喚醒一個被condition.await函數阻塞的線程。
  • 如果線程調用函數conditon.signalAll,則會喚醒所有被condition.await函數阻塞的線程。

CountDownLatch

我們在使用CountDownLatch時,會有線程調用CountDownLatchawait函數,其他線程會調用CountDownLatchcountDown函數。在CountDownLatch內部會有一個計數器,計數器的值我們在初始化的時候可以進行設置,線程每調用一次countDown函數計數器的值就會減一。

  • 如果在線程在調用await函數之前,計數器的值已經小於或等於0時,調用await函數的線程就不會阻塞,直接放行。
  • 如果在線程在調用await函數之前,計數器的值大於0時,調用await函數的線程就會被阻塞,當有其他線程將計數器的值降低為0時,那麼這個將計數器降低為0線程就需要使用condition.signalAll()函數將其他所有被await阻塞的函數喚醒。
  • 線程如果想阻塞自己的話可以使用函數condition.await(),如果某個線程在進入臨界區之後達到了喚醒其他線程的條件,我們則可以使用函數condition.signalAll()喚醒所有被函數await阻塞的線程。

上面的規則已經將CountDownLatch的整體功能描述清楚了,為了能夠將代碼解釋清楚,我將對應的文字解釋放在了代碼當中:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCountDownLatch {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int curValue;

    public MyCountDownLatch(int targetValue) {
        // 我們需要有一個變量去保存計數器的值
        this.curValue = targetValue;
    }

    public void countDown() {
        // curValue 是一個共享變量
        // 我們需要用鎖保護起來
        // 因此每次只有一個線程進入 lock 保護
        // 的代碼區域
        lock.lock();
        try {
            // 每次執行 countDown 計數器都需要減一
            // 而且如果計數器等於0我們需要喚醒哪些被
            // await 函數阻塞的線程
            curValue--;
            if (curValue <= 0)
                condition.signalAll();
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }

    public void await() {
        lock.lock();
        try {
            // 如果 curValue 的值大於0
            // 則說明 countDown 調用次數還不夠
            // 需要將線程掛起 否則直接放行
            if (curValue > 0)
                // 使用條件變量 condition 將線程掛起
                condition.await();
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }
}

可以使用下面的代碼測試我們自己寫的CountDownLatch

public static void main(String[] args) throws InterruptedException {
    MyCountDownLatch latch = new MyCountDownLatch(5);
    for (int i = 0; i < 3; i++) {
        new Thread(() -> {
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + "countDown執行完成");
        }).start();
    }

    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                latch.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() +  "latch執行完成");
        }).start();
    }
}

CyclicBarrier

CyclicBarrier有一個路障(同步點),所有的線程到達路障之後都會被阻塞,當被阻塞的線程個數達到指定的數目的時候,就需要對指定數目的線程進行放行。

  • CyclicBarrier當中會有一個數據threadCount,表示在路障需要達到這個threadCount個線程的時候才進行放行,而且需要放行threadCount個線程,這裡我們可以循環使用函數condition.signal()去喚醒指定個數的線程,從而將他們放行。如果線程需要將自己阻塞住,可以使用函數condition.await()
  • CyclicBarrier當中需要有一個變量currentThreadNumber,用於記錄當前被阻塞的線程的個數。
  • 用戶還可以給CyclicBarrier傳入一個Runnable對象,當放行的時候需要執行這個Runnable對象,你可以新開一個線程去執行這個Runnable對象,或者讓喚醒其他線程的這個線程執行Runnable對象。

根據上面的CyclicBarrier要求,寫出的代碼如下(分析和解釋在注釋當中):

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MyCyclicBarrier {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int threadCount;
    private int currentThreadNumber;
    private Runnable runnable;

    public MyBarrier(int count) {
        threadCount = count;
    }

    /**
     * 允許傳入一個 runnable 對象
     * 當放行一批線程的時候就執行這個 runnable 函數
     * @param count
     * @param runnable
     */
    public MyBarrier(int count, Runnable runnable) {
        this(count);
        this.runnable = runnable;
    }
    
    public void await() {
        lock.lock();
        currentThreadNumber++;
        try {
            // 如果阻塞的線程數量不到 threadCount 需要進行阻塞
            // 如果到了需要由這個線程喚醒其他線程
            if (currentThreadNumber == threadCount) {
                // 放行之後需要重新進行計數
                // 因為放行之後 condition.await();
                // 阻塞的線程個數為 0
                currentThreadNumber = 0;
                if (runnable != null) {
                    new Thread(runnable).start();
                }
                // 喚醒 threadCount - 1 個線程 因為當前這個線程
                // 已經是在運行的狀態 所以只需要喚醒 threadCount - 1
                // 個被阻塞的線程
                for (int i = 1; i < threadCount; i++)
                    condition.signal();
            }else {
                // 如果數目還沒有達到則需要阻塞線程
                condition.await();
            }
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }

}

下面是測試我們自己寫的路障的代碼:

public static void main(String[] args) throws InterruptedException {
    MyCyclicBarrier barrier = new MyCyclicBarrier(5, () -> {
        System.out.println(Thread.currentThread().getName() + "開啟一個新線程");
        for (int i = 0; i < 1; i++) {
            System.out.println(i);
        }
    });

    for (int i = 0; i < 5; i++) {
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "進入阻塞");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            barrier.await();
            System.out.println(Thread.currentThread().getName() + "阻塞完成");
        }).start();
    }
}

Semaphore

Semaphore可以控制執行某一段臨界區代碼的線程數量,在Semaphore當中會有兩個計數器semCountcurCount

  • semCount表示可以執行臨界區代碼的線程的個數。
  • curCount表示正在執行臨界區代碼的線程的個數。

這個工具實現起來也並不複雜,具體分析都在注釋當中:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MySemaphore {

    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    private int semCount;
    private int curCount;

    public MySemaphore(int semCount) {
        this.semCount = semCount;
    }

    public void acquire() {
        lock.lock();
        try {
            // 正在執行臨界區代碼的線程個數加一
            curCount++;
            // 如果線程個數大於指定的能夠執行的線程個數
            // 需要將當前這個線程阻塞起來
            // 否則直接放行
            if (curCount > semCount) {
                condition.await();
            }
        }catch (Exception ignored) {}
        finally {
            lock.unlock();
        }
    }

    public void release() {
        lock.lock();
        try {
            // 線程執行完臨界區的代碼
            // 將要離開臨界區 因此 curCount 
            // 需要減一
            curCount--;
            // 如果有線程阻塞需要喚醒被阻塞的線程
            // 如果沒有被阻塞的線程 這個函數執行之後
            // 對結果也不會產生影響 因此在這裡不需要進行
            // if 判斷
            condition.signal();
            // signal函數只對在調用signal函數之前
            // 被await函數阻塞的線程產生影響 如果
            // 某個線程調用 await 函數在 signal 函數
            // 執行之後,那麼前面那次 signal 函數調用
            // 不會影響後面這次 await 函數
        }catch (Exception ignored){}
        finally {
            lock.unlock();
        }
    }
}

使用下面的代碼測試我們自己寫的MySemaphore

public static void main(String[] args) {
    MySemaphore mySemaphore = new MySemaphore(5);
    for (int i = 0; i < 10; i++) {
        new Thread(() -> {
            mySemaphore.acquire();
            System.out.println(Thread.currentThread().getName() + "已經進入臨界區");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mySemaphore.release();
            System.out.println(Thread.currentThread().getName() + "已經離開臨界區");
        }).start();
    }
}

總結

在本文當中主要給大家介紹了三個在並發當中常用的工具類該如何使用,然後介紹了我們自己實現三個工具類的細節,其實主要是利用條件變量實現的,因為它可以實現線程的阻塞和喚醒,其實只要大家了解條件變量的使用方法,和三種工具的需求大家也可以自己實現一遍。

以上就是本文所有的內容了,希望大家有所收穫,我是LeHung,我們下期再見!!!(記得點贊收藏哦!)


更多精彩內容合集可訪問項目://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,了解更多計算機(Java、Python、計算機系統基礎、算法與數據結構)知識。