JUC之執行緒池基礎與簡單源碼分析

執行緒池

定義和方法

執行緒池的工作時控制運行的執行緒數量,處理過程中將任務放入隊列,然後在執行緒創建後啟動這些任務,如果執行緒數量超過了最大數量,超出數量的執行緒排隊等候,等待其他執行緒執行完成,再從隊列中取出任務來執行。

特點:

執行緒復用,控制最大並發數,管理執行緒。

好處:

  1. 降低資源消耗。通過重複利用已創建的執行緒來降低執行緒創建和銷毀造成的消耗。
  2. 提升響應速度。當任務到達時,任務不需要等待執行緒創建就能立即執行
  3. 提高執行緒的可管理性。當執行緒時稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配、調優和監控。

簡單的架構圖:

image-20220113160721762

我們使用的其實就是ThreadPoolExecutor.

阿里巴巴開發規範手冊:

【強制】執行緒池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這樣的處理方式讓寫的同學更加明確執行緒池的運行規則,規避資源耗盡的風險。

說明:Executors 返回的執行緒池對象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool:

允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM。

2)CachedThreadPool:

允許的創建執行緒數量為 Integer.MAX_VALUE,可能會創建大量的執行緒,從而導致 OOM。

首先使用工具類(Executors)來創建執行緒池:

FixedThreadPool被稱為可重用固定執行緒數的執行緒池,執行長期任務性能好,創建一個執行緒池,一池有N個固定的執行緒,有固定的執行緒數的執行緒池。

public class demo1 {
    public static void main(String[] args) {
        //使用Executors工具類來創建newFixedThreadPool執行緒池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        try{
            for (int i = 0; i < 10; i++) {
                executor.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"任務執行");
                });
            }
        } finally {
            executor.shutdown();
        }
    }
}

SingleThreadExecutor是使用單個worker執行緒的Executor,省去了創建執行緒和銷毀執行緒時資源消耗。

public class demo1 {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try{
            for (int i = 0; i < 10; i++) {
                executor.execute(()->{

                    System.out.println(Thread.currentThread().getName()+"\t任務執行");
                });
            }
        } finally {
            executor.shutdown();
        }
    }
}

CachedThreadPool是一個會根據需要創建新執行緒的執行緒池。可擴容的執行緒池。

public class demo1 {
    public static void main(String[] args) {
        //使用Executors工具類來創建newFixedThreadPool執行緒池
        //ExecutorService executor = Executors.newFixedThreadPool(3);
        //一個執行緒池就一個執行緒
        //ExecutorService executor = Executors.newSingleThreadExecutor();
        ExecutorService executor = Executors.newCachedThreadPool();
        try{
            for (int i = 0; i < 10; i++) {
                executor.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t任務執行");
                });
            }
        } finally {
            executor.shutdown();
        }
    }
}
pool-1-thread-2	任務執行
pool-1-thread-1	任務執行
pool-1-thread-4	任務執行
pool-1-thread-3	任務執行
pool-1-thread-5	任務執行
pool-1-thread-6	任務執行
pool-1-thread-7	任務執行
pool-1-thread-8	任務執行
pool-1-thread-9	任務執行
pool-1-thread-10	任務執行

當設置延時操作來模仿耗時的程式碼時使用執行緒池的過程。

package com.JucPool;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 基礎認識以及執行緒池的三大方法
 */
public class demo1 {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        try{
            for (int i = 0; i < 10; i++) {
                //暫停幾秒
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executor.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t任務執行");
                });
            }
        } finally {
            executor.shutdown();
        }
    }
}
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行
pool-1-thread-1	任務執行

Process finished with exit code 0

這時就降為SingleThreadExecutor執行緒池。

簡單源碼分析

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

通過對比發現,底層都是通過ThreadPoolExecutor來實現,只是參數的不同,傳遞的參數與阻塞隊列也有關係。

參數:(非常重要)

  1. ·corePool:執行緒池常駐核心執行緒池的大小。
  2. ·maximumPool:最大執行緒池的大小–執行緒池中能夠容納同時執行的最大執行緒數
  3. keepAliveTime:多餘的空閑執行緒的存活時間,當前池中執行緒數超過了corePool時並且空閑時間到達keepAliveTime,多餘的執行緒會被銷毀直到只剩下corePool個執行緒為止
  4. unit – keepAliveTime參數的時間單位
  5. workQueue – 用於在執行任務之前保存任務的隊列。 此隊列將僅保存由execute方法提交的Runnable任務。

在ThreadPoolExecutor中還存在兩個參數:

  1. ThreadFactory: 表示生成執行緒池中工作執行緒的執行緒工廠,用於創建執行緒,一般默認
  2. Handler:拒絕策略,表示當隊列滿了,並且工作的執行緒要超過最大執行緒池的大小,如何讓拒絕請求執行的runnable的策略

上述是執行緒池中的七大參數總結,很重要,關於什麼情況下走到拒絕策略,後面會分析執行緒池的執行流程。