【JUC】如何理解執行緒池?第四種使用執行緒的方式
執行緒池的概念
執行緒池的主要工作的控制運行的執行緒的數量,處理過程種將任務放在隊列,執行緒創建後再啟動折現任務,如果執行緒數量超過了最大的數量,則超過部分的執行緒排隊等待,直到其他執行緒執行完畢後,從隊列種取出任務來執行。
處理流程:
1.執行緒池判斷核心執行緒池的執行緒是否全部在執行任務?
1.1 不是:創建一個新的工作執行緒執行任務。
1.2 是:
2. 執行緒池判斷工作隊列是否已經滿了?
2.1 沒有滿:將新提交的任務存儲在工作隊列中。
2.2 滿了:
3. 執行緒池判斷執行緒池的執行緒是否都在工作?
3.1 是:交由飽和策略來處理這個任務。
3.2 不是:創建一個新的工作執行緒來執行任務。
特點:執行緒復用、控制最大並發數、管理執行緒。
執行緒池的優勢
1. 降低資源消耗,通過重複利用已經創建的執行緒,降低了執行緒創建和銷毀產生的消耗。
2. 提高響應速度,任務到達時,任務不需要等待執行緒創建就能立即執行。
3. 提高執行緒的可管理性,執行緒是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行同一的分配、調優和監控。
執行緒池的使用
Java執行緒池是通過Executor框架實現的,該框架中用到了Executor、Executors、ExecutorService和ThreadPoolExecutor類。
具體使用示例:
1 public static void fixedThreadPool() { 2 ExecutorService threadPool = Executors.newFixedThreadPool(5);//固定執行緒 3 try { 4 for (int i = 0; i < 10; i++) { 5 threadPool.execute(()->{ 6 System.out.println(Thread.currentThread().getName()); 7 }); 8 } 9 }catch (Exception e){ 10 e.printStackTrace(); 11 }finally { 12 threadPool.shutdown(); 13 } 14 }
輸出結果


pool-1-thread-2 pool-1-thread-4 pool-1-thread-2 pool-1-thread-5 pool-1-thread-1 pool-1-thread-3 pool-1-thread-5 pool-1-thread-1 pool-1-thread-2 pool-1-thread-4
View Code
執行緒池的源碼及重要參數
Executors.newFixedThreadPool(int)
固定執行緒數,適用場景:執行長期任務,性能好。
1 public static ExecutorService newFixedThreadPool(int nThreads) {
2 return new ThreadPoolExecutor(nThreads, nThreads,
3 0L, TimeUnit.MILLISECONDS,
4 new LinkedBlockingQueue<Runnable>());
5 }
Executors.newSingleThreadExecutor()
一池一個執行緒,使用場景:一個任務接一個任務執行的時候。
1 public static ExecutorService newSingleThreadExecutor() {
2 return new FinalizableDelegatedExecutorService
3 (new ThreadPoolExecutor(1, 1,
4 0L, TimeUnit.MILLISECONDS,
5 new LinkedBlockingQueue<Runnable>()));
6 }
Executors.newCachedThreadPool()
N個執行緒,帶快取,適用場景:執行很多短期非同步的小程式或者負載較輕的伺服器。
1 public static ExecutorService newCachedThreadPool() {
2 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3 60L, TimeUnit.SECONDS,
4 new SynchronousQueue<Runnable>());
5 }
ThreadPoolExecutor
ThreadPoolExecutor的執行示意圖:
1. corePoolSize:執行緒池中的常駐核心執行緒數
2. maximumPoolSize:執行緒池能夠容納同時執行的最大執行緒數,必須大於等於1【擴容的上限】。如果工作隊列滿了,core也滿了的時候,執行緒池會擴容,直到達到maximumPoolSize(新來的任務會直接搶佔擴容執行緒,不進入工作隊列,工作隊列中的任務繼續等待)。
1 public static void main(String[] args) { 2 ExecutorService threadPool = new ThreadPoolExecutor( 3 2, //corePoolSize 4 5, //maximumPoolSize 5 100L, //keepAliveTime 6 TimeUnit.SECONDS, 7 new LinkedBlockingDeque<>(3), 8 Executors.defaultThreadFactory(), 9 new ThreadPoolExecutor.AbortPolicy());//N個執行緒帶快取 10 try { 11 for (int i = 1; i <= 6; i++) { 12 final int tmp = i; 13 threadPool.execute(()->{ 14 System.out.println(Thread.currentThread().getName()+"執行緒"+",執行任務"+tmp); 15 try { 16 TimeUnit.SECONDS.sleep(4); 17 } catch (InterruptedException e) { 18 e.printStackTrace(); 19 } 20 }); 21 } 22 }catch (Exception e){ 23 e.printStackTrace(); 24 }finally { 25 threadPool.shutdown(); 26 } 27 }
輸出結果:


pool-1-thread-2執行緒,執行任務2 pool-1-thread-3執行緒,執行任務6 pool-1-thread-1執行緒,執行任務1 pool-1-thread-3執行緒,執行任務3 pool-1-thread-2執行緒,執行任務4 pool-1-thread-1執行緒,執行任務5
View Code
當執行緒池中有2個核心執行緒時,執行緒1和2正在執行任務1和2,任務3、4、5在工作隊列中等候,此時工作隊列滿了,core也滿了的時候,且core< maximumPoolSize,任務6的出現引起執行緒池的擴容,任務6在3、4、5執行任務前進行了搶佔。所以從輸出結果可以看出新來的任務會直接搶佔新擴容的執行緒。
3. keepAliveTime:多餘的空閑執行緒的存活時間。當前執行緒數超過corePoolSize的時候,空閑時間達到keepAliveTime時,多餘的空閑執行緒會被銷毀直到剩下corePoolSize的數量的執行緒。
4. unit:keepAliveTime的單位
5. workQueue:(阻塞隊列)工作隊列,任務等待區,被提交但是沒有被執行的任務。
6. threadFactory:執行緒工廠,用於創建執行緒,一般用默認即可。
7. handler:拒絕策略。當隊列滿了並且工作執行緒大於執行緒池的最大執行緒數的時候觸發拒絕策略。
5個參數的構造函數
1 public ThreadPoolExecutor(int corePoolSize,
2 int maximumPoolSize,
3 long keepAliveTime,
4 TimeUnit unit,
5 BlockingQueue<Runnable> workQueue) {
6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
7 Executors.defaultThreadFactory(), defaultHandler);
8 }
7個參數的構造函數
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory, 7 RejectedExecutionHandler handler) { 8 if (corePoolSize < 0 || 9 maximumPoolSize <= 0 || 10 maximumPoolSize < corePoolSize || 11 keepAliveTime < 0) 12 throw new IllegalArgumentException(); 13 if (workQueue == null || threadFactory == null || handler == null) 14 throw new NullPointerException(); 15 this.acc = System.getSecurityManager() == null ? 16 null : 17 AccessController.getContext(); 18 this.corePoolSize = corePoolSize; 19 this.maximumPoolSize = maximumPoolSize; 20 this.workQueue = workQueue; 21 this.keepAliveTime = unit.toNanos(keepAliveTime); 22 this.threadFactory = threadFactory; 23 this.handler = handler; 24 }
執行緒池的底層工作原理及源碼
ThreadPoolExecutor執行execute方法分下面4種情況。
1)如果當前運行的執行緒少於corePoolSize,則創建新執行緒來執行任務(注意,執行這一步驟需要獲取全局鎖)。
2)如果運行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。
3)如果無法將任務加入BlockingQueue(隊列已滿),則創建新的執行緒來處理任務(注意,執行這一步驟需要獲取全局鎖)。
4)如果創建新執行緒將使當前運行的執行緒超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor採取上述步驟的總體設計思路,是為了在執行execute()方法時,儘可能地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱之後(當前運行的執行緒數大於等於corePoolSize),幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 int c = ctl.get(); 5 //如果執行緒數小於核心執行緒數,創建執行緒執行任務 6 if (workerCountOf(c) < corePoolSize) { 7 if (addWorker(command, true)) 8 return; 9 c = ctl.get(); 10 } 11 //如果執行緒數大於等於核心執行緒數或執行緒創建失敗,當前任務加入工作隊列 12 if (isRunning(c) && workQueue.offer(command)) { 13 int recheck = ctl.get(); 14 if (! isRunning(recheck) && remove(command)) 15 reject(command); 16 else if (workerCountOf(recheck) == 0) 17 addWorker(null, false); 18 } 19 //如果執行緒數不處於運行中或人物失效無法放入隊列, 20 //且當前執行緒數量小於最大允許的執行緒數,則創建一個執行緒執行任務 21 else if (!addWorker(command, false)) 22 reject(command); 23 }
工作執行緒:執行緒池創建執行緒時,會將執行緒封裝成工作執行緒Worker,Worker在執行完任務後,還會循環獲取工作隊列里的任務來執行。我們可以從Worker類的run()方法里看到。
1 public void run() { 2 runWorker(this); 3 } 4 5 final void runWorker(Worker w) { 6 Thread wt = Thread.currentThread(); 7 Runnable task = w.firstTask; 8 w.firstTask = null; 9 w.unlock(); // allow interrupts 10 boolean completedAbruptly = true; 11 try {//循環獲取工作隊列里的任務執行 12 while (task != null || (task = getTask()) != null) { 13 w.lock(); 14 // If pool is stopping, ensure thread is interrupted; 15 // if not, ensure thread is not interrupted. This 16 // requires a recheck in second case to deal with 17 // shutdownNow race while clearing interrupt 18 if ((runStateAtLeast(ctl.get(), STOP) || 19 (Thread.interrupted() && 20 runStateAtLeast(ctl.get(), STOP))) && 21 !wt.isInterrupted()) 22 wt.interrupt(); 23 try { 24 beforeExecute(wt, task); 25 Throwable thrown = null; 26 try { 27 task.run(); 28 } catch (RuntimeException x) { 29 thrown = x; throw x; 30 } catch (Error x) { 31 thrown = x; throw x; 32 } catch (Throwable x) { 33 thrown = x; throw new Error(x); 34 } finally { 35 afterExecute(task, thrown); 36 } 37 } finally { 38 task = null; 39 w.completedTasks++; 40 w.unlock(); 41 } 42 } 43 completedAbruptly = false; 44 } finally { 45 processWorkerExit(w, completedAbruptly); 46 } 47 }
執行緒池的拒絕策略(RejectedExecutionHandler)
當隊列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略。
1. AbortPolicy:直接拋出異常。RejectedExecutionException
2. CallerRunsPolicy:只用調用者所在執行緒來運行任務。不拋棄任務,也不拋出異常,將任務回退到調用者。
例如:任務數 > maximumPoolSize+Queue.capacity=8的時候拒絕任務9和10,任務回退給調用者,示例中的調用者就是main執行緒。
pool-1-thread-1執行緒,執行任務1 main執行緒,執行任務9 pool-1-thread-3執行緒,執行任務6 pool-1-thread-2執行緒,執行任務2 pool-1-thread-5執行緒,執行任務8 pool-1-thread-4執行緒,執行任務7 main執行緒,執行任務10 pool-1-thread-3執行緒,執行任務3 pool-1-thread-1執行緒,執行任務5 pool-1-thread-5執行緒,執行任務4
3. DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
4. DiscardPolicy:不處理,丟棄掉。
任務隊列(runnableTaskQueue)
1. ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按FIFO(先進先出)原則對元素進行排序。
2. LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
3. SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個執行緒調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於Linked-BlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
在實際開發中選擇那種執行緒池?
三種:單一/固定數/可變,都不能用。為什麼不用?
在實際的開發中執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式創建執行緒。
因為不使用執行緒池,有可能造成系統創建大量同類執行緒而導致消耗完記憶體或者過度切換的問題。
執行緒池不允許適用Executors去創建,而是通過ThreadPoolExecutor的方式,可以避免資源耗盡的風險。
Executors中的執行緒池對象存在的問題:
1. FixedThreadPool和SingleThreadPool:允許請求隊列的長度為Integer.MAX_VALUE,可能會堆積大量請求,導致OOM
2. CachedThreadPool和ScheduledThreadPool:允許創建執行緒數量為Integer.MAX_VALUE,可能會創建大量請求,導致OOM
所以應該選擇自定義執行緒池。
如何配置自定義的執行緒池參數?
首先查詢伺服器是幾核的?Runtime.getRuntime().availableProcessors();
1. CPU密集型
任務需要大量的運算,而沒有阻塞,CPU一直全速運行,CPU密集任務只有在真正的多核CPU上才可能得到加速。(通過多執行緒)
應該配置儘可能少的執行緒數量:CPU核數+1個執行緒的執行緒池
2. IO密集型
IO密集型任務並不是一直執行任務,則應配置儘可能多的執行緒,
CPU核數 * 2
CPU核數 / 1 – 阻塞係數(0.8~0.9)
補充:CPU密集 & IO密集
CPU密集型,又稱計算密集型任務。它的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對影片進行高清解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多任務完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。計算密集型任務由於主要消耗CPU資源,因此,程式碼運行效率至關重要。
IO密集型,涉及到網路、磁碟IO的任務都是IO密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待IO操作完成(因為IO的速度遠遠低於CPU和記憶體的速度)。對於IO密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是IO密集型任務,比如Web應用。IO密集型任務執行期間,99%的時間都花在IO上,花在CPU上的時間很少。