自己動手寫乞丐版執行緒池

自己動手寫乞丐版執行緒池

前言

在上篇文章執行緒池的前世今生當中我們介紹了實現執行緒池的原理,在這篇文章當中我們主要介紹實現一個非常簡易版的執行緒池,深入的去理解其中的原理,麻雀雖小,五臟俱全。

執行緒池的具體實現

執行緒池實現思路

任務保存到哪裡?

在上一篇文章執行緒池的前世今生當中我們具體去介紹了執行緒池當中的原理。在執行緒池當中我們有很多個執行緒不斷的從任務池(用戶在使用執行緒池的時候不斷的使用execute方法將任務添加到執行緒池當中)裡面去拿任務然後執行,現在需要思考我們應該用什麼去實現任務池呢?

答案是阻塞隊列,因為我們需要保證在多個執行緒往任務池裡面加入任務的時候並發安全,JDK已經給我們提供了這樣的數據結構——BlockingQueue,這個是一個並發安全的阻塞隊列,他之所以叫做阻塞隊列,是因為我們可以設置隊列當中可以容納數據的個數,當加入到隊列當中的數據超過這個值的時候,試圖將數據加入到阻塞隊列當中的執行緒就會被掛起。當隊列當中為空的時候,試圖從隊列當中取出數據的執行緒也會被掛起。

執行緒的設計

在我們自己實現的執行緒池當中我們定一個Worker類去不斷的從任務池當中取出任務,然後進行執行。在我們自己定義的worker當中還需要有一個變數isStopped表示執行緒是否停止工作。同時在worker當中還需要保存當前是哪個執行緒在執行任務,因此在我們自己設計的woker類當中還需要有一個thisThread變數,保存正在執行任務的執行緒,因此worker的整體設計如下:

package cscore.concurrent.java.threadpool;

import java.util.concurrent.BlockingQueue;

public class Worker implements Runnable {

  private Thread thisThread; // 表示正在執行任務的執行緒
  private BlockingQueue<Runnable> taskQueue; // 由執行緒池傳遞過來的任務隊列
  private volatile boolean isStopped; // 表示 worker 是否停止工作 需要使用 volatile 保證執行緒之間的可見性

  public Worker(BlockingQueue taskQueue) { // 這個構造方法是在執行緒池的實現當中會被調用
    this.taskQueue = taskQueue;
  }

  // 執行緒執行的函數
  @Override
  public void run() {
    thisThread = Thread.currentThread(); // 獲取執行任務的執行緒
    while (!isStopped) { // 當執行緒沒有停止的時候就不斷的去任務池當中取出任務
      try {
        Runnable task = taskQueue.take(); // 從任務池當中取出任務 當沒有任務的時候執行緒會被這個方法阻塞
        task.run(); // 執行任務 任務就是一個 Runnable 對象
      } catch (InterruptedException e) {
        // do nothing
        // 這個地方很重要 你有沒有思考過一個問題當任務池當中沒有任務的時候 執行緒會被阻塞在 take 方法上
        // 如果我們後面沒有任務提交拿他就會一直阻塞 那麼我們該如何喚醒他呢
        // 答案就在下面的函數當中 調用執行緒的 interruput 方法 那麼take方法就會產生一個異常 然後我們
        // 捕獲到一異常 然後執行緒退出
      }
    }
  }

  public synchronized void stopWorker() {
    if (isStopped) {
      throw new RuntimeException("thread has been interrupted");
    }
    isStopped = true;
    thisThread.interrupt(); // 中斷執行緒產生異常
  }

  public synchronized boolean isStopped() {
    return isStopped;
  }
}

執行緒池的參數

