徹底了解線程池的原理——40行從零開始自己寫線程池

徹底了解線程池的原理——40行從零開始自己寫線程池

前言

在我們的日常的編程當中,並發是始終離不開的主題,而在並發多線程當中,線程池又是一個不可規避的問題。多線程可以提高我們並發程序的效率,可以讓我們不去頻繁的申請和釋放線程,這是一個很大的花銷,而在線程池當中就不需要去頻繁的申請線程,他的主要原理是申請完線程之後並不中斷,而是不斷的去隊列當中領取任務,然後執行,反覆這樣的操作。在本篇文章當中我們主要是介紹線程池的原理,因此我們會自己寫一個非常非常簡單的線程池,主要幫助大家理解線程池的核心原理!!!

線程池給我們提供的功能

我們首先來看一個使用線程池的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo01 {

  public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 100; i++) {
      pool.execute(new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + " print " + i);
          }
        }
      });
    }
  }
}

在上面的例子當中,我們使用Executors.newFixedThreadPool去生成來一個固定線程數目的線程池,在上面的代碼當中我們是使用5個線程,然後通過execute方法不斷的去向線程池當中提交任務,大致流程如下圖所示:

線程池通過execute函數不斷的往線程池當中的任務隊列加入任務,而線程池當中的線程會不斷的從任務隊列當中取出任務,然後進行執行,然後繼續取任務,繼續執行….,線程的執行過程如下:

while (true) {
  Runnable runnable = taskQueue.take(); // 從任務隊列當中取出任務
  runnable.run(); // 執行任務
}

根據上面所談到的內容,現在我們的需求很清晰了,首先我們需要有一個隊列去存儲我們所需要的任務,然後需要開啟多個線程不斷的去任務隊列當中取出任務,然後進行執行,然後重複取任務執行任務的操作。

工具介紹

在我們前面提到的線程池實現的原理當中有一個非常重要的數據結構,就是ArrayBlockingQueue阻塞隊列,它是一個並發安全的數據結構,我們首先先簡單介紹一下這個數據結構的使用方法。(如果你想深入了解阻塞隊列的實現原理,可以參考這篇文章JDK數組阻塞隊列源碼剖析

我們主要用的是ArrayBlockingQueue的下面兩個方法:

  • put函數,這個函數是往線程當中加入數據的。我們需要了解的是,如果一個線程調用了這個函數往隊列當中加入數據,如果此時隊列已經滿了則線程需要被掛起,如果沒有滿則需要將數據加入到隊列當中,也就是將數據存儲到數組當中。
  • take函數,從隊列當中取出數據,但是當隊列為空的時候需要將調用這個方法的線程阻塞。當隊列當中有數據的時候,就可以從隊列當中取出數據。
  • 需要注意的是,如果一個線程被上面兩個任何一個線程阻塞之後,可以調用對應線程的interrupt方法終止線程的執行,同時還會拋出一個異常。

下面是一份測試代碼:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class QueueTest {

  public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5); // 隊列的容量為5
    Thread thread = new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        try {
          queue.put(i);
          System.out.println("數據 " + i + "被加入到隊列當中");
        } catch (InterruptedException e) {
          System.out.println("出現了中斷異常");
          // 如果出現中斷異常 則退出 線程就不會一直在 put 方法被掛起了
          return;
        }finally {
        }
      }
    });
    thread.start();
    TimeUnit.SECONDS.sleep(1);
    thread.interrupt();
  }
}

上面代碼輸出結果:

數據 0被加入到隊列當中
數據 1被加入到隊列當中
數據 2被加入到隊列當中
數據 3被加入到隊列當中
數據 4被加入到隊列當中
出現了中斷異常

上面代碼的執行順序是:

線程thread會將0-4這5個數據加入到隊列當中,但是在加入第6個數據的時候,阻塞隊列已經滿了,因此在加入數據的時候線程thread會被阻塞,然後主線程在休息一秒之後中斷了線程thread,然後線程thread發生了中斷異常,然後被捕獲進入catch代碼塊,然後函數返回,線程thread就不會一直被阻塞了,這一點在我們後面寫線程池非常重要!!!

Worker設計

在前文當中我們已經提到了我們的線程需要不斷的去任務隊列裏面取出任務然後執行,我們設計一個Worker類去做這件事!

  • 首先在類當中肯定需要有一個線程池的任務隊列,因為worker需要不斷的從阻塞隊列當中取出任務進行執行。
  • 我們用一個isStopped變量表示線程是否需要終止了,也就是線程池是否需要關閉,如果線程池需要關閉了,那麼線程也應該停止了。
  • 我們還需要有一個變量記錄執行任務的線程,因為當我們需要關閉線程池的時候需要等待任務隊列當中所有的任務執行完成,那麼當所有的任務都執行執行完成的時候,隊列肯定是空的,而如果這個時候有線程還去取任務,那麼肯定會被阻塞,前面已經提到了ArrayBlockingQueue的使用方法了,我們可以使用這個線程的interrupt的方法去中斷這個線程的執行,這個線程會出現異常,然後這個線程捕獲這個異常就可以退出了,因此我們需要知道對那個線程執行interrupt方法!

Worker實現的代碼如下:

import java.util.concurrent.ArrayBlockingQueue;

public class Worker implements Runnable {

  // 用於保存任務的隊列
  private ArrayBlockingQueue<Runnable> tasks;
  // 線程的狀態 是否終止
  private volatile boolean isStopped;

  // 保存執行 run 方法的線程
  private volatile Thread thisThread;

  public Worker(ArrayBlockingQueue<Runnable> tasks) {
    // 這個參數是線程池當中傳入的
    this.tasks = tasks;
  }

  @Override
  public void run() {
    thisThread = Thread.currentThread();
    while (!isStopped) {
      try {
        Runnable task = tasks.take();
        task.run();
      } catch (InterruptedException e) {
        // do nothing
      }
    }
  }
	// 注意是其他線程調用這個方法 同時需要注意是 thisThread 這個線程在執行上面的 run 方法
  // 其他線程調用 thisThread 的 interrupt 方法之後 thisThread 會出現異常 然後就不會一直阻塞了
  // 會判斷 isStopped 是否為 true 如果為 true 的話就可以退出 while 循環了
  public void stop() {
    isStopped = true;
    thisThread.interrupt(); // 中斷線程 thisThread
  }

  public boolean isStopped(){
    return isStopped;
  }
}

線程池設計

  • 首先線程池需要可以指定有多少個線程,阻塞隊列的最大長度,因此我們需要有這兩個參數。
  • 線程池肯定需要有一個隊列去存放通過submit函數提交的任務。
  • 需要有一個變量存儲所有的woker,因為線程池關閉的時候需要將這些worker都停下來,也就是調用worker的stop方法。
  • 需要有一個shutDown函數表示關閉線程池。
  • 需要有一個函數能夠停止所有線程的執行,因為關閉線程池就是讓所有線程的工作停下來。

線程池實現代碼:

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;

public class MyFixedThreadPool {
  // 用於存儲任務的阻塞隊列
  private ArrayBlockingQueue<Runnable> taskQueue;

  // 保存線程池當中所有的線程
  private ArrayList<Worker> threadLists;

  // 線程池是否關閉
  private boolean isShutDown;

  // 線程池當中的線程數目
  private int numThread;

  public MyFixedThreadPool(int i) {
    this(Runtime.getRuntime().availableProcessors() + 1, 1024);
  }

  public MyFixedThreadPool(int numThread, int maxTaskNumber) {
    this.numThread = numThread;
    taskQueue = new ArrayBlockingQueue<>(maxTaskNumber); // 創建阻塞隊列
    threadLists = new ArrayList<>();
    // 將所有的 worker 都保存下來
    for (int i = 0; i < numThread; i++) {
      Worker worker = new Worker(taskQueue);
      threadLists.add(worker);
    }
    for (int i = 0; i < threadLists.size(); i++) {
      new Thread(threadLists.get(i),
              "ThreadPool-Thread-" + i).start(); // 讓worker開始工作
    }
  }
	
  // 停止所有的 worker 這個只在線程池要關閉的時候才會調用
  private void stopAllThread() {
    for (Worker worker : threadLists) {
      worker.stop(); // 調用 worker 的 stop 方法 讓正在執行 worker 當中 run 方法的線程停止執行
    }
  }

  public void shutDown() {
    // 等待任務隊列當中的任務執行完成
    while (taskQueue.size() != 0) {
      // 如果隊列當中還有任務 則讓出 CPU 的使用權
      Thread.yield();
    }
    // 在所有的任務都被執行完成之後 停止所有線程的執行
    stopAllThread();
  }

  public void submit(Runnable runnable) {
    try {
      taskQueue.put(runnable); // 如果任務隊列滿了, 調用這個方法的線程會被阻塞
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

測試代碼:

public class Test {

  public static void main(String[] args) {
    MyFixedThreadPool pool = new MyFixedThreadPool(5, 1024); // 開啟5個線程 任務隊列當中最多只能存1024個任務
    for (int i = 0; i < 1000000; i++) {
      pool.submit(() -> {
        System.out.println(Thread.currentThread().getName()); // 提交的任務就是打印線程自己的名字
      });
    }
    pool.shutDown();
  }
}

上面的代碼是可以正常執行並且結束的,這個輸出太長了這裡只列出部分輸出結果:

ThreadPool-Thread-0
ThreadPool-Thread-4
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-1
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-4

從上面的輸出我們可以看見線程池當中只有5個線程,這5個線程在不斷從隊列當中取出任務然後執行,因為我們可以看到同一個線程的名字輸出了多次。

總結

在本篇文章當中主要介紹了線程池的原理,以及我們應該去如何設計一個線程池,同時也介紹了在阻塞隊列當中一個非常重要的數據結構ArrayBlockingQueue的使用方法。

  • 線程池當中有一個阻塞隊列去存放所有被提交到線程池當中的任務。
  • 所有的Worker會不斷的從任務隊列當中取出任務然後執行。
  • 線程池的shutDown方法其實比較難思考該怎麼實現的,首先在我們真正關閉線程池之前需要將任務隊列當中所有的任務執行完成,然後將所有的線程停下來。
  • 在所有的任務執行完成之後,可能有的線程就會阻塞在take方法上(從隊列當中取數據的方法,如果隊列為空線程會阻塞),好在ArrayBlockingQueue在實現的時候就考慮到了這個問題,只需要其他線程調用了這個被阻塞線程的interrupt方法的話,線程就可以通過捕獲異常恢復執行,然後判斷isStopped,如果需要停止了就跳出while循環,這樣的話我們就可以完成所有線程的停止操作了。

以上就是本篇文章的所有內容了,我是LeHung,我們下期再見!!!更多精彩內容合集可訪問項目://github.com/Chang-LeHung/CSCore

關注公眾號:一無是處的研究僧,了解更多計算機(Java、Python、計算機系統基礎、算法與數據結構)知識。