并发编程之线程池

线程池

为什么需要线程池?

如果性能允许的话,我们完全可以在 for 循环代码起很多的线程去帮我们执行任务,代码如下

public class ManyThread {
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            Thread thread = new Thread(new Task(), "thread" + i);
            thread.start();
        }
    }
}

class Task implements Runnable {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ":正在执行");
    }
}

由上述代码来看,我们仍然可以通过以上这种笨拙的方式实现相关的需求。但这样明显是不合适的,如果频繁地创建过多的线程来执行任务,这样开销实在太大,毕竟过多的线程会占用太多的内存;但是通过线程池这种方式,创建固定数量的线程来执行任务,就能够使线程复用起来,加快响应速度,并且还合理利用CPU和内存,还统一管理。

构造参数

参数名 类型 含义
corePoolSize int 核心线程数
maxPoolSize int 最大线程数
keepAliveTime long 保持存活时间
workQueue BlockingQueue 任务存储队列
threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用 threadFactory 来生成新的线程
Handler RejectedExecutionHandler 由于线程池无法接受新提交的任务所指向的拒绝策略
  • corePoolSize : 核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务。

  • maxPoolSize : 最大线程数:在 corePoolSize 的基础上,会额外地增加一些线程,但是这些新增加的线程有一个上限,也就是线程的最大量。

  • keepAliveTime : 存活时间,如果线程池当前的线程数多于 corePoolSize,那么如果多余的线程空闲时间超过 keepAliveTime,它们就会被终止。

  • ThreadFactory: 新的线程是由 ThreadFactory 创建的,默认使用 Executors.defaultThreadFactory() 创建,创建出来的线程都在同一个线程组,拥有相同优先级,但是不属于守护线程。

  • workQueue: 常见的三种队列类型

    SynchronousQueue : 直接交接:在任务不多的情况下,只是通过队列做简单的中转站;当进来一个新的任务,就会直接创建一个新的线程处理。这种队列本身没有容量的,里面没有办法存放任务,如果要使用该队列,maxPoolSize要设置相对大点,因为没有队列作为缓冲,会经常创建线程

    LinkedBlockingQueue :无界队列,特指的是未指定容量的前提下,(如果在设置了指定容量的情况下,就是有界队列);当corePoolSize已经满的情况下,任务就会添加到这个队列里面来,而且是没有容量限制的,所以 maxPoolSize 设置任何值都不会起作用。如果添加任务的时间远远大于线程执行的时间,会占用大量的内存,可能会导致OOM的发生

    ArrayBlockQueue :有界队列,可以设置默认大小,如果线程数等于(或大于)corePoolSize 但少于maxPoolSize,则将任务放入该有序队列。

添加线程规则

  1. 如果线程数小于 corePoolSize ,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。

  2. 如果线程数等于(或大于)corePoolSize 但少于maxPoolSize,则将任务放入队列。

  3. 如果队列已满,并且线程数小于 maxPoolSize,则创建一个新线程来运行刚提交的任务

  4. 如果任务队列没有满,线程池内运行的一直都是 corePoolSize 这个线程

  5. 如果队列已满,并且线程数大于或等于 maxPoolSize ,则拒绝该任务。

常见的ThreadPool

  • newFixedThreadPool:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

通过源码,我们不难看出 corePoolSize 和 maxPoolSize 使用都是传进来的 nThread 参数,说明创建的线程永远不会超过 nThread 的范围,然后就是 keepAliveTime 被设置为 0L,由于 maxPoolSize 和 corePoolSize 一样大,所以在这该参数的设置是没有意义的,然后 TimeUnit.MILLISECONDS 是时间单位,与 keepAliveTime 绑定;最后一个是 LinkedBlockingQueue ,存储更多任务的一个容器,所以无论再多的任务进来,都会放入到该队列中执行。

由于传进去的LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM。

  • newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

跟 newFixedThread 的原理基本一样,用的是相同的工作队列,默认把线程数直接设置成了1,所以会导致同样的问题,也就是请求堆积的时候,会容易造成占用大量的内存

  • CachedThreadPool:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    可缓存线程池,用了 synchronous queue 队列,不需要存储任务,有任务进来直接创建线程,具有自动回收多余线程的功能。但是这个线程池存在一种弊端,在默认情况下,maxPoolSize 被设置为 Integer.MAX_VALUE,这可能会创建非常多的线程,甚至导致OOM。(注意:Cache 特指的是对线程的缓存,如果一段时间线程空闲,就回收)

  • ScheduleThreadPool:支持定时及周期性任务执行的线程池

插曲

线程数量设定多少比较合适?

答:线程数 = CPU 核心数 * ( 1 + 平均等待时间/平时工作时间 )

关闭线程池

  • shutdown:运行之后并不会停止,而是会把存量的任务都执行完毕。
  • shutdownNow:立即停止线程,并且队列的任务也不会执行。

拒绝策略

拒绝的时机是最大线程数满

  • AbortPolicy:默认的拒绝策略,直接抛出异常
  • DiscardPolicy:直接丢弃,提交线程不会收到任何信息
  • DiscardOldestPolicy:丢弃在队列中等待时间最长的任务
  • CallerRunsPolicy:由提交线程执行任务,是一种负反馈机制

线程池实现任务复用的原理

核心原理是用相同的线程去执行不同的任务。首先 execute 方法先去检查当前线程数是否小于 corePoolSize ,如果小于的话,则执行 addWork 加一个工作线程,然后会执行 runWork 方法,该方法先会获取一个任务 task ,这个 task 是 Runnable 实例,并且while循环中判断这个任务是否为空,最后直接 task 调用 run 方法

在runWork方法中,会将一个个 Runnable 实例 (也就是 task) 给拿到,然后直接调用 run 方法

面试题:submit 和 execute 的区别

(1)类型

execute只能接受Runnable类型的任务

​ submit不管是Runnable还是Callable类型的任务都可以接受,但是Runnable返回值均为void,所以使用Future的get()获得的还是null

(2)返回值

​ 由Callable和Runnable的区别可知:

​ execute没有返回值

​ submit有返回值,所以需要返回值的时候必须使用submit

(3)异常

​ 1.execute中抛出异常

​ execute中的是 Runnable 接口的实现,所以只能使用 try、catch 来捕获 CheckedException,通过实现UncaughtExceptionHander 接口处理 UncheckedException

​ 即和普通线程的处理方式完全一致

​ 2.submit中抛出异常

​ 不管提交的是Runnable还是Callable类型的任务,如果不对返回值Future调用get()方法,都会吃掉异常

Tags: