自己動手寫乞丐版線程池

自己動手寫乞丐版線程池

前言

在上篇文章線程池的前世今生當中我們介紹了實現線程池的原理,在這篇文章當中我們主要介紹實現一個非常簡易版的線程池,深入的去理解其中的原理,麻雀雖小,五臟俱全。

線程池的具體實現

線程池實現思路

任務保存到哪裡?

在上一篇文章線程池的前世今生當中我們具體去介紹了線程池當中的原理。在線程池當中我們有很多個線程不斷的從任務池(用戶在使用線程池的時候不斷的使用execute方法將任務添加到線程池當中)裏面去拿任務然後執行,現在需要思考我們應該用什麼去實現任務池呢?

答案是阻塞隊列,因為我們需要保證在多個線程往任務池裏面加入任務的時候並發安全,JDK已經給我們提供了這樣的數據結構——BlockingQueue,這個是一個並發安全的阻塞隊列,他之所以叫做阻塞隊列,是因為我們可以設置隊列當中可以容納數據的個數,當加入到隊列當中的數據超過這個值的時候,試圖將數據加入到阻塞隊列當中的線程就會被掛起。當隊列當中為空的時候,試圖從隊列當中取出數據的線程也會被掛起。

線程的設計

在我們自己實現的線程池當中我們定一個Worker類去不斷的從任務池當中取出任務,然後進行執行。在我們自己定義的worker當中還需要有一個變量isStopped表示線程是否停止工作。同時在worker當中還需要保存當前是哪個線程在執行任務,因此在我們自己設計的woker類當中還需要有一個thisThread變量,保存正在執行任務的線程,因此worker的整體設計如下:

package cscore.concurrent.java.threadpool;

import java.util.concurrent.BlockingQueue;

public class Worker implements Runnable {

  private Thread thisThread; // 表示正在執行任務的線程
  private BlockingQueue<Runnable> taskQueue; // 由線程池傳遞過來的任務隊列
  private volatile boolean isStopped; // 表示 worker 是否停止工作 需要使用 volatile 保證線程之間的可見性

  public Worker(BlockingQueue taskQueue) { // 這個構造方法是在線程池的實現當中會被調用
    this.taskQueue = taskQueue;
  }

  // 線程執行的函數
  @Override
  public void run() {
    thisThread = Thread.currentThread(); // 獲取執行任務的線程
    while (!isStopped) { // 當線程沒有停止的時候就不斷的去任務池當中取出任務
      try {
        Runnable task = taskQueue.take(); // 從任務池當中取出任務 當沒有任務的時候線程會被這個方法阻塞
        task.run(); // 執行任務 任務就是一個 Runnable 對象
      } catch (InterruptedException e) {
        // do nothing
        // 這個地方很重要 你有沒有思考過一個問題當任務池當中沒有任務的時候 線程會被阻塞在 take 方法上
        // 如果我們後面沒有任務提交拿他就會一直阻塞 那麼我們該如何喚醒他呢
        // 答案就在下面的函數當中 調用線程的 interruput 方法 那麼take方法就會產生一個異常 然後我們
        // 捕獲到一異常 然後線程退出
      }
    }
  }

  public synchronized void stopWorker() {
    if (isStopped) {
      throw new RuntimeException("thread has been interrupted");
    }
    isStopped = true;
    thisThread.interrupt(); // 中斷線程產生異常
  }

  public synchronized boolean isStopped() {
    return isStopped;
  }
}

線程池的參數

在我們自己實現的線程池當中,我們只需要定義兩個參數一個是線程的個數,另外一個是阻塞隊列(任務池)當中最大的任務個數。在我們自己實現的線程池當中還需要有一個變量isStopped表示線程池是否停止工作了,因此線程池的初步設計大致如下:

  private BlockingQueue taskQueue; // 任務池
  private volatile boolean isStopped; // 
  private final List<Worker> workers = new ArrayList<>();// 保存所所有的執行任務的線程

  public ThreadPool(int numThreads, int maxTasks) {
    this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {
      workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    // 這裡產生線程 然後啟動線程
    for (Worker worker : workers) {
      new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

線程池實現代碼

在上文當中我們大致設計的線程池的初步結構,從上面的結果可以看出當我們造一個ThreadPool對象的時候會產生指定線程的數目線程並且啟動他們去執行任務,現在我們還需要設計的就是如果關閉線程!我們在關閉線程的時候還需要保證所有的任務都被執行完成然後才關閉所有的線程,再退出,我們設計這個方法為shutDown。除此之外我們還設計一個函數可以強制退出,不用執行所有的任務了,就直接退出,這個方法為stop。整個線程池實現的代碼如下:

package cscore.concurrent.java.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ThreadPool {

  private BlockingQueue taskQueue;
  private volatile boolean isStopped;
  private final List<Worker> workers = new ArrayList<>();

  public ThreadPool(int numThreads, int maxTasks) {
    this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {
      workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    for (Worker worker : workers) {
      new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

  // 下面這個方法是向線程池提交任務
  public void execute(Runnable runnable) throws InterruptedException {
    if (isStopped) {
      // 如果線程池已經停下來了,就不在向任務隊列當中提交任務了
      System.err.println("thread pool has been stopped, so quit submitting task");
      return;
    }
    taskQueue.put(runnable);
  }

  // 強制關閉線程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }

  public synchronized void shutDown() {
    // 先表示關閉線程池 線程就不能再向線程池提交任務
    isStopped = true;
    // 先等待所有的任務執行完成再關閉線程池
    waitForAllTasks();
    stop();
  }

  private void waitForAllTasks() {
    // 當線程池當中還有任務的時候 就不退出循環
    while (taskQueue.size() > 0)
      Thread.yield();
  }
}

線程池測試代碼

package cscore.concurrent.java.threadpool;

public class TestPool {

  public static void main(String[] args) throws InterruptedException {
    ThreadPool pool = new ThreadPool(3, 1024);

    for (int i = 0; i < 10; i++) {
      int tmp = i;
      pool.execute(() -> {
        System.out.println(Thread.currentThread().getName() + " say hello " + tmp);
      });
    }
    pool.shutDown();
  }
}

上面的代碼輸出結果:

ThreadPool-2-thread say hello 1
ThreadPool-2-thread say hello 3
ThreadPool-2-thread say hello 4
ThreadPool-2-thread say hello 5
ThreadPool-2-thread say hello 6
ThreadPool-2-thread say hello 7
ThreadPool-2-thread say hello 8
ThreadPool-2-thread say hello 9
ThreadPool-3-thread say hello 2
ThreadPool-1-thread say hello 0

從上面的結果來看確實實現了線程池的效果。

雜談

可能你會有疑問,當我們調用 interrupt的時候是如何產生異常的,我們仔細看一個阻塞隊列的實現。在ArrayBlockingQueue當中take方法實現如下:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

在這個方法當中調用的是鎖的lock.lockInterruptibly();方法,當調用這個方法的時候線程是可以被interrupt方法中斷的,然後會拋出InterruptedException異常。

總結

在本篇文章當中我們主要實現了一個乞丐版的線程池,這個線程池離JDK給我們提供的線程池還是有一點距離,JDK給我們提供給的線程池還有很多其他的參數,我們將在後續的幾篇文章當中繼續向JDK給我們提供的線程池靠近,直至實現一個盜版的JDK的線程池。本篇文章的代碼在下面的鏈接當中也可以訪問。


以上就是本篇文章的所有內容了,我是LeHung,我們下期再見!!!更多精彩內容合集可訪問項目://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,了解更多計算機(Java、Python、計算機系統基礎、算法與數據結構)知識。