自定义线程池ThreadPoolExecutor

使用自定义的方式创建线程池

Java本身提供的获取线程池的方式

使用Executors直接获取线程池,注意,前四个方式的底层都是通过new ThreadPoolExecutor()的方式创建的线程池,只是参数不一样而已,我们也正是利用了这点特性来实现自己的线程池

1. newCachedThreadPool

创建一个可缓存无限制数量的线程池,
如果线程池中没有空闲的线程的话,再来任务会新建线程,
线程60s内没被使用,则销毁。
简单的说,忙不过来的时候就新建线程

Executors.newCachedThreadPool()

底层实现
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

常驻核心线程数为0
线程池最大线程数Integer.MAX_VALUE
线程空闲60S回收
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于线程池最大线程池为Integer.MAX_VALUE,所以有OOM的风险

2. newFixedThreadPool

创建一直指定大小的线程池,如果线程池满了,后面的任务会在队列中等待,等拿到空闲的线程才能执行

Executors.newFixedThreadPool(nThreads)

底层实现
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

常驻核心线程数自定义
线程池最大线程数等于常驻核心线程数
因为最大线程数等于常驻核心线程,而常驻核心线程不会被回收,所以时间参数为0
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于队列没有指明长度,默认为Integer.MAX_VALUE,所以有OOM的风险

3. newSingleThreadExecutor

创建一个大小为1的线程池,用唯一的线程来执行任务,保证任务有序进行

Executors.newSingleThreadExecutor()

底层实现
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

常驻核心线程数为1
线程池最大线程数等于常驻核心线程数1
线程池里一共就一个常驻核心线程,所以不会被回收,所以时间参数为0
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于队列没有指明长度,默认为Integer.MAX_VALUE,所以有OOM的风险

4. newScheduledThreadPool

创建指定大小的线程池,支持定时及周期性的执行任务

Executors.newScheduledThreadPool(corePoolSize)

底层实现
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,一下为ScheduledThreadPoolExecutor的构造方法

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

常驻核心线程数自定义
线程池最大线程数等于Integer.MAX_VALUE
不回收线程
使用工作队列来存放任务,这个队列有好几种,各有各的特性。
**风险:**由于线程池最大线程池为Integer.MAX_VALUE,所以有OOM的风险

5. newWorkStealingPool

JDK1.8 引入
创建持有足够线程的线程池来支持给定的并行级别,
并通过使用多个队列减少竞争,并行级别的参数,如果不传,默认为cpu的数量,
返回的不再是 ThreadPoolExecutor 而是 ForkJoinPool

Executors.newWorkStealingPool()

底层实现为**ForkJoinPool**,和上面的四个不同
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

正在研究ing

在阿里巴巴Java开发手册中明确指出,不允许使用jdk自带的方式获取线程池。就是上面的前四个方法,所以,我们自己创建即可

创建自定义的线程工厂

public class ThreadFactoryImpl implements ThreadFactory {
    /**
     * 线程池号
     */
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    /**
     * 线程前缀名称
     */
    private final String namePrefix;
    /**
     * 创建初始值为1且线程安全的线程号
     */
    private final AtomicInteger threadNumber = new AtomicInteger(1);

    public ThreadFactoryImpl(String whatFeatureOfGroup) {
        namePrefix = "ThreadFactoryImpl's " + whatFeatureOfGroup + "-work-";
    }

    @Override
    public Thread newThread(Runnable r) {
        int threadNextId = threadNumber.getAndIncrement();
        String name = namePrefix + threadNextId;
        Thread thread = new Thread(null, r, name, 0);
        System.out.println("创建的第"+threadNextId+"个线程");
        return thread;
    }
}

AtomicInteger 实现了原子性,保证了高并发下的线程安全,该系类还有很多。
我们可以在自定义的线程工厂里面添加我们需要的内容
不指定的话,会是默认的线程工厂
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

创建自定义线程池拒绝策略

public class ThreadPoolRejectHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("task rejected. "+executor.toString());
    }
}

在ThreadPoolExecutor中提供了四个公开的内部静态类
1. AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常
2. DiscardPloicy:丢弃任务,不抛出异常(不推荐使用)
3. DiscardOldestPolicy:丢弃队列中等待最久的任务,把当前任务加入到队列中
4. CallerRunsPolicy:绕过线程池,直接调用任务的run()方法。
**根据需求选中合适的策略才是正确的**

实现我们的线程池

public class ThreadPoolUtil {

    /**
     * @param corePoolSize    常驻核心线程,线程池初始化的时候池里是没有线程的,前面 corePoolSize 个任务是会创建线程,
     *                        当前线程池中的数量大于常驻核心线程数的时候,如果有空闲的线程则使用,没有的话就把任务放到
     *                        工作队列中
     * @param maximumPoolSize 线程池允许创建的最大线程数,如果队列满了,且线程数小于最大线程数,则新建临时线程(空闲超过时间会被销毁的),
     *                        如果队列为无界队列,则该参数无用
     * @param workQueueSize   工作队列,请求线程数大于常驻核心线程数的时候,将多余的任务放到工作队列
     * @param threadName      线程名称
     * @param handler         线程池拒绝策略,当线程池和队列都满了,则调用该策略,执行具体的逻辑
     * @author: taoym
     * @date: 2020/9/9 11:35
     * @desc: 自定义线程池的实现 总体逻辑就是 前corePoolSize个任务时,来一个任务就创建一个线程
     * 如果当前线程池的线程数大于了corePoolSize那么接下来再来的任务就会放入到我们上面设置的workQueue队列中
     * 如果此时workQueue也满了,那么再来任务时,就会新建临时线程,那么此时如果我们设置了keepAliveTime或者设置了allowCoreThreadTimeOut,那么系统就会进行线程的活性检查,一旦超时便销毁线程
     * 如果此时线程池中的当前线程大于了maximumPoolSize最大线程数,那么就会执行我们刚才设置的handler拒绝策略
     */
    public static ExecutorService createThreadPool(int corePoolSize,
                                                   int maximumPoolSize,
                                                   int workQueueSize,
                                                   String threadName,
                                                   RejectedExecutionHandler handler) {
        BlockingQueue workQueue = new LinkedBlockingDeque(workQueueSize);
        ThreadFactoryImpl threadFactory = new ThreadFactoryImpl(threadName);

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 60L, TimeUnit.SECONDS, workQueue, handler);
        // 提前创建好核心线程
        //threadPoolExecutor.prestartAllCoreThreads();
        // 常驻核心线程的空闲时间超过 keepAliveTime 的时候要被回收
        //threadPoolExecutor.allowCoreThreadTimeOut(true);

        return threadPoolExecutor;
    }
}

注释写的很明白了,desc下的注释来自springForAll的文章。别的都是自己找的加上自己所理解的编写而成。

文章是刚研究了《码出高效》的线程池篇加上对源码文档的理解,趁热打铁写出来的,写的不好,多多见谅。

Tags: