java執行緒池,阿里為什麼不允許使用Executors?

  • 2019 年 10 月 3 日
  • 筆記

帶著問題

  1. 阿里Java程式碼規範為什麼不允許使用Executors快速創建執行緒池?
  2. 下面的程式碼輸出是什麼?
ThreadPoolExecutor executor = new ThreadPoolExecutor(          1, //corePoolSize          100, //maximumPoolSize          100, //keepAliveTime          TimeUnit.SECONDS, //unit          new LinkedBlockingDeque<>(100));//workQueue    for (int i = 0; i < 5; i++) {      final int taskIndex = i;      executor.execute(() -> {          System.out.println(taskIndex);          try {              Thread.sleep(Long.MAX_VALUE);          } catch (InterruptedException e) {              e.printStackTrace();          }      });  }

A) 0 1 2 3 4 5

B) 0~5 順序不一致輸出5行

C) 0

基礎

什麼是執行緒池?

執行緒池可以通過池看出來是一個資源集,任何池的作用都大同小異,主要是用來減少資源創建、初始化的系統開銷。

創建執行緒很「貴」嗎?

是的。創建執行緒的代價是昂貴的。

我們都知道系統中的每個進程有自己獨立的記憶體空間,而被稱為輕量級進程的執行緒也是需要的。

在JVM中默認一個執行緒需要使用256k~1M(取決於32位還是64位作業系統)的記憶體。(具體的數組我們不深究,因為隨著JVM版本的變化這個默認值隨時可能發生變更,我們只需要知道執行緒是需要佔用記憶體的)

除了記憶體還有更多嗎?
許多文章會將上下文切換、CPU調度列入其中,這邊不將執行緒調度列入是因為睡眠中的執行緒不會被調度(OS控制),如果不是睡眠中的執行緒那麼是一定需要被調度的。
但在JVM中除了創建時的記憶體消耗,還會給GC帶來壓力,如果頻繁創建執行緒那麼相對的GC的時候也需要回收對應的執行緒。

執行緒池的機制?

可以看到執行緒池是一種重複利用執行緒的技術,執行緒池的主要機制就是保留一定的執行緒數在沒有事情做的時候使之睡眠,當有活乾的時候拿一個執行緒去運行。
這些牽扯到執行緒池實現的具體策略。

還有哪些常見的池?

  • 執行緒池
  • 連接池(資料庫連接、TCP連接等)
  • BufferPool
  • ……

Java中的執行緒池

UML圖(Java 8)

UML
可以看到真正的實現類有

  1. ThreadPoolExecutor (1.5)
  2. ForkJoinPool (1.7)
  3. ScheduledThreadPoolExecutor (1.5)

今天我們主要談談 ThreadPoolExecutor 也是使用率較高的一個實現。

Executors提供的工廠方法

  1. newCachedThreadPool (ThreadPoolExecutor)

    創建一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閑(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的添加新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠創建的最大執行緒大小。

  2. newFixedThreadPool (ThreadPoolExecutor)

    創建固定大小的執行緒池。每次提交一個任務就創建一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

  3. newSingleThreadExecutor (ThreadPoolExecutor)

    創建一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒串列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

  4. newScheduledThreadPool (ScheduledThreadPoolExecutor)

    創建一個大小無限的執行緒池。此執行緒池支援定時以及周期性執行任務的需求。

  5. newSingleThreadScheduledExecutor (ScheduledThreadPoolExecutor)

    創建一個單執行緒用於定時以及周期性執行任務的需求。

  6. newWorkStealingPool (1.8 ForkJoinPool)

    創建一個工作竊取

可以看到各種不同的工廠方法中使用的執行緒池實現類最終只有3個,對應關係如下:

工廠方法 實現類
newCachedThreadPool ThreadPoolExecutor
newFixedThreadPool ThreadPoolExecutor
newSingleThreadExecutor ThreadPoolExecutor
newScheduledThreadPool ScheduledThreadPoolExecutor
newSingleThreadScheduledExecutor ScheduledThreadPoolExecutor
newWorkStealingPool ForkJoinPool

ThreadPoolExecutor

首先我們看下 ThreadPoolExecutor 的完全構造函數

public ThreadPoolExecutor(int corePoolSize,                                int maximumPoolSize,                                long keepAliveTime,                                TimeUnit unit,                                BlockingQueue<Runnable> workQueue,                                ThreadFactory threadFactory,                                RejectedExecutionHandler handler)
  1. corePoolSize

    核心池大小,除非設置了 allowCoreThreadTimeOut 否則哪怕執行緒超過空閑時間,池中也要最少要保留這個數目的執行緒。

    需要注意的是,corePoolSize所需的執行緒並不是立即創建的,需要在提交任務之後進行創建,所以如果有大量的快取執行緒數可以先提交一個空任務讓執行緒池將執行緒先創建出來,從而提升後續的執行效率。

  2. maximumPoolSize

    允許的最大執行緒數。

  3. keepAliveTime

    空閑執行緒空閑存活時間,核心執行緒需要 allowCoreThreadTimeOut 為true才會退出。

  4. unit

    keepAliveTime 配合,設置 keepAliveTime 的單位,如:毫秒、秒。

  5. workQueue

    執行緒池中的任務隊列。上面提到執行緒池的主要作用是復用執行緒來處理任務,所以我們需要一個隊列來存放需要執行的任務,在使用池中的執行緒來處理這些任務,所以我們需要一個任務隊列。

  6. threadFactory

    當執行緒池判斷需要新的執行緒時通過執行緒工程創建執行緒。

  7. handler

    執行被阻止時的處理程式,執行緒池無法處理。這個與任務隊列相關,比如隊列中可以指定隊列大小,如果超過了這個大小該怎麼辦呢?JDK已經為我們考慮到了,並提供了4個默認實現。

    下列是JDK中默認攜帶的策略:

    1. AbortPolicy (默認)

      拋出 RejectedExecutionException 異常。

    2. CallerRunsPolicy

      調用當前執行緒池所在的執行緒去執行。

    3. DiscardPolicy

      直接丟棄當前任務。

    4. DiscardOldestPolicy

      將最舊的任務丟棄,將當前任務添加到隊列。

容易混淆的參數:corePoolSize maximumPoolSize workQueue

任務隊列、核心執行緒數、最大執行緒數的邏輯關係

  1. 當執行緒數小於核心執行緒數時,創建執行緒。
  2. 當執行緒數大於等於核心執行緒數,且任務隊列未滿時,將任務放入任務隊列。
  3. 當執行緒數大於等於核心執行緒數,且任務隊列已滿
    1. 若執行緒數小於最大執行緒數,創建執行緒
    2. 若執行緒數等於最大執行緒數,調用拒絕執行處理程式(默認效果為:拋出異常,拒絕任務)

那麼這三個參數推薦如何設置,有最優值嗎?

由於java對於協程的支援不友好,所以會大量依賴於執行緒池和執行緒。
從而這個值沒有最優推薦,需要根據業務需求情況來進行設置。
不同的需求類型可以創建多個不同的執行緒池來執行。

問題1:阿里開發規範為什麼不允許Executors快速創建執行緒池?

p3c

參考地址:https://github.com/alibaba/p3c

可以看到原因很簡單

  1. newSingleThreadExecutor
  2. newFixedThreadPool

workQueue 參數直接 使用了 new LinkedBlockingQueue<Runnable>() 理論上可以無限添加任務到執行緒池。

public static ExecutorService newFixedThreadPool(int nThreads) {      return new ThreadPoolExecutor(nThreads, nThreads,                                      0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>();  }    public static ExecutorService newSingleThreadExecutor() {      return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,      1,      0L,      TimeUnit.MILLISECONDS,      new LinkedBlockingQueue<Runnable>()));  }

如果提交到執行緒池的任務由問題,比如 sleep 永久,會造成記憶體泄漏,最終導致OOM。

同時 阿里還推薦自定義 threadFactory 設置執行緒名稱便於以後排查問題。

問題2:下面的程式碼輸出是什麼?

應該選C。
雖然最大執行緒數有100但核心執行緒數為1,任務隊列由100。
滿足了 ‘當執行緒數大於等於核心執行緒數,且任務隊列未滿時,將任務放入任務隊列。’ 這個條件。
所以後續添加的任務都會被堵塞。

最後

關於 ThreadPoolExecutor 的邏輯在實際使用的時候會有點奇怪,因為執行緒池中的執行緒並沒有超過最大執行緒數,有沒有一種可能當任務被堵塞很久的時候創建新的執行緒池來處理呢?

這邊推薦大家使用 newWorkStealingPool,也就是ForkJoinPool。採取了工作竊取的模式。
後續會跟大家一起聊聊 ForkJoinPool。