從源碼角度解析線程池中頂層接口和抽象類

摘要:我們就來看看線程池中那些非常重要的接口和抽象類,深度分析下線程池中是如何將抽象這一思想運用的淋漓盡致的。

本文分享自華為雲社區《【高並發】深度解析線程池中那些重要的頂層接口和抽象類》,作者:冰 河。

通過對線程池中接口和抽象類的分析,你會發現,整個線程池設計的是如此的優雅和強大,從線程池的代碼設計中,我們學到的不只是代碼而已!!

題外話:膜拜Java大神Doug Lea,Java中的並發包正是這位老爺子寫的,他是這個世界上對Java影響力最大的一個人。

一、接口和抽象類總覽

說起線程池中提供的重要的接口和抽象類,基本上就是如下圖所示的接口和類。

接口與類的簡單說明:

  • Executor接口:這個接口也是整個線程池中最頂層的接口,提供了一個無返回值的提交任務的方法。
  • ExecutorService接口:派生自Executor接口,擴展了很過功能,例如關閉線程池,提交任務並返回結果數據、喚醒線程池中的任務等。
  • AbstractExecutorService抽象類:派生自ExecutorService接口,實現了幾個非常實現的方法,供子類進行調用。
  • ScheduledExecutorService定時任務接口,派生自ExecutorService接口,擁有ExecutorService接口定義的全部方法,並擴展了定時任務相關的方法。

接下來,我們就分別從源碼角度來看下這些接口和抽象類從頂層設計上提供了哪些功能。

二、Executor接口

Executor接口的源碼如下所示。

public interface Executor {
    //提交運行任務,參數為Runnable接口對象,無返回值
    void execute(Runnable command);
}

從源碼可以看出,Executor接口非常簡單,只提供了一個無返回值的提交任務的execute(Runnable)方法。

由於這個接口過於簡單,我們無法得知線程池的執行結果數據,如果我們不再使用線程池,也無法通過Executor接口來關閉線程池。此時,我們就需要ExecutorService接口的支持了。

三、ExecutorService接口

ExecutorService接口是非定時任務類線程池的核心接口,通過ExecutorService接口能夠向線程池中提交任務(支持有返回結果和無返回結果兩種方式)、關閉線程池、喚醒線程池中的任務等。ExecutorService接口的源碼如下所示。

package java.util.concurrent;
import java.util.List;
import java.util.Collection;
public interface ExecutorService extends Executor {

    //關閉線程池,線程池中不再接受新提交的任務,但是之前提交的任務繼續運行,直到完成
    void shutdown();
 
    //關閉線程池,線程池中不再接受新提交的任務,會嘗試停止線程池中正在執行的任務。
    List<Runnable> shutdownNow();
 
    //判斷線程池是否已經關閉
    boolean isShutdown();
 
    //判斷線程池中的所有任務是否結束,只有在調用shutdown或者shutdownNow方法之後調用此方法才會返回true。
    boolean isTerminated();

    //等待線程池中的所有任務執行結束,並設置超時時間
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
 
    //提交一個Callable接口類型的任務,返回一個Future類型的結果
    <T> Future<T> submit(Callable<T> task);
 
    //提交一個Callable接口類型的任務,並且給定一個泛型類型的接收結果數據參數,返回一個Future類型的結果
    <T> Future<T> submit(Runnable task, T result);

    //提交一個Runnable接口類型的任務,返回一個Future類型的結果
    Future<?> submit(Runnable task);

    //批量提交任務並獲得他們的future,Task列表與Future列表一一對應
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
 
    //批量提交任務並獲得他們的future,並限定處理所有任務的時間
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit) throws InterruptedException;
 
    //批量提交任務並獲得一個已經成功執行的任務的結果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException; 
 
    //批量提交任務並獲得一個已經成功執行的任務的結果,並限定處理任務的時間
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

關於ExecutorService接口中每個方法的含義,直接上述接口源碼中的注釋即可,這些接口方法都比較簡單,我就不一一重複列舉描述了。這個接口也是我們在使用非定時任務類的線程池中最常使用的接口。

四、AbstractExecutorService抽象類

AbstractExecutorService類是一個抽象類,派生自ExecutorService接口,在其基礎上實現了幾個比較實用的方法,提供給子類進行調用。我們還是來看下AbstractExecutorService類的源碼。

注意:大家可以到java.util.concurrent包下查看完整的AbstractExecutorService類的源碼,這裡,我將AbstractExecutorService源碼進行拆解,詳解每個方法的作用。

newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

RunnableFuture類用於獲取執行結果,在實際使用時,我們經常使用的是它的子類FutureTask,newTaskFor方法的作用就是將任務封裝成FutureTask對象,後續將FutureTask對象提交到線程池。

doInvokeAny方法

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
    throws InterruptedException, ExecutionException, TimeoutException {
    //提交的任務為空,拋出空指針異常
    if (tasks == null)
        throw new NullPointerException();
    //記錄待執行的任務的剩餘數量
    int ntasks = tasks.size();
    //任務集合中的數據為空,拋出非法參數異常
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    //以當前實例對象作為參數構建ExecutorCompletionService對象
    // ExecutorCompletionService負責執行任務,後面調用用poll返回第一個執行結果
    ExecutorCompletionService<T> ecs =
        new ExecutorCompletionService<T>(this);

    try {
        // 記錄可能拋出的執行異常
        ExecutionException ee = null;
        // 初始化超時時間
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();
 
        //提交任務,並將返回的結果數據添加到futures集合中
        //提交一個任務主要是確保在進入循環之前開始一個任務
        futures.add(ecs.submit(it.next()));
        --ntasks;
        //記錄正在執行的任務數量
        int active = 1;

        for (;;) {
            //從完成任務的BlockingQueue隊列中獲取並移除下一個將要完成的任務的結果。
            //如果BlockingQueue隊列中中的數據為空,則返回null
            //這裡的poll()方法是非阻塞方法
            Future<T> f = ecs.poll();
            //獲取的結果為空
            if (f == null) {
                //集合中仍有未執行的任務數量
                if (ntasks > 0) {
                    //未執行的任務數量減1
                    --ntasks;
                    //提交完成並將結果添加到futures集合中
                    futures.add(ecs.submit(it.next()));
                    //正在執行的任務數量加•1
                    ++active;
                }
                //所有任務執行完成,並且返回了結果數據,則退出循環
                //之所以處理active為0的情況,是因為poll()方法是非阻塞方法,可能導致未返回結果時active為0
                else if (active == 0)
                    break;
                //如果timed為true,則執行獲取結果數據時設置超時時間,也就是超時獲取結果表示
                else if (timed) {    
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                //沒有設置超時,並且所有任務都被提交了,則一直阻塞,直到返回一個執行結果
                else
                    f = ecs.take();
            }
            //獲取到執行結果,則將正在執行的任務減1,從Future中獲取結果並返回
            if (f != null) {
                --active;
                try {
                    return f.get();
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        //如果從所有執行的任務中獲取到一個結果數據,則取消所有執行的任務,不再向下執行
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);
    }
}

這個方法是批量執行線程池的任務,最終返回一個結果數據的核心方法,通過源代碼的分析,我們可以發現,這個方法只要獲取到一個結果數據,就會取消線程池中所有運行的任務,並將結果數據返回。這就好比是很多要進入一個居民小區一樣,只要有一個人有門禁卡,門衛就不再檢查其他人是否有門禁卡,直接放行。

在上述代碼中,我們看到提交任務使用的ExecutorCompletionService對象的submit方法,我們再來看下ExecutorCompletionService類中的submit方法,如下所示。

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

public Future<V> submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task, result);
    executor.execute(new QueueingFuture(f));
    return f;
}

可以看到,ExecutorCompletionService類中的submit方法本質上調用的還是Executor接口的execute方法。

invokeAny方法

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    try {
        return doInvokeAny(tasks, false, 0);
    } catch (TimeoutException cannotHappen) {
        assert false;
        return null;
    }
}

public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                       long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

這兩個invokeAny方法本質上都是在調用doInvokeAny方法,在線程池中提交多個任務,只要返回一個結果數據即可。

直接看上面的代碼,大家可能有點暈。這裡,我舉一個例子,我們在使用線程池的時候,可能會啟動多個線程去執行各自的任務,比如線程A負責task_a,線程B負責task_b,這樣可以大規模提升系統處理任務的速度。如果我們希望其中一個線程執行完成返回結果數據時立即返回,而不需要再讓其他線程繼續執行任務。此時,就可以使用invokeAny方法。

invokeAll方法

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    //標識所有任務是否完成
    boolean done = false;
    try {
        //遍歷所有任務
        for (Callable<T> t : tasks) {
            將每個任務封裝成RunnableFuture對象提交任務
            RunnableFuture<T> f = newTaskFor(t);
            //將結果數據添加到futures集合中
            futures.add(f);
            //執行任務
            execute(f);
        }
        //遍歷結果數據集合
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            //任務沒有完成
            if (!f.isDone()) {
                try {
                    //阻塞等待任務完成並返回結果
                    f.get();
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        //任務完成(不管是正常結束還是異常完成)
        done = true;
        //返回結果數據集合
        return futures;
    } finally {
        //如果發生中斷異常InterruptedException 則取消已經提交的任務
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                     long timeout, TimeUnit unit)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks)
            futures.add(newTaskFor(t));

        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();

        for (int i = 0; i < size; i++) {
            execute((Runnable)futures.get(i));
            // 在添加執行任務時超時判斷,如果超時則立刻返回futures集合
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L)
                return futures;
        }
         // 遍歷所有任務
        for (int i = 0; i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                //對結果進行判斷時進行超時判斷
                if (nanos <= 0L)
                    return futures;
                try {
                    f.get(nanos, TimeUnit.NANOSECONDS);
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                } catch (TimeoutException toe) {
                    return futures;
                }
                //重置任務的超時時間
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

invokeAll方法同樣實現了無超時時間設置和有超時時間設置的邏輯。

無超時時間設置的invokeAll方法總體邏輯為:將所有任務封裝成RunnableFuture對象,調用execute方法執行任務,將返回的結果數據添加到futures集合,之後對futures集合進行遍歷判斷,檢測任務是否完成,如果沒有完成,則調用get方法阻塞任務,直到返回結果數據,此時會忽略異常。最終在finally代碼塊中對所有任務是否完成的標識進行判斷,如果存在未完成的任務,則取消已經提交的任務。

有超時設置的invokeAll方法總體邏輯與無超時時間設置的invokeAll方法總體邏輯基本相同,只是在兩個地方添加了超時的邏輯判斷。一個是在添加執行任務時進行超時判斷,如果超時,則立刻返回futures集合;另一個是每次對結果數據進行判斷時添加了超時處理邏輯。

invokeAll方法中本質上還是調用Executor接口的execute方法來提交任務。

submit方法

submit方法的邏輯比較簡單,就是將任務封裝成RunnableFuture對象並提交,執行任務後返回Future結果數據。如下所示。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

從源碼中可以看出submit方法提交任務時,本質上還是調用的Executor接口的execute方法。

綜上所述,在非定時任務類的線程池中提交任務時,本質上都是調用的Executor接口的execute方法。至於調用的是哪個具體實現類的execute方法,我們在後面的文章中深入分析。

五、ScheduledExecutorService接口

ScheduledExecutorService接口派生自ExecutorService接口,繼承了ExecutorService接口的所有功能,並提供了定時處理任務的能力,ScheduledExecutorService接口的源代碼比較簡單,如下所示。

package java.util.concurrent;

public interface ScheduledExecutorService extends ExecutorService {

    //延時delay時間來執行command任務,只執行一次
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //延時delay時間來執行callable任務,只執行一次
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    //延時initialDelay時間首次執行command任務,之後每隔period時間執行一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
 
    //延時initialDelay時間首次執行command任務,之後每延時delay時間執行一次
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

至此,我們分析了線程池體系中重要的頂層接口和抽象類。

通過對這些頂層接口和抽象類的分析,我們需要從中感悟並體會軟件開發中的抽象思維,深入理解抽象思維在具體編碼中的實現,最終,形成自己的編程思維,運用到實際的項目中,這也是我們能夠從源碼中所能學到的眾多細節之一。這也是高級或資深工程師和架構師必須了解源碼細節的原因之一。

 

點擊關注,第一時間了解華為雲新鮮技術~