Java並發之ThreadPoolExecutor源碼解析(二)

ThreadPoolExecutor

ThreadPoolExecutor是ExecutorService的一種實現,可以用若干已經池化的線程執行被提交的任務。使用線程池可以幫助我們限定和整合程序資源,儘可能避免創建新的線程來執行任務從而降低任務調用的開銷,在執行大量異步任務的時候反而能獲得更好的性能。此外,ThreadPoolExecutor還會維護一些統計信息,比如已完成的任務數量。

juc包的作者Doug Lea推薦程序員盡量使用更為便利的Executors類的工廠方法來配置線程池:

  1. Executors.newCachedThreadPool():創建一個線程池,可以根據線程池的需要來創建任務。這個線程池比較適用執行周期較短且量大的異步任務,在調用execute(…)方法時如果線程池中存在閑置的線程,將復用閑置線程,否則創建一個新線程執行任務。如果線程的閑置時間超過60s,線程將被終止並從線程池內移除。因此,該線程池即便空閑時間再長,也不會有資源的消耗。
  2. Executors.newFixedThreadPool(int nThreads):創建一個線程池,nThreads為池內線程的數量,池內最多同時有nThreads個線程並行處理任務,如果有新的任務提交到線程池,會先暫存在線程池中的無邊界任務隊列進行等待,直到有線程可用。如果有線程在執行期間因為錯誤提前終止,線程池將啟動一個新的線程代替原先的線程繼續處理任務隊列中處於等待的任務。除非顯式調用shutdown(),否則線程池中的線程將一直存在。
  3. Executors.newSingleThreadExecutor():創建一個Executor,該Executor使用一個工作線程處理任務。如果線程在執行期間因為錯誤而終止,將啟動一個新的線程代替原先的線程繼續處理無邊界任務隊列中處於等待的任務。隊列中的任務是按順序執行,任何時刻都不會有多個任務處於活躍狀態。與newFixedThreadPool(1)不同,newFixedThreadPool(int nThreads)生成的線程池,可以強轉為ThreadPoolExecutor類型,再調用setCorePoolSize(int corePoolSize)方法設置核心線程數,而newSingleThreadExecutor()的實現類為FinalizableDelegatedExecutorService,無法直接設置核心線程數。

上面三種是較為常見的配置線程池的工廠方法,如果有需要根據業務場景特殊配置線程池的,請看下面的參數:

核心線程數和最大線程數

ThreadPoolExecutor將根據corePoolSize(核心線程數)和maximumPoolSize(最大線程數)設置的邊界自動調整線程池內工作線程的數量(通過getPoolSize()),corePoolSize可以通過getCorePoolSize()、setCorePoolSize(int corePoolSize)獲取和設置核心線程數,maximumPoolSize可以通過getMaximumPoolSize()、setMaximumPoolSize(int maximumPoolSize)獲取和設置最大線程數。當有新的任務提交時,如果工作線程少於核心線程數,將會創建一個新線程來執行該任務,即便其他工作線程處於閑置狀態。如果工作線程多於corePoolSize但少於maximumPoolSize,則當任務隊列滿的時候才會創建新線程。如果corePoolSize和maximumPoolSize數值一樣,則創建一個固定大小的線程池;如果將maximumPoolSize設置為Integer.MAX_VALUE,則線程池可以容納任意數量的並發任務。

按需構造

核心線程只有當有新任務到達時才會創建,但我們可以重寫prestartCoreThread() 或者prestartAllCoreThreads()來預先啟動核心線程。如果在構造一個線程池時,傳入的任務隊列已經存在任務,則需要線程池初始化完畢後,預先啟動線程。

創建新線程

使用ThreadFactory(線程工廠)創建新線程。如果沒有特別指定,則使用Executors.defaultThreadFactory()作為默認的線程工廠,該線程工廠所創建的線程都位於相同的線程組(ThreadGroup)中,線程的優先級都是NORM_PRIORITY,線程守護狀態都為false。通過提供不同線程工廠的實現,你可以修改線程名、線程組、線程優先級和守護狀態等等。

活躍時間

