探索JAVA並發 – 線程池詳解

  • 2019 年 10 月 7 日
  • 筆記

線程池是並發編程中必不可少的一種工具,也是面試高頻話題。

線程池,即管理着若干線程的資源池(字面意思)。相比於為每個任務分配一個線程,在線程池中執行任務優勢更多:

1.線程復用:線程池中的線程是可以復用的,省去了創建、銷毀線程的開銷,提高了資源利用率(創建、銷毀等操作都是要消耗系統資源的)和響應速度(任務提交過來線程已存在就不用等待線程創建了);

2.合理利用資源:通過調整線程池大小,讓所有處理器盡量保持忙碌,又能防止過多線程產生過多競爭浪費資源;

常用的線程池主要是ThreadPoolExecutor 和 ScheduledThreadPoolExecutor(定時任務線程池,繼承ThreadPoolExecutor)。

Executor框架

在JAVA中,任務執行的主要抽象不是Thread,而是Executor。Executor基於生產者-消費者模式,提交任務的操作相當於生產者,執行任務的線程相當於消費者。

所謂Executor框架,其實就是定義了一個接口,我們常用的線程池ThreadPoolExecutor 就是對這個接口的一種實現。

public interface Executor {        /**       * Executes the given command at some time in the future.  The command       * may execute in a new thread, in a pooled thread, or in the calling       * thread, at the discretion of the {@code Executor} implementation.       *       * @param command 可執行的任務       * @throws RejectedExecutionException 任務可能被拒絕(當Executor處理不了的時候)       * @throws NullPointerException if command is null       */      void execute(Runnable command);  }  

Executors與常用線程池

Executors 其實就是Executor(加s)

Executors是一個Executor的工廠,有很多定義好的工廠方法,可以幫助懶惰的 開發者快速創建一個線程池。下面是幾個常用的工廠方法:

  • newFixedThreadPool 固定長度線程池,每次提交任務都會創建一個新線程,直到線程數量達到指定閾值則不再創建新的;
  • newCachedThreadPool 可緩存線程池,每次提交任務都會創建一個新線程(理論上無限制),部分任務執行完後如果沒有新的任務,導致某些線程無用武之地,它們將被終結;
  • newSingleThreadExecutor 只有一個線程的線程池;
  • newScheduledThreadPool 可以延時或者定時執行任務的線程池。 public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } }

如果查看上述工廠方法的源碼,會發現只是 new 了一個線程池對象返回給調用 者而已,沒什麼花里胡哨的東西。不過看看構造參數還真不少,通過這種方式 比起我們自己 new 一個線程池要簡單多了(才怪)。

線程池構造參數

了解線程池構造參數的意義,能讓我們更清楚程序執行邏輯。

  • int corePoolSize : 核心線程數,有新任務來時,如果當前線程小於核心線程,則新建一個線程來執行該任務
  • int maximumPoolSize : 最大線程數,線程池最多擁有的線程數
  • long keepAliveTime : 空閑線程存活時間
  • TimeUnit unit : 空閑線程存活時間的單位
  • BlockingQueue workQueue : 存放待執行任務的阻塞隊列,新任務來時,若當前線程數>=最大核心線程數,則放到這個隊列(具體邏輯更複雜,請看下面源碼分析)
  • ThreadFactory threadFactory : 創建新線程的工廠,一般用來給線程取個名字方便排查問題
  • RejectedExecutionHandler handler : 任務被拒絕後的處理器,默認的處理器會直接拋出異常,建議重新實現
  • 配合源碼,效果更佳:
  • public class ThreadPoolExecutor extends AbstractExecutorService { // 構造函數 public ThreadPoolExecutor(int corePoolSize, // 核心線程數 int maximumPoolSize, // 最大線程數 long keepAliveTime, // 空閑線程存活時間 TimeUnit unit, // 空閑線程存活時間的單位 BlockingQueue<Runnable> workQueue, // 存放待執行任務的阻塞隊列 ThreadFactory threadFactory, // 創建新線程的工廠 RejectedExecutionHandler handler // 任務被拒絕後的處理器 ) { // … } // 提交任務 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 沒翻,懶得翻 * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 當前狀態值 int c = ctl.get(); // 當前線程數 = workerCountOf(c) 小於 核心線程數 的上限時 // 直接創建一個線程來執行任務 if (workerCountOf(c) < corePoolSize) { // 並發提交場景下可能會失敗 if (addWorker(command, true)) return; // 新增成功就可以結束了 // 失敗就更新下線程池狀態 c = ctl.get(); } // 不能創建核心線程來執行,並不會直接創建非核心線程,而是把任務暫存到阻塞隊列 // isRunning(c)判斷線程池是否還在運行 // workQueue.offer(command)返回值表示是否成功提交到隊列 if (isRunning(c) && workQueue.offer(command)) { // 成功放到隊列里了,再檢查一下線程池狀態 int recheck = ctl.get(); // 如果線程池已經沒有運行了,則嘗試把新增的任務從隊列移除 // remove(command)返回值表示是否移除成功 if (! isRunning(recheck) && remove(command)) reject(command); // 移除成功後,執行拒絕策略 // 檢查下當前線程數是否為0,如果是的話新建一個線程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 線程池沒有運行,或者放入隊列失敗(比如隊列已滿) // 則創建非核心線程去執行任務,這也失敗就只能拒絕了 else if (!addWorker(command, false)) reject(command); }

當對線程池的構造參數和任務處理邏輯有了以上大致的了解後,回想Executors 提供的幾個工廠方法,或許會感到所謂提供便利性的方法並不那麼便利。因為從方法的名字上來看很難和線程池的配置準確關聯,想要清除地知道這些方法創建的線程池如何運作,就需要知道他們用了怎樣的構造參數,那為什麼不直接使用構造方法呢?

所以盡量使用構造方法是更好的編程習慣,這樣不管是作者還是其他開發者,只要看看傳了什麼參數,就知道這個線程池是怎麼運作的了。

線程池創建示例

import java.util.concurrent.*;  import java.util.concurrent.atomic.AtomicInteger;    public class Main {        public static void main(String[] args) throws Exception {          AtomicInteger threadCount = new AtomicInteger();          ThreadPoolExecutor executor = new ThreadPoolExecutor(                  5,  // 核心線程數                  10, // 最大線程數                  1,  // 空閑線程存活時間                  TimeUnit.MINUTES, // 空閑線程存活時間單位                  new ArrayBlockingQueue<>(100), // 一個指定上限的阻塞隊列,存放待執行任務                  new ThreadFactory() {                      // 自定義一個線程工廠來給線程池裡的線程取名字                      @Override                      public Thread newThread(Runnable r) {                          return new Thread(r, "pool-thread-"                              + threadCount.incrementAndGet());                      }                  },                  new RejectedExecutionHandler() {                      // 自定義一個拒絕處理策略,安慰被線程池拒之門外的小可憐                      @Override                      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                          System.out.println("線程池拒絕了任務: " + r);                      }                  }          );      }    }  

有返回值的提交方式

submit

ThreadPoolExecutor.execute() 方法是沒有返回值的,也就是說把任務提交給線程池後,我們就失去了它的消息,除非你還保留着它的引用,並且在裏面有維護狀態。如果不想這麼麻煩,可以使用ThreadPoolExecutor.submit()來提交任務,這個方法會返回一個 Future 對象,通過這個對象可以知道任務何時被執行完。

import java.util.concurrent.*;  import java.util.concurrent.atomic.AtomicInteger;    public class Main {        public static void main(String[] args) throws Exception {          // 線程池定義          // ...            Future<?> future = executor.submit(new Runnable() {              @Override              public void run() {                  try {                      Thread.sleep(2000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }                  System.out.println("我要關注: 一杯82年的JAVA");              }          });          Object r = future.get();          System.out.println("返回:" + r);          executor.shutdown();      }    }    /* 輸出:    我要關注: 一杯82年的JAVA  返回:null    */  

可以看到 Future.get() 是有返回值的,但是上面的例子返回了 null,因為任務是 一個Runnable 實現,run 方法沒有返回值。

submit Callable

如果想任務有返回值,可以使用 Callable 作為任務定義。

import java.util.concurrent.*;  import java.util.concurrent.atomic.AtomicInteger;    public class Main {        public static void main(String[] args) throws Exception {          // 線程池定義          // ...            Future<String> future = executor.submit(new Callable<String>() {              @Override              public String call() throws Exception {                  try {                      Thread.sleep(2000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }                  System.out.println("I'm fine, and you?");                  return "我要關注: 一杯82年的JAVA";              }          });          String r = future.get();          System.out.println("返回:" + r);          executor.shutdown();      }    }    /* 返回:    I'm fine, and you?  返回:我要關注: 一杯82年的JAVA    */  

submit實現原理

為什麼 submit 就可以讓用戶等待、獲取任務返回?從源碼講起:

public abstract class AbstractExecutorService implements ExecutorService {        public <T> Future<T> submit(Callable<T> task) {          if (task == null) throw new NullPointerException();          // 把任務用一個RunnableFuture又給包裝了一下          RunnableFuture<T> ftask = newTaskFor(task);          // 最後還是調用了沒有返回值的execute          execute(ftask);          return ftask;      }        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {          return new FutureTask<T>(callable);      }  }    // 看看這個包裝類  public class FutureTask<V> implements RunnableFuture<V> {         private Callable<V> callable;       private volatile int state;         // 也是Runable的一種實現,所以能在線程池中被執行       public void run() {          // 有個表示狀態的標識          if (state != NEW ||              !UNSAFE.compareAndSwapObject(this, runnerOffset,                                           null, Thread.currentThread()))              return;          try {              Callable<V> c = callable;              if (c != null && state == NEW) {                  V result;                  boolean ran;                  try {                      // 執行用戶的邏輯,獲得返回值                      // 這個步驟可能需要點時間                      result = c.call();                      ran = true;                  } catch (Throwable ex) {                      result = null;                      ran = false;                      setException(ex);                  }                  if (ran)                      set(result);              }          } finally {              // runner must be non-null until state is settled to              // prevent concurrent calls to run()              runner = null;              // state must be re-read after nulling runner to prevent              // leaked interrupts              int s = state;              if (s >= INTERRUPTING)                  handlePossibleCancellationInterrupt(s);          }      }        // 獲取執行結果,阻塞直到狀態改變      public V get() throws InterruptedException, ExecutionException {          int s = state;          if (s <= COMPLETING)              s = awaitDone(false, 0L);          return report(s);      }  }  

小結:submit 時用一個FutureTask 把用戶提交的Callable包裝起來,再把FutureTask 提交給線程池執行,FutureTask.run 運行時會執行 Callable 中的業務代碼,並且過程中 FutureTask 會維護一個狀態標識,根據狀態標識,可以知道任務是否執行完成,也可以阻塞到狀態為完成獲取返回值。

關閉線程池

為什麼需要關閉線程池?

  1. 如果線程池裡的線程一直存活,而且這些線程又不是守護線程,那麼會導致虛擬機無法正常退出;
  2. 如果直接粗暴地結束應用,線程池中的任務可能沒執行完,業務將處於未知狀態;
  3. 線程中有些該釋放的資源沒有被釋放。

怎麼關閉線程池?

  1. shutdown 停止接收新任務(繼續提交會被拒絕,執行拒絕策略),但已提交的任務會繼續執行,全部完成後線程池徹底關閉;
  2. shutdownNow 立即停止線程池,並嘗試終止正在進行的線程(通過中斷),返回沒執行的任務集合;
  3. awaitTermination 阻塞當前線程,直到全部任務執行完,或者等待超時,或者被中斷。

由於 shutdownNow 的終止線程是通過中斷,這個方式並不能保證線程會提前停止。(關於中斷: 如何處理線程中斷)

一般先調用 shutdown 讓線程池停止接客,然後調用 awaitTermination 等待正在工作的線程完事。

// 你的池子對我打了烊  executor.shutdown();    // 等待一首歌的時間(bei~bei~~)  // 如果超時還沒結束返回false,你可以選擇再等一首長點的歌,或者不等了  boolean ok = executor.awaitTermination(4, TimeUnit.SECONDS);  

擴展線程池

線程池提供了一些擴展的方法,通過重寫這些方法可以添加前置、後置操作,讓使用更靈活。如 beforeExecute、afterExecute、terminated …

總結

線程池很好用,但使用不當會造成嚴重的後果,了解它各個屬性表示的含義以及執行的流程能幫助我們少踩坑。

舉個例子:如果設置了核心線程 < 最大線程數不等(一般都這麼設置),但是又設置了一個很大的阻塞隊列,那麼很可能只有幾個核心線程在工作,普通線程一直沒機會被創建,因為核心線程滿了會優先放到隊列里,而不是創建普通線程。

文章來源:

https://acupt.cn/2019/07/30/concurrent-thread-pool/

歡迎關注公眾號『easyserverdev』,同時,您也可以加入我的 QQ 群578019391。