Java並發編程(08):Executor執行緒池框架

本文源碼:GitHub·點這裡 || GitEE·點這裡

一、Executor框架簡介

1、基礎簡介

Executor系統中,將執行緒任務提交和任務執行進行了解耦的設計,Executor有各種功能強大的實現類,提供便捷方式來提交任務並且獲取任務執行結果,封裝了任務執行的過程,不再需要Thread().start()方式,顯式創建執行緒並關聯執行任務。

2、調度模型

執行緒被一對一映射為服務所在作業系統執行緒,啟動時會創建一個作業系統執行緒;當該執行緒終止時,這個作業系統執行緒也會被回收。

3、核心API結構

Executor框架包含的核心介面和主要的實現類如下圖所示:

執行緒池任務:核心介面:Runnable、Callable介面和介面實現類;

任務的結果:介面Future和實現類FutureTask;

任務的執行:核心介面Executor和ExecutorService介面。在Executor框架中有兩個核心類實現了ExecutorService介面,ThreadPoolExecutor和ScheduledThreadPoolExecutor。

二、用法案例

1、API基礎

ThreadPoolExecutor基礎構造

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}
參數名 說明
corePoolSize 執行緒池的核心大小,隊列沒滿時,執行緒最大並發數
maximumPoolSize 最大執行緒池大小,隊列滿後執行緒能夠容忍的最大並發數
keepAliveTime 空閑執行緒等待回收的時間限制
unit keepAliveTime時間單位
workQueue 阻塞的隊列類型
threadFactory 創建執行緒的工廠,一般用默認即可
handler 超出工作隊列和執行緒池時,任務會默認拋出異常

2、初始化方法

ExecutorService :Executors.newFixedThreadPool();
ExecutorService :Executors.newSingleThreadExecutor();
ExecutorService :Executors.newCachedThreadPool();

ThreadPoolExecutor :new ThreadPoolExecutor() ;

通常情況下,執行緒池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式更加明確執行緒池的運行規則,規避資源耗盡的風險。

3、基礎案例

package com.multy.thread.block08executor;
import java.util.concurrent.*;

public class Executor01 {
    // 定義執行緒池
    private static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                    3,10,5000,TimeUnit.SECONDS,
                    new SynchronousQueue<>(),Executors.defaultThreadFactory(),new ExeHandler());
    public static void main(String[] args) {
        for (int i = 0 ; i < 100 ; i++){
            poolExecutor.execute(new PoolTask(i));
            //帶返回值:poolExecutor.submit(new PoolTask(i));
        }
    }
}
// 定義執行緒池任務
class PoolTask implements Runnable {

    private int numParam;

    public PoolTask (int numParam) {
        this.numParam = numParam;
    }
    @Override
    public void run() {
        try {
            System.out.println("PoolTask "+ numParam+" begin...");
            Thread.sleep(5000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public int getNumParam() {
        return numParam;
    }
    public void setNumParam(int numParam) {
        this.numParam = numParam;
    }
}
// 定義異常處理
class ExeHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        System.out.println("ExeHandler "+executor.getCorePoolSize());
        executor.shutdown();
    }
}

流程分析

  • 執行緒池中執行緒數小於corePoolSize時,新任務將創建一個新執行緒執行任務,不論此時執行緒池中存在空閑執行緒;
  • 執行緒池中執行緒數達到corePoolSize時,新任務將被放入workQueue中,等待執行緒池中任務調度執行;
  • 當workQueue已滿,且maximumPoolSize>corePoolSize時,新任務會創建新執行緒執行任務;
  • 當workQueue已滿,且提交任務數超過maximumPoolSize,任務由RejectedExecutionHandler處理;
  • 當執行緒池中執行緒數超過corePoolSize,且超過這部分的空閑時間達到keepAliveTime時,回收該執行緒;
  • 如果設置allowCoreThreadTimeOut(true)時,執行緒池中corePoolSize範圍內的執行緒空閑時間達到keepAliveTime也將回收;

三、執行緒池應用

應用場景:批量賬戶和密碼的校驗任務,在實際的業務中算比較常見的,通過初始化執行緒池,把任務提交執行,最後拿到處理結果,這就是執行緒池使用的核心思想:節省資源提升效率。

public class Executor02 {

    public static void main(String[] args) {
        // 初始化校驗任務
        List<CheckTask> checkTaskList = new ArrayList<>() ;
        initList(checkTaskList);
        // 定義執行緒池
        ExecutorService executorService ;
        if (checkTaskList.size() < 10){
            executorService = Executors.newFixedThreadPool(checkTaskList.size());
        }else{
            executorService = Executors.newFixedThreadPool(10);
        }
        // 批量處理
        List<Future<Boolean>> results = new ArrayList<>() ;
        try {
            results = executorService.invokeAll(checkTaskList);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        // 查看結果
        for (Future<Boolean> result : results){
            try {
                System.out.println(result.get());
                // System.out.println(result.get(10000,TimeUnit.SECONDS));
            } catch (Exception e) {
                e.printStackTrace() ;
            }
        }
        // 關閉執行緒池
        executorService.shutdownNow();
    }

    private static void initList (List<CheckTask> checkTaskList){
        checkTaskList.add(new CheckTask("root","123")) ;
        checkTaskList.add(new CheckTask("root1","1234")) ;
        checkTaskList.add(new CheckTask("root2","1235")) ;
    }
}
// 校驗任務
class CheckTask implements Callable<Boolean> {
    private String userName ;
    private String passWord ;
    public CheckTask(String userName, String passWord) {
        this.userName = userName;
        this.passWord = passWord;
    }
    @Override
    public Boolean call() throws Exception {
        // 校驗賬戶+密碼
        if (userName.equals("root") && passWord.equals("123")){
            return Boolean.TRUE ;
        }
        return Boolean.FALSE ;
    }
}

執行緒池主要用來解決執行緒生命周期開銷問題和資源不足問題,通過執行緒池對多個任務執行緒重複使用,執行緒創建也被分攤到多個任務上,多數任務提交就有空閑的執行緒可以使用,所以消除執行緒頻繁創建帶來的開銷。

四、源程式碼地址

GitHub·地址
//github.com/cicadasmile/java-base-parent
GitEE·地址
//gitee.com/cicadasmile/java-base-parent

推薦閱讀:Java並發編程

序號 文章標題
01 Java並發:執行緒的創建方式,狀態周期管理
02 Java並發:執行緒核心機制,基礎概念擴展
03 Java並發:多執行緒並發訪問,同步控制
04 Java並發:執行緒間通訊,等待/通知機制
05 Java並發:悲觀鎖和樂觀鎖機制
06 Java並發:Lock機制下API用法詳解
07 Java並發:Fork/Join框架機制詳解