Java並發——執行緒池Executor框架
- 2019 年 10 月 8 日
- 筆記
執行緒池
無限制的創建執行緒
若採用”為每個任務分配一個執行緒”的方式會存在一些缺陷,尤其是當需要創建大量執行緒時:
- 執行緒生命周期的開銷非常高
- 資源消耗
- 穩定性
引入執行緒池
任務是一組邏輯工作單元,執行緒則是使任務非同步執行的機制。當存在大量並發任務時,創建、銷毀執行緒需要很大的開銷,運用執行緒池可以大大減小開銷。
Executor框架
說明:
- Executor 執行器介面,該介面定義執行Runnable任務的方式。
- ExecutorService 該介面定義提供對Executor的服務。
- ScheduledExecutorService 定時調度介面。
- AbstractExecutorService 執行框架抽象類。
- ThreadPoolExecutor JDK中執行緒池的具體實現。
- Executors 執行緒池工廠類。
ThreadPoolExecutor 執行緒池類
執行緒池是一個複雜的任務調度工具,它涉及到任務、執行緒池等的生命周期問題。要配置一個執行緒池是比較複雜的,尤其是對於執行緒池的原理不是很清楚的情況下,很有可能配置的執行緒池不是較優的。
JDK中的執行緒池均由ThreadPoolExecutor類實現。其構造方法如下:
參數說明:
corePoolSize:核心執行緒數。
maximumPoolSize:最大執行緒數。
keepAliveTime:執行緒存活時間。當執行緒數大於core數,那麼超過該時間的執行緒將會被終結。
unit:keepAliveTime的單位。java.util.concurrent.TimeUnit類存在靜態靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS
workQueue:Runnable的阻塞隊列。若執行緒池已經被佔滿,則該隊列用於存放無法再放入執行緒池中的Runnable。
另一個構造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
該方法在下面的擴展部分有更深入的講解。其中handler表示執行緒池對拒絕任務的處理策略。
ThreadPoolExecutor的使用需要注意以下概念:
-
若執行緒池中的執行緒數量小於corePoolSize,即使執行緒池中的執行緒都處於空閑狀態,也要創建新的執行緒來處理被添加的任務。
-
若執行緒池中的執行緒數量等於 corePoolSize且緩衝隊列 workQueue未滿,則任務被放入緩衝隊列。
-
若執行緒池中執行緒的數量大於corePoolSize且緩衝隊列workQueue滿,且執行緒池中的數量小於maximumPoolSize,則建新的執行緒來處理被添加的任務。
-
若執行緒池中執行緒的數量大於corePoolSize且緩衝隊列workQueue滿,且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。
- 當執行緒池中的執行緒數量大於corePoolSize時,如果某執行緒空閑時間超過keepAliveTime,執行緒將被終止。
Executors 工廠方法
JDK內部提供了五種最常見的執行緒池。由Executors類的五個靜態工廠方法創建。
- newFixedThreadPool(…)
- newSingleThreadExecutor(…)
- newCachedThreadPool(…)
- newScheduledThreadPool(…)
- newSingleThreadScheduledExecutor()
單執行緒的執行緒池 newSingleThreadExecutor
這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒串列執行所有任務。
返回單執行緒的Executor,將多個任務交給此Exector時,這個執行緒處理完一個任務後接著處理下一個任務,若該執行緒出現異常,將會有一個新的執行緒來替代。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。
說明:LinkedBlockingQueue會無限的添加需要執行的Runnable。
創建固定大小的執行緒池 newFixedThreadPool
每次提交一個任務就創建一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。
public static ExecutorSevice newFixedThreadPool()
返回一個包含指定數目執行緒的執行緒池,如果任務數量多於執行緒數目,那麼沒有沒有執行的任務必須等待,直到有任務完成為止。
可快取的執行緒池 newCachedThreadPool
如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閑(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的添加新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠創建的最大執行緒大小。
newCachedThreadPool方法創建的執行緒池可以自動的擴展執行緒池的容量。核心執行緒數量為0。
SynchronousQueue是個特殊的隊列。 SynchronousQueue隊列的容量為0。當試圖為SynchronousQueue添加Runnable,則執行會失敗。只有當一邊從SynchronousQueue取數據,一邊向SynchronousQueue添加數據才可以成功。SynchronousQueue僅僅起到數據交換的作用,並不保存執行緒。但newCachedThreadPool()方法沒有執行緒上限。Runable添加到SynchronousQueue會被立刻取出。
根據用戶的任務數創建相應的執行緒來處理,該執行緒池不會對執行緒數目加以限制,完全依賴於JVM能創建執行緒的數量,可能引起記憶體不足。
定時任務調度的執行緒池 newScheduledThreadPool
創建一個大小無限的執行緒池。此執行緒池支援定時以及周期性執行任務的需求。
例:
public class ScheduledThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); } }, 1000, 2000, TimeUnit.MILLISECONDS); } }
單執行緒的定時任務調度執行緒池 newSingleThreadScheduledExecutor
此執行緒池支援定時以及周期性執行任務的需求。
Executor介面
Executor是一個執行緒執行介面。任務執行的主要抽象不是Thead,而是Executor。
public interface Executor{ void executor(Runnable command); }
Executor將任務的提交過程與執行過程解耦,並用Runnable來表示任務。執行的任務放入run方法中即可,將Runnable介面的實現類交給執行緒池的execute方法,作為它的一個參數。如果需要給任務傳遞參數,可以通過創建一個Runnable介面的實現類來完成。
Executor可以支援多種不同類型的任務執行策略。
Executor基於生產者消費者模式,提交任務的操作相當於生產者,執行任務的執行緒則相當於消費者。
ExecutorService介面
執行緒池介面。ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:
Future<?> submit(Runnable task) <T> Future<T> submit(Callable<T> task)
這兩個方法都是向執行緒池中提交任務,它們的區別在於Runnable在執行完畢後沒有結果,Callable執行完畢後有一個結果。這在多個執行緒中傳遞狀態和結果是非常有用的。另外他們的相同點在於都返回一個Future對象。Future對象可以阻塞執行緒直到運行完畢(獲取結果,如果有的話),也可以取消任務執行,當然也能夠檢測任務是否被取消或者是否執行完畢。
在沒有Future之前我們檢測一個執行緒是否執行完畢通常使用Thread.join()或者用一個死循環加狀態位來描述執行緒執行完畢。現在有了更好的方法能夠阻塞執行緒,檢測任務執行完畢甚至取消執行中或者未開始執行的任務。
ScheduledExecutorService介面
ScheduledExecutorService描述的功能和Timer/TimerTask類似,解決那些需要任務重複執行的問題。這包括延遲時間一次性執行、延遲時間周期性執行以及固定延遲時間周期性執行等。當然了繼承ExecutorService的ScheduledExecutorService擁有ExecutorService的全部特性。
執行緒池生命周期
執行緒是有多種執行狀態的,同樣管理執行緒的執行緒池也有多種狀態。JVM會在所有執行緒(非後台daemon執行緒)全部終止後才退出,為了節省資源和有效釋放資源關閉一個執行緒池就顯得很重要。有時候無法正確的關閉執行緒池,將會阻止JVM的結束。
執行緒池Executor是非同步的執行任務,因此任何時刻不能夠直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。因此關閉執行緒池可能出現一下幾種情況:
- 平緩關閉:已經啟動的任務全部執行完畢,同時不再接受新的任務。
- 立即關閉:取消所有正在執行和未執行的任務。
另外關閉執行緒池後對於任務的狀態應該有相應的回饋資訊。
啟動執行緒池
執行緒池在構造前(new操作)是初始狀態,一旦構造完成執行緒池就進入了執行狀態RUNNING。嚴格意義上講執行緒池構造完成後並沒有執行緒被立即啟動,只有進行”預啟動”或者接收到任務的時候才會啟動執行緒。
執行緒池是處於運行狀態,隨時準備接受任務來執行。
關閉執行緒池
執行緒池運行中可以通過shutdown()和shutdownNow()來改變運行狀態。
- shutdown():平緩的關閉執行緒池。執行緒池停止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列還沒有開始的任務。shutdown()方法執行過程中,執行緒池處於SHUTDOWN狀態。
- shutdownNow():立即關閉執行緒池。執行緒池停止接受新的任務,同時執行緒池取消所有執行的任務和已經進入隊列但是還沒有執行的任務。shutdownNow()方法執行過程中,執行緒池處於STOP狀態。shutdownNow方法本質是調用Thread.interrupt()方法。但我們知道該方法僅僅是讓執行緒處於interrupted狀態,並不會讓執行緒真正的停止!所以若只調用或只調用一次shutdownNow()方法,不一定會讓執行緒池中的執行緒都關閉掉,執行緒中必須要有處理interrupt事件的機制。
執行緒池結束
一旦shutdown()或者shutdownNow()執行完畢,執行緒池就進入TERMINATED狀態,即執行緒池就結束了。
- isTerminating() 如果關閉後所有任務都已完成,則返回 true。
- isShutdown() 如果此執行程式已關閉,則返回 true。
例:使用固定大小的執行緒池。並將任務添加到執行緒池。
import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; public class JavaThreadPool { public static void main(String[] args) { // 創建一個可重用固定執行緒數的執行緒池 ExecutorService pool = Executors.newFixedThreadPool(2); // 創建實現了Runnable介面對象,Thread對象當然也實現了Runnable介面 Thread t1 = new MyThread(); Thread t2 = new MyThread(); Thread t3 = new MyThread(); Thread t4 = new MyThread(); Thread t5 = new MyThread(); // 將執行緒放入池中進行執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); // 關閉執行緒池 pool.shutdown(); } } class MyThread extends Thread { @Override public void run() { System.out.println(Thread.currentThread().getName() + "正在執行。。。"); } }
Java執行緒池擴展
ThreadPoolExecutor執行緒池的執行監控
ThreadPoolExecutor中定義了三個空方法,用於監控執行緒的執行情況。
ThreadPoolExecutor源碼:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { }
例:使用覆蓋方法,定義新的執行緒池。
public class ExtThreadPoolTest { static class MyTask implements Runnable { public String name; public MyTask(String name) { super(); this.name = name; } @Override public void run() { try { Thread.sleep(500); System.out.println("執行中:"+this.name); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("準備執行:" + ((MyTask)r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("執行完成:" + ((MyTask)r).name); } @Override protected void terminated() { System.out.println("執行退出"); } }; for(int i=0;i<5;i++){ MyTask task = new MyTask("Task-"+i); es.execute(task); } Thread.sleep(10); // 等待terminated()執行 es.shutdown(); // 若無該方法,主執行緒不會結束。 } }
ThreadPoolExecutor的拒絕策略
執行緒池不可能處理無限多的執行緒。所以一旦執行緒池中中需要執行的任務過多,執行緒池對於某些任務就無法處理了。拒絕策略即對這些無法處理的任務進行處理。可能丟棄掉這些不能處理的任務,也可能用其他方式。
ThreadPoolExecutor類還有另一個構造方法。該構造方法中的RejectedExecutionHandler 用於定義拒絕策略。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ..... }
JDK內部已經提供一些拒絕策略。
AbortPolicy 一旦執行緒不能處理,則拋出異常。
AbortPolicy源碼:
public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy 一旦執行緒不能處理,則丟棄任務。
DiscardPolicy源碼:
public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
CallerRunsPolicy 一旦執行緒不能處理,則將任務返回給提交任務的執行緒處理。
CallerRunsPolicy源碼:
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardOldestPolicy 一旦執行緒不能處理,丟棄掉隊列中最老的任務。
DiscardOldestPolicy源碼:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
例:自定義拒絕策略。列印並丟棄無法處理的任務。
public class RejectedPolicyHandleTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 列印並丟棄。 System.out.println(r.toString()+" is discard"); } }); for(int i=0;i<Integer.MAX_VALUE;i++){ MyTask task = new MyTask("Task-"+i); es.execute(task); Thread.sleep(10); } es.shutdown(); // 若無該方法,主執行緒不會結束。 } }
ThreadFactory 執行緒工廠
ThreadPoolExecutor類構造器的參數其中之一即為ThreadFactory執行緒工廠。
ThreadFactory用於創建執行緒池中的執行緒。
public interface ThreadFactory { Thread newThread(Runnable r); }
ThreadFactory的實現類中一般定義執行緒了執行緒組,執行緒數與執行緒名稱。
DefaultThreadFactory源碼:
static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
CompletionService介面
這裡需要稍微提一下的是CompletionService介面,它是用於描述順序獲取執行結果的一個執行緒池包裝器。它依賴一個具體的執行緒池調度,但是能夠根據任務的執行先後順序得到執行結果,這在某些情況下可能提高並發效率。
來源:https://www.cnblogs.com/shijiaqi1066/p/3412300.html
歡迎關注微信公眾號【Java典籍】,收看更多Java技術乾貨!關注即送java全套資料一份
▼微信掃一掃下圖↓↓↓二維碼關注