自定义线程池ThreadPoolExecutor
使用自定义的方式创建线程池
Java本身提供的获取线程池的方式
使用Executors直接获取线程池,注意,前四个方式的底层都是通过new ThreadPoolExecutor()的方式创建的线程池,只是参数不一样而已,我们也正是利用了这点特性来实现自己的线程池
1. newCachedThreadPool
创建一个可缓存无限制数量的线程池,
如果线程池中没有空闲的线程的话,再来任务会新建线程,
线程60s内没被使用,则销毁。
简单的说,忙不过来的时候就新建线程
Executors.newCachedThreadPool()
底层实现
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
常驻核心线程数为0
线程池最大线程数Integer.MAX_VALUE
线程空闲60S回收
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于线程池最大线程池为Integer.MAX_VALUE,所以有OOM的风险
2. newFixedThreadPool
创建一直指定大小的线程池,如果线程池满了,后面的任务会在队列中等待,等拿到空闲的线程才能执行
Executors.newFixedThreadPool(nThreads)
底层实现
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
常驻核心线程数自定义
线程池最大线程数等于常驻核心线程数
因为最大线程数等于常驻核心线程,而常驻核心线程不会被回收,所以时间参数为0
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于队列没有指明长度,默认为Integer.MAX_VALUE,所以有OOM的风险
3. newSingleThreadExecutor
创建一个大小为1的线程池,用唯一的线程来执行任务,保证任务有序进行
Executors.newSingleThreadExecutor()
底层实现
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
常驻核心线程数为1
线程池最大线程数等于常驻核心线程数1
线程池里一共就一个常驻核心线程,所以不会被回收,所以时间参数为0
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于队列没有指明长度,默认为Integer.MAX_VALUE,所以有OOM的风险
4. newScheduledThreadPool
创建指定大小的线程池,支持定时及周期性的执行任务
Executors.newScheduledThreadPool(corePoolSize)
底层实现
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,一下为ScheduledThreadPoolExecutor的构造方法
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
常驻核心线程数自定义
线程池最大线程数等于Integer.MAX_VALUE
不回收线程
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于线程池最大线程池为Integer.MAX_VALUE,所以有OOM的风险
5. newWorkStealingPool
JDK1.8 引入
创建持有足够线程的线程池来支持给定的并行级别,
并通过使用多个队列减少竞争,并行级别的参数,如果不传,默认为cpu的数量,
返回的不再是 ThreadPoolExecutor 而是 ForkJoinPool
Executors.newWorkStealingPool()
底层实现为**ForkJoinPool**,和上面的四个不同
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
正在研究ing
在阿里巴巴Java开发手册中明确指出,不允许使用jdk自带的方式获取线程池。就是上面的前四个方法,所以,我们自己创建即可
创建自定义的线程工厂
public class ThreadFactoryImpl implements ThreadFactory {
/**
* 线程池号
*/
private static final AtomicInteger poolNumber = new AtomicInteger(1);
/**
* 线程前缀名称
*/
private final String namePrefix;
/**
* 创建初始值为1且线程安全的线程号
*/
private final AtomicInteger threadNumber = new AtomicInteger(1);
public ThreadFactoryImpl(String whatFeatureOfGroup) {
namePrefix = "ThreadFactoryImpl's " + whatFeatureOfGroup + "-work-";
}
@Override
public Thread newThread(Runnable r) {
int threadNextId = threadNumber.getAndIncrement();
String name = namePrefix + threadNextId;
Thread thread = new Thread(null, r, name, 0);
System.out.println("创建的第"+threadNextId+"个线程");
return thread;
}
}
AtomicInteger 实现了原子性,保证了高并发下的线程安全,该系类还有很多。
我们可以在自定义的线程工厂里面添加我们需要的内容
不指定的话,会是默认的线程工厂
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
创建自定义线程池拒绝策略
public class ThreadPoolRejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("task rejected. "+executor.toString());
}
}
在ThreadPoolExecutor中提供了四个公开的内部静态类
1. AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常
2. DiscardPloicy:丢弃任务,不抛出异常(不推荐使用)
3. DiscardOldestPolicy:丢弃队列中等待最久的任务,把当前任务加入到队列中
4. CallerRunsPolicy:绕过线程池,直接调用任务的run()方法。
**根据需求选中合适的策略才是正确的**
实现我们的线程池
public class ThreadPoolUtil {
/**
* @param corePoolSize 常驻核心线程,线程池初始化的时候池里是没有线程的,前面 corePoolSize 个任务是会创建线程,
* 当前线程池中的数量大于常驻核心线程数的时候,如果有空闲的线程则使用,没有的话就把任务放到
* 工作队列中
* @param maximumPoolSize 线程池允许创建的最大线程数,如果队列满了,且线程数小于最大线程数,则新建临时线程(空闲超过时间会被销毁的),
* 如果队列为无界队列,则该参数无用
* @param workQueueSize 工作队列,请求线程数大于常驻核心线程数的时候,将多余的任务放到工作队列
* @param threadName 线程名称
* @param handler 线程池拒绝策略,当线程池和队列都满了,则调用该策略,执行具体的逻辑
* @author: taoym
* @date: 2020/9/9 11:35
* @desc: 自定义线程池的实现 总体逻辑就是 前corePoolSize个任务时,来一个任务就创建一个线程
* 如果当前线程池的线程数大于了corePoolSize那么接下来再来的任务就会放入到我们上面设置的workQueue队列中
* 如果此时workQueue也满了,那么再来任务时,就会新建临时线程,那么此时如果我们设置了keepAliveTime或者设置了allowCoreThreadTimeOut,那么系统就会进行线程的活性检查,一旦超时便销毁线程
* 如果此时线程池中的当前线程大于了maximumPoolSize最大线程数,那么就会执行我们刚才设置的handler拒绝策略
*/
public static ExecutorService createThreadPool(int corePoolSize,
int maximumPoolSize,
int workQueueSize,
String threadName,
RejectedExecutionHandler handler) {
BlockingQueue workQueue = new LinkedBlockingDeque(workQueueSize);
ThreadFactoryImpl threadFactory = new ThreadFactoryImpl(threadName);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, workQueue, handler);
// 提前创建好核心线程
//threadPoolExecutor.prestartAllCoreThreads();
// 常驻核心线程的空闲时间超过 keepAliveTime 的时候要被回收
//threadPoolExecutor.allowCoreThreadTimeOut(true);
return threadPoolExecutor;
}
}
注释写的很明白了,desc下的注释来自springForAll的文章。别的都是自己找的加上自己所理解的编写而成。
文章是刚研究了《码出高效》的线程池篇加上对源码文档的理解,趁热打铁写出来的,写的不好,多多见谅。