­

JAVA執行緒池的使用

一、使用 Executors 創建執行緒池

Executors是一個執行緒池工廠類,裡面有許多靜態方法,供開發者調用。

/* 該方法返回一個固定執行緒數量的執行緒池,該執行緒池池中的執行緒數量始終不變。
 * 當有一個新的任務提交時,執行緒池中若有空閑執行緒,則立即執行。
 * 若沒有,則新的任務會被暫存在一個任務隊列中,待有執行緒空閑時,便處理在任務隊列中的任務 
 * 默認等待隊列長度為Integer.MAX_VALUE
 */
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(1);

/* 該方法返回一個只有一個執行緒的執行緒池。
 * 若多餘一個任務被提交到執行緒池,任務會被保存在一個任務隊列中,等待執行緒空閑,按先入先出順序執行隊列中的任務
 * 默認等待隊列長度為Integer.MAX_VALUE
 */
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

/* 
 * 該方法返回一個可根據實際情況調整執行緒數量的執行緒池。
 * 執行緒池的執行緒數量不確定,但若有空閑執行緒可以復用,則會優先使用可復用的執行緒。
 * 若所有執行緒均在工作,又有新任務的提交,則會創建新的執行緒處理任務。
 * 所有執行緒在當前任務執行完畢後,將返回執行緒池進行復用
 */
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

/* 該方法返回一個ScheduledExecutorService對象,執行緒池大小為1。
 * ScheduledExecutorService介面在ExecutorService介面之上擴展了在給定時間內執行某任務的功能,
 * 如在某個固定的延時之後執行,或者周期性執行某個任務
 */
ExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

/*
 * 該方法也返回一個ScheduledExecutorService對象,但該執行緒池可以指定執行緒數量
 */
ExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);

Executors 的靜態方法都是基於 ThreadPoolExecutor 類實現的,相當於 ThreadPoolExecutor 的語法糖。

但這幾個靜態方法都存在一個弊端,因為會在創建執行緒池的同時隱式創建等待隊列,而隊列的長度默認是 Integer.MAX_VALUE ,相當於不限長度,這樣就存在OOM的隱患。

二、使用 ThreadPoolExecutor 創建執行緒池

上面說過,Executors 的靜態方法都是基於 ThreadPoolExecutor 類實現的,所以在生產環境下,還是建議直接使用 ThreadPoolExecutor 類創建執行緒池:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue);

ThreadPoolExecutor 有多個構造方法,一般來說使用最精簡的即可。

三、參數含義

corePoolSize

指定執行緒池的核心執行緒數。

當一個新任務被添加到執行緒池時,首先會判斷當前的執行緒數(ThreadCount),如果:

A:ThreadCount < corePoolSize:即當前執行緒數小於核心執行緒數,就會創建一個新的執行緒來執行這個任務;

B:ThreadCount >= corePoolSize:即當前執行緒數大於等於核心執行緒數,就會將新任務添加到等待隊列中。

該參數的兩個特殊參數值:

1、0:意味著沒有核心執行緒,全部執行緒都會受到 keepAliveTime 參數的回收機制影響。

2、Integer.MAX_VALUE:意味著不限制核心執行緒數,連等待隊列都不需要,可以想像這種情況下很容易OOM。

maximumPoolSize

指定執行緒池的最大執行緒數,包括核心執行緒和非核心執行緒。

當另一個新任務被添加到執行緒池時,如果此時等待隊列的容量已滿,則會判斷當前的執行緒數(ThreadCount),如果:

A:ThreadCount < maximumPoolSize:即當前執行緒數小於最大執行緒數,就會創建一個新的執行緒來執行這個任務;

B:ThreadCount == maximumPoolSize:即當前執行緒數已達到最大值,此時等待隊列的容量也已用盡,因此會拋出異常。

該參數的兩個特殊參數值:

1、0:意味著只有核心執行緒,默認情況下全部執行緒都不會受到 keepAliveTime 參數的回收機制影響,除非設置 allowCoreThreadTimeOut 為 true。

2、Integer.MAX_VALUE:意味著不限制最大執行緒數,這種情況下也很容易OOM。

keepAliveTime

空閑執行緒的存活時間。

默認情況下,該參數只對非核心執行緒有效。

在處理大量任務時,可能會創建大量的非核心執行緒,在所有任務都執行完成後會繼續保留這些非核心執行緒一段時間,等時間到了就會自動回收,以減少系統開銷。

當設置執行緒池的 allowCoreThreadTimeOut(true) 時,意味著該參數也同時對核心執行緒有效,在時間到了之後,全部執行緒都會自動回收。

unit

空閑執行緒存活時間的單位。

workQueue

等待隊列。

創建執行緒池時另外一個容易引起OOM的重要參數,主要包括以下幾種:

1、ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
2、LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按 FIFO(先進先出)排序元素,吞吐量通常要高於 ArrayBlockingQueue。靜態工廠方法 Executors.newFixedThreadPool() 使用了這個隊列。
3、SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個執行緒調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於 LinkedBlockingQueue,靜態工廠方法 Executors.newCachedThreadPool 使用了這個隊列。
4、PriorityBlockingQueue:一個具有優先順序的無限阻塞隊列。

 

以最常用的 LinkedBlockingQueue 為例:

//創建一個容量為9999的隊列實例
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(9999);

 

關於執行緒池各參數的作用,可以通過下面的圖片進行詳細了解:

四、使用執行緒池的注意事項

一句話:應該最大化的,同時也要有限度的滿足業務需求。

在實際使用執行緒池時,首先應該確保所創建的執行緒池可以滿足業務設計需求,主要就是執行緒數和隊列容量,前者由CPU核心數限制,後者由伺服器記憶體限制。

執行緒太少,則消費隊列的時間就長,就需要更大容量的隊列;執行緒太多,會增加大量的上下文切換時間,反而不利於合理分配CPU的計算資源。

隊列太小,則添加任務時可能會拋出異常;隊列太大,會佔用更多的記憶體消耗。

關鍵是切勿使用無邊界值(Integer.MAX_VALUE),這也是造成OOM的最主要原因。

可以根據伺服器配置和業務需求,對這兩個方面進行均衡考慮。

五、使用案例

int cpuCoreCnt = Runtime.getRuntime().availableProcessors(); //獲取伺服器CPU核心數
int corePoolSize = cpuCoreCnt;      // 核心執行緒數
int maximumPoolSize = cpuCoreCnt;   // 最大執行緒數
int keepAliveTime = 30;             // 非核心執行緒的空閑存活時長(分鐘)
int queueCapacity = 9999;           // 隊列最大長度

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(queueCapacity);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, queue);
threadPool.allowCoreThreadTimeOut(true);    //允許回收核心執行緒

上面案例中,使用CPU核心數作為最大執行緒數,相對來說還是比較合理的。

等待隊列的容量儘可能設置的大一些,和添加任務時拋出異常相比,多付出一些記憶體來實現更大容量的隊列還是非常值得的。

keepAliveTime 也可以適當設置的長一些,避免太快回收,畢竟頻繁的創建執行緒也是需要時間開銷的。

最後還設置了allowCoreThreadTimeOut方法,允許自動回收核心執行緒,用來減少阻塞執行緒的性能消耗。

六、執行緒池復用

執行緒池在完成全部的任務後,會自動進入摸魚狀態,期間會根據配置自動回收空閑執行緒,直到新的任務被添加進來再起來工作。

即使設置了 allowCoreThreadTimeOut(true) 對核心執行緒進行回收,有新任務時也會重新創建核心執行緒繼續進入工作狀態。

只要不是調用 shutdown() 手動關閉它,正常情況下執行緒池是可以長期重複性使用的。

有些強迫症患者(比如本人)會非常介意一個無所事事的執行緒池在記憶體里裝死,因此必須手動 shutdown 才會安心。

但這樣的話,之前的執行緒池就徹底掛掉了,再向其中添加任務時會拋出異常。

有效的做法是,將創建執行緒池的部分單獨封裝,每次添加任務時都進行判斷,如果當前執行緒池已經掛掉了,就重新創建一個:

/**
 * <p>
 * 添加任務
 * 註:如果執行緒池已關閉,會自動創建新的執行緒池
 * </p>
 * 
 * @param task
 */
public void addTask(Task task){
	if(threadPool.isShutdown()) createThreadPool(corePoolSize, maximumPoolSize, keepAliveTime);
	threadPool.execute(task);
}
Tags: