規範使用執行緒池與底層原理詳解
什麼是執行緒池
「執行緒池」顧名思義,就是存放執行緒的池子,這個池子可以存放多少執行緒取決於採用哪種執行緒池,取決於有多少並發執行緒,有多少電腦的硬體資源。
執行緒池優勢
執行緒池最主要的工作在於控制運行執行緒的數量,從而做到執行緒復用、控制最大並發數量、管理執行緒。其具體的優勢在於:
- 降低資源消耗:通過重複利用已經創建的執行緒降低執行緒創建和銷毀造成的消耗;
- 提高響應速度:當任務到達時,任務可以不需要等到執行緒創建就能執行;
- 提高執行緒的可管理性:執行緒是稀缺資源,不能無限創建,否則會消耗系統資源、降低系統的穩定性,使用執行緒可以進行統一分配,調優和監控;
如何創建執行緒池
執行緒池繼承結構圖:
jdk自帶創建執行緒池的四種常見方式:
- Executors.newFixedThreadPool(int):創建一個固定執行緒數量的執行緒池,可控制執行緒最大並發數,超出的執行緒需要在隊列中等待。注意它內部corePoolSize和maximumPoolSize的值(就是第一和第二個參數 nThreads)是相等的,並且使用的是LinkedBlockingQueue:
源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- Executors.newSingleThreadExecutor():創建一個單執行緒的執行緒池,它只有唯一的執行緒來執行任務,保證所有任務按照指定順序執行。注意它內部corePoolSize和maximumPoolSize的值都為1,它使用的是LinkedBlockingQueue:
源碼:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- Executors.newCachedThreadPool():創建一個可快取的執行緒池,如果執行緒長度超過處理需要,可靈活回收空閑執行緒,若無可回收執行緒,則創建新執行緒。注意它內部將corePoolSize值設為0,maximumPoolSize值設置為Integer.MAX_VALUE,並且使用的是SynchronizedQueue,keepAliveTime值為60,即當執行緒空閑時間超過60秒,就銷毀執行緒:
源碼:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- Executors.newScheduledThreadPool(int):創建一個固定執行緒數量的執行緒池,相比於newFixedThreadPool(int)固定個數的執行緒池強大在 ①可以執行延時任務,②也可以執行帶有返回值的任務,並且使用的是DelayedWorkQueue:
源碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } | | | | | | | | | | | | V V V V V V V V V V V V public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, | | new DelayedWorkQueue()); } | | V V //ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
注意:
- 以上四種創建執行緒的方式內部都是由ThreadPoolExecutor這個類完成的,該類的構造方法有5個參數,稱為執行緒池的5大參數(還有另外兩個參數);
- 執行緒池使用完畢之後需要關閉,應該配合try-finally程式碼塊,將執行緒池關閉的程式碼放在finally程式碼塊中;
執行緒池的7大參數
ThreadPoolExecutor對構造函數進行了重載,實際內部使用了7個參數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
..............//已省略 參數合法校驗 ..............//已省略 參數賦值 }
- corePoolSize:執行緒池中常駐核心執行緒池(當執行緒池中的執行緒數目達到了corePoolSize後,就會把任務放到快取隊列中;)
- maximumPoolSize:執行緒池中能夠容納同時執行最大執行緒數,該值必須大於等於1
- keepAliveTime:多餘執行緒的最大存活時間
- unit:keepAliveTime的單位
- workQueue:任務隊列,被提交但尚未被執行的任務(阻塞隊列)
- threadFactory:生成執行緒池中工作執行緒的執行緒工廠,一般使用默認即可
- handler:拒絕策略,表示當任務隊列滿並且工作執行緒大於等於執行緒池的最大執行緒數時,對即將到來的執行緒的拒絕策略
執行緒池底層原理
執行緒池具體工作流程:
- 在創建執行緒後,等待提交過來的任務請求
- 當調用execute()/submit()方法添加一個請求任務時,執行緒池會做出以下判斷:
- 如果正在運行的執行緒數量小於corePoolSize,會立刻創建執行緒運行該任務
- 如果正在運行的執行緒數量大於等於corePoolSize,會將該任務放入阻塞隊列中
- 如果隊列也滿但是正在運行的執行緒數量小於maximumPoolSize,執行緒池會進行拓展
- 將執行緒池中的執行緒數拓展到最大執行緒數
- 如果隊列滿並且運行的執行緒數量大於等於maximumPoolSize,那麼執行緒池會啟動相應的拒絕策略來拒絕相應的任務請求
- 當一個執行緒完成任務時,它會從隊列中取下一個任務來執行
- 當一個執行緒空閑時間超過給定的keepAliveTime時,執行緒會做出判斷:
- 如果當前運行執行緒大於corePoolSize,那麼該執行緒將會被停止。也就是說,當執行緒池的所有任務都完成之後,它會收縮到corePoolSize的大小
執行緒池的拒絕策略
當執行緒池的阻塞隊列滿了同時執行緒池中執行緒數量達到了最大maximumPoolSize時,執行緒池將會啟動相應的拒絕策略來拒絕請求任務。
4種拒絕策略具體為:
- AbortPolicy(默認):直接拋出RejectedExecutionException異常阻止系統正常運行
- CallerRunsPolicy:調用者運行的一種機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者
- DiscardOldestPolicy:拋棄隊列中等待最久的任務,然後把當前任務加入到隊列中嘗試再次提交當前任務
- DiscardPolicy:直接丟棄任務,不予任何處理也不拋出異常。如果任務允許丟失,那麼該策略是最好的方案
注意:以上4種拒絕策略均實現了RejectedExecutionHandler介面
規範創建執行緒池
實際開發中不允許使用內置的執行緒池:必須明確地通過ThreadPoolExecutor方式,指定相應的執行緒池參數創建自定義執行緒或者使用其它框架提供的執行緒池。因為內置執行緒池的第五個參數阻塞隊列允許的請求隊列長度為 Integer.MAX_VALUE(從上面的源碼上可以看出),可能造成大量請求堆積,導致OOM:
阿里巴巴規範中指出不能使用Executors去創建:
自定義執行緒池:使用不同的拒絕策略:
package com.raicho.mianshi.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @author: Raicho * @Description: 自定義執行緒池的各個參數 * @program: mianshi * @create: 2020-08-12 10:44 **/ public class CustomThreadPool { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), // new ThreadPoolExecutor.AbortPolicy() new ThreadPoolExecutor.CallerRunsPolicy() // 注意使用該拒絕策略,可能會回退給main執行緒執行 // new ThreadPoolExecutor.DiscardOldestPolicy() //new ThreadPoolExecutor.DiscardPolicy() ); try { for (int i = 0; i < 9; i++) { executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + ": 執行任務"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } }); } } catch (Exception e) { e.printStackTrace(); } finally { executorService.shutdown(); } } }
執行緒池配置合理執行緒數量
執行緒池合理配置執行緒數量需要考慮業務具體是CPU密集型還是IO密集型:
- CPU密集型:該任務需要大量運算,而沒有阻塞,CPU一直在全速運行,CPU密集型只有在真正的多核CPU上才能進行加速。
CPU密集型任務配置應該儘可能少的執行緒數量,一般公式為:
CPU核數 + 1個執行緒的執行緒池
- IO密集型:任務需要大量的IO操作,即大量的阻塞。在單執行緒上進行IO密集型的任務會浪費大量的CPU運算能力在等待操作上。
所以在IO密集型任務中使用多執行緒可以大大加速程式運行:
CPU核數 / (1 - 阻塞係數) 阻塞係數在0.8-0.9 CPU核數 * 2