並發編程之執行緒池原理

  • 2020 年 3 月 10 日
  • 筆記

點擊上方疾風先生可以訂閱哦

執行緒池作用

使用背景

  • 在並發大量非同步任務處理程式中,每執行一個任務就需要創建一個執行緒,同時任務執行完畢之後需要將執行緒銷毀.我們知道JVM創建執行緒的時候需要為其分配執行緒棧空間以及一些初始化操作,同時銷毀的過程需要回收執行緒棧空間並由gc釋放資源,期間都需要耗費一定的時間,因此一個任務的最終執行時間=創建執行緒newTime + 程式執行excuteTime + 執行緒銷毀的gcTime,如果期間newTime+gcTime > excuteTime,那麼這時候執行任務創建執行緒在程式中就顯得十分不划算
  • 執行緒執行需要JVM分配執行緒棧空間,需要從系統記憶體申請,如果創建的執行緒過多,那麼就容易造成記憶體空間不夠用導致記憶體溢出,默認的執行緒棧空間大小是1M
  • 執行緒是由作業系統的CPU進行調度,因此並發多執行緒執行時CPU需要分配時間片並發執行執行緒,也就是執行緒並發執行是需要來回切換CPU的context,嚴重影響性能
  • 並發環境下,如果創建的執行緒很多,增加對執行緒的維護和管理的困難

作用

  • 運用資源重複利用的思維,我們建立一個「池」的概念,多任務非同步執行通過執行緒池實現執行緒復用,利用池化技術來分配和管理執行緒的使用,避免執行緒頻繁創建和銷毀消耗更多的時間,提高並發執行效率
  • 其次,通過執行緒池我們可以控制執行緒的數量,可以根據指定的策略來管理執行緒,比如任務過多,執行緒處理不過來,可以分配新的執行緒,當執行緒數量達到上限時,可以自定義策略管理任務,要麼是放入阻塞隊列中等待執行緒消費完成再繼續,要麼直接丟棄任務,相比單個執行緒處理方式,靈活性更大,也容易管理
  • 最後,由於池可回收執行緒資源,可以避免無限制創建執行緒,能夠降低CPU資源的消耗

執行緒池API

執行緒池介面API

  • 執行緒池核心介面與實現類類圖
  • Executor & ExecutorService介面核心方法
// Executor.java  // 從執行緒池中分配一個執行緒執行任務  void execute(Runnable command);    // ExecutorService.java  // 關閉執行緒池,會將已提交的任務執行完畢再關閉執行緒池,不接受新的提交任務  void shutdown();    // 立即關閉,不論是否有任務正在執行還是已提交未執行,都立即退出jvm不再執行,
  • ScheduledExecutorService介面方法
// 主要用於處理定時任務  //ScheduledExecutorService.java  // 創建並執行一個一次性定時任務,指定在 delay&unit(時長+時間單位) 時間點將會執行  public ScheduledFuture<?> schedule(Runnable command,                                     long delay, TimeUnit unit);  public <V> ScheduledFuture<V> schedule(Callable<V> callable,                                     long delay, TimeUnit unit)      // 下面兩個方法主要是創建並執行一個周期性任務,如果發生異常會退出程式  // 區別在於  // scheduleAtFixedRate執行周期任務之後,如果任務超過指定的周期時間,下一個任務會立刻執行不會等待  // scheduleWithFixedDelay執行任務超過周期時間,仍然會等待delay時間再進行下一個任務的執行  public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,                                                long initialDelay,                                                long period,                                                TimeUnit unit);  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,                                                   long initialDelay,                                                   long delay,                                                   TimeUnit unit);

執行緒池工具類API

  • 執行緒池與執行緒池工具類圖關係
  • 根據類圖可以將執行緒池劃分為定時和非定時
非定時執行緒池實現類: ThreadPoolExecutor & ForkJoinPool & FinalizableDelegatedExecutorService  定時執行緒池實現類:  ScheduledThreadPoolExecutor & DelegatedScheduledExecutorService
  • 執行緒池工具類Executors創建執行緒池的核心方法