如果線程池中的線程數超過核心線程數,多出的線程如果空閑時間超出keepAliveTime(活躍時間)將會終止,回收不再活躍的線程。當有需要時,新的線程會重新創建。可以通過setKeepAliveTime(long time, TimeUnit unit)動態設置活躍時間。如果time設置為Long.MAX_VALUE,unit設置為TimeUnit.NANOSECONDS,那麼多餘的空閑線程將不會在關閉線程池之前回收。如果調用allowCoreThreadTimeOut(boolean value)傳入的value為true,那麼keepAliveTime將適用於核心線程,如果allowCoreThreadTimeOut為true且keepAliveTime不為0,核心線程的空閑時間超出活躍時間,核心線程也會被回收。

隊列

阻塞隊列(BlockingQueue)允許在獲取元素時陷入等待,直到有元素加入到隊列中。調用阻塞隊列方法時,有些方法不一定馬上返回,可能會在未來某個時刻達成某些條件時返回。阻塞隊列的方法伴隨四種形式:

  1. 拋出異常。
  2. 返回特殊值,null或者false,具體視操作而定。
  3. 調用線程無限期陷入阻塞直到某些條件達成。
  4. 限定阻塞時長。
  拋異常 特殊值 阻塞 超時
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
檢查(獲取但不移除隊列頭部元素) element() peek()    

阻塞隊列不接受null元素,如果調用add、put、offer嘗試添加一個null元素,將會拋出NullPointerException異常,當調用poll操作失敗時也會返回null。阻塞隊列可能有容量限制,無論何時都不能向隊列添加超過剩餘容量的元素,否則只能調用put方法陷入阻塞,直到有剩餘的空間可以容納元素。如果對隊列的容納空間沒有限制,則剩餘容量返回Integer.MAX_VALUE。阻塞隊列的實現一般用於生產者-消費者隊列的場景,此外阻塞隊列還實現了Collection接口,因此,隊列還可以使用remove(x)來移除元素。

阻塞隊列是線程安全的,所有排隊方法的實現都是用內部鎖或者其他並發控制手段來實現原子性的。然而,除非是特殊規定,否則大部分集合操作,如:addAll、containsAll、retainAll 、removeAll不一定要保證原子性。因此,可能出現在調用addAll(c)時,只添加c中一部分的元素就拋出異常。阻塞隊列本質上並不支持關閉的操作,如:close或shutdown,當有需要讓隊列不再接受新元素。如果有這種需要或者特性更傾向於以來隊列的實現。一種常見的策略是生產者往隊列插入具有特殊標識的對象,當消費者使用對象時,會對特殊標識進行解釋。

注意,阻塞隊列允許多個生產者和消費者同時使用,如下:

 class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

  

public interface BlockingQueue<E> extends Queue<E> {
    /**
     * 在不超過隊列容量的情況下插入一個元素將返回true,,如果隊列沒有多餘的空間拋出
     * IllegalStateException異常,當使用容量受限的隊列時最好使用offer。
     *
     * @param e 待添加進隊列的元素
     * @return 返回true代表元素加入隊列成功
     */
    boolean add(E e);

    /**
     * 相比add(E)如果隊列滿時插入元素不報錯,只是返回false。
     *
     * @param e 待添加進隊列的元素
     * @return 返回true代表元素加入隊列成功,隊列滿時無法插入返回false
     */
    boolean offer(E e);

    /**
     * 將一個元素插入到隊列,如果有必要會等待隊列有多餘空間可以插入。如果調用
     * put(E)的線程被中斷,將拋出中斷異常InterruptedException
     *
     * @param e 待添加進隊列的元素
     */
    void put(E e) throws InterruptedException;

    /**
     * 相比offer(E)多了插入元素時陷入等待,如果等待期間隊列依舊
     * 沒有多餘的空間容納元素,則返回false,如果等待期間能插入則返回true。
     * 如果等待期間線程被中斷,則拋出中斷異常InterruptedException。
     *
     * @param e       待添加進隊列的元素
     * @param timeout 等待時長
     * @param unit    等待時長單位
     */
    boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException;

    /**
     * 取出並刪除隊列的頭部元素,如果隊列為空,則會陷入等待,直到隊列有新的元素加入,
     * 如果等待期間線程被中斷,將拋出中斷異常
     *
     * @return 對頭元素
     */
    E take() throws InterruptedException;

    /**
     * 相比take()多了一個等待時長,如果隊列本身有元素,或者隊列原先有空但等待期間有元素
     * 加入則返回頭部元素,否則隊列為空且等待期間沒有元素加入,則返回null。如果等待期間調用線程
     * 被中斷,則拋出InterruptedException異常。
     */
    E poll(long timeout, TimeUnit unit)
            throws InterruptedException;

    /**
     * 返回隊列理想狀況下可無阻塞容納元素的容量。注意:我們不能通過此方法判斷元素是否插入成功,
     * 因為可能存在別的線程插入或刪除隊列中的元素。
     */
    int remainingCapacity();

    /**
     * 從隊列中移除指定的元素,如果隊列中存在一個或多個相同的元素,即:o.equal(e),則刪除並返回true。
     */
    boolean remove(Object o);

    /**
     * 如果隊列存在一個或多個相同的元素,即:o.equal(e),則返回true。
     */
    boolean contains(Object o);

    /**
     * 刪除此隊列中所有可用元素,並將它們移動到給定的集合c中。當我們把元素從原隊列取出時添加到集合c時,
     * 可能出現異常導致元素既不在原隊列,也不在集合中。隊列同樣實現了Collection接口,如果將原隊列當做
     * 參數傳入將拋出IllegalArgumentException異常
     * @param c 將隊列元素傳輸到給定的集合c。
     * @return 加入到集合c中元素的數量。
     */
    int drainTo(Collection<? super E> c);

    /**
     * 最多將maxElements個元素從隊列傳輸到給定集合,其他和drainTo(Collection<? super E> c)一樣。
     */
    int drainTo(Collection<? super E> c, int maxElements);
}

  

任何阻塞隊列都可以用來獲取和保存任務,如何使用隊列視當前線程池的大小而定:

  1. 如果工作線程的數量小於corePoolSize,那麼Executor更傾向於添加新線程執行而非讓任務排隊。
  2. 如果工作線程的數量大於等於corePoolSize,那麼Executor更傾向於把任務添加到隊列等待執行,而非創建新線程。
  3. 如果隊列已滿,且線程池內的線程數小於maximumPoolSize,Executor會創建一個新線程來執行任務,否則任務會被拒絕。

通常有三種排隊策略:

  1. 同步隊列(SynchronousQueue):如果有線程從同步隊列獲取任務,則移交給線程,否則持有任務,如果有新的任務嘗試入隊將返回失敗,可以根據入隊結果判斷是否要構造一個新線程。同步隊列可以避免當處理多個請求時內部依賴出現鎖定,直接交接任務要求對maximumPoolSize這一參數不做限制,即maximumPoolSize為Integer.MAX_VALUE,避免線程池拒絕提交任務。但如果線程池處理任務的速度不夠快,可能出現線程無限增長。
  2. 無界隊列(LinkedBlockingQueue):如果線程池核心工作線程都在執行任務時,新提交的任務將在隊列中等待。如果任務提交速度過快而執行任務的速度又慢,將導致隊列中的任務無限增長。
  3. 有界隊列(ArrayBlockingQueue):當與有限的maximumPoolSize配合使用時,有界隊列可以防止資源耗盡,但如何設定有界隊列的大小是一個很難的問題。如果隊列過大而maximumPoolSize過小,可以減少CPU的使用、操作系統資源和上下文切換的開銷,這有可能導致低吞吐量。如果隊列過小而maximumPoolSize過大,這會使得CPU十分繁忙,甚至出現巨大的調度開銷,同樣也會降低吞吐量。

拒絕任務

如果線程池關閉後有新任務提交、或者在任務隊列已滿的情況下,線程池到達最大線程數且所有線程都在執行任務,將調用RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)拒絕任務。默認提供四種預定義拒絕策略:

  1. ThreadPoolExecutor.AbortPolicy:默認情況下使用的策略,拋出RejectedExecutionException異常。
  2. ThreadPoolExecutor.CallerRunsPolicy:使用調用線程執行任務,這種機制可以降低任務提交的速度。
  3. ThreadPoolExecutor.DiscardPolicy:將無法入隊也不能執行的任務直接丟棄。
  4. ThreadPoolExecutor.DiscardOldestPolicy:如果線程池未關閉,則獲取並丟棄隊列的頭部元素,再嘗試用線程池執行任務,這一策略可能會失敗,如果失敗則重複之前的步驟。

除了上述四種步驟,我們也可以自定義拒絕策略。

鉤子函數

ThreadPoolExecutor提供了可重寫函數beforeExecute(java.lang.Thread, java.lang.Runnable)、afterExecute(java.lang.Runnable, java.lang.Throwable),分別允許我們在執行任務前和執行任務後做一些操作,這些方法可以控制執行環境,例如:初始化ThreadLocals、收集統計信息、添加日誌等等。此外,也可以重寫terminated()方法,當線程池完全終止後會調用此方法。

如果鉤子函數或者回調函數拋出異常,工作線程可能會終止。

隊列維護

ThreadPoolExecutor提供了getQueue()允許外部獲取隊列進行監控和調試,但不管出於什麼目的盡量少使用此方法。此外ThreadPoolExecutor還提供了remove(java.lang.Runnable) 和purge()用於刪除任務,purge()可以取消大量處於排隊等待的任務。

銷毀

當一個線程池不再有引用指向,且線程池內沒有存活線程將會自動關閉。如果你期望一個未手動調用shutdown()方法的線程池會被回收,你要設置合理的線程存活時間(keep-alive times)、設置核心線程數為0,或者設置allowCoreThreadTimeOut為true,當核心線程空閑時間超過存活時間將被回收,當線程池沒有引用指向,且無存活線程,就會被自動關閉並回收。

源碼解析

ThreadPoolExecutor的ctl變量類型為AtomicInteger,這個數值有32位,包含兩個部分:

  1. runState(運行狀態):線程池是否處於運行中、是否已關閉等等。
  2. workerCount(工作線程數):線程池當前有多少個存活(忙碌或空閑)的線程。

為了將運行狀態和工作線程數放在一個int字段,我們劃分前3位存儲運行狀態,後29位存儲存活線程數量(2^29)-1(約5億)。未來有可能調整ctl為AtomicLong類型,這可能需要調整移位和掩碼,但如果使用AtomicInteger,ThreadPoolExecutor的代碼會更簡單也更高效一些。

workerCount是線程池中還存活的線程數,該值有時候可能會短暫不同於池內實際的存活線程數。當需要增加工作線程時,會先用CAS的方式對workerCount+1,然後才向ThreadFactory申請創建一個線程。

runState為線程池提供了生命周期控制,有以下幾種狀態:

  • RUNNING:允許接受新任務和處理隊列中任務。
  • SHUTDOWN:不接受新任務,但處理隊列中任務。
  • STOP:不接受新任務,不處理隊列中任務,同時嘗試中斷正在執行的任務。
  • TIDYING:所有任務都終止,workerCount為0,線程池狀態過度到TIDYING,將要執行terminated()鉤子函數。
  • TERMINATED:terminated()函數執行完畢。

下面,我們來看看線程池狀態的轉換:

  • RUNNING->SHUTDOWN:調用shutdown()。
  • (RUNNING or SHUTDOWN)->STOP:調用shutdownNow()。
  • SHUTDOWN->TIDYING:當線程池不再有存活線程且隊列為空。
  • STOP->TIDYING:當線程池不再有存活線程。
  • TIDYING->TERMINATED:調用terminated()。

