面試題-關於Java線程池一篇文章就夠了

  • 2019 年 12 月 20 日
  • 筆記

在Java面試中,線程池相關知識,雖不能說是必問提,但出現的頻次也是非常高的。同時又鑒於公眾號「程序新視界」的讀者後台留言讓寫一篇關於Java線程池的文章,於是就有本篇內容,本篇將基於Java線程池的原理、實現以及相關源碼進行講解等。

什麼是線程池

線程池是一種多線程處理形式,處理過程中將任務提交到線程池,任務的執行交由線程池來管理。

為了充分利用CPU多核資源,應用都會採用多線程並行/並發計算,最大限度的利用多核提升應用程序性能。

試想一下,如果每個請求都執行一遍創建線程、執行任務、銷毀線程,那麼對服務器資源將是一種浪費。在高並發的情況下,甚至會耗盡服務器資源。

線程池的主要作用有兩個:不同請求之間重複利用線程,無需頻繁的創建和銷毀線程,降低系統開銷和控制線程數量上限,避免創建過多的線程耗盡進程內存空間,同時減少線程上下文切換次數。

常見面試題

  • 說說Java線程池的好處及實現的原理?
  • Java提供線程池各個參數的作用,如何進行的?
  • 根據線程池內部機制,當提交新任務時,有哪些異常要考慮?
  • 線程池都有哪幾種工作隊列?
  • 使用無界隊列的線程池會導致內存飆升嗎?
  • 說說幾種常見的線程池及使用場景?

線程池的創建與使用

在JDK5版本中增加了內置線程池實現ThreadPoolExecutor,同時提供了Executors來創建不同類型的線程池。Executors中提供了以下常見的線程池創建方法:

  • newSingleThreadExecutor:一個單線程的線程池。如果因異常結束,會再創建一個新的,保證按照提交順序執行。
  • newFixedThreadPool:創建固定大小的線程池。根據提交的任務逐個增加線程,直到最大值保持不變。如果因異常結束,會新創建一個線程補充。
  • newCachedThreadPool:創建一個可緩存的線程池。會根據任務自動新增或回收線程。
  • newScheduledThreadPool:支持定時以及周期性執行任務的需求。
  • newWorkStealingPool:JDK8新增,根據所需的並行層次來動態創建和關閉線程,通過使用多個隊列減少競爭,底層使用ForkJoinPool來實現。優勢在於可以充分利用多CPU,把一個任務拆分成多個「小任務」,放到多個處理器核心上並行執行;當多個「小任務」執行完成之後,再將這些執行結果合併起來即可。

雖然在JDK中提供Executors類來支持以上類型的線程池創建,但通常情況下不建議開發人員直接使用(見《阿里巴巴java開發規範》)。

線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。

Executors部分方法的弊端:

  • newFixedThreadPool和newSingleThreadExecutor主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
  • newCachedThreadPool和newScheduledThreadPool:主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。

同時,阿里巴巴java開發規範中推薦了3種線程池創建方式。

方式一,引入commons-lang3包。

//org.apache.commons.lang3.concurrent.BasicThreadFactory  ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,      new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());

方式二,引入com.google.guava包。

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()      .setNameFormat("demo-pool-%d").build();    //Common Thread Pool  ExecutorService pool = new ThreadPoolExecutor(5, 200,      0L, TimeUnit.MILLISECONDS,      new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());    pool.execute(()-> System.out.println(Thread.currentThread().getName()));  pool.shutdown();//gracefully shutdown

方式三,spring配置線程池方式:自定義線程工廠bean需要實現ThreadFactory,可參考該接口的其它默認實現類,使用方式直接注入bean,調用execute(Runnable task)方法即可。

<bean id="userThreadPool"      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">      <property name="corePoolSize" value="10" />      <property name="maxPoolSize" value="100" />      <property name="queueCapacity" value="2000" />    <property name="threadFactory" value= threadFactory />      <property name="rejectedExecutionHandler">          <ref local="rejectedExecutionHandler" />      </property>  </bean>  // in code  userThreadPool.execute(thread);

ThreadPoolExecutor的構造方法

除了以上推薦的創建線程池的方法,還可以通過ThreadPoolExecutor的構造方法,直接創建線程池。本質上來講,以上方法最終也是創建了ThreadPoolExecutor對象,然後堆積進行包裝處理。

ThreadPoolExecutor提供了多個構造方法,我們最終都調用的構造方法來進行說明。

 public ThreadPoolExecutor(int corePoolSize,        int maximumPoolSize,        long keepAliveTime,        TimeUnit unit,        BlockingQueue<Runnable> workQueue,        ThreadFactory threadFactory,        RejectedExecutionHandler handler) {     // 省略代碼  }

核心參數作用解析如下:

  • corePoolSize:線程池核心線程數最大值。
  • maximumPoolSize:線程池最大線程數大小。
  • keepAliveTime:線程池中非核心線程空閑的存活時間大小。
  • unit:線程空閑存活時間單位。
  • workQueue:存放任務的阻塞隊列。
  • threadFactory:創建新線程的工廠,所有線程都是通過該工廠創建的,有默認實現。
  • handler:線程池的拒絕策略。

程池的拒絕策略

構造方法的中最後的參數RejectedExecutionHandler用於指定線程池的拒絕策略。當請求任務不斷的過來,而系統此時又處理不過來的時候,我們就需要採取對應的策略是拒絕服務。

默認有四種類型:

  • AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
  • CallerRunsPolicy策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
  • DiscardOleddestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
  • DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。

當然,除了默認的4種策略之外,還可以根據業務需求自定義拒絕策略。通過實現RejectedExecutionHandler接口,在創建ThreadPoolExecutor對象時作為參數傳入即可。

在spring-integration-core中便自定義了CallerBlocksPolicy,相關代碼如下:

public class CallerBlocksPolicy implements RejectedExecutionHandler {      private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);      private final long maxWait;        public CallerBlocksPolicy(long maxWait) {          this.maxWait = maxWait;      }        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {          if (!executor.isShutdown()) {              try {                  BlockingQueue<Runnable> queue = executor.getQueue();                  if (logger.isDebugEnabled()) {                      logger.debug("Attempting to queue task execution for "   this.maxWait   " milliseconds");                  }                    if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {                      throw new RejectedExecutionException("Max wait time expired to queue task");                  } else {                      if (logger.isDebugEnabled()) {                          logger.debug("Task execution queued");                      }                    }              } catch (InterruptedException var4) {                  Thread.currentThread().interrupt();                  throw new RejectedExecutionException("Interrupted", var4);              }          } else {              throw new RejectedExecutionException("Executor has been shut down");          }      }  }

線程池的執行

創建完成ThreadPoolExecutor之後,當向線程池提交任務時,通常使用execute方法。execute方法的執行流程圖如下:

  • 如果線程池中存活的核心線程數小於線程數corePoolSize時,線程池會創建一個核心線程去處理提交的任務。
  • 如果線程池核心線程數已滿,即線程數已經等於corePoolSize,一個新提交的任務,會被放進任務隊列workQueue排隊等待執行。
  • 當線程池裏面存活的線程數已經等於corePoolSize了,並且任務隊列workQueue也滿,判斷線程數是否達到maximumPoolSize,即最大線程數是否已滿,如果沒到達,創建一個非核心線程執行提交的任務。
  • 如果當前的線程數達到了maximumPoolSize,還有新的任務過來的話,直接採用拒絕策略處理。

源代碼分析

下面看一下JDK8中ThreadPoolExecutor中execute方法的源代碼實現:

public void execute(Runnable command) {      if (command == null)          throw new NullPointerException();      // 線程池本身的狀態跟worker數量使用同一個變量ctl來維護      int c = ctl.get();      // 通過位運算得出當然線程池中的worker數量與構造參數corePoolSize進行比較      if (workerCountOf(c) < corePoolSize) {          // 如果小於corePoolSize,則直接新增一個worker,並把當然用戶提交的任務command作為參數,如果成功則返回。          if (addWorker(command, true))              return;          // 如果失敗,則獲取最新的線程池數據          c = ctl.get();      }      // 如果線程池仍在運行,則把任務放到阻塞隊列中等待執行。      if (isRunning(c) && workQueue.offer(command)) {          // 這裡的recheck思路是為了處理並發問題          int recheck = ctl.get();          // 當任務成功放入隊列時,如果recheck發現線程池已經不再運行了則從隊列中把任務刪除          if (! isRunning(recheck) && remove(command))              //刪除成功以後,會調用構造參數傳入的拒絕策略。              reject(command);           // 如果worker的數量為0(此時隊列中可能有任務沒有執行),則新建一個worker(由於此時新建woker的目的是執行隊列中堆積的任務,           // 因此入參沒有執行任務,詳細邏輯後面會詳細分析addWorker方法)。          else if (workerCountOf(recheck) == 0)              addWorker(null, false);      }      // 如果前面的新增woker,放入隊列都失敗,則會繼續新增worker,此時線程池的狀態是woker數量達到corePoolSize,阻塞隊列任務已滿      // 只能基於maximumPoolSize參數新建woker      else if (!addWorker(command, false))          // 如果基於maximumPoolSize新建woker失敗,此時是線程池中線程數已達到上限,隊列已滿,則調用構造參數中傳入的拒絕策略          reject(command);  }

下面再看在上述代碼中調用的addWorker方法的源代碼實現及解析:

private boolean addWorker(Runnable firstTask, boolean core) {      // 這裡有一段基於CAS 死循環實現的關於線程池狀態,線程數量的校驗與更新邏輯就先忽略了,重點看主流程。      //...        boolean workerStarted = false;      boolean workerAdded = false;      Worker w = null;      try {           // 把指定任務作為參數新建一個worker線程          w = new Worker(firstTask);          // 這裡是重點w.thread是通過線程池構造函數參數threadFactory生成的woker對象          // 也就是說這個變量t就是代表woker線程。絕對不是用戶提交的線程任務firstTask。          final Thread t = w.thread;          if (t != null) {              final ReentrantLock mainLock = this.mainLock;              mainLock.lock();              try {                  // 加鎖之後仍舊是判斷線程池狀態等一些校驗邏輯。                  int rs = runStateOf(ctl.get());                  if (rs < SHUTDOWN ||                      (rs == SHUTDOWN && firstTask == null)) {                      if (t.isAlive())                          throw new IllegalThreadStateException();                      // 把新建的woker線程放入集合保存,這裡使用的是HashSet                      workers.add(w);                      int s = workers.size();                      if (s > largestPoolSize)                          largestPoolSize = s;                      workerAdded = true;                  }              } finally {                  mainLock.unlock();              }              if (workerAdded) {                  // 然後啟動woker線程                   // 該變量t代表woker線程,會調用woker的run方法                  t.start();                  workerStarted = true;              }          }      } finally {          if (! workerStarted)              // 如果woker啟動失敗,則進行一些善後工作,比如說修改當前woker數量等              addWorkerFailed(w);      }      return workerStarted;  }

addWorker方法主要做的工作就是新建一個Woker線程,加入到woker集合中。在上述方法中會調用到Worker類的run方法,並最終執行了runWorker方法。

// Woker類實現了Runnable接口  public void run() {      runWorker(this);  }    final void runWorker(Worker w) {      Thread wt = Thread.currentThread();      // task就是Woker構造函數入參指定的任務,即用戶提交的任務      Runnable task = w.firstTask;      w.firstTask = null;      w.unlock();      boolean completedAbruptly = true;      try {          //一般情況下,task都不會為空(特殊情況上面注釋中也說明了),因此會直接進入循環體中          //這裡getTask方法是要重點說明的,它的實現跟我們構造參數設置存活時間有關          //我們都知道構造參數設置的時間代表了線程池中的線程,即woker線程的存活時間,如果到期則回收woker線程,這個邏輯的實現就在getTask中。          //來不及執行的任務,線程池會放入一個阻塞隊列,getTask方法就是去阻塞隊列中取任務,用戶設置的存活時間,就是          //從這個阻塞隊列中取任務等待的最大時間,如果getTask返回null,意思就是woker等待了指定時間仍然沒有          //取到任務,此時就會跳過循環體,進入woker線程的銷毀邏輯。          while (task != null || (task = getTask()) != null) {              w.lock();              if ((runStateAtLeast(ctl.get(), STOP) ||                   (Thread.interrupted() &&                    runStateAtLeast(ctl.get(), STOP))) &&                  !wt.isInterrupted())                  wt.interrupt();              try {                  //該方法是個空的實現,如果有需要用戶可以自己繼承該類進行實現                  beforeExecute(wt, task);                  Throwable thrown = null;                  try {                      //真正的任務執行邏輯                      task.run();                  } catch (RuntimeException x) {                      thrown = x; throw x;                  } catch (Error x) {                      thrown = x; throw x;                  } catch (Throwable x) {                      thrown = x; throw new Error(x);                  } finally {                      //該方法是個空的實現,如果有需要用戶可以自己繼承該類進行實現                      afterExecute(task, thrown);                  }              } finally {                  //這裡設為null,也就是循環體再執行的時候會調用getTask方法                  task = null;                  w.completedTasks  ;                  w.unlock();              }          }          completedAbruptly = false;      } finally {          //當指定任務執行完成,阻塞隊列中也取不到可執行任務時,會進入這裡,做一些善後工作,比如在corePoolSize跟maximumPoolSize之間的woker會進行回收          processWorkerExit(w, completedAbruptly);      }  }

woker線程的執行流程就是首先執行初始化時分配給的任務,執行完成以後會嘗試從阻塞隊列中獲取可執行的任務,如果指定時間內仍然沒有任務可以執行,則進入銷毀邏輯。這裡只會回收corePoolSize與maximumPoolSize直接的那部分woker。

execute與submit的區別

執行任務除了可以使用execute方法還可以使用submit方法。它們的主要區別是:execute適用於不需要關注返回值的場景,submit方法適用於需要關注返回值的場景。

異常處理

當執行任務時發生異常,那麼該怎麼處理呢?首先看當Thread線程異常如何處理。

在任務中通過try…catch是可以捕獲異常並進行處理的,如下代碼:

Thread t = new Thread(() -> {      try {          System.out.println(1 / 0);      } catch (Exception e) {          LOGGER.error(e.getMessage(), e);      }  });  t.start();

如果很多線程任務默認的異常處理機制都是相同的,可以通過Thread類的UncaughtExceptionHandler來設置線程默認的異常處理機制。

實現UncaughtExceptionHandler接口,並調用Thread#setUncaughtExceptionHandler(UncaughtExceptionHandler)方法。如果想設置為全局默認異常處理機制,則可調用Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)方法。

ThreadGroup默認提供了異常處理機制如下:

public void uncaughtException(Thread t, Throwable e) {      if (parent != null) {          parent.uncaughtException(t, e);      } else {          Thread.UncaughtExceptionHandler ueh =              Thread.getDefaultUncaughtExceptionHandler();          if (ueh != null) {              ueh.uncaughtException(t, e);          } else if (!(e instanceof ThreadDeath)) {              System.err.print("Exception in thread ""                                 t.getName()   "" ");              e.printStackTrace(System.err);          }      }  }

ThreadPoolExecutor的異常處理機制與Thread是一樣的。同時,ThreadPoolExecutor提供了uncaughtExceptionHandler方法來設置異常處理。如下示例:

public class ThreadPool {    	public static void main(String[] args) {  		ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()  				.setNameFormat("demo-pool-%d")  				.setUncaughtExceptionHandler(new LogUncaughtExceptionHandler())  				.build();    		ExecutorService pool = new ThreadPoolExecutor(5, 200,  				0L, TimeUnit.MILLISECONDS,  				new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());    		pool.execute(() -> {  			throw new RuntimeException("測試異常");  		});    		pool.shutdown();  	}    	static class  LogUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {    		@Override  		public void uncaughtException(Thread t, Throwable e) {  			System.out.println("打印LogUncaughtExceptionHandler中獲得的異常信息:"   e.getMessage());  		}  	}  }

但需要注意的是使用UncaughtExceptionHandler的方法只適用於execute方法執行的任務,而對submit方法是無效。submit執行的任務,可以通過返回的Future對象的get方法接收拋出的異常,再進行處理。這也算是execute方法與submit方法的差別之一。

線程池中常見的隊列

線程池有以下工作隊列:

  • ArrayBlockingQueue:有界隊列,是一個用數組實現的有界阻塞隊列,按FIFO排序量。
  • LinkedBlockingQueue:可設置容量隊列,基於鏈表結構的阻塞隊列,按FIFO排序任務,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE,吞吐量通常要高於ArrayBlockingQuene;newFixedThreadPool線程池使用了這個隊列。
  • DelayQueue:延遲隊列,是一個任務定時周期的延遲執行的隊列。根據指定的執行時間從小到大排序,否則根據插入到隊列的先後排序。newScheduledThreadPool線程池使用了這個隊列。
  • PriorityBlockingQueue:優先級隊列,是具有優先級的無界阻塞隊列。
  • SynchronousQueue:同步隊列,一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene,newCachedThreadPool線程池使用了這個隊列。

關閉線程池

關閉線程池可以調用shutdownNow和shutdown兩個方法來實現。

shutdownNow:對正在執行的任務全部發出interrupt(),停止執行,對還未開始執行的任務全部取消,並且返回還沒開始的任務列表。

shutdown:當我們調用shutdown後,線程池將不再接受新的任務,但也不會去強制終止已經提交或者正在執行中的任務。

參考文章:

https://www.jianshu.com/p/5df6e38e4362

https://juejin.im/post/5d1882b1f265da1ba84aa676

原文鏈接:《面試題-關於Java線程池一篇文章就夠了