// Executors.java  // 創建一個基於無界的鏈表阻塞隊列(LinkedBlockingQueue)且為固定執行緒數為nThreads的執行緒池  // 無界: 隊列沒有指定容器大小,可以不斷添加元素  public static ExecutorService newFixedThreadPool(int nThreads) {}  public static ExecutorService newFixedThreadPool(int nThreads,                                                   ThreadFactory threadFactory){}      // 創建一個基於無界的同步隊列(SynchronousQueue)且執行緒數無限制(0 - Integer.MAX_VALUE)的執行緒池  // 1.該執行緒池核心執行緒數為0且執行緒空閑時間為60s,意味著任務執行完成或者是超時將會銷毀執行緒  // 2.不存儲數據的同步隊列(最多只有一個元素的隊列),僅作為快取作用  // 即等待內部創建工作執行緒完成之後就立即交由執行緒進行消費  // 3.適用於處理任務但是不確定執行緒個數的場景,同時為了防止不斷創建執行緒造成CPU資源消耗過多  // 一般會自定義添加對應的執行緒最大數而不是Integer.MAX_VALUE  public static ExecutorService newCachedThreadPool() {}  public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {}      // 創建一個基於延遲的無界任務隊列且能夠執行定時任務的執行緒池  // 執行緒池的核心數量corePoolSize,最大執行緒數為Integer.MAX_VALUE  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){}  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize,                                                      ThreadFactory threadFactory)      // 創建一個基於延遲的無界任務隊列,能夠執行定時任務且只有一個執行緒的執行緒池  // 當該執行緒池中的執行緒被中斷或者異常退出的時候,執行緒池會新創建一個執行緒繼續執行後續的任務  public static ScheduledExecutorService newSingleThreadScheduledExecutor() {}  public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory){}
  • Executors包含的組件

1) 介面ThreadFactory實現類:藉助內部定義的默認工廠類創建執行緒,並為當前執行緒池中的執行緒命名比較規範的執行緒名稱,有默認執行緒工廠以及帶有ACC控制許可權的執行緒工廠

2) 代理執行緒池類:負責創建執行緒池,即將實現ExecutorService的執行緒池包裝起來代理其完成相應的方法實現,具體實現類FinalizableDelegatedExecutorService是重寫父類方法finalize以便gc調用回收執行緒池,DelegatedScheduledExecutorService是實現定時功能

3) 實現Callable介面的任務類:RunnableAdapter主要實現處理任務task的執行結果返回給主執行緒,PrivilegedCallable以及PrivilegedCallableUsingCurrentClassLoader增加ACC控制許可權設置來執行任務

執行緒池工作原理

執行緒池核心類ThreadPoolExecutor

  • ThreadPoolExecutor類圖
  • ThreadPoolExecutor類下核心組件

1) 工作執行緒Worker:包裝一個執行緒以及任務的工作執行緒,在沒有任務的時候處於等待,可以循環任務,並且實現AQS介面,從類圖中可以看出具備獨佔鎖的功能

2) ThreadFactory:用於創建執行緒的工廠類,並且為執行緒池中的每個執行緒分配更有意義的名稱

3) 任務介面Runnable: 每個任務必須實現的介面,以便於工作執行緒調度任務的執行

4) 任務隊列BlockingQueue: 用於存放沒有處理的任務,相當於緩衝作用

5) 拒絕處理策略RejectedExecutionHandler: 對不滿足執行緒池執行條件的任務採取指定的處理策略

  • RejectedExecutionHandler策略

1) CallerRunsPolicy: 只用獲取當前拒絕的任務的執行緒且執行緒是活躍狀態的時候來執行任務,如果當前執行緒池已經被關閉將不會執行

2) AbortPolicy: 直接拋出異常, 告知當前的任務已經無法加入到阻塞隊列中

3) DiscardPolicy: 不做任何處理,空方法,直接將任務丟棄,不執行

4) DiscardOldestPolicy: 移除隊列中最靠前的一個任務,並執行當前拒絕的任務

  • ThreadPoolExecutor構造方法
public ThreadPoolExecutor(int corePoolSize,                              int maximumPoolSize,                              long keepAliveTime,                              TimeUnit unit,                              BlockingQueue<Runnable> workQueue,                              ThreadFactory threadFactory,                              RejectedExecutionHandler handler){    }
  • ThreadPoolExecutor構造方法參數說明

1) corePoolSize: 執行緒池保持活躍的執行緒數

2) maximumPoolSize: 執行緒池可創建最大的執行緒數,如果隊列為無界隊列,則該設置值沒有效果

3) keepAliveTime: 執行緒池中的執行緒保持活躍的時長, 如果超過時長, 並且執行緒池中的執行緒個數比核心執行緒數多,那麼多出的空閑執行緒將會被回收

4) unit: keepAliveTime的時間單位

5) threadFactory: 藉助執行緒工廠類創建執行緒

6) handler: 當執行緒池中的隊列已滿&執行緒數達到最大值,此時添加的新任務將採取handler策略進行處理

執行緒池執行流程

  • execute方法源程式碼
public void execute(Runnable command) {       if (command == null)           throw new NullPointerException();  // ctl 高三位表示執行緒池狀態,低(Integer.SIZE - 3)位表示執行緒個數,也就是ctl = 執行緒狀態 + 執行緒個數  // 獲取當前池的狀態和執行緒個數的組合值       int c = ctl.get();    // 當前執行緒的個數是否小於核心執行緒數       if (workerCountOf(c) < corePoolSize) {       	 // 創建新的執行緒執行任務,會通過加鎖的方式保存工作執行緒,如果成功則執行任務           if (addWorker(command, true))               return;           c = ctl.get();       }       // 判斷當前執行緒池的狀態是否為Running,是則將任務加入阻塞隊列中,說明上述的corePool不夠用       if (isRunning(c) && workQueue.offer(command)) {           int recheck = ctl.get();           if (! isRunning(recheck) && remove(command))           	 // 執行緒池已經shutdown並且不接受當前的任務,交由指定的策略處理               reject(command);           else if (workerCountOf(recheck) == 0)           	// 執行緒池已經沒有執行緒,由於任務已經加入阻塞隊列,這裡只需要新創建一個執行緒               addWorker(null, false);       }       // 隊列滿,從執行緒池中創建新執行緒後處理任務,如果創建失敗則執行策略       else if (!addWorker(command, false))           reject(command);   }
  • execute核心執行流程
  • execute核心流程小結 1) 如果當前執行緒池中的執行緒數num < corePoolSize, 則創建新執行緒執行任務,需要獲取全局鎖 2) 如果當前執行緒池中的執行緒數num >= corePoolSize, 則將任務加入BlockingQueue中 3) 如果BlockingQueue已經滿且執行緒數num < maximumPoolSize, 那麼創建新的執行緒來執行任務,需要獲取全局鎖 4) 如果BlockingQueue已經滿且執行緒數num >= maximumPoolSize,新加入的任務將交由指定handler策略處理,默認採取的策略是拒絕處理
  • ThreadPoolExecutor內部執行示意圖
  • ThreadPoolExecutor示意圖說明

1) 主執行緒提交任務給執行緒池之後,執行緒池的corePool未滿則創建執行緒執行任務(1,2)

2) 若corePool已經滿了,則將任務提交到阻塞隊列中(3)

3) 任務添加到阻塞隊列之後將會進行雙重檢測,如果執行緒池關閉,則移除當前任務並執行拒絕策略處理任務返回主執行緒中,若執行緒池未關閉,再次檢測corePool的執行緒個數,如果沒有執行緒則創建新的執行緒但不執行任務(4.1&4.2)

4) 若隊列已經滿了,在創建新執行緒過程中會去檢測隊列容量,corePool和maxPool的數量,如果池中執行緒數滿足< max就會創建新執行緒並執行任務,否則說明已經達到maxPool,創建失敗並將任務移除到拒絕策略執行返回給主執行緒(5,6)

5) 在非corePool下的執行緒,若存在空閑執行緒超過單位unit的keepalive的時間,將銷毀執行緒

6) 同時對於corePool下的空閑執行緒,將會從阻塞隊列中獲取任務並執行任務

7) 從執行緒創建流程知道,添加的工作執行緒將由一個基於HashSet的集合來維護,只有添加到set成功之後才會執行任務,如果檢測到工作執行緒已經是啟動過了,將不會再次添加到set集合中

