自定義執行緒池ThreadPoolExecutor
使用自定義的方式創建執行緒池
Java本身提供的獲取執行緒池的方式
使用Executors直接獲取執行緒池,注意,前四個方式的底層都是通過new ThreadPoolExecutor()的方式創建的執行緒池,只是參數不一樣而已,我們也正是利用了這點特性來實現自己的執行緒池
1. newCachedThreadPool
創建一個可快取無限制數量的執行緒池,
如果執行緒池中沒有空閑的執行緒的話,再來任務會新建執行緒,
執行緒60s內沒被使用,則銷毀。
簡單的說,忙不過來的時候就新建執行緒
Executors.newCachedThreadPool()
底層實現
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
常駐核心執行緒數為0
執行緒池最大執行緒數Integer.MAX_VALUE
執行緒空閑60S回收
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險
2. newFixedThreadPool
創建一直指定大小的執行緒池,如果執行緒池滿了,後面的任務會在隊列中等待,等拿到空閑的執行緒才能執行
Executors.newFixedThreadPool(nThreads)
底層實現
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
常駐核心執行緒數自定義
執行緒池最大執行緒數等於常駐核心執行緒數
因為最大執行緒數等於常駐核心執行緒,而常駐核心執行緒不會被回收,所以時間參數為0
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於隊列沒有指明長度,默認為Integer.MAX_VALUE,所以有OOM的風險
3. newSingleThreadExecutor
創建一個大小為1的執行緒池,用唯一的執行緒來執行任務,保證任務有序進行
Executors.newSingleThreadExecutor()
底層實現
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
常駐核心執行緒數為1
執行緒池最大執行緒數等於常駐核心執行緒數1
執行緒池裡一共就一個常駐核心執行緒,所以不會被回收,所以時間參數為0
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於隊列沒有指明長度,默認為Integer.MAX_VALUE,所以有OOM的風險
4. newScheduledThreadPool
創建指定大小的執行緒池,支援定時及周期性的執行任務
Executors.newScheduledThreadPool(corePoolSize)
底層實現
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,一下為ScheduledThreadPoolExecutor的構造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
常駐核心執行緒數自定義
執行緒池最大執行緒數等於Integer.MAX_VALUE
不回收執行緒
使用工作隊列來存放任務,這個隊列有好幾種,各有各的特性。
**風險:**由於執行緒池最大執行緒池為Integer.MAX_VALUE,所以有OOM的風險
5. newWorkStealingPool
JDK1.8 引入
創建持有足夠執行緒的執行緒池來支援給定的並行級別,
並通過使用多個隊列減少競爭,並行級別的參數,如果不傳,默認為cpu的數量,
返回的不再是 ThreadPoolExecutor 而是 ForkJoinPool
Executors.newWorkStealingPool()
底層實現為**ForkJoinPool**,和上面的四個不同
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
正在研究ing
在阿里巴巴Java開發手冊中明確指出,不允許使用jdk自帶的方式獲取執行緒池。就是上面的前四個方法,所以,我們自己創建即可
創建自定義的執行緒工廠
public class ThreadFactoryImpl implements ThreadFactory {
/**
* 執行緒池號
*/
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/**
* 執行緒前綴名稱
*/
private final String namePrefix;
/**
* 創建初始值為1且執行緒安全的執行緒號
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
public ThreadFactoryImpl(String whatFeatureOfGroup) {
namePrefix = "ThreadFactoryImpl's " + whatFeatureOfGroup + "-work-";
}
@Override
public Thread newThread(Runnable r) {
int threadNextId = threadNumber.getAndIncrement();
String name = namePrefix + threadNextId;
Thread thread = new Thread(null, r, name, 0);
System.out.println("創建的第"+threadNextId+"個執行緒");
return thread;
}
}
AtomicInteger 實現了原子性,保證了高並發下的執行緒安全,該系類還有很多。
我們可以在自定義的執行緒工廠裡面添加我們需要的內容
不指定的話,會是默認的執行緒工廠
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
創建自定義執行緒池拒絕策略
public class ThreadPoolRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("task rejected. "+executor.toString());
}
}
在ThreadPoolExecutor中提供了四個公開的內部靜態類
1. AbortPolicy(默認):丟棄任務並拋出RejectedExecutionException異常
2. DiscardPloicy:丟棄任務,不拋出異常(不推薦使用)
3. DiscardOldestPolicy:丟棄隊列中等待最久的任務,把當前任務加入到隊列中
4. CallerRunsPolicy:繞過執行緒池,直接調用任務的run()方法。
**根據需求選中合適的策略才是正確的**
實現我們的執行緒池
public class ThreadPoolUtil {
/**
* @param corePoolSize 常駐核心執行緒,執行緒池初始化的時候池裡是沒有執行緒的,前面 corePoolSize 個任務是會創建執行緒,
* 當前執行緒池中的數量大於常駐核心執行緒數的時候,如果有空閑的執行緒則使用,沒有的話就把任務放到
* 工作隊列中
* @param maximumPoolSize 執行緒池允許創建的最大執行緒數,如果隊列滿了,且執行緒數小於最大執行緒數,則新建臨時執行緒(空閑超過時間會被銷毀的),
* 如果隊列為無界隊列,則該參數無用
* @param workQueueSize 工作隊列,請求執行緒數大於常駐核心執行緒數的時候,將多餘的任務放到工作隊列
* @param threadName 執行緒名稱
* @param handler 執行緒池拒絕策略,當執行緒池和隊列都滿了,則調用該策略,執行具體的邏輯
* @author: taoym
* @date: 2020/9/9 11:35
* @desc: 自定義執行緒池的實現 總體邏輯就是 前corePoolSize個任務時,來一個任務就創建一個執行緒
* 如果當前執行緒池的執行緒數大於了corePoolSize那麼接下來再來的任務就會放入到我們上面設置的workQueue隊列中
* 如果此時workQueue也滿了,那麼再來任務時,就會新建臨時執行緒,那麼此時如果我們設置了keepAliveTime或者設置了allowCoreThreadTimeOut,那麼系統就會進行執行緒的活性檢查,一旦超時便銷毀執行緒
* 如果此時執行緒池中的當前執行緒大於了maximumPoolSize最大執行緒數,那麼就會執行我們剛才設置的handler拒絕策略
*/
public static ExecutorService createThreadPool(int corePoolSize,
int maximumPoolSize,
int workQueueSize,
String threadName,
RejectedExecutionHandler handler) {
BlockingQueue workQueue = new LinkedBlockingDeque(workQueueSize);
ThreadFactoryImpl threadFactory = new ThreadFactoryImpl(threadName);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, workQueue, handler);
// 提前創建好核心執行緒
//threadPoolExecutor.prestartAllCoreThreads();
// 常駐核心執行緒的空閑時間超過 keepAliveTime 的時候要被回收
//threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
注釋寫的很明白了,desc下的注釋來自springForAll的文章。別的都是自己找的加上自己所理解的編寫而成。
文章是剛研究了《碼出高效》的執行緒池篇加上對源碼文檔的理解,趁熱打鐵寫出來的,寫的不好,多多見諒。