檢測線程池的狀態從SHUTDOWN過度到TIDYING並非易事,因為在SHUTDOWN狀態下,隊列可能從非空變為空,即仍然有存活的線程處理隊列中的任務。只有workerCount為0且隊列為空,才能結束線程池。 

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * 類型為AtomicInteger的ctl可以保證線程安全,該數值分兩個部分:
     * 前3位為runState代表線程池當前的狀態:RUNNING~TERMINATED,
     * 後29位為workerCount代表線程池內存活線程數量。
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    /**
     * Integer.SIZE=32,COUNT_BITS=32-3=29,用COUNT_BITS來劃分
     * runState和workerCount。
     */
    private static final int COUNT_BITS = Integer.SIZE - 3;
    /**
     * 1 << COUNT_BITS = 0010 0000 0000 0000 0000 0000 0000 0000
     * COUNT_MASK = (1 << COUNT_BITS) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
     * ~COUNT_MASK = 1110 0000 0000 0000 0000 0000 0000 0000
     * COUNT_MASK可以幫助我們計算線程池當前的runState和workerCount。
     * 假設ctl的值為:0000 0000 0000 0000 0000 0000 0000 0011
     * 調用runStateOf(ctl.get()),將做ctl.get() & ~COUNT_MASK運算:
     * 0000 0000 0000 0000 0000 0000 0000 0011
     * & 1110 0000 0000 0000 0000 0000 0000 0000
     * = 0000 0000 0000 0000 0000 0000 0000 0000
     * 由此我們可以得到,線程池當前狀態為0,即SHUTDOWN。
     * 調用workerCountOf(ctl.get()),將做ctl.get() & COUNT_MASK運算:
     * 0000 0000 0000 0000 0000 0000 0000 0011
     * & 0001 1111 1111 1111 1111 1111 1111 1111
     * = 0000 0000 0000 0000 0000 0000 0000 0011
     * 由此我們可以得到,線程池還有3個存活的線程。
     */
    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

    /**
     * -1的二進制表示為:1111 1111 1111 1111 1111 1111 1111 1111,
     * 左移29位為:1110 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int RUNNING = -1 << COUNT_BITS;
    /**
     * 0的二進制表示為:0000 0000 0000 0000 0000 0000 0000 0000,
     * 左移29位和原先沒有變化。
     */
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    /**
     * 0的二進制表示為:0000 0000 0000 0000 0000 0000 0000 0001,
     * 左移29位為:0010 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int STOP = 1 << COUNT_BITS;
    /**
     * 2的二進制表示為:0000 0000 0000 0000 0000 0000 0000 0010,
     * 左移29位為:0100 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int TIDYING = 2 << COUNT_BITS;
    /**
     * 3的二進制表示為:0000 0000 0000 0000 0000 0000 0000 0011,
     * 左移29位為:0110 0000 0000 0000 0000 0000 0000 0000
     */
    private static final int TERMINATED = 3 << COUNT_BITS;

    private static int runStateOf(int c) {
        return c & ~COUNT_MASK;
    }

    private static int workerCountOf(int c) {
        return c & COUNT_MASK;
    }

    /**
     * 根據runState和workerCount生成ctl,比如初始化線程池時,
     * ctl = new AtomicInteger(ctlOf(RUNNING, 0)),代表線程池
     * 的狀態為RUNNING,存活線程數量為0。
     * @param rs 線程池狀態
     * @param wc 存活線程數量
     * @return
     */
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }

    /**
     * 通過位運算在進行一些狀態的判斷時,我們不需要解析ctl的運行狀態,
     * 假設當前ctl:1110 0000 0000 0000 0000 0000 0000 0011,
     * 我們要判斷線程池狀態是否小於STOP,由於ctl開頭為1110,以補碼的
     * 方式來計算,ctl的值必然為負,STOP開頭為0010,以補碼方式計算為正數,
     * 所以ctl必然小於STOP。
     * ctl的布局還能保證workerCount永遠不會為負數。
     */
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    /**
     * 判斷線程池至少處於某個狀態,假設線程池現在隊列為空且無任何存活線程,
     * 所以能保證線程池處於TIDYING狀態,如果s我們傳入STOP,TIDYING的開頭
     * 為0100,STOP的開頭為0010,TIDYING>STOP,所以我們能知道,線程池至少
     * 處於STOP以上包含STOP的狀態。
     */
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    //線程池是否處於RUNNING狀態
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    //CAS增加worker數量
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    //CAS減少worker數量
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
}

  

我們知道AbstractExecutorService.submit(…)方法最終會調用execute(Runnable command)方法,而AbstractExecutorService類中並沒有實現execute(Runnable command)方法,它將execute(Runnable command)的實現交由子類。那麼我們來看看ThreadPoolExecutor又是如何實現execute(Runnable command)方法呢?當一個任務提交到線程池,它的執行流程又是如何呢?來看下面的代碼注釋:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //如果工作線程的數量小於核心線程的數量,則嘗試增加工作線程
        if (workerCountOf(c) < corePoolSize) {//<1>
            /*
             * 如果成功增加工作線程,工作線程會執行我們提交的任務,我們就可以安心退出,
             * 但線程池可以並發提交任務,可能存在在<1>處時工作線程小於核心線程數,
             * 執行<2>處的addWorker(Runnable firstTask, boolean core)時,
             * 其他線程先當前線程提交任務並增加工作線程,線程池內工作線程數超過核心線程數,
             * 當前線程增加工作線程失敗,不能直接退出。
             * 註:addWorker(Runnable firstTask, boolean core),core為true
             * 代表增加核心線程,會將任務作為firstTask傳入;而false代表增加非核心線程,
             * 如果傳入firstTask為null,則代表讓工作線程去隊列中拉取任務。
             */
            if (addWorker(command, true))//<2>
                return;
            //如果<2>處增加工作線程失敗,則重新獲取ctl的值。
            c = ctl.get();
        }
        //判斷線程池是否處於運行中,且任務可以入隊成功,如果兩者成立,則進入<3>分支
        if (isRunning(c) && workQueue.offer(command)) {//<3>
            int recheck = ctl.get();
            /*
             * 重新獲取ctl,因為可能在進入<3>分支的時候,線程池被關閉,
             * 所以要重新判斷線程池狀態,如果線程池不是處於運行狀態,且
             * 任務成功被移除,則進入<4>分支,拒絕任務。
             */
            if (!isRunning(recheck) && remove(command))//<4>
                reject(command);
            /*
             * 為什麼這裡要判斷工作線程數量是否為0?因為如果設置allowCoreThreadTimeOut
             * 為true的話,核心線程是可以為0的,可能代碼執行到<3>處workQueue.offer(command)之前,
             * 即任務還未入隊,工作線程數量已經為0了,所以這裡要重新根據ctl判斷工作線程是否為0,
             * 如果為0得再增加非核心線程去隊列拉取並執行任務。
             */
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*
         * 如果沒有進入<3>分支,而到達<5>分支,一般分兩種情況:
         * 1.線程池被關閉,<3>處isRunning(c)為false,此時調用<5>處的
         * addWorker(...)必然返回false,然後執行拒絕策略。
         * 2.線程池處於運行狀態,<3>處isRunning(c)為true,但隊列已滿
         * workQueue.offer(command)返回false,入隊失敗。
         * 這時候應該嘗試創建非核心工作線程執行任務,如果工作線程數量沒到達最大線程數,
         * 則創建線程並執行任務,如果工作線程到達最大線程數,則addWorker(...)返回
         * false,執行拒絕策略。
         */
        else if (!addWorker(command, false))//<5>
            reject(command);
    }

    

