线程池简单探索
为什么使用线程池
- 第一点,线程池可以解决线程生命周期的系统开销问题,同时还可以加快响应速度。因为线程池中的线程是可以复用的,我们只用少量的线程去执行大量的任务,这就大大减小了线程生命周期的开销。而且线程通常不是等接到任务后再临时创建,而是已经创建好时刻准备执行任务,这样就消除了线程创建所带来的延迟,提升了响应速度,增强了用户体验。
- 第二点,线程池可以统筹内存和 CPU 的使用,避免资源使用不当。线程池会根据配置和任务数量灵活地控制线程数量,不够的时候就创建,太多的时候就回收,避免线程过多导致内存溢出,或线程太少导致 CPU 资源浪费,达到了一个完美的平衡。
- 第三点,线程池可以统一管理资源。比如线程池可以统一管理任务队列和线程,可以统一开始或结束任务,比单个线程逐一处理任务要更方便、更易于管理,同时也有利于数据统计,比如我们可以很方便地统计出已经执行过的任务的数量。
线程池状态
Java把线程池的状态分为五种:
- 运行(RUNNING):该状态下的线程池接收新任务并处理队列中的任务;线程池创建完毕就处于该状态,也就是正常状态;
- 关机(SHUTDOWN):线程池不接受新任务,但处理队列中的任务;线程池调用shutdown()之后的池状态;
- 停止(STOP):线程池不接受新任务,也不处理队列中的任务,并中断正在执行的任务;线程池调用shutdownNow()之后的池状态;
- 清理(TIDYING):线程池所有任务已经终止,workCount(当前线程数)为0;过渡到清理状态的线程将运行terminated()钩子方法;
- 终止(TERMINATED):terminated()方法结束后的线程池状态;
继承树

Executors
创建线程池
Java通过Executors提供多种线程池:

常用的是下面四种,分别为:
1、newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
2、newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3、newScheduledThreadPool
创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
4、newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
但是通常都不建议使用Executors提供的方法,避免资源耗尽的风险。
风险
1、newSingleThreadExecutor 和 newFixedThreadPool:
允许的请求队列的长度为Integer.MAX_VALUE。如果我们对任务的处理速度比较慢,那么随着请求的增多,队列中堆积的任务也会越来越多,最终大量堆积的任务会占用大量内存,并发生 OOM ,也就是OutOfMemoryError,这几乎会影响到整个程序,会造成很严重的后果。

2、newCachedThreadPool
允许的创建线程数量为Integer.MAX_VALUE。CachedThreadPool 和前面两种线程池不一样的地方在于任务队列使用的是 SynchronousQueue,SynchronousQueue 本身并不存储任务,而是对任务直接进行转发,这本身是没有问题的,但你会发现构造函数的第二个参数被设置成了 Integer.MAX_VALUE,这个参数的含义是最大线程数,所以由于 CachedThreadPool 并不限制线程的数量,当任务数量特别多的时候,就可能会导致创建非常多的线程,最终超过了操作系统的上限而无法创建新线程,或者导致内存不足。

3、newScheduledThreadPool
允许的请求队列的长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM.
newScheduledThreadPool 里面调用的是ScheduledThreadPoolExecutor.而在ScheduledThreadPoolExecutor中请求队列为DelayedWorkQueue()

在DelayedWorkQueue() 中,用堆维护了一个优先队列,默认长度为 2^16.每次扩容到原来的1.5倍,最大为Integer.MAX_VALUE.

ExecutorService
execute(Runnable)
该方法接收一个Runnable,并异步执行。这个方法有个问题,就是没有办法获知task的执行结果。如果我们想获得task的执行结果,我们可以传入一个Callable的实例
submit(Runnable)
submit和execute区别是前者可以返回一个Future对象,通过返回的Future对象,我们可以检查提交的任务是否执行完毕. ( Feature.get()可以返回是否执行完毕,返回null 表示执行完毕。但是会产生阻塞 )
submit(Runnable, T result)
这个和上面的区别就是返回设定值,执行完毕返回trsult.
submit(Callable<T>)
该方法会返回Callable的执行结果
invokeAny(Collection<? Extends Callable<T>> …)
方法接收的是一个Callable的集合,执行这个方法不会返回Future,但是会返回所有Callable任务中其中一个任务的执行结果。这个方法也无法保证返回的是哪个任务的执行结果,反正是其中的某一个。
invokeAll(Collection<? Extends Callable<T>> …)
接收一个Callable集合,但是前者执行之后会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象。
shutdown()
不接受新任务,但处理队列中的任务.
shutdownNow()
停止所有任务,已经开始也要停止.
ThreadPoolExecutor
工作原理
ThreadPoolExecutor会创建一组工作线程,每当一个工作线程完成其任务的时候,会向任务队列获取新的任务执行。如果任务队列为空,获取任务的线程将被阻塞。不出意外的话,工作线程会一直工作,直到线程池主动释放空闲线程,或者随着线程池的终结而结束。

构造器
ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
corePoolSize:
核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:
线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:
表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
ThreadFactory :
ThreadFactory 实际上是一个线程工厂,它的作用是生产线程以便执行任务。我们可以选择使用默认的线程工厂,创建的线程都会在同一个线程组,并拥有一样的优先级,且都不是守护线程,我们也可以选择自己定制线程工厂,以方便给线程自定义命名,不同的线程池内的线程通常会根据具体业务来定制不同的线程名。
workQueue :
阻塞队列。

LinkedBlockingQueue
对于 FixedThreadPool 和 SingleThreadExector 而言,它们使用的阻塞队列是容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue,可以认为是无界队列。由于 FixedThreadPool 线程池的线程数是固定的,所以没有办法增加特别多的线程来处理任务,这时就需要 LinkedBlockingQueue 这样一个没有容量限制的阻塞队列来存放任务。这里需要注意,由于线程池的任务队列永远不会放满,所以线程池只会创建核心线程数量的线程,所以此时的最大线程数对线程池来说没有意义,因为并不会触发生成多于核心线程数的线程。
SynchronousQueue
第二种阻塞队列是 SynchronousQueue,对应的线程池是 CachedThreadPool。线程池 CachedThreadPool 的最大线程数是 Integer 的最大值,可以理解为线程数是可以无限扩展的。 CachedThreadPool 和上一种线程池 FixedThreadPool 的情况恰恰相反,FixedThreadPool 的情况是阻塞队列的容量是无限的,而这里 CachedThreadPool 是线程数可以无限扩展,所以 CachedThreadPool 线程池并不需要一个任务队列来存储任务,因为一旦有任务被提交就直接转发给线程或者创建新线程来执行,而不需要另外保存它们。
我们自己创建使用 SynchronousQueue 的线程池时,如果不希望任务被拒绝,那么就需要注意设置最大线程数要尽可能大一些,以免发生任务数大于最大线程数时,没办法把任务放到队列中也没有足够线程来执行任务的情况。
DelayedWorkQueue
第三种阻塞队列是DelayedWorkQueue,它对应的线程池分别是 ScheduledThreadPool 和 SingleThreadScheduledExecutor,这两种线程池的最大特点就是可以延迟执行任务,比如说一定时间后执行任务或是每隔一定的时间执行一次任务。DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构。之所以线程池 ScheduledThreadPool 和 SingleThreadScheduledExecutor 选择 DelayedWorkQueue,是因为它们本身正是基于时间执行任务的,而延迟队列正好可以把任务按时间进行排序,方便任务的执行。
handler:
拒绝时机:
- 第一种情况是当我们调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部依然有没执行完的任务正在执行,但是由于线程池已经关闭,此时如果再向线程池内提交任务,就会遭到拒绝。
- 第二种情况是线程池没有能力继续处理新提交的任务,也就是工作已经非常饱和的时候。
拒绝策略:
Java 在 ThreadPoolExecutor 类中为我们提供了 4 种默认的拒绝策略来应对不同的场景,都实现了 RejectedExecutionHandler 接口:

- 第一种拒绝策略是 AbortPolicy,这种拒绝策略在拒绝任务时,会直接抛出一个类型为 RejectedExecutionException 的 RuntimeException,让你感知到任务被拒绝了,于是你便可以根据业务逻辑选择重试或者放弃提交等策略。
- 第二种拒绝策略是 DiscardPolicy,这种拒绝策略正如它的名字所描述的一样,当新任务被提交后直接被丢弃掉,也不会给你任何的通知,相对而言存在一定的风险,因为我们提交的时候根本不知道这个任务会被丢弃,可能造成数据丢失。
- 第三种拒绝策略是 DiscardOldestPolicy,如果线程池没被关闭且没有能力执行,则会丢弃任务队列中的头结点,通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的,这样就可以腾出空间给新提交的任务,但同理它也存在一定的数据丢失风险。
-
第四种拒绝策略是 CallerRunsPolicy,相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处。
- 第一点新提交的任务不会被丢弃,这样也就不会造成业务损失。
- 第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。
构造流程

总结出线程池的几个特点。
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加线程。
- 线程池只有在任务队列填满时才创建多于 corePoolSize 的线程,如果使用的是无界队列(例如 LinkedBlockingQueue),那么由于队列不会满,所以线程数不会超过 corePoolSize。
- 通过设置 corePoolSize 和 maximumPoolSize 为相同的值,就可以创建固定大小的线程池。
Worker
在ThreadPoolExecutor中有一个内部类Worker,但这个Woker类并没有像想象中的那样继承于Thread,而是通过组合的方式绑定一个线程。在一定程度上,也可以把这个Worker看作是一个工作者线程。
Worker如何绑定一个线程?
这个工作者任务是在创建的时候与一个线程绑定的,其通过外部类ThreadPoolExecutor提供的线程工厂,创建一个线程,把自己传递给它,并保留线程的引用。
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { //防止在runWorker之前被中断,因为worker一旦建立就会加入workers集合中 //其他线程可能会中断空闲线程 //而空闲线程的依据就是能否获得worker的锁 setState(-1); //设置初始任务,注意这里没有null检查,故初始任务可以为空 this.firstTask = firstTask; //通过ThreadPoolExecutor的提供线程工厂来创建线程,并把自身赋值给它,作为其线程任务 //保留线程引用,用于中断线程 this.thread = getThreadFactory().newThread(this); }
Worker绑定的线程何时启动?
启动线程,必须通过Thread的start方法启动。那就来找找start方法在何处调用。在ThreadPoolExecutor的addWorker中,我们找到,当创建的Worker对象成功加入workers集合后,将启动对应线程。
private boolean addWorker(Runnable firstTask, boolean core) { //core表示是否是核心线程 //先试图改变控制信息内 工作线程数 的值 retry: for (;;) { //获得控制信息 int c = ctl.get(); //从控制信息内 获取线程池运行状态 int rs = runStateOf(c); //如果已经SHUTDOWN或者STOP则不再添加新工作线程 //除非,在SHUTDOWN状态下,有任务尚未完成,不接受新任务 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //从控制信息内获取 工作线程数 int wc = workerCountOf(c); //工作线程已经超过容量 或 //核心线程,超过核心线程数 //非核心线程,超过最大线程数 //不得添加新线程 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS改变控制信息内 工作线程数的值 +1 ,并结束自旋 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //worker线程是否已经启动 boolean workerAdded = false; //worker线程是否已加入workers集合 Worker w = null; try { w = new Worker(firstTask); //创建新线程,把初始任务赋值给它 final Thread t = w.thread; //获取Worker的线程引用 if (t != null) { //因为要修改集合HashSet,故需获取线程池的锁,以保证线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取锁后再次检查状态,有可能在获得锁之前,线程池已经被shutdown了 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //提前检查线程能否start throw new IllegalThreadStateException(); //把worker对象加入workers集合 workers.add(w); int s = workers.size(); //更新largetstPoolSize,此字段表示线程池运行时,最多开启过多少个线程 if (s > largestPoolSize) largestPoolSize = s; //线程已加入集合,如果前面出现异常,这里不会被执行 workerAdded = true; } } finally { mainLock.unlock(); } //如果添加成功,则启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果启动失败了,则表示添加Worker失败,回滚 if (! workerStarted) //这个方法,会把前面添加到workers集合中的对应worker删除 //并且把前面更新的 控制信息内的工作线程数再减回来 addWorkerFailed(w); } return workerStarted; }
Worker的执行
由于这里采用的是传递Runnable对象的方式创建线程任务,故Thread的run方法执行的是其target的run方法。而这个target正是前面传递给它的Worker。故执行的是Worker的run方法。而Worker.run()
执行的是ThreadPoolExecutor.runWorker(Worker w)。
final void runWorker(Worker w) { //获得当前执行这段代码的线程 Thread wt = Thread.currentThread(); //先尝试从worker取得初始任务 Runnable task = w.firstTask; w.firstTask = null; //允许中断,unlock后state=1,中断方法获取到锁,则判断为空闲线程,可中断 w.unlock(); boolean completedAbruptly = true; try { //不断地取任务执行、 其中getTask提供阻塞。如果getTask返回null则退出循环 while (task != null || (task = getTask()) != null) { //获取锁,标识此线程正在工作,非空闲线程 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //钩子函数,空实现,子类可根据需要进行实现 beforeExecute(wt, task); Throwable thrown = null; try { //运行获取到的任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //钩子函数 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } //如果因为异常退出,这段语句不会被执行,也就是说completedAbruptly==true completedAbruptly = false; } finally { //工作线程退出的处理操作,如获取当前worker完成的任务量 //如果异常退出,还需弥补,补充工作线程等等 processWorkerExit(w, completedAbruptly); } }
线程安全问题
线程安全问题多出于多个线程对同一资源的访问,但是上述代码中,每个线程操作的是各自绑定的Worker。这些线程唯一有交集的,就是取任务操作了。但是任务已经交由BlockingQueue处理了,BlockingQueue的同步特性使得多个线程能够安全地获取任务。也就是说,不会有线程安全问题。
shutdown
线程池不接受新任务,但处理队列中的任务;线程池调用shutdown()之后的池状态为关机(SHUTDOWN)
public void shutdown() { final ReentrantLock mainLock = this.mainLock; //获取线程池锁 mainLock.lock(); try { //检查执行线程是否有权关闭线程池 checkShutdownAccess(); //更改线程池运行状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲线程 interruptIdleWorkers(); //钩子函数 onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
interruptIdleWorkers()
根据注释可以知道,其功能是关闭等待任务的线程(也就是没有被上锁的线程),由此可得Worker有没有获得锁,是区分其是否空闲的标志。结合源码:
private void interruptIdleWorkers(boolean onlyOne) { //获取线程池的锁,保持独占访问 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历workers集合中的所有工作线程 for (Worker w : workers) { //获得worker对象中的线程引用 Thread t = w.thread; //如果获得锁成功,则中断对应线程 //如果工作线程正在执行任务,因为开始执行前,任务会获取worker的锁,故其无法被中断 //如果工作线程正在等待任务,因其没获得锁,则当前线程可以获得其worker的锁,此工作线程被中断 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } //如果只需要关闭一个工作线程,则到此为止 if (onlyOne) break; } } finally { mainLock.unlock(); } }
那问题又来了,如何终止已经开始的任务呢?
shutdownNow
shutdownNow就是停止所有任务,已经开始也要停止。其对应的线程池状态是STOP
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //更改线程池运行状态为STOP advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍历workers集合中的所有工作线程 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }