java多執行緒9:執行緒池

執行緒池

執行緒池的優點

我們知道執行緒的創建和上下文的切換也是需要消耗CPU資源的,所以在多執行緒任務下,使用執行緒池的優點就有:

第一:降低資源消耗。通過重複利用已創建的執行緒降低執行緒創建和銷毀造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到執行緒創建就能立即執行。

第三:提高執行緒的可管理性。執行緒是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,

使用執行緒池可以進行統一分配、調優和監控。

 

執行緒池的實現原理

我們看下執行緒池的主要處理流程,ThreadPoolExecutor執行示意圖

 

 

1)如果當前運行的執行緒少於corePoolSize,則創建新執行緒來執行任務(注意,執行這一步驟需要獲取全局鎖)。

2)如果運行的執行緒等於或多於corePoolSize,則將任務加入BlockingQueue。

3)如果無法將任務加入BlockingQueue(隊列已滿),則創建新的執行緒來處理任務(注意,執行這一步驟需要獲取全局鎖)。

4)如果創建新執行緒將使當前運行的執行緒超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()方法。

 

 

看下構造方法中核心的7個參數

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

1:corePoolSize(執行緒池的基本大小)

當提交一個任務到執行緒池時,執行緒池會創建一個執行緒來執行任務,即使其他空閑的基本執行緒能夠執行新任務也會創建執行緒,等到需要執行的任務數大於corePoolSize時就不再創建。

如果調用了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前創建並啟動所有基本執行緒。

如果調用了執行緒池的allowsCoreThreadTimeOut()方法,執行緒池的核心執行緒可以在等待新任務超時後自動銷毀。

2:maximumPoolSize(執行緒池最大數量)

執行緒池允許創建的最大執行緒數。如果隊列滿了,並且已創建的執行緒數小於最大執行緒數,則執行緒池會再創建新的執行緒執行任務。這也是當前執行緒池能同時運行的最大執行緒數。

3:keepAliveTime(執行緒活動保持時間)

執行緒池的工作執行緒空閑後,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高執行緒的利用率。

4:unit(執行緒活動保持時間的單位)

可選的單位有天(DAYS)、小時(HOURS)、分鐘(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)。

5:workQueue(任務隊列)

用於保存等待執行的任務的阻塞隊列。

阻塞隊列的數據結構與功能可以參考:java多執行緒8:阻塞隊列與Fork/Join框架,可用於執行緒池的有:

ArrayBlockingQueue、PriorityBlockingQueue、LinkedBlockingQueue 靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列、

SynchronousQueue 靜態工廠方法Executors.newCachedThreadPool使用了這個隊列

6:ThreadFactory:用於設置創建執行緒的工廠,可以通過執行緒工廠給每個創建出來的執行緒設置更有意義的名字。

7:RejectedExecutionHandler(飽和策略):當隊列和執行緒池都滿了,說明執行緒池處於飽和狀態,那麼必須採取一種策略處理提交的新任務。

這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。在JDK 1.5中Java執行緒池框架提供了以下4種策略:

    * AbortPolicy:直接拋出異常。

  * CallerRunsPolicy:使用調用者所在執行緒來運行任務。

  * DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。

  * DiscardPolicy:不處理,丟棄掉。

當然,也可以根據應用場景需要來實現RejectedExecutionHandler介面自定義策略。如記錄日誌或持久化存儲不能處理的任務。

 

執行緒池的創建

在Executors 中為我們提供了大多數場景下幾種常用的執行緒池創建方法

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

  單執行緒執行緒池,執行緒池中核心執行緒數和最大執行緒數都是1,workQueue選擇了Integer.MAX_VALUE 長度的LinkedBlockingQueue,基本上不管來多少任務都在排隊等待一個一個的執行。

因為workQueue是無界的,也就是說排隊的任務永遠不會多過workQueue的容量,那maximum其實設置多少都無所謂了

 

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

  固定大小的執行緒池,無非是讓執行緒池中能運行的執行緒編程了手動指定的nThreads罷了,和單執行緒的執行緒池異曲同工。

 

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  無界執行緒池,意思是不管多少任務提交進來,都直接運行。無界執行緒池採用了SynchronousQueue,採用這個執行緒池就沒有workQueue容量一說了,只要添加進去的執行緒就會被拿去用。

既然是無界執行緒池,那執行緒數肯定沒上限,所以以maximumPoolSize為主了,設置為一個近似的無限大Integer.MAX_VALUE。

另外注意一下,單執行緒執行緒池和固定大小執行緒池執行緒都不會進行自動回收的,也即是說保證提交進來的任務最終都會被處理,但至於什麼時候處理,就要看處理能力了。

但是無界執行緒池是設置了回收時間的,由於corePoolSize為0,所以只要60秒沒有被用到的執行緒都會被直接移除。

 

上面三種創建執行緒池的方式,有一個最大的弊端就是提交任務可以無限制,這樣就很容易導致我們服務OOM,阿里的java開發手冊在並發處理一節中就強制建議:

【強制】執行緒池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的運行規則,規避資源耗盡的風險。

  說明:Executors 返回的執行緒池對象的弊端如下:

    1:FixedThreadPool 和 SingleThreadPool: workQueue默認都是Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM

    2:CachedThreadPool: 允許創建執行緒數量為Integer.MAX_VALUE,可能會創建大量執行緒,從而導致 OOM

 通常來說,我們一般顯示的通過ThreadPoolExecutor來創建自定義執行緒池,根據性質不同的任務可以用不同規模的執行緒池分開處理。

CPU密集型任務應配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。IO密集型任務執行緒並不是一直在執行任務,則應配置儘可能多的執行緒,如2*Ncpu。

對於任務隊列workQueue,還是建議使用有界隊列可以提高系統的穩定性,而且可以通過我們自定義的拒絕策略去排序執行緒池的問題。

 

執行緒池的監控

可以通過執行緒池提供的參數進行監控,在監控執行緒池的時候可以使用以下屬性。

taskCount:執行緒池需要執行的任務數量。

completedTaskCount:執行緒池在運行過程中已完成的任務數量,小於或等於taskCount。

largestPoolSize:執行緒池裡曾經創建過的最大執行緒數量。通過這個數據可以知道執行緒池是否曾經滿過。如該數值等於執行緒池的最大大小,則表示執行緒池曾經滿過。

getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷毀的話,執行緒池裡的執行緒不會自動銷毀,所以這個大小隻增不減。

getActiveCount:獲取活動的執行緒數。

 

可以通過繼承執行緒池來自定義執行緒池,重寫執行緒池的beforeExecute、afterExecute和terminated方法,也可以在任務執行前、執行後和執行緒池關閉前執行一些程式碼來進行監控。

例如,監控任務的平均執行時間、最大執行時間和最小執行時間等

 

執行緒池的關閉

可以通過調用執行緒池的shutdown或shutdownNow方法來關閉執行緒池。

它們的原理是遍歷執行緒池中的工作執行緒,然後逐個調用執行緒的interrupt方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。

它們的區別是,shutdownNow首先將執行緒池的狀態設置成STOP,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,

而shutdown只是將執行緒池的狀態設置成SHUTDOWN狀態,然後中斷所有沒有正在執行任務的執行緒。

只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時調用isTerminaed方法會返回true。

至於應該調用哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常調用shutdown方法來關閉執行緒池,如果任務不一定要執行完,則可以調用shutdownNow方法。

awaitTermination(long timeout, TimeUnit unit) 設定超時時間及單位,當等待超過設定時間時,會監測執行緒池是否已經關閉,若關閉則返回true,否則返回false。一般情況下會和shutdown方法組合使用。

 

參考文獻

1:《Java並發編程的藝術》