自定義執行緒池ThreadPoolExecutor

使用自定義的方式創建執行緒池

Java本身提供的獲取執行緒池的方式

使用Executors直接獲取執行緒池,注意,前四個方式的底層都是通過new ThreadPoolExecutor()的方式創建的執行緒池,只是參數不一樣而已,我們也正是利用了這點特性來實現自己的執行緒池

1. newCachedThreadPool

創建一個可快取無限制數量的執行緒池,
如果執行緒池中沒有空閑的執行緒的話,再來任務會新建執行緒,
執行緒60s內沒被使用,則銷毀。
簡單的說,忙不過來的時候就新建執行緒

Executors.newCachedThreadPool()

底層實現
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

常駐核心執行緒數為0
執行緒池最大執行緒數Integer.MAX_VALUE
執行緒空閑60S回收
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險

2. newFixedThreadPool

創建一直指定大小的執行緒池,如果執行緒池滿了,後面的任務會在隊列中等待,等拿到空閑的執行緒才能執行

Executors.newFixedThreadPool(nThreads)

底層實現
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

常駐核心執行緒數自定義
執行緒池最大執行緒數等於常駐核心執行緒數
因為最大執行緒數等於常駐核心執行緒,而常駐核心執行緒不會被回收,所以時間參數為0
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於隊列沒有指明長度,默認為Integer.MAX_VALUE,所以有OOM的風險

3. newSingleThreadExecutor

創建一個大小為1的執行緒池,用唯一的執行緒來執行任務,保證任務有序進行

Executors.newSingleThreadExecutor()

底層實現
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

常駐核心執行緒數為1
執行緒池最大執行緒數等於常駐核心執行緒數1
執行緒池裡一共就一個常駐核心執行緒,所以不會被回收,所以時間參數為0
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於隊列沒有指明長度,默認為Integer.MAX_VALUE,所以有OOM的風險

4. newScheduledThreadPool

創建指定大小的執行緒池,支援定時及周期性的執行任務

Executors.newScheduledThreadPool(corePoolSize)

底層實現
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,一下為ScheduledThreadPoolExecutor的構造方法

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

常駐核心執行緒數自定義
執行緒池最大執行緒數等於Integer.MAX_VALUE
不回收執行緒
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險

5. newWorkStealingPool

JDK1.8 引入
創建持有足夠執行緒的執行緒池來支援給定的並行級別,
並通過使用多個隊列減少競爭,並行級別的參數,如果不傳,默認為cpu的數量,
返回的不再是 ThreadPoolExecutor 而是 ForkJoinPool

Executors.newWorkStealingPool()

底層實現為**ForkJoinPool**,和上面的四個不同
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

正在研究ing

在阿里巴巴Java開發手冊中明確指出,不允許使用jdk自帶的方式獲取執行緒池。就是上面的前四個方法,所以,我們自己創建即可

創建自定義的執行緒工廠

public class ThreadFactoryImpl implements ThreadFactory {
    /**
     * 執行緒池號
     */
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    /**
     * 執行緒前綴名稱
     */
    private final String namePrefix;
    /**
     * 創建初始值為1且執行緒安全的執行緒號
     */
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public ThreadFactoryImpl(String whatFeatureOfGroup) {
        namePrefix = "ThreadFactoryImpl's " + whatFeatureOfGroup + "-work-";
    }

    @Override
    public Thread newThread(Runnable r) {
        int threadNextId = threadNumber.getAndIncrement();
        String name = namePrefix + threadNextId;
        Thread thread = new Thread(null, r, name, 0);
        System.out.println("創建的第"+threadNextId+"個執行緒");
        return thread;
    }
}

AtomicInteger 實現了原子性,保證了高並發下的執行緒安全,該系類還有很多。
我們可以在自定義的執行緒工廠裡面添加我們需要的內容
不指定的話,會是默認的執行緒工廠
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

創建自定義執行緒池拒絕策略

public class ThreadPoolRejectHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("task rejected. "+executor.toString());
    }
}

在ThreadPoolExecutor中提供了四個公開的內部靜態類
1. AbortPolicy(默認):丟棄任務並拋出RejectedExecutionException異常
2. DiscardPloicy:丟棄任務,不拋出異常(不推薦使用)
3. DiscardOldestPolicy:丟棄隊列中等待最久的任務,把當前任務加入到隊列中
4. CallerRunsPolicy:繞過執行緒池,直接調用任務的run()方法。
**根據需求選中合適的策略才是正確的**

實現我們的執行緒池

public class ThreadPoolUtil {

    /**
     * @param corePoolSize    常駐核心執行緒,執行緒池初始化的時候池裡是沒有執行緒的,前面 corePoolSize 個任務是會創建執行緒,
     *                        當前執行緒池中的數量大於常駐核心執行緒數的時候,如果有空閑的執行緒則使用,沒有的話就把任務放到
     *                        工作隊列中
     * @param maximumPoolSize 執行緒池允許創建的最大執行緒數,如果隊列滿了,且執行緒數小於最大執行緒數,則新建臨時執行緒(空閑超過時間會被銷毀的),
     *                        如果隊列為無界隊列,則該參數無用
     * @param workQueueSize   工作隊列,請求執行緒數大於常駐核心執行緒數的時候,將多餘的任務放到工作隊列
     * @param threadName      執行緒名稱
     * @param handler         執行緒池拒絕策略,當執行緒池和隊列都滿了,則調用該策略,執行具體的邏輯
     * @author: taoym
     * @date: 2020/9/9 11:35
     * @desc: 自定義執行緒池的實現 總體邏輯就是 前corePoolSize個任務時,來一個任務就創建一個執行緒
     * 如果當前執行緒池的執行緒數大於了corePoolSize那麼接下來再來的任務就會放入到我們上面設置的workQueue隊列中
     * 如果此時workQueue也滿了,那麼再來任務時,就會新建臨時執行緒,那麼此時如果我們設置了keepAliveTime或者設置了allowCoreThreadTimeOut,那麼系統就會進行執行緒的活性檢查,一旦超時便銷毀執行緒
     * 如果此時執行緒池中的當前執行緒大於了maximumPoolSize最大執行緒數,那麼就會執行我們剛才設置的handler拒絕策略
     */
    public static ExecutorService createThreadPool(int corePoolSize,
                                                   int maximumPoolSize,
                                                   int workQueueSize,
                                                   String threadName,
                                                   RejectedExecutionHandler handler) {
        BlockingQueue workQueue = new LinkedBlockingDeque(workQueueSize);
        ThreadFactoryImpl threadFactory = new ThreadFactoryImpl(threadName);

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, workQueue, handler);
        // 提前創建好核心執行緒
        //threadPoolExecutor.prestartAllCoreThreads();
        // 常駐核心執行緒的空閑時間超過 keepAliveTime 的時候要被回收
        //threadPoolExecutor.allowCoreThreadTimeOut(true);

        return threadPoolExecutor;
    }
}

注釋寫的很明白了,desc下的注釋來自springForAll的文章。別的都是自己找的加上自己所理解的編寫而成。

文章是剛研究了《碼出高效》的執行緒池篇加上對源碼文檔的理解,趁熱打鐵寫出來的,寫的不好,多多見諒。