從ThreadPoolExecutor.execute(Runnable command)的實現,我們可以知道addWorker(Runnable firstTask, boolean core)方法是至關重要的,它決定了是否將任務添加進線程池執行。下面,我們再來看看addWorker(Runnable firstTask, boolean core)方法:

    /**
     * 此方法會根據線程池當前的運行狀態、線程池所設定的邊界(核心線程數和最大線程數)。
     * 如果線程池允許創建線程執行任務,則創建線程執行firstTask並相應調整工作線程的數量。
     * 如果線程池狀態處於已停止(STOP)、關閉(SHUTDOWN)則會返回false。如果向線程工
     * 廠請求創建線程失敗,也會返回false。線程創建失敗分兩種情況,一種是線程工廠返回null,
     * 或者執行Thread.start()時出現異常(通常為OOM異常)。
     *
     * @param firstTask:新線程首要執行任務,如果沒有則傳入null。當工作線程數少於核心    線程數,線程池總是創建一個新線程來執行firstTask。
     * @param core:根據core為true或者false,決定是以核心線程數或者最大線程數作為界限, 判斷當前線程池的工作線程池是否小於界限,如果小於則允許創建線程。
     * @return 如果成功添加工作線程則返回true。
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get(); ; ) {//<1>
            // Check if queue empty only if necessary.
            /*
             * 在這個地方addWorker(...)會返回false,即添加工作線程失敗,
             * 我們來看看是什麼情況下會進入這個分支:
             * runStateAtLeast(c, SHUTDOWN)代表線程池運行狀態至少處於
             * SHUTDOWN,如果線程池還處於RUNNING運行狀態,此方法不會立即
             * 返回失敗。所以我們知道,要進入此分支,首要條件就是運行狀態大於
             * 等於SHUTDOWN。
             * 之後如果runStateAtLeast(c, STOP)、firstTask != null、
             * workQueue.isEmpty())這三個條件其一為true,則添加線程失敗。
             * 首先是runStateAtLeast(c, STOP),如果線程池當前處於STOP
             * 狀態,這時候既不接受新任務,也不處理隊列里的任務,所以不管
             * firstTask是否為null,都返回false。
             * 如果runStateAtLeast(c, STOP)為false,那運行狀態只能是
             * SHUTDOWN,SHUTDOWN狀態下會處理隊列里的任務,但不再接受新
             * 任務,所以firstTask不為null,也直接返回false。
             * 如果運行狀態既處於SHUTDOWN、firstTask也會空,且任務隊列也
             * 為空,則毫無必要增加工作線程,也直接返回false。
             * 所以總結一下有兩種情況不會進入此分支:
             * 1.線程池處於RUNNING狀態的時候。
             * 2.線程池處於SHUTDOWN,但firstTask為空隊列不為空時。
             */
            if (runStateAtLeast(c, SHUTDOWN)
                    && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (; ; ) {//<2>
                //根據core判斷工作線程的上限,如果大於上限則返回false。
                if (workerCountOf(c)
                        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                /*
                 * 如果用CAS的方式成功增加工作線程的數量,則用break retry的方式
                 * 結束了retry對應的外層循環(即<1>處for循環),而不是break所在
                 * 的本層循環(即<2>處循環),代碼會從<3>處開始執行。
                 */
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                /*
                 * 如果上面用CAS的方式增加工作線程失敗,則會重新判斷線程池當前
                 * 狀態是否至少處於SHUTDOWN,如果線程池已關閉,代碼會跳到retry
                 * 處重新執行<1>處的for循環。如果線程池仍然處於RUNNING狀態,則
                 * 重複執行<2>處的循環。
                 */
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //<3>
        boolean workerStarted = false;//如果工作線程啟動成功,則賦值為true
        boolean workerAdded = false;//如果工作線程添加成功則賦值為true
        Worker w = null;
        try {
            //創建一個Worker對象,Worker對象會向線程工廠申請創建一個線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            //如果線程工廠創建的Thread對象不為null,則進入此分支
            if (t != null) {
                /*
                 * 這裡用可重入鎖鎖住try模塊代碼,因為要將之前創建好的
                 * w對象放進workers集合。
                 * 註:重入鎖ReentrantLock的概念筆者會在以後的文章里
                 * 單獨介紹,這裡先簡單理解,可重入鎖就是禁止其他線程同時
                 * 訪問mainLock.lock()到mainLock.unlock()之間的代碼,
                 * 和synchronized有些類似。
                 */
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    /*
                     * 重新獲取ctl,判斷線程池當前是否處於運行狀態,或小於STOP狀態,
                     * 即線程池處於RUNNING或SHUTDOWN,如果處於RUNNING則直接進入分支,
                     * 如果處於SHUTDOWN且首要執行任務為空,代表可能要啟動一個工作線程
                     * 來執行隊列中的任務。
                     */
                    if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                        /*
                         * 判斷線程是否已經啟動,如果使用的是Executors.DefaultThreadFactory
                         * 默認的線程工廠,正常來說創建出來的Thread對象都是線程未啟動的,即:尚未
                         * 調用Thread.start()。但ThreadPoolExecutor允許我們傳入定製化的線程
                         * 工廠,所以會存在線程工廠創建出Thread對象,但Thread對象已調用過start()
                         * 方法的可能。
                         */
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //將創建好的worker添加進集合workers。
                        workers.add(w);
                        //更新歷史上最大的工作線程數,即workers.size()。
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //將worker添加進workers後,更新workerAdded的值為true。
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //如果worker成功加入集合,則啟動線程,並更新workerStarted為true。
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            /*
             * 如果worker沒有啟動,代表worker沒有加入到workers集合,
             * 可能線程池狀態>=STOP,則需要執行添加工作線程失敗操作。
             */
            if (!workerStarted)
                addWorkerFailed(w);
        }
        //返回工作線程是否啟動成功
        return workerStarted;
    }

    

Tags: