線程池之ThreadPoolExecutor
線程池之ThreadPoolExecutor
線程池的工作主要是控制運行的線程的數量,處理過程中將任務放入隊列,然後在線程創建後啟動這些任務,如果線程數量超過了最大數量,那麼超出數量的線程排隊等候,等其他線程執行完畢再從隊列中取出任務來執行。
在開發過程中,合理地使用線程池能夠帶來3個好處:
- 降低資源消耗。通過重複利用已創建的線程降低線程創建和銷毀造成的消耗;
- 提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行;
- 提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。
1. 線程池實現原理
線程池主要處理流程:
ThreadPoolExecutor執行execute()方法的示意圖:
2. 線程池的使用
2.1 線程池創建
通過ThreadPoolExecutor來創建線程池:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
創建線程池的參數:
1)corePoolSize
:線程池的核心線程數,定義了最小可以同時運行的線程數量。
2)maximumPoolSize
:線程池的最大線程數。方隊列中存放的任務達到隊列容量時,房前可以同時運行的線程數量變為最大線程數。
3)keepAliveTime
:當線程池中的線程數量大於corePoolSize時,如果沒有新任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待的時間超過了KeepAliveTime才會被回收銷毀。
4)unit
:keepAliveTime參數的時間單位,包括DAYS、HOURS、MINUTES、MILLISECONDS等。
5)workQueue
:用於保存等待執行任務的阻塞隊列。可以選擇以下集個阻塞隊列:
- ArrayBlockingQueue:是一個基於數組結構的阻塞隊列,此隊列按FIFO原則對元素進行排序;
- LinkedBlockingQueue:是一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量常高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool()使用了這個隊列。
- PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
6)threadFactory
:用於設置創建線程的工廠,可以通過工廠給每個創造出來的線程設置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池裡的線程設置有意義的名字:
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
7)handler
:飽和策略。若當前同時運行的線程數量達到最大線程數量並且隊列已經被放滿,ThreadPoolExecutor定義了一些飽和策略:
- ThreadPoolExecutor.AbortPolicy:直接拋出RejectedExecutionException異常來拒絕處理新任務;
- ThreadPoolExecutor.CallerRunsPolicy:只用調用者所在的線程來運行任務,會降低新任務的提交速度,影響程序的整體性能。
- ThreadPoolExecutor.DiscardPolicy:不處理新任務,直接丟棄掉。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列中最近的一個任務,執行當前任務。
2.2 向線程池提交任務
execute()方法用於像線程池提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。
executor.execute(new Runnable() {
@Override
public void run() {
// TODO
}
});
submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞當前線程一段時間後立即返回,這時有可能任務還沒有執行完。
Future<T> future = executor.submit(hasReturnValueTask);
try {
T s = future.get();
} catch (InterruptedExecption | ExecutortionExcception e) {
// 處理異常
e.printStackTrace();
} finally {
// 關閉線程池
executor.shutdown();
}
2.3 關閉線程池
可以使用線程池的shutdown或shutdownNow方法來關閉線程池。其原理在於遍歷線程池中的工作線程,然後逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能無法終止。
二者區別在於:shutdownNow方法首先將線程池狀態設置為STOP,然後嘗試停止所有正在執行或暫停任務的線程,並返回等到執行任務的列表,而shutdown只是將線程池的狀態設置為SHUTDOWN狀態,然後中斷所有沒有整在執行任務的線程。
2.4 合理配置線程池
查看當前設備的CPU核數:
Runtime.getRuntime().availableProcessors()
-
CPU密集型任務:任務需要大量的運算,而沒有阻塞,CPU一直全速運行。
CPU密集型任務配置儘可能的少的線程數量。
公式:CPU核數 + 1個線程的線程池。
-
IO密集型任務:任務需要大量的IO,即大量的阻塞。
由於IO密集型任務線程並不是一直在執行任務,可以多分配一點線程數,如CPU核數*2。
公式:CPU核數/(1-阻塞係數),其中阻塞係數在0.8-0.9之間。