【萬字圖文-原創】 | 學會Java中的執行緒池,這一篇也許就夠了!

碎碎念

關於JDK源碼相關的文章這已經是第四篇了,原創不易,粉絲從幾十人到昨天的666人,真的很感謝之前幫我轉發文章的一些朋友們。

關注人數.png

從16年開始寫技術文章,到現在部落格園已經發表了222篇文章,大多數都是原創,共有800多粉絲,基本上每個月都會有文章的產出。

部落格園資訊.png

部落格園文章月份記錄.png

回顧這幾年以來寫作的心路歷程,一直都是偷偷的寫,偷偷的發,害怕被人知道,怕被人罵文章寫的太水(之前心理太脆弱了,哈哈)。後面和cxuan聊過後,他建議我給他投稿試試,於是就有了那一篇的萬字的AQS文章。

最近也有好多讀者加到我的微信,問一些文章中的問題,我也都會認真解答,看到有人閱讀我的文章並有所收穫,我真的挺欣慰,這就是寫作的動力吧。

幫助別人的同時也是在幫助自己,自己學的技術和理解的內容都是有局限性的。通過寫文章結識到了很多朋友,聽聽別人的分析和見解,我也能學到很多。

答疑.png

每次看到部落格中有人留言都很激動,也會第一時間去回復。感謝下面的公眾號大佬們之前無私的幫助,大家也可以關注一下他們,都是很nice的大佬:

Java建設者、Java團長、程式猿石頭、碼象全棧、Java3y、JAVA小咖秀、Bella的技術輪子、石杉的架構筆記、武培軒、程式通事

前言

Java中的執行緒池已經不是什麼神秘的技術了,相信在看的讀者在項目中也都有使用過。關於執行緒池的文章也是數不勝數,我們站在巨人的肩膀上來再次梳理一下。

本文還是保持原有的風格,圖文解析,盡量做到多畫圖!全文共20000+字,建議收藏後細細品讀,閱讀期間搭配源碼食用效果更佳!

讀完此文你將學到:

  1. ThreadPoolExecutor中常用參數有哪些?
  2. ThreadPoolExecutor中執行緒池狀態和執行緒數量如何存儲的?
  3. ThreadPoolExecutor有哪些狀態,狀態之間流轉是什麼樣子的?
  4. ThreadPoolExecutor任務處理策略?
  5. ThreadPoolExecutor常用的拒絕策略有哪些?
  6. Executors工具類提供的執行緒池有哪些?有哪些缺陷?
  7. ThreadPoolExecutor核心執行緒池中執行緒預熱功能?
  8. ThreadPoolExecutor中創建的執行緒如何被複用的?
  9. ThreadPoolExecutor中關閉執行緒池的方法shutdownshutdownNow的區別?
  10. ThreadPoolExecutor中存在的一些擴展點?
  11. ThreadPoolExecutor支援動態調整核心執行緒數、最大執行緒數、隊列長度等一些列參數嗎?怎麼操作?

本文源碼基於JDK1.8

執行緒池基本概念

執行緒池是一種池化思想的產物,如同我們資料庫有連接池、Java中的常量池。執行緒池可以幫助我們管理執行緒、復用執行緒,減少執行緒頻繁新建、銷毀等帶來的開銷。

在Java中是通過ThreadPoolExecutor類來創建一個執行緒池的,一般我們建議項目中自己去定義執行緒池,不推薦使用JDK提供的工具類Executors去構建執行緒池。

查看阿里巴巴開發手冊中也有對執行緒池的一些建議:

【強制】創建執行緒或執行緒池時請指定有意義的執行緒名稱,方便出錯時回溯。
正例:自定義執行緒工廠,並且根據外部特徵進行分組,比如,來自同一機房的調用,把機房編號賦值給whatFeaturOfGroup

public class UserThreadFactory implements ThreadFactory {

	private final String namePrefix;
	private final AtomicInteger nextId = new AtomicInteger(1);

	UserThreadFactory(String whatFeaturOfGroup) {
		namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
	}

	@Override
	public Thread newThread(Runnable task) {
		String name = namePrefix + nextId.getAndIncrement();
		Thread thread = new Thread(null, task, name, 0, false);
		System.out.println(thread.getName());
		return thread;
	}
}

【強制】執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式創建執行緒。

說明:執行緒池的好處是減少在創建和銷毀執行緒上所消耗的時間以及系統資源的開銷,解決資源不足的問題。
如果不使用執行緒池,有可能造成系統創建大量同類執行緒而導致消耗完記憶體或者「過度切換」的問題。

【強制】執行緒池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這
樣的處理方式讓寫的同學更加明確執行緒池的運行規則,規避資源耗盡的風險。

說明:Executors 返回的執行緒池對象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
2) CachedThreadPool:
允許的創建執行緒數量為 Integer.MAX_VALUE,可能會創建大量的執行緒,從而導致 OOM。

執行緒池使用示例

下面是一個自定義的執行緒池,這是之前公司在用的一個執行緒池,修改其中部分屬性和備註做脫敏處理:

public class MyThreadPool {
	static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);

	private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

	private static final String THREAD_POOL_NAME = "MyThreadPool-%d";

	private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
			.daemon(true).build();

	private static final int DEFAULT_SIZE = 500;

	private static final long DEFAULT_KEEP_ALIVE = 60L;

	private static ExecutorService executor;

	private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);

	static {
		try {
			executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
					TimeUnit.SECONDS, executeQueue, FACTORY);

			Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
				@Override
				public void run() {
					LOGGER.info("MyThreadPool shutting down.");
					executor.shutdown();

					try {
						if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
							LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
							executor.shutdownNow();
						}
					} catch (InterruptedException e) {
						LOGGER.error("MyThreadPool shutdown interrupted.");
						executor.shutdownNow();
					}

					LOGGER.info("MyThreadPool shutdown complete.");
				}
			}));
		} catch (Exception e) {
			LOGGER.error("MyThreadPool init error.", e);
			throw new ExceptionInInitializerError(e);
		}
	}

	private MyThreadPool() {
	}

	public static boolean execute(Runnable task) {

		try {
			executor.execute(task);
		} catch (RejectedExecutionException e) {
			LOGGER.error("Task executing was rejected.", e);
			return false;
		}

		return true;
	}

	public static <T> Future<T> submitTask(Callable<T> task) {

		try {
			return executor.submit(task);
		} catch (RejectedExecutionException e) {
			LOGGER.error("Task executing was rejected.", e);
			throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
		}
	}
}

這裡主要就是使用調用ThreadPoolExecutor構造函數來構造一個執行緒池,指定自定義的ThreadFactory,裡面包含我們自己執行緒池的poolName等資訊。重寫裡面的execute()submitTask()方法。 添加了系統關閉時的鉤子函數shutDownHook(),在裡面調用執行緒池的shutdown()方法,使得系統在退出(使用ctrl c或者kill -15 pid)時能夠優雅的關閉執行緒池。

如果有看不懂的小夥伴也沒有關係,後面會詳細分析ThreadPoolExecutor中的源碼,相信看完後面的程式碼再回頭來看這個用例 就完全是小菜一碟了。

執行緒池實現原理

通過上面的示例程式碼,我們需要知道創建執行緒池時幾個重要的屬性:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

corePoolSize: 執行緒池核心執行緒數量
maximumPoolSize: 執行緒池最大執行緒數量
workQueue: 執行緒池中阻塞隊列,一般指定隊列大小

執行緒池中數據模型可以簡化成下圖所示,其中Thread應該是添加的一個個Worker,這裡標註的Thread是為了方便理解:

執行緒池中數據模型.png

執行緒池中提交一個任務具體執行流程如下圖:

執行流程.png

提交任務時,比較當前執行緒池中執行緒數量和核心執行緒數的大小,根據比較結果走不同的任務處理策略,這個下面會有詳細說明。

執行緒池中核心方法調用鏈路:

方法調用鏈路.png

TheadPoolExecutor源碼初探

TheadPoolExecutor中常用屬性和方法較多,我們可以先分析下這些,然後一步步往下深入,常用屬性和方法如下:

執行緒池常見屬性和方法.png

具體程式碼如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	private static final int COUNT_BITS = Integer.SIZE - 3;
	private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

	private static final int RUNNING    = -1 << COUNT_BITS;
	private static final int SHUTDOWN   =  0 << COUNT_BITS;
	private static final int STOP       =  1 << COUNT_BITS;
	private static final int TIDYING    =  2 << COUNT_BITS;
	private static final int TERMINATED =  3 << COUNT_BITS;

	private static int runStateOf(int c)     { return c & ~CAPACITY; }
	private static int workerCountOf(int c)  { return c & CAPACITY; }
	private static int ctlOf(int rs, int wc) { return rs | wc; }

	private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

     private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }
}
  1. ctl

ctl代表當前執行緒池狀態和執行緒池執行緒數量的結合體,高3位標識當前執行緒池運行狀態,後29位標識執行緒數量。ctlOf方法就是rs(執行緒池運行狀態)和wc(執行緒數量)按位或操作

  1. COUNT_BITS

COUNT_BITS = Integer.SIZE – 3 = 29,在ctl中,低29位用於存放當前執行緒池中執行緒的數量

  1. CAPACITY

CAPACITY = (1 << COUNT_BITS) – 1
我們來計算一下:
1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
(1 << 29) – 1 = 0001 1111 1111 1111 1111 1111 1111 1111
這個屬性是用來執行緒池能裝載執行緒的最大數量,也可以用來做一些位運算操作。

  1. 執行緒池幾種狀態

RUNNING:

(1) 狀態說明:執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理。
(2) 狀態切換:執行緒池的初始化狀態是RUNNING。換句話說,執行緒池被一旦被創建,就處於RUNNING狀態,並且執行緒池中的任務數為0

SHUTDOWN:

(1) 狀態說明:執行緒池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。
(2) 狀態切換:調用執行緒池的shutdown()介面時,執行緒池由RUNNING -> SHUTDOWN

STOP:

(1) 狀態說明:執行緒池處在STOP狀態時,不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
(2) 狀態切換:調用執行緒池的shutdownNow()介面時,執行緒池由(RUNNING or SHUTDOWN ) -> STOP

TIDYING:

(1) 狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING狀態。當執行緒池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在執行緒池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
(2) 狀態切換:當執行緒池在SHUTDOWN狀態下,阻塞隊列為空並且執行緒池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。
當執行緒池在STOP狀態下,執行緒池中執行的任務為空時,就會由STOP -> TIDYING

TERMINATED:

(1) 狀態說明:執行緒池徹底終止,就變成TERMINATED狀態。
(2) 狀態切換:執行緒池處在TIDYING狀態時,執行完terminated()之後,就會由 TIDYING -> TERMINATED

狀態的變化流轉:

執行緒池的狀態流轉.png

  1. runStateOf()

計算執行緒池運行狀態的,就是計算ctl前三位的數值。`unStateOf() = c & ~CAPACITY,CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111,那麼~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000,它與任何數按位與的話都是只看這個數前三位

  1. workerCountOf()

計算執行緒池的執行緒數量,就是看ctl的後29位,workerCountOf() = c & CAPACITY, CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111與任何數按位與,就是看這個數的後29位

  1. ctlOf(int rs, int wt)

在獲取當前執行緒池ctl的時候會用到,在後面源碼中會有很多地方調用, 傳遞的參數rs代表執行緒池狀態,wt代表當前執行緒次執行緒(worker)的數量

  1. runStateLessThan(int c, int s)

return c < s,c一般傳遞的是當前執行緒池的ctl值。比較當前執行緒池ctl所表示的狀態是否小於某個狀態s

  1. runStateAtLeast(int c, int s)

return c >= s,c一般傳遞的是當前執行緒池的ctl值。比較當前執行緒池ctl所表示的狀態,是否大於等於某個狀態s

  1. isRunning(int c)

c < SHUTDOWN, 判斷當前執行緒池是否是RUNNING狀態,因為只有RUNNING的值小於SHUTDOWN

  1. compareAndIncrementWorkerCount()/compareAndDecrementWorkerCount()

使用CAS方式 讓ctl值分別加一減一 ,成功返回true, 失敗返回false

  1. decrementWorkerCount()

將ctl值減一,這個方法用了do…while循環,直到成功為止

  1. completedTaskCount

記錄執行緒池所完成任務總數 ,當worker退出時會將 worker完成的任務累積到completedTaskCount

  1. Worker

執行緒池內部類,繼承自AQS且實現Runnable介面。Worker內部有一個Thread thread是worker內部封裝的工作執行緒。Runnable firstTask用來接收用戶提交的任務數據。在初始化Worker時候會設置state為-1(初始化狀態),通過threadFactory創建一個執行緒。

  1. ThreadPoolExecutor初始化參數

corePoolSize: 核心執行緒數限制
maximumPoolSize: 最大執行緒限制
keepAliveTime: 非核心的空閑執行緒等待新任務的時間 unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心執行緒池中的執行緒。
workQueue: 任務隊列,最好選用有界隊列,指定隊列長度
threadFactory: 執行緒工廠,最好自定義執行緒工廠,可以自定義每個執行緒的名稱
handler: 拒絕策略,默認是AbortPolicy

execute()源碼分析

當有任務提交到執行緒池時,就會直接調用ThreadPoolExecutor.execute()方法,執行流程如下:

執行流程.png

從流程圖可看,添加任務會有三個分支判斷,源碼如下:

java.util.concurrent.ThreadPoolExecutor.execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

c在這裡代表執行緒池ctl的值,包含工作任務數量以及執行緒池的狀態,上面有解釋過。

接著看下面幾個分支程式碼:

分支一: if (workerCountOf(c) < corePoolSize) ,條件成立表示當前執行緒數量小於核心執行緒數,此次提交任務,直接創建一個新的worker

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}

如果執行緒數小於核心執行緒數,執行addWorker操作,這個後面會講這個方法的細節,如果添加成功則直接返回,失敗後會重新計算ctl的值,然後執行分支二。

針對addWorker()執行失敗的情況,有以下幾種可能:

  1. 存在並發情況,execute()方法是可能有多個執行緒同時調用的,當多個執行緒同時workerCountOf(c) < corePoolSize成立後,就會向執行緒池中創建worker,這個時候執行緒池的核心執行緒數可能已經達到,在addWorker中還會再次判斷,所以會有任務添加失敗。

addWorker()失敗場景一.png

  1. 當前執行緒池狀態發生改變,例如執行緒A執行addWorker()方法時,執行緒B修改執行緒池狀態,導致執行緒池不是RUNNING狀態,此時執行緒A執行addWorker()就有可能失敗。

addWorker()失敗場景二.png

分支二: if (isRunning(c) && workQueue.offer(command)) {}

通過分支一流程的分析,我們可以知道執行到這個分支說明**當前執行緒數量已經達到corePoolSize或者addWorker()執行失敗,我們先看看分支二執行流程:

超過核心執行緒數往隊列中添加任務流程圖.png

首先判斷當前執行緒池是否處於RUNNING狀態,如果是則嘗試將task放入到workQueue中,workQueue是我們在初始化ThreadPoolExecutor時傳入進來的阻塞隊列。

如果當前任務成功添加到阻塞隊列中,再次獲取ctl賦值給recheck變數,然後執行:

if (!isRunning(recheck) && remove(command))
    reject(command);

再次判斷當前執行緒池是否為RUNNINT狀態,如果不是則說明提交任務到隊列之後,執行緒池狀態被其他執行緒給修改了,比如調用shutdown()/shutdownNow()等。這種情況就需要把剛剛提交到隊列中的的任務刪除掉。

再看下remove()方法:

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate();
    return removed;
}

如果任務提交到隊列之後,執行緒池中的執行緒還未將這個任務消費,那麼就可以remove成功,調用reject()方法來執行拒絕策略。
如果在改變執行緒池狀態之前,隊列中的數據已經被消費了,此時remove()就會失敗。

移除隊列中Task任務.png

接著走else if中的邏輯:

else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

走這個else if邏輯有兩種可能,執行緒池是RUNNING狀態或者執行緒池狀態被改變且workQueue中添加的任務已經被消費導致remove()失敗。
如果是RUNNING狀態,執行緒池中的執行緒數量是0,此時workQueue中還有待執行的任務,就需要新增一個worker(addWorker裡面會有創建執行緒的操作),繼續消費workqueue中的任務。

添加新任務.png

這裡要注意一下addWorker(null, false),也就是創建一個執行緒,但並沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務。在workerCountOf(recheck) == 0時執行addWorker(null, false)也是為了保證執行緒池在RUNNING狀態下必須要有一個執行緒來執行任務,可以理解為一種擔保兜底機制。

至於執行緒池中執行緒為何可以為0?這個如果我們設置了allowCoreThreadTimeOut=true,那麼核心執行緒也是允許被回收的,後面getTask()中程式碼有提及。

分支三: else if (!addWorker(command, false)) {}

通過分支一和分之二的分析,進入這個分支的前置條件:執行緒數超過核心執行緒數且workQueue中數據已滿。

else if (!addWorker(command, false)),執行添加worker操作,如果執行失敗就直接走reject()拒絕策略。這裡添加失敗可能是執行緒數已經超過了maximumPoolSize

分支三執行流程.png

addWorker()源碼分析

上面分析提交任務的方法execute()時多次用到addWorker方法,接收任務後將任務添加到Worker中。

WorkerThreadPoolExecutor中的內部類,繼承自AQS且實現了Runnable介面。 類中包含Thread thread,它是worker內部封裝的工作執行緒,還有firstTask屬性,它是一個可執行的Runnable對象。在Worker的構造函數中,使用執行緒工廠創建了一個執行緒,當thread啟動的時候,會以worker.run()為入口啟動執行緒,這裡會直接調用到runWorker()中。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    private static final long serialVersionUID = 6138294804551838833L;

    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

流程如下圖:

添加Worker.png

這裡再回頭看下addWorker(Runnable firstTask, boolean core) 方法,這個方法主要是添加一個Worker到執行緒池中並執行,firstTask參數用於指定新增的執行緒執行的第一個任務,core參數為true表示在新增執行緒時會判斷當前活動執行緒數是否少於corePoolSizefalse表示在新增執行緒時會判斷當前活動執行緒數是否少於maximumPoolSize

addWorker方法整體執行流程圖如下:
addWorker流程圖.png

接著看下源碼:

java.util.concurrent.ThreadPoolExecutor.addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這裡是有兩層for循環,外層循環主要是判斷執行緒池的狀態,如果狀態不合法就直接返回false.

只有兩種情況屬於合法狀態:

  1. RUNNING狀態
  2. SHUTDOWN狀態時,隊列中還有未處理的任務,且提交的任務為空。SHUTDOWN含義就是不再接收新任務,可以繼續處理阻塞隊列的任務。

第二層循環是通過CAS操作更新workCount數量,如果更新成功則往執行緒池中中添加執行緒,這個所謂的執行緒池就是一個HashSet數組。添加失敗時判斷失敗原因,CAS失敗有兩種原因:執行緒池狀態被改變或者並發情況修改執行緒池中workCount數量,這兩種情況都會導致ctl值被修改。如果是第二種原因導致的失敗,繼續自旋更新workCount數量。

接著繼續分析循環內部的實現,先看看第一層循環:c代表執行緒池ctl值,rs代表執行緒池運行狀態。

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
    return false;

條件一:rs >= SHUTDOWN 成立, 說明當前執行緒池狀態不是RUNNING狀態

條件二: !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())

我們之前提到過,創建任務有兩種情況:
1)RUNNING狀態可以提交任務,
2)SHUTDOWN狀態下如果傳遞的任務是空且阻塞隊列中還有任務未處理的情況才是允許創建任務繼續處理的,因為阻塞隊列中的任務仍然需要繼續處理。

上面的條件一和條件二就是處理SHUTDOWN狀態下任務創建操作的判斷。

接著分析第二層循環,先是判斷執行緒池workCount數量是否大於可創建的最大值,或者是否超過了核心執行緒數/最大執行緒數,如果是則直接返回,addWorker()操作失敗。

接著使用compareAndIncrementWorkerCount(c)將執行緒池中workCount+1,這裡使用的是CAS操作,如果成功則直接跳出最外層循環。

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();
    if (runStateOf(c) != rs)
        continue retry;
}

如果CAS失敗,說明此時有競爭,會重新獲取ctl的值,判斷競爭失敗的原因是添加workCount數量還是修改執行緒池狀態導致的,如果執行緒池狀態未發生改變,就繼續循環嘗試CAS增加workCount數量,接著看循環結束後邏輯:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive())
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (!workerStarted)
        addWorkerFailed(w);
}

這裡workerStarted代表worker是否已經啟動,workerAdded代表創建的worker是否添加到池子中,這裡所謂的池子就是全局定義的一個HashSet結構的workers變數。

接著根據傳遞的firstTask來構建一個Worker,在Worker的構造方法中也會通過ThreadFactory創建一個執行緒,這裡判斷t != null是因為用戶可以自定義ThreadFactory,如果這裡用戶不是創建執行緒而是直接返回null則會出現一些問題,所以需要判斷一下。

w = new Worker(firstTask);
final Thread t = w.thread;

if (t != null) {

}

在往池子中添加Worker的時候,是需要先加鎖的,因為針對全局的workers操作並不是執行緒安全的。

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

繼續看下面程式碼,rs代表當前執行緒池的狀態,這裡還是判斷執行緒池的狀態,如果rs < SHUTDOWN代表執行緒池狀態是RUNNING狀態,此時可以直接操作。
如果是SHUTDOWN狀態,需要滿足firstTask == null才可以繼續操作。因為在SHUTDOWN狀態時不會再添加新的任務,但還是可以繼續處理workQueue中的任務。

t.isAlive() 當執行緒start後,執行緒isAlive會返回true,這裡還是防止自定義的ThreadFactory創建執行緒返回給外部之前,將執行緒start了,由此可見Doug lea考慮問題真的很全面。

int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive())
        throw new IllegalThreadStateException();
    workers.add(w);
}

接著將創建的Worker添加到workers集合中,設置largestPoolSize,這個屬性是執行緒池生命周期內執行緒數最大值,一般是做統計數據用的。 最後修改workerAdded = true,代表當前提交的任務所創建的Worker已經添加到池子中了。

添加worker成功後,調用執行緒的start()方法啟動執行緒,因為Worker中重寫了run()方法,最後會執行Worker.run()。最後設置workerStarted = true後釋放全局鎖。

int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
	
	workers.add(w);
	int s = workers.size();
	if (s > largestPoolSize)
        largestPoolSize = s;
        
    orkerAdded = true;
}

這裡再回頭看看workerAdded = false的情形,如果執行緒池在lock之前,狀態發生了變化,導致添加失敗。此時workerAdded也會為false,最後執行addWorkerFailed(work)操作,這個方法是將Workworkers中移除掉,然後將workCount數量減一,最後執行tryTerminate()來嘗試關閉執行緒池,這個方法後面會細說。

runWorker()源碼分析

Worker類中的run方法調用了runWorker來執行任務。上面addWorker()方法正常的執行邏輯會創建一個Worker,然後啟動Worker中的執行緒,這裡其實就會執行到runWorker方法。

方法調用關係.png

runWorker的執行邏輯很簡單,啟動一個執行緒,執行當前傳遞的task任務,執行完後又不斷的從workQueue中獲取任務繼續執行,如果當前workCount數量小於核心執行緒數且隊列中沒有了任務,當前執行緒會被阻塞,這個就是getTask()的邏輯,一會會講到。

如果當前執行緒數大於核心執行緒數且隊列中沒有任務,就會返回null,在runWorker這邊退出循環,回收多餘的worker數據。

runWorker流程.png

源碼如下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

這裡w.unlock()是為了初始化當前Workstate==0,然後設置獨佔執行緒為null,因為在shutDown()方法中會嘗試獲取Worker中的鎖,如果獲取成功代表當前執行緒沒有被加鎖處於空閑狀態,給當前執行緒一個中斷訊號。所以這裡在執行執行緒任務的時候需要加鎖,防止調用shutDown()的時候給當前worker執行緒一個中斷訊號。

判斷task是否為空,如果是一個空任務,那麼就去workQueue中獲取任務,如果兩者都為空就會退出循環。

while (task != null || (task = getTask()) != null) {}

最核心的就是調用task.run()啟動當前任務,這裡面還有兩個可擴展的方法,分別是beforeExecute()/afterExecute(),我們可以在任務執行前和執行後分別自定義一些操作,其中afterExecute()可以接收到任務拋出的異常資訊,方便我們做後續處理。

while (task != null || (task = getTask()) != null) {
    try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
            task.run();
        } catch (RuntimeException x) {
            thrown = x; throw x;
        } catch (Error x) {
            thrown = x; throw x;
        } catch (Throwable x) {
            thrown = x; throw new Error(x);
        } finally {
            afterExecute(task, thrown);
        }
    } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
    }
}

如果退出循環,說明getTask()方法返回null。會執行到finally中的processWorkerExit(w, completedAbruptly)方法,此方法是用來清理執行緒池中添加的work數據,completedAbruptly=true代表是異常情況下退出。

try {
    while (task != null || (task = getTask()) != null) {
        
    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}

runWorker()中只是啟動了當前執行緒工作,還需要源源不斷通過getTask()方法從workQueue來獲取任務執行。在workQueue沒有任務的時候,根據執行緒池workCount和核心執行緒數的對比結果來使用processWorkerExit()執行清理工作。

getTask()源碼分析

getTask方法用於從阻塞隊列中獲取任務,如果當前執行緒小於核心執行緒,那麼當阻塞隊列中沒有任務時就會阻塞,反之會等待keepAliveTime後返回。

這個就是keepAliveTime的使用含義:非核心的空閑執行緒等待新任務的時間,當然如果這裡設置了allowCoreThreadTimeOut=true也會回收核心執行緒。

具體程式碼如下:

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

這裡核心程式碼就是從workQueue中取任務,採用poll還是take取決於allowCoreThreadTimeOut和執行緒數量,allowCoreThreadTimeOut在構造ThreadLocalExecutor後設置的,默認為false。如果設置為true則代表核心執行緒數下的執行緒也是可以被回收的。如果使用take則表明workQueue中沒有任務當前執行緒就會被阻塞掛起,直到有了新的任務才會被喚醒。

workQueue數據取出流程.png

在這裡擴展下阻塞隊列的部分方法的含義,這裡主要是看poll()take()的使用區別:
阻塞隊列插入方法:

boolean add(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full。
boolean offer(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,返回false。
void put(E e):隊列沒有滿,則插入數據;隊列滿時,阻塞調用此方法執行緒,直到隊列有空閑空間時此執行緒進入就緒狀態。
boolean offer(E e, long timeout, TimeUnit unit):隊列沒有滿,插入數據並返回true;隊列滿時,阻塞調用此方法執行緒,若指定等待的時間內還不能往隊列中插入數據,返回false。

阻塞隊列移除(獲取)方法:

E remove():隊列非空,則以FIFO原則移除數據,並返回該數據的值;隊列為空,拋出異常 java.util.NoSuchElementException。

E poll(): 隊列非空,移除數據,並返回該數據的值;隊列為空,返回null。

E take(): 隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法執行緒,直到隊列為非空時此執行緒進入就緒狀態。

E poll(long timeout, TimeUnit unit):隊列非空,移除數據,並返回該數據的值;隊列為空,阻塞調用此方法執行緒,若指定等待的時間內隊列都沒有數據可取,返回null。

阻塞隊列檢查方法:

E element(): 隊列非空,則返回隊首元素;隊列為空,拋出異常 java.util.NoSuchElementException。
E peek(): 隊列非空,則返回隊首元素;隊列為空,返回null。

processWorkerExit()源碼分析

此方法的含義是清理當前執行緒,從執行緒池中移除掉剛剛添加的worker對象。

processWorkerExit執行前置條件.png

執行processWorkerExit()代表在runWorker()執行緒跳出了當前循環,一般有兩種情況:

  1. task.run()內部拋出異常,直接結束循環,然後執行processWorkerExit()
  2. getTask()返回為空,代表執行緒數量大於核心數量且workQueue中沒有任務,此時需要執行processWorkerExit()來清理多餘的Worker對象
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly)、
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

針對於執行緒池workers的操作都會進行加鎖處理,然後將當前Worker從池子中移除,累加當前執行緒池完成的任務總數completedTaskCount

接著調用tryTerminate()嘗試關閉執行緒池,這個方法後面有詳細說明。

接著判斷if (runStateLessThan(c, STOP)) {},含義是當前執行緒池狀態小於STOP,即當前執行緒池狀態當前執行緒池狀態為 RUNNINGSHUTDOWN,判斷當前執行緒是否是正常退出。如果當前執行緒是正常退出,那麼completedAbruptly=false,接著判斷執行緒池中是否還擁有足夠多的的執行緒,因為異常退出可能導致執行緒池中執行緒數量不足,此時就要執行addWorker()為執行緒池添加新的worker數據,看下面的詳細分析:

執行最後的addWorke()有三種可能:
1)當前執行緒在執行task時 發生異常,這裡一定要創建一個新worker頂上去。
2)如果!workQueue.isEmpty()說明任務隊列中還有任務,這種情況下最起碼要留一個執行緒,因為當前狀態為 RUNNING || SHUTDOWN這是前提條件。
3)當前執行緒數量 < corePoolSize值,此時會創建執行緒,維護執行緒池數量在corePoolSize個水平。

tryTerminate()源碼分析

上面移除Worker的方法中有一個tryTerminate()方法的調用,這個方法是根據執行緒池狀態嘗試關閉執行緒池。

執行流程如下:

tryTerminate()流程.png

實現源碼如下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

首先是判斷執行緒池狀態:
條件一:isRunning(c) 成立,直接返回就行,執行緒池很正常!
條件二:runStateAtLeast(c, TIDYING) 說明 已經有其它執行緒 在執行 TIDYING -> TERMINATED狀態了,當前執行緒直接回去。
條件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
SHUTDOWN特殊情況,如果是這種情況,直接回去。得等隊列中的任務處理完畢後,再轉化狀態。

接著執行:

if (workerCountOf(c) != 0) {
    interruptIdleWorkers(ONLY_ONE);
    return;
}

走到這個邏輯,說明執行緒池狀態 >= STOP或者執行緒池狀態為SHUTDOWN且隊列已經空了

當前執行緒池中的執行緒數量 > 0,調用interruptIdleWorkers()中斷一個空閑執行緒,然後返回。我們來分析下,在getTask()返回為空時會執行退出邏輯processWorkerExit(),這裡就會調用tryTerminate()方法嘗試關閉執行緒池。

如果此時執行緒池狀態滿足執行緒池狀態 >= STOP或者執行緒池狀態為SHUTDOWN且隊列已經空了,如果此時執行緒池中執行緒數不為0,就會中斷一個空閑執行緒。
為什麼這裡只中斷一個執行緒呢?這裡的設計思想是,如果執行緒數量特別多的話,只有一個執行緒去做喚醒空閑worker的任務可能會比較吃力,所以,就給了每個 被喚醒的worker執行緒 ,在真正退出之前協助 喚醒一個空閑執行緒的任務,提供吞吐量的一種常用手段。

我們順便看下interruptIdleWorkers()源碼:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

遍歷workers,如果執行緒是空閑狀態(空閑狀態:queue.take()和queue.poll()返回空),則給其一個中斷訊號,如果是處於workQueue阻塞的執行緒,會被喚醒,喚醒後,進入下一次自旋時,可能會return null執行退出相關的邏輯,接著又會調用processWorkerExit()->tryTerminate(),回到上面場景,當前執行緒退出的時候還是會繼續喚醒下一個空現執行緒。

接著往下看tryTerminate的剩餘邏輯:

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
            terminated();
        } finally {
            ctl.set(ctlOf(TERMINATED, 0));
            termination.signalAll();
        }
        return;
    }
} finally {
    mainLock.unlock();
}

執行到這裡的執行緒是誰?
workerCountOf(c) == 0 時,會來到這裡。
最後一個退出的執行緒。 在 (執行緒池狀態 >= STOP || 執行緒池狀態為 SHUTDOWN 且 隊列已經空了)
執行緒喚醒後,都會執行退出邏輯,退出過程中 會 先將 workerCount計數 -1 => ctl -1。
調用tryTerminate 方法之前,已經減過了,所以0時,表示這是最後一個退出的執行緒了。

獲取全局鎖,進行加鎖操作,通過CAS設置執行緒池狀態為TIDYING狀態,設置成功則執行terminated()方法,這也是一個自定義擴展的方法,當執行緒池中止的時候會調用此方法。

最後設置執行緒池狀態為TERMINATED狀態,喚醒調用awaitTermination()方法的執行緒。

awaitTermination()源碼分析

該方法是判斷執行緒池狀態是否達到TERMINATED,如果達到了則直接返回true,沒有達到則會await掛起當前執行緒指定的時間。

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

在每次執行tryTerminate()後會喚醒所有被await的執行緒,繼續判斷執行緒池狀態。

shutDown()/shutDownNow()源碼分析

shutDownshutDown()方法都是直接改變執行緒池狀態的方法,一般我們在系統關閉之前會調用此方法優雅的關閉執行緒池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownshutdownNow方法調用差不多,只是shutdown是將執行緒池狀態設置為SHUTDOWNshutdownNow是將執行緒池狀態設置為STOP
shutdownNow會返回所有未處理的task集合。

來看看它們共同調用的一些方法:

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

這個方法是設置執行緒池狀態為指定狀態,runStateAtLeast(c, targetState),判斷當前執行緒池ctl值,如果小於targetState則會往後執行。
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))),通過CAS指令,修改ctl中執行緒池狀態為傳入的targetState

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers含義是為空閑的執行緒設置中斷標識,這裡要清楚worker什麼時候空閑?我們在上面講解runWorker()方法時,執行task.run()之前,要針對Worker對象加鎖,設置Worker中的state值為1,防止運行的worker被添加中斷標識。接著執行getTask()方法,獲取阻塞隊列中的任務,如果是queue.take()則會阻塞掛起當前執行緒,釋放鎖,此時執行緒處於空閑狀態。如果是queue.pool()返回為空,runWorker()會釋放鎖,此時執行緒也是空閑狀態。

執行interrupt()後處於queue阻塞的執行緒,會被喚醒,喚醒後,進入下一次自旋判斷執行緒池狀態是否改變,如果改變可能直接返回空,這裡具體參看runWorker()getTask()方法。

onShutdown()也是一個擴展方法,需要子類去重寫,這裡代表當執行緒池關閉後需要做的事情。drainQueue()方法是獲取workQueue中現有的的任務列表。

問題回顧

  1. ThreadPoolExecutor中常用參數有哪些?
    上面介紹過了,參見的參數是指ThreadPoolExecutor的構造參數,一般面試的時候都會先問這個,要解釋每個參數的含義及作用。

  2. ThreadPoolExecutor中執行緒池狀態和執行緒數量如何存儲的?
    通過AtomicInteger類型的變數ctl來存儲,前3位代表執行緒池狀態,後29位代表執行緒池中執行緒數量。

  3. ThreadPoolExecutor有哪些狀態,狀態之間流轉是什麼樣子的?
    RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
    執行緒池的狀態流轉.png

  4. ThreadPoolExecutor任務處理策略?
    這個問題就是考察execute()的執行過程,只要看過源碼就不會有問題。
    執行流程.png

  5. ThreadPoolExecutor常用的拒絕策略有哪些?
    策略處理該任務,執行緒池提供了4種策略:
    1)AbortPolicy:直接拋出異常,默認策略
    2)CallerRunsPolicy:用調用者所在的執行緒來執行任務
    3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務
    4)DiscardPolicy:直接丟棄任務
    當然執行緒池是支援自定義拒絕策略的,需要實現RejectedExecutionHandler介面中rejectedExecution()方法即可。

  6. Executors工具類提供的執行緒池有哪些?有哪些缺陷?
    1) FixedThreadPool 和 SingleThreadPool:允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。
    2) CachedThreadPool:允許的創建執行緒數量為 Integer.MAX_VALUE,可能會創建大量的執行緒,從而導致 OOM。
    所以阿里巴巴也建議我們要自定義執行緒池核心執行緒數以及阻塞隊列的長度。

  7. ThreadPoolExecutor核心執行緒池中執行緒預熱功能?
    在創建執行緒池後,可以使用prestartAllCoreThreads()來預熱核心執行緒池。

    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }
    
  8. ThreadPoolExecutor中創建的執行緒如何被複用的?
    這個主要是看runWorker()和getTask()兩個方法的執行流程,當執行任務時調用runWorker()方法,執行完成後會繼續從workQueue中獲取任務繼續執行,已達到執行緒復用的效果,當然這裡還有一些細節,可以回頭看看上面的源碼解析。

  9. ThreadPoolExecutor中關閉執行緒池的方法shutdownshutdownNow的區別?
    最大的區別就是shutdown()會將執行緒池狀態變為SHUTDOWN,此時新任務不能被提交,workQueue中還存有的任務可以繼續執行,同時會像執行緒池中空閑的狀態發出中斷訊號。
    shutdownNow()方法是將執行緒池的狀態設置為STOP,此時新任務不能被提交,執行緒池中所有執行緒都會收到中斷的訊號。如果執行緒處於wait狀態,那麼中斷狀態會被清除,同時拋出InterruptedException。

  10. ThreadPoolExecutor中存在的一些擴展點?
    鉤子方法:
    1)beforeExecute()/afterExecute():runWorker()中執行緒執行前和執行後會調用的鉤子方法
    2)terminated:執行緒池的狀態從TIDYING狀態流轉為TERMINATED狀態時terminated方法會被調用的鉤子方法。
    3)onShutdown:當我們執行shutdown()方法時預留的鉤子方法。

  11. ThreadPoolExecutor支援動態調整核心執行緒數、最大執行緒數、隊列長度等一些列參數嗎?怎麼操作?
    運行期間可動態調整參數的方法:
    1)setCorePoolSize():動態調整執行緒池核心執行緒數
    2)setMaximumPoolSize():動態調整執行緒池最大執行緒數
    3)setKeepAliveTime(): 空閑執行緒存活時間,如果設置了allowsCoreThreadTimeOut=true,核心執行緒也會被回收,默認只回收非核心執行緒
    4)allowsCoreThreadTimeOut():是否允許回收核心執行緒,如果是true,在getTask()方法中,獲取workQueue就採用workQueue.poll(keepAliveTime),如果超過等待時間就會被回收。

總結

這篇執行緒池源碼覆蓋到了ThreadPoolExecutor中大部分程式碼,我相信認真閱讀完後肯定會對執行緒池有更深刻的理解。如有疑問或者建議可關注公眾號給我私信,我都會一一為大家解答。

另外推薦一個我的up主朋友,他自己錄製了好多學習影片並分享在B站上了,大家有時間可以看一下(PS:非恰飯非利益相關,良心推薦):小劉講源碼-B站UP主

原創乾貨分享.png