在我們自己實現的執行緒池當中,我們只需要定義兩個參數一個是執行緒的個數,另外一個是阻塞隊列(任務池)當中最大的任務個數。在我們自己實現的執行緒池當中還需要有一個變數isStopped表示執行緒池是否停止工作了,因此執行緒池的初步設計大致如下:

  private BlockingQueue taskQueue; // 任務池
  private volatile boolean isStopped; // 
  private final List<Worker> workers = new ArrayList<>();// 保存所所有的執行任務的執行緒

  public ThreadPool(int numThreads, int maxTasks) {
    this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {
      workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    // 這裡產生執行緒 然後啟動執行緒
    for (Worker worker : workers) {
      new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

執行緒池實現程式碼

在上文當中我們大致設計的執行緒池的初步結構,從上面的結果可以看出當我們造一個ThreadPool對象的時候會產生指定執行緒的數目執行緒並且啟動他們去執行任務,現在我們還需要設計的就是如果關閉執行緒!我們在關閉執行緒的時候還需要保證所有的任務都被執行完成然後才關閉所有的執行緒,再退出,我們設計這個方法為shutDown。除此之外我們還設計一個函數可以強制退出,不用執行所有的任務了,就直接退出,這個方法為stop。整個執行緒池實現的程式碼如下:

package cscore.concurrent.java.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ThreadPool {

  private BlockingQueue taskQueue;
  private volatile boolean isStopped;
  private final List<Worker> workers = new ArrayList<>();

  public ThreadPool(int numThreads, int maxTasks) {
    this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {
      workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    for (Worker worker : workers) {
      new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

  // 下面這個方法是向執行緒池提交任務
  public void execute(Runnable runnable) throws InterruptedException {
    if (isStopped) {
      // 如果執行緒池已經停下來了,就不在向任務隊列當中提交任務了
      System.err.println("thread pool has been stopped, so quit submitting task");
      return;
    }
    taskQueue.put(runnable);
  }

  // 強制關閉執行緒池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先表示關閉執行緒池 執行緒就不能再向執行緒池提交任務
    isStopped = true;
    // 先等待所有的任務執行完成再關閉執行緒池
    waitForAllTasks();
    stop();
  }

  private void waitForAllTasks() {
    // 當執行緒池當中還有任務的時候 就不退出循環
    while (taskQueue.size() > 0)
      Thread.yield();
  }
}

執行緒池測試程式碼

package cscore.concurrent.java.threadpool;

public class TestPool {

  public static void main(String[] args) throws InterruptedException {
    ThreadPool pool = new ThreadPool(3, 1024);

    for (int i = 0; i < 10; i++) {
      int tmp = i;
      pool.execute(() -> {
        System.out.println(Thread.currentThread().getName() + " say hello " + tmp);
      });
    }
    pool.shutDown();
  }
}

上面的程式碼輸出結果:

ThreadPool-2-thread say hello 1
ThreadPool-2-thread say hello 3
ThreadPool-2-thread say hello 4
ThreadPool-2-thread say hello 5
ThreadPool-2-thread say hello 6
ThreadPool-2-thread say hello 7
ThreadPool-2-thread say hello 8
ThreadPool-2-thread say hello 9
ThreadPool-3-thread say hello 2
ThreadPool-1-thread say hello 0

從上面的結果來看確實實現了執行緒池的效果。

雜談

可能你會有疑問,當我們調用 interrupt的時候是如何產生異常的,我們仔細看一個阻塞隊列的實現。在ArrayBlockingQueue當中take方法實現如下:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

在這個方法當中調用的是鎖的lock.lockInterruptibly();方法,當調用這個方法的時候執行緒是可以被interrupt方法中斷的,然後會拋出InterruptedException異常。

總結

在本篇文章當中我們主要實現了一個乞丐版的執行緒池,這個執行緒池離JDK給我們提供的執行緒池還是有一點距離,JDK給我們提供給的執行緒池還有很多其他的參數,我們將在後續的幾篇文章當中繼續向JDK給我們提供的執行緒池靠近,直至實現一個盜版的JDK的執行緒池。本篇文章的程式碼在下面的鏈接當中也可以訪問。


以上就是本篇文章的所有內容了,我是LeHung,我們下期再見!!!更多精彩內容合集可訪問項目://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,了解更多電腦(Java、Python、電腦系統基礎、演算法與數據結構)知識。