線程池之Executor框架

線程池之Executor框架

Java的線程既是工作單元,也是執行機制。從JDK5開始,把工作機單元和執行機制分離開來。工作單元包括Runnable和Callable,而執行機制由Executor框架提供。

1. Executor框架簡介

1.1 Executor框架的兩級調度模型

在上層,Java多線程程序通常把應用分解為若干個任務,然後使用用戶級的調度器(Executor框架)將這些任務映射為固定數量的線程。

在底層,操作系統內核將這些線程映射到硬件處理器上。

image-20200820220148034

1.2 Executor框架的結構

Executor框架主要由3部分組成:

  • 任務。包括被執行任務需要實現的接口:Runnable接口或者Callable接口。
  • 任務的執行。包括任務執行機制的核心接口Executor,以及繼承自Executor的ExecutorService接口。Executor框架有兩個關鍵類實現了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 異步計算的結果。包括Future和實現Future的FutureTask類。

Executor框架的成員及其關係可以用一下的關係圖表示:

image-20200820221319222

Executor框架的使用示意圖:

image-20200820222719384

使用步驟:

  • 主線程首先創建實現Runnable或Callable接口的任務對象。工具類Executors可以把一個Runnable對象封裝為一個Callable對象(Executors.callable(Runnable task)Executors.callable(Runnable task, Object result))。
  • 創建Executor接口的實現類ThreadPoolExecutor類或者ScheduledThreadPoolExecutor類的對象,然後調用其execute()方法或者submit()方法把工作任務添加到線程中,如果有返回值則返回Future對象。其中Callable對象有返回值,因此使用submit()方法;而Runnable可以使用execute()方法,此外還可以使用submit()方法,只要使用callable(Runnable task)或者callable(Runnable task, Object result)方法把Runnable對象包裝起來就可以,使用callable(Runnable task)方法返回的null,使用callable(Runnable task, Object result)方法返回result。
  • 主線程可以執行Future對象的get()方法獲取返回值,也可以調用cancle()方法取消當前線程的執行。

1.3 Executor框架的使用案例

import java.util.concurrent.*;

public class ExecutorDemo {
    // 創建ThreadPoolExecutor實現類
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
            5,
            10,
            100,
            TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(5),
    );

    public static void main(String[] args) {
        // 採用submit()方法提交Callable對象並返回Future對象
        Future<String> future = executor.submit(new callableDemo());
        try {
            // get()方法獲取返回值
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            // 處理異常
            e.printStackTrace();
        } finally {
            // 關閉線程池
            executor.shutdown();
        }
    }
}

/**
 * 創建Callable接口的實現類
 */
class callableDemo implements Callable<String> {
    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        String s = "return string";
        return s;
    }
}

2. Executor框架成員

image-20200820225956625

2.1 ThreadPoolExecutor

直接創建ThreadPoolExecutor的實例對象,見//www.cnblogs.com/chiaki/p/13536624.html

ThreadPoolExecutor通常使用工廠類Executors創建,可以創建3種類型的ThreadPoolExecutor,即FixedThreadPool、SingleThreadExecutor以及CachedThreadPool。

  • FixedThreadPool適用於為了滿足資源管理的需求,而需要限制當先線程數量的應用場景,適用於負載比較重的服務器。

    public static ExecutorService es = Executors.newFixedThreadPool(int threadNums);
    public static ExecutorService es = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);
    
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    把線程池最大線程數量maxmumPoolSize和核心線程池的數量corePoolSize設置為threadNums,將參數keepAliveTime設置為0L。使用無界隊列LinkedBlockingQueue作為阻塞隊列,因此當任務不能立刻執行時,都會添加到阻塞隊列中,而且maximumPoolSize,keepAliveTime都是無效的。

  • SingleThreadExecutor:適用於需要保證順序地執行各個任務;並且在任意時間點,不會有多個線程是活動地應用場景。**

    public static ExecutorService es = Executors.newSingleThreadExecutor();
    public static ExecutorService es = Executors.newSingleThreadExecutor(ThreadFactory threadFactory);
    
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    因為阻塞隊列使用的是LinkedBlockingQueue,因此和FixedThreadPool一樣,參數maximumPoolSize以及keepAliveTime都是無效的。corePoolSize為1,因此最多只能創建一個線程

  • CachedThreadPool大小無界的線程池,適用於執行很多的短期異步任務的小程序,或者是負載較輕的服務器。

    public static ExecutorService es = Executors.newCachedThreadPool();
    public static ExecutorService es = Executors.newCachedThreadPool(ThreadFactory threadFactory);
    
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    CachedThreadPool使用SynchronizedQueue作為阻塞隊列,SynchronizedQueue是不存儲元素的阻塞隊列,實現「一對一的交付」,也就是說,每次向隊列中put一個任務必須等有線程來take這個任務,否則就會一直阻塞該任務,如果一個線程要take一個任務就要一直阻塞知道有任務被put進阻塞隊列。

    因為CachedThreadPool的maximumPoolSize為Integer.MUX_VALUE,因此CachedThreadPool是無界的線程池,也就是說可以一直不斷的創建線程,這樣可能會使CPU和內存資源耗盡。corePoolSize為0 ,因此在CachedThreadPool中直接通過阻塞隊列來進行任務的提交

2.2 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor類繼承了ThreadPoolExecutor並實現了ScheduledExecutorService接口。主要用於在給定的延遲後執行任務或者定期執行任務。

ScheduledThreadPoolExecutor通常使用Executors工廠類來創建,可創建2種類型的ScheduledThreadPoolExecutor,即ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor。

  • ScheduledThreadPoolExecutor:適用於若干個(固定)線程延時或者定期執行任務,同時為了滿足資源管理的需求而需要限制後台線程數量的場景。

    public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums);
    public static ScheduledExecutorService ses = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory);
    
  • SingleThreadScheduledExecutor:適用於需要單個線程延時或者定期的執行任務,同時需要保證各個任務順序執行的應用場景。

    public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums);
    public static ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);
    

ScheduledThreadPoolExecutor的實現:

ScheduledThreadPoolExecutor的實現主要是通過把任務封裝為ScheduledFutureTask來實現。通過調用scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞隊列添加一個實現了RunnableScheduledFutureTask接口的ScheduledFutureTask類對象。ScheduledFutureTask主要包括3個成員變量:

// 序列號,用於保存任務添加到阻塞隊列的順序
private final long sequenceNumber;
// 用於保存該任務將要被執行的具體時間
private long time;
// 周期,用於保存任務直線的間隔周期
private final long period;

ScheduledTreadPoolExecutor的阻塞隊列是用無界隊列DelayQueue實現的,可以實現元素延時delayTime後才能獲取元素,在ScheduledThreadPoolExecutor中,DelayQueue內部封裝了一個PriorityQueue,來對任務進行排序,首先對time排序,time小的在前,如果time一樣,則sequence小的在前,也就是說如果time一樣,那麼先被提交的任務先執行。

因為DelayQueue是一個無界的隊列,因此線程池的maximumPoolSize是無效的。ScheduledThreadPoolExecutor的工作流程大致如下:

image-20200820233614169

2.3 Future接口/FutureTask實現類

Future接口和實現Future接口的FutureTask實現類,代表異步計算的結果。

2.3.1 FutureTask的使用

FutureTask除了實現Future接口外還實現了Runnable接口。因此,FutureTask可以交給Executor執行,也可以條用線程直接執行(FutureTask.run())。根據FutureTask.run()方法被執行的時機,FutureTask可處於以下3種狀態:

  • 未啟動:創建了一個FutureTask對象但沒有執行FutureTask.run();
  • 已啟動:FutureTask.run()方法被執行的過程中;
  • 已完成:FutureTask.run()正常執行結束,或者FutureTask被取消(FutureTask.cancel()),或者執行FutureTask.run()時拋出異常而異常結束;

狀態遷移示意圖:

image-20200820234612077

FutureTask的get和cancle執行示意圖:

image-20200820234818187

2.3.2 FutureTask的實現

FutureTask是一個基於AQS同步隊列實現的一個自定義同步組件,通過對同步狀態state的競爭實現acquire或者release操作。

FutureTask的內部類Sync實現了AQS接口,通過對tryAcquire等抽象方法的重寫和模板方法的調用來實現內部類Sync的tryAcquireShared等方法,然後聚合Sync的方法來實現FutureTask的get和cancel等方法。

FutureTask的設計示意圖:

image-20200820235358434

FutureTask的get方法最終會調用AQS.acquireSharedInterruptibly(int arg)方法:

  • 調用AQS.acquireSharedInterruptibly(int arg)方法會首先調用tryAcquireShared()方法判斷acquire操作是否可以成功,可以成功的條件是state為執行完成狀態RAN或者已取消狀態CANCELLED,且runner不為null;
  • 如果成功則get()方法立即返回,如果失敗則到線程等待隊列執行release操作;
  • 當其他線程執行release操作喚醒當前線程後(比如FutureTask.run()FutureTask.cancle(...)),當前線程再次執行tryAcquireShared()將返回正值1,當前線程離開現場等待隊列並喚醒它的後繼線程(級聯喚醒);
  • 最後返回計算的結果或拋出異常。

image-20200821000310543

2.3.3 FutureTask的使用場景
  • 當一個線程需要等待另一個線程把某個任務執行完以後它才能繼續執行時;
  • 有若干線程執行若干任務,每個任務最多只能被執行一次;
  • 當多個線程師徒執行同一個任務,但只能允許一個線程執行此任務,其它線程需要等這個任務被執行完畢以後才能繼續執行時。

2.4 Runnable和Callable接口

用於實現線程要執行的工作單元。

2.5 Executors工廠類

提供了常見配置線程池的方法,因為ThreadPoolExecutor的參數眾多且意義重大,為了避免配置出錯,才有了Executors工廠類。

3. 為什麼不建議使用Executors創建線程池?

FixedThreadPoolSingleThreadExecutor:允許請求的隊列長度為Integer.MAX_VALUE(無界的阻塞隊列),可能堆積大量的請求,從而導致OOM。

CachedThreadPoolScheduledThreadPool:允許創建的線程數量為Integer.MAX_VALUE(無界的阻塞隊列),可能會創建大量線程,從而導致OOM。