執行緒池狀態

  • 狀態程式碼
// 存儲執行緒池狀態以及工作執行緒個數(RUNNING, 0)  // rs表示狀態,wc表示工作執行緒個數  private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  private static final int COUNT_BITS = Integer.SIZE - 3;  private static final int CAPACITY   = (1 << COUNT_BITS) - 1;    // runState is stored in the high-order bits  // 狀態存儲在對應數值的高三位  // 接收新任務並能處理隊列里的任務  private static final int RUNNING    = -1 << COUNT_BITS;  // 不再接收新任務但能處理隊列里的任務  private static final int SHUTDOWN   =  0 << COUNT_BITS;  // 將會在處理任務過程被中斷,不再接收和處理隊列的任務  private static final int STOP       =  1 << COUNT_BITS;  // 所有任務已經執行完畢,工作執行緒個數為0且在調用terminated()之前的狀態  private static final int TIDYING    =  2 << COUNT_BITS;  // terminated()執行之後的狀態  private static final int TERMINATED =  3 << COUNT_BITS;    // Packing and unpacking ctl  // 獲取當前執行緒池狀態  private static int runStateOf(int c)     { return c & ~CAPACITY; }  // 獲取當前執行緒池的工作執行緒個數  private static int workerCountOf(int c)  { return c & CAPACITY; }  private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 狀態變化圖

執行緒池監控

  • getLargestPoolSize(): 返回核心執行緒池中當前最大的執行緒數,與workers的集合大小一致,表示當前執行緒池中的執行緒數量
// 核心程式碼  addWorker(Runnable firstTask, boolean core){  // ..  // 自旋檢測執行緒個數與隊列容量,corePool,maxPool的比較  	mainLock.lock();  int s = workers.size();      if (s > largestPoolSize)          largestPoolSize = s;     // ...     mainLock.unlock();  }  public int getLargestPoolSize() {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          return largestPoolSize;      } finally {          mainLock.unlock();      }  }
  • getPoolSize(): 返回當前執行緒池狀態不為TERMINATED(>=TIDYING)時的執行緒數量
public int getPoolSize() {       final ReentrantLock mainLock = this.mainLock;       mainLock.lock();       try {           // Remove rare and surprising possibility of           // isTerminated() && getPoolSize() > 0           return runStateAtLeast(ctl.get(), TIDYING) ? 0               : workers.size();       } finally {           mainLock.unlock();       }   }
  • getActiveCount(): 獲取當前正在處理任務的執行緒數量
public int getActiveCount() {      final ReentrantLock mainLock = this.mainLock;      mainLock.lock();      try {          int n = 0;          for (Worker w : workers)          	// worker是基於AQS實現的,內部啟動執行任務的時候會加鎖              if (w.isLocked())                  ++n;          return n;      } finally {          mainLock.unlock();      }  }
  • getTaskCount(): 獲取執行緒池中所有的任務,包括已經執行完成的任務+正在執行的任務+在阻塞隊列中未執行的任務個數
public long getTaskCount() {       final ReentrantLock mainLock = this.mainLock;       mainLock.lock();       try {           long n = completedTaskCount;	// 執行緒池執行完成的任務個數           for (Worker w : workers) {               n += w.completedTasks;   // 當前工作執行緒執行完成的任務個數               if (w.isLocked())                   ++n;           }           return n + workQueue.size();       } finally {           mainLock.unlock();       }   }
  • getCompletedTaskCount(): 獲取當前執行緒池中已經完成的任務個數(執行緒池完成的任務 + 工作執行緒完成的任務)
public long getCompletedTaskCount() {       final ReentrantLock mainLock = this.mainLock;       mainLock.lock();       try {           long n = completedTaskCount;           for (Worker w : workers)               n += w.completedTasks;           return n;       } finally {           mainLock.unlock();       }   }

你好,我是疾風先生,先後從事外企和互聯網大廠的java和python工作, 記錄並分享個人技術棧,歡迎關注我的公眾號,致力於做一個有深度,有廣度,有故事的工程師,歡迎成長的路上有你陪伴,關注後回復greek可添加私人微信,歡迎技術互動和交流,謝謝!