執行緒池
一、執行緒池簡介
執行緒池可以看做是執行緒的集合。在沒有任務時執行緒處於空閑狀態,當請求到來:執行緒池給這個請求分配一個空閑的執行緒,任務完成後回到執行緒池中等待下次任務(而不是銷毀)。這樣就實現了執行緒的重用。
我們來看看如果沒有使用執行緒池的情況是這樣的:
- 為每個請求都新開一個執行緒!
public class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { // 為每個請求都創建一個新的執行緒 final Socket connection = socket.accept(); Runnable task = () -> handleRequest(connection); new Thread(task).start(); } } private static void handleRequest(Socket connection) { // request-handling logic here } }
為每個請求都開一個新的執行緒雖然理論上是可以的,但是會有缺點:
- 執行緒生命周期的開銷非常高。每個執行緒都有自己的生命周期,創建和銷毀執行緒所花費的時間和資源可能比處理客戶端的任務花費的時間和資源更多,並且還會有某些空閑執行緒也會佔用資源。
- 程式的穩定性和健壯性會下降,每個請求開一個執行緒。如果受到了惡意攻擊或者請求過多(記憶體不足),程式很容易就奔潰掉了。
所以說:我們的執行緒最好是交由執行緒池來管理,這樣可以減少對執行緒生命周期的管理,一定程度上提高性能。
二、JDK提供的執行緒池API
JDK給我們提供了Excutor框架來使用執行緒池,它是執行緒池的基礎。
- Executor提供了一種將「任務提交」與「任務執行」分離開來的機制(解耦)
下面我們來看看JDK執行緒池的總體api架構:
2.1ForkJoinPool執行緒池
除了ScheduledThreadPoolExecutor和ThreadPoolExecutor類執行緒池以外,還有一個是JDK1.7新增的執行緒池:ForkJoinPool執行緒池
於是我們的類圖就可以變得完整一些:
JDK1.7中新增的一個執行緒池,與ThreadPoolExecutor一樣,同樣繼承了AbstractExecutorService。ForkJoinPool是Fork/Join框架的兩大核心類之一。與其它類型的ExecutorService相比,其主要的不同在於採用了工作竊取演算法(work-stealing):所有池中執行緒會嘗試找到並執行已被提交到池中的或由其他執行緒創建的任務。這樣很少有執行緒會處於空閑狀態,非常高效。這使得能夠有效地處理以下情景:大多數由任務產生大量子任務的情況;從外部客戶端大量提交小任務到池中的情況。
2.2補充:Callable和Future
學到了執行緒池,我們可以很容易地發現:很多的API都有Callable和Future這麼兩個東西。
Future<?> submit(Runnable task)
<T> Future<T> submit(Callable<T> task)
其實它們也不是什麼高深的東西
我們可以簡單認為:Callable就是Runnable的擴展。
- Runnable沒有返回值,不能拋出受檢查的異常,而Callable可以!
也就是說:當我們的任務需要返回值的時,我們就可以使用Callable!
Future一般我們認為是Callable的返回值,但他其實代表的是任務的生命周期(當然了,它是能獲取得到Callable的返回值的)
簡單來看一下他們的用法:
public class CallableDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { // 創建執行緒池對象 ExecutorService pool = Executors.newFixedThreadPool(2); // 可以執行Runnable對象或者Callable對象代表的執行緒 Future<Integer> f1 = pool.submit(new MyCallable(100)); Future<Integer> f2 = pool.submit(new MyCallable(200)); // V get() Integer i1 = f1.get(); Integer i2 = f2.get(); System.out.println(i1); System.out.println(i2); // 結束 pool.shutdown(); } }
Callable任務:
public class MyCallable implements Callable<Integer> { private int number; public MyCallable(int number) { this.number = number; } @Override public Integer call() throws Exception { int sum = 0; for (int x = 1; x <= number; x++) { sum += x; } return sum; } }
三、ThreadPoolExecutor詳解
這是用得最多的執行緒池,所以本文會重點講解它。
執行緒的狀態:
- RUNNING:執行緒池能夠接受新任務,以及對新添加的任務進行處理。
- SHUTDOWN:執行緒池不可以接受新任務,但是可以對已添加的任務進行處理。
- STOP:執行緒池不接收新任務,不處理已添加的任務,並且會中斷正在處理的任務。
- TIDYING:當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING狀態。當執行緒池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在執行緒池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。
- TERMINATED:執行緒池徹底終止的狀態。
各個狀態之間轉換:
3.2已默認實現的池
下面我就列舉三個比較常見的實現池:
- newFixedThreadPool
- newCachedThreadPool
- SingleThreadExecutor
如果讀懂了上面對應的策略呀,執行緒數量這些,應該就不會太難看懂了。
3.2.1newFixedThreadPool
一個固定執行緒數的執行緒池,它將返回一個corePoolSize和maximumPoolSize相等的執行緒池。
3.2.2newCachedThreadPool
非常有彈性的執行緒池,對於新的任務,如果此時執行緒池裡沒有空閑執行緒,執行緒池會毫不猶豫的創建一條新的執行緒去處理這個任務。
3.2.3SingleThreadExecutor
使用單個worker執行緒的Executor
3.3構造方法
我們讀完上面的默認實現池還有對應的屬性,再回到構造方法看看
- 構造方法可以讓我們自定義(擴展)執行緒池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
1.指定核心執行緒數量
2.指定最大執行緒數量
3.允許執行緒空閑時間
4.時間對象
5.阻塞隊列
6.執行緒工廠
7.任務拒絕策略
再總結一遍這些參數的要點:
執行緒數量要點:
- 如果運行執行緒的數量少於核心執行緒數量,則創建新的執行緒處理請求
- 如果運行執行緒的數量大於核心執行緒數量,小於最大執行緒數量,則當隊列滿的時候才創建新的執行緒
- 如果核心執行緒數量等於最大執行緒數量,那麼將創建固定大小的連接池
- 如果設置了最大執行緒數量為無窮,那麼允許執行緒池適合任意的並發數量
執行緒空閑時間要點:
- 當前執行緒數大於核心執行緒數,如果空閑時間已經超過了,那該執行緒會銷毀。
排隊策略要點:
- 同步移交:不會放到隊列中,而是等待執行緒執行它。如果當前執行緒沒有執行,很可能會新開一個執行緒執行。
- 無界限策略:如果核心執行緒都在工作,該執行緒會放到隊列中。所以執行緒數不會超過核心執行緒數
- 有界限策略:可以避免資源耗盡,但是一定程度上減低了吞吐量
當執行緒關閉或者執行緒數量滿了和隊列飽和了,就有拒絕任務的情況了:
拒絕任務策略:
- 直接拋出異常
- 使用調用者的執行緒來處理
- 直接丟掉這個任務
- 丟掉最老的任務
四、execute執行方法
execute執行方法分了三步,以注釋的方式寫在程式碼上了
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果執行緒池中運行的執行緒數量<corePoolSize,則創建新執行緒來處理請求,即使其他輔助執行緒是空閑的。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //如果執行緒池中運行的執行緒數量>=corePoolSize,且執行緒池處於RUNNING狀態,且把提交的任務成功放入阻塞隊列中,就再次檢查執行緒池的狀態, // 1.如果執行緒池不是RUNNING狀態,且成功從阻塞隊列中刪除任務,則該任務由當前 RejectedExecutionHandler 處理。 // 2.否則如果執行緒池中運行的執行緒數量為0,則通過addWorker(null, false)嘗試新建一個執行緒,新建執行緒對應的任務為null。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果以上兩種case不成立,即沒能將任務成功放入阻塞隊列中,且addWoker新建執行緒失敗,則該任務由當前 RejectedExecutionHandler 處理。 else if (!addWorker(command, false)) reject(command); }
五、執行緒池關閉
ThreadPoolExecutor提供了shutdown()和shutdownNow()兩個方法來關閉執行緒池
區別:
- 調用shutdown()後,執行緒池狀態立刻變為SHUTDOWN,而調用shutdownNow(),執行緒池狀態立刻變為STOP。
- shutdown()等待任務執行完才中斷執行緒,而shutdownNow()不等任務執行完就中斷了執行緒。