探索JAVA並發 – 線程池詳解
- 2019 年 10 月 7 日
- 筆記
線程池是並發編程中必不可少的一種工具,也是面試高頻話題。
線程池,即管理着若干線程的資源池(字面意思)。相比於為每個任務分配一個線程,在線程池中執行任務優勢更多:
1.線程復用:線程池中的線程是可以復用的,省去了創建、銷毀線程的開銷,提高了資源利用率(創建、銷毀等操作都是要消耗系統資源的)和響應速度(任務提交過來線程已存在就不用等待線程創建了);
2.合理利用資源:通過調整線程池大小,讓所有處理器盡量保持忙碌,又能防止過多線程產生過多競爭浪費資源;
常用的線程池主要是ThreadPoolExecutor 和 ScheduledThreadPoolExecutor(定時任務線程池,繼承ThreadPoolExecutor)。
Executor框架
在JAVA中,任務執行的主要抽象不是Thread,而是Executor。Executor基於生產者-消費者模式,提交任務的操作相當於生產者,執行任務的線程相當於消費者。
所謂Executor框架,其實就是定義了一個接口,我們常用的線程池ThreadPoolExecutor 就是對這個接口的一種實現。
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command 可執行的任務 * @throws RejectedExecutionException 任務可能被拒絕(當Executor處理不了的時候) * @throws NullPointerException if command is null */ void execute(Runnable command); }
Executors與常用線程池
Executors 其實就是Executor(加s)
Executors是一個Executor的工廠,有很多定義好的工廠方法,可以幫助懶惰的 開發者快速創建一個線程池。下面是幾個常用的工廠方法:
- newFixedThreadPool 固定長度線程池,每次提交任務都會創建一個新線程,直到線程數量達到指定閾值則不再創建新的;
- newCachedThreadPool 可緩存線程池,每次提交任務都會創建一個新線程(理論上無限制),部分任務執行完後如果沒有新的任務,導致某些線程無用武之地,它們將被終結;
- newSingleThreadExecutor 只有一個線程的線程池;
- newScheduledThreadPool 可以延時或者定時執行任務的線程池。 public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } }
如果查看上述工廠方法的源碼,會發現只是 new 了一個線程池對象返回給調用 者而已,沒什麼花里胡哨的東西。不過看看構造參數還真不少,通過這種方式 比起我們自己 new 一個線程池要簡單多了(才怪)。
線程池構造參數
了解線程池構造參數的意義,能讓我們更清楚程序執行邏輯。
- int corePoolSize : 核心線程數,有新任務來時,如果當前線程小於核心線程,則新建一個線程來執行該任務
- int maximumPoolSize : 最大線程數,線程池最多擁有的線程數
- long keepAliveTime : 空閑線程存活時間
- TimeUnit unit : 空閑線程存活時間的單位
- BlockingQueue workQueue : 存放待執行任務的阻塞隊列,新任務來時,若當前線程數>=最大核心線程數,則放到這個隊列(具體邏輯更複雜,請看下面源碼分析)
- ThreadFactory threadFactory : 創建新線程的工廠,一般用來給線程取個名字方便排查問題
- RejectedExecutionHandler handler : 任務被拒絕後的處理器,默認的處理器會直接拋出異常,建議重新實現
- 配合源碼,效果更佳:
- public class ThreadPoolExecutor extends AbstractExecutorService { // 構造函數 public ThreadPoolExecutor(int corePoolSize, // 核心線程數 int maximumPoolSize, // 最大線程數 long keepAliveTime, // 空閑線程存活時間 TimeUnit unit, // 空閑線程存活時間的單位 BlockingQueue<Runnable> workQueue, // 存放待執行任務的阻塞隊列 ThreadFactory threadFactory, // 創建新線程的工廠 RejectedExecutionHandler handler // 任務被拒絕後的處理器 ) { // … } // 提交任務 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 沒翻,懶得翻 * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 當前狀態值 int c = ctl.get(); // 當前線程數 = workerCountOf(c) 小於 核心線程數 的上限時 // 直接創建一個線程來執行任務 if (workerCountOf(c) < corePoolSize) { // 並發提交場景下可能會失敗 if (addWorker(command, true)) return; // 新增成功就可以結束了 // 失敗就更新下線程池狀態 c = ctl.get(); } // 不能創建核心線程來執行,並不會直接創建非核心線程,而是把任務暫存到阻塞隊列 // isRunning(c)判斷線程池是否還在運行 // workQueue.offer(command)返回值表示是否成功提交到隊列 if (isRunning(c) && workQueue.offer(command)) { // 成功放到隊列里了,再檢查一下線程池狀態 int recheck = ctl.get(); // 如果線程池已經沒有運行了,則嘗試把新增的任務從隊列移除 // remove(command)返回值表示是否移除成功 if (! isRunning(recheck) && remove(command)) reject(command); // 移除成功後,執行拒絕策略 // 檢查下當前線程數是否為0,如果是的話新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 線程池沒有運行,或者放入隊列失敗(比如隊列已滿) // 則創建非核心線程去執行任務,這也失敗就只能拒絕了 else if (!addWorker(command, false)) reject(command); }
、
當對線程池的構造參數和任務處理邏輯有了以上大致的了解後,回想Executors 提供的幾個工廠方法,或許會感到所謂提供便利性的方法並不那麼便利。因為從方法的名字上來看很難和線程池的配置準確關聯,想要清除地知道這些方法創建的線程池如何運作,就需要知道他們用了怎樣的構造參數,那為什麼不直接使用構造方法呢?
所以盡量使用構造方法是更好的編程習慣,這樣不管是作者還是其他開發者,只要看看傳了什麼參數,就知道這個線程池是怎麼運作的了。
線程池創建示例
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class Main { public static void main(String[] args) throws Exception { AtomicInteger threadCount = new AtomicInteger(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, // 核心線程數 10, // 最大線程數 1, // 空閑線程存活時間 TimeUnit.MINUTES, // 空閑線程存活時間單位 new ArrayBlockingQueue<>(100), // 一個指定上限的阻塞隊列,存放待執行任務 new ThreadFactory() { // 自定義一個線程工廠來給線程池裡的線程取名字 @Override public Thread newThread(Runnable r) { return new Thread(r, "pool-thread-" + threadCount.incrementAndGet()); } }, new RejectedExecutionHandler() { // 自定義一個拒絕處理策略,安慰被線程池拒之門外的小可憐 @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("線程池拒絕了任務: " + r); } } ); } }
有返回值的提交方式
submit
ThreadPoolExecutor.execute() 方法是沒有返回值的,也就是說把任務提交給線程池後,我們就失去了它的消息,除非你還保留着它的引用,並且在裏面有維護狀態。如果不想這麼麻煩,可以使用ThreadPoolExecutor.submit()來提交任務,這個方法會返回一個 Future 對象,通過這個對象可以知道任務何時被執行完。
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class Main { public static void main(String[] args) throws Exception { // 線程池定義 // ... Future<?> future = executor.submit(new Runnable() { @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我要關注: 一杯82年的JAVA"); } }); Object r = future.get(); System.out.println("返回:" + r); executor.shutdown(); } } /* 輸出: 我要關注: 一杯82年的JAVA 返回:null */
可以看到 Future.get() 是有返回值的,但是上面的例子返回了 null,因為任務是 一個Runnable 實現,run 方法沒有返回值。
submit Callable
如果想任務有返回值,可以使用 Callable 作為任務定義。
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class Main { public static void main(String[] args) throws Exception { // 線程池定義 // ... Future<String> future = executor.submit(new Callable<String>() { @Override public String call() throws Exception { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("I'm fine, and you?"); return "我要關注: 一杯82年的JAVA"; } }); String r = future.get(); System.out.println("返回:" + r); executor.shutdown(); } } /* 返回: I'm fine, and you? 返回:我要關注: 一杯82年的JAVA */
submit實現原理
為什麼 submit 就可以讓用戶等待、獲取任務返回?從源碼講起:
public abstract class AbstractExecutorService implements ExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 把任務用一個RunnableFuture又給包裝了一下 RunnableFuture<T> ftask = newTaskFor(task); // 最後還是調用了沒有返回值的execute execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } } // 看看這個包裝類 public class FutureTask<V> implements RunnableFuture<V> { private Callable<V> callable; private volatile int state; // 也是Runable的一種實現,所以能在線程池中被執行 public void run() { // 有個表示狀態的標識 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 執行用戶的邏輯,獲得返回值 // 這個步驟可能需要點時間 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } // 獲取執行結果,阻塞直到狀態改變 public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } }
小結:submit 時用一個FutureTask 把用戶提交的Callable包裝起來,再把FutureTask 提交給線程池執行,FutureTask.run 運行時會執行 Callable 中的業務代碼,並且過程中 FutureTask 會維護一個狀態標識,根據狀態標識,可以知道任務是否執行完成,也可以阻塞到狀態為完成獲取返回值。
關閉線程池
為什麼需要關閉線程池?
- 如果線程池裡的線程一直存活,而且這些線程又不是守護線程,那麼會導致虛擬機無法正常退出;
- 如果直接粗暴地結束應用,線程池中的任務可能沒執行完,業務將處於未知狀態;
- 線程中有些該釋放的資源沒有被釋放。
怎麼關閉線程池?
- shutdown 停止接收新任務(繼續提交會被拒絕,執行拒絕策略),但已提交的任務會繼續執行,全部完成後線程池徹底關閉;
- shutdownNow 立即停止線程池,並嘗試終止正在進行的線程(通過中斷),返回沒執行的任務集合;
- awaitTermination 阻塞當前線程,直到全部任務執行完,或者等待超時,或者被中斷。
由於 shutdownNow 的終止線程是通過中斷,這個方式並不能保證線程會提前停止。(關於中斷: 如何處理線程中斷)
一般先調用 shutdown 讓線程池停止接客,然後調用 awaitTermination 等待正在工作的線程完事。
// 你的池子對我打了烊 executor.shutdown(); // 等待一首歌的時間(bei~bei~~) // 如果超時還沒結束返回false,你可以選擇再等一首長點的歌,或者不等了 boolean ok = executor.awaitTermination(4, TimeUnit.SECONDS);
擴展線程池
線程池提供了一些擴展的方法,通過重寫這些方法可以添加前置、後置操作,讓使用更靈活。如 beforeExecute、afterExecute、terminated …
總結
線程池很好用,但使用不當會造成嚴重的後果,了解它各個屬性表示的含義以及執行的流程能幫助我們少踩坑。
舉個例子:如果設置了核心線程 < 最大線程數不等(一般都這麼設置),但是又設置了一個很大的阻塞隊列,那麼很可能只有幾個核心線程在工作,普通線程一直沒機會被創建,因為核心線程滿了會優先放到隊列里,而不是創建普通線程。
文章來源:
https://acupt.cn/2019/07/30/concurrent-thread-pool/
歡迎關注公眾號『easyserverdev』,同時,您也可以加入我的 QQ 群578019391。