Java並發編程:線程池的使用

  • 2020 年 7 月 21 日
  • 筆記

Thread和Runnable

首先,多線程的實現方式兩種:一種是繼承Thread類,另一種是實現Runnable接口。

那麼這兩種方法的區別何在?該如何選擇?

第一:他們之間的關係
查看J2EE的API看到

Thread類中: public class Thread extends Object implements Runnable

Runnable接口:public interfaceRunnable

明顯可知兩者:Thread類是Runnable接口的一個實現類,那麼Runnable接口是何用?

文檔解釋:

Runnable 接口應該由那些打算通過某一線程執行其實例的類來實現。類必須定義一個稱為 run 的無參數方法。

設計該接口的目的是為希望在活動時執行代碼的對象提供一個公共協議。例如,Thread 類實現了 Runnable。激活的意思是說某個線程已啟動並且尚未停止。

也就是說Runnable提供的是一種線程運行規範,具體運行線程需要通過它的實現類

第二:通過源碼分析
我們以public class Thread1 extends Thread 這個自定義線程來追本溯源:首先查看Thread類

其中定義了一個private Runnable target; 定義了Runnable類型的屬性target,查看哪裡有引用

幾個方法:

 private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize) {。。。}

public Thread() {
    init(null, null, "Thread-" + nextThreadNum(), 0);
   }

public Thread(Runnable target) {
	init(null, target, "Thread-" + nextThreadNum(), 0);
    }

public Thread(ThreadGroup group, Runnable target) {
	init(group, target, "Thread-" + nextThreadNum(), 0);
    }

public Thread(String name) {
	init(null, null, name, 0);
    }

 public Thread(ThreadGroup group, String name) {
	init(group, null, name, 0);
    }

public Thread(Runnable target, String name) {
	init(null, target, name, 0);
    }

public Thread(ThreadGroup group, Runnable target, String name) {
	init(group, target, name, 0);
    }

 public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
	init(group, target, name, stackSize);
    }

以上列出了Thread的一個初始化方法init()和所有的構造方法:可以知道,構造方法需要調用init()方法初始化線程對象,有一個Runnable類型的target對象也參與初始化。
我們所知道的Thread類進行運行線程時是調用start()方法,我們也來查看這個方法:

public synchronized void start() {
        /**
	 * This method is not invoked for the main method thread or "system"
	 * group threads created/set up by the VM. Any new functionality added 
	 * to this method in the future may have to also be added to the VM.
	 *
	 * A zero status value corresponds to state "NEW".
         */
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
        group.add(this);
        start0();
        if (stopBeforeStart) {
	    stop0(throwableFromStop);
	}
    }

可知:當調用這個start()方法時,使該線程開始執行;Java 虛擬機調用該線程的 run 方法。結果是兩個線程並發地運行;當前線程(從調用返回給 start 方法)和另一個線程(執行其 run 方法)。
這個方法僅作了線程狀態的判斷(保證一個線程不多次啟動,多次啟動在JVM看來這是非法的),然後把該線程添加到線程組(不多做解釋)等待運行。
那麼繼續看run()方法:thread的run只是調用了runable的對像,再調用runable對象的方法

public void run() {
	if (target != null) {
	    target.run();
	}
    }

可知,當Runnable實現類對象沒有內容為null,則方法什麼都不執行,如果有實現類對象,就調用它實現類對象實現的run()方法。這讓我們想到了一個經典的設計模式:代理模式
到此我們知道:線程的運行是依靠Thread類中的start()方法執行,並且由虛擬機調用run()方法,所以我們必須實現run()方法

那麼還是要看一下Runnable接口的設計:

public
interface Runnable {
    /**
     * When an object implementing interface <code>Runnable</code> is used 
     * to create a thread, starting the thread causes the object's 
     * <code>run</code> method to be called in that separately executing 
     * thread. 
     * <p>
     * The general contract of the method <code>run</code> is that it may 
     * take any action whatsoever.
     *
     * @see     java.lang.Thread#run()
     */
    public abstract void run();
}

可知它只有一個抽象的run()方法,完全是實實在在的線程運行規範
第三:通過他們之間的設計模式:代理模式 再次深入
代理模式如圖:

可知,Thread也是Runnable接口的子類,但其沒有完全實現run()方法,所以說如果繼承Thread類實現多線程,仍舊需要覆寫run()方法。

看兩種實現多線程的基本方式

繼承Thread:

class MyThread extends Thread{
	private int ticket = 5;
	public void run(){
		for(int i = 0;i<100;i++){
			if(ticket>0){//判斷是否還有剩餘票
				System.out.println("賣票,ticket = "+ticket--);
			}
		}
	}
};
public class ThreadDemo04{
	public static void main(String args[]){
		MyThread mt1 = new MyThread();
		MyThread mt2 = new MyThread();
		MyThread mt3 = new MyThread();
		mt1.start();//調用線程主體讓其運行
		mt2.start();//三個地方同時賣票
		mt3.start();
	}
};	

實現Runnable:

class MyThread implements Runnable{
	private int ticket=5;
	public void run(){
		for(int i = 0;i<100;i++){
			if(ticket>0){
				System.out.println("賣票,ticket = "+ticket--);
			}
		}
	}
};
public class RunnableDemo02{
	public static void main(String args[]){
		MyThread my1 = new MyThread();
		new Thread(my1).start(); //啟動三個線程
		new Thread(my1).start(); //共享my1中資源
		new Thread(my1).start(); 
	}
};

可知最後:無論哪種方法都需要實現run()方法,run方法是線程的運行主體。並且,線程的運行都是調用Thread的start()方法。
那麼代理模式中Thread類就充當了代理類,它在線程運行主體運行前作了一些操作然後才運行線程的run()。首先說一下代理模式的基本特徵就是對【代理目標進行增強】代理模式就不在這裡詳述。總之,Thread提供了很多有關線程運行前、後的操作,然後通過它的start()方法讓JVM自動調用目標的run()方法

第四:繼承Thread與實現Runnable接口方法區別
首先看一段代碼:

new Thread(
    new Runnable(){

              public void run() {
            while(true){
                try {
                        Thread.sleep(500);
                } catch (InterruptedException e) {e.printStackTrace(); }
                    System.out.println("runnable :" + Thread.currentThread().getName());
            }                            
        }
    }

){

        public void run() {
        while(true){
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {e.printStackTrace();}
     
                       System.out.println("thread :" + Thread.currentThread().getName());
        }    
    }

}.start();

可以預測一下這段代碼是執行哪一個run()方法?
根據以前java中基礎知識可知:執行start()方法後,JVM去找run()方法,然後它找到了自己的run()方法,那麼就直接運行自己的run()方法。如果找不到自己的方法它才會去找被代理的run()方法。所以它應該執行的是”thread:。。。”代碼部分。可以把它放到一個main方法中,通過測試驗證推斷正確:

想說明的是一個面向對象的思想:即如果沒有上方第二個run()塊,那麼它執行的就是匿名Runnable實現類的run()方法。這說明什麼,說明,Thread相當於一個執行者,而執行的代碼塊在Runnable實現類中定義好。這樣實現執行與源碼的分離,體現了面向對象的思想。這也是他們之間的一個比較大的區別。

其他區別:

實現Runnable接口可以實現資源共享,Thread無法完成資源共享 —– 討論第三點的兩段代碼中:繼承Thread的:結果賣出了15張,各有各的票數。實現Runnable接口的方法:賣出5張,共享了資源

實現Runnable接口比繼承Thread類來實現多線程有如下明顯優點:

適合多個相同程序代碼使用共同資源;

避免由單繼承局限帶來的影響;

增強程序的健壯性,代碼能夠被多個線程共享,代碼數據是獨立的;

使用的選擇
通過比對區別可知:
由於面向對象的思想,以及資源共享,代碼健壯性等,一般都是使用實現Runnable接口來實現多線程,也比較推薦

Java並發編程:線程池的使用

文章的主題是基於下面的舊版的一些線程池的講解,再加上1.8的情況與一些補充,下面為舊版原文鏈接,基本思想沒變,僅進行一些對比解釋

  //www.cnblogs.com/dolphin0520/p/3932921.html 

在前面的文章中,我們使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:

  如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。

  那麼有沒有一種辦法使得線程可以復用,就是執行完一個任務,並不被銷毀,而是可以繼續執行其他的任務?

  在Java中可以通過線程池來達到這樣的效果。今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然後再講述它的實現原理,接着給出了它的使用示例,最後討論了一下如何合理配置線程池的大小。

  以下是本文的目錄大綱:

  一.Java中的ThreadPoolExecutor類

  二.深入剖析線程池實現原理

  三.使用示例

  四.如何合理配置線程池的大小 

  若有不正之處請多多諒解,並歡迎批評指正。

一.Java中的ThreadPoolExecutor類
  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現源碼。

  在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

    .....
    
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    
            BlockingQueue<Runnable> workQueue);

 


    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

 


    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

 


    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
    
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    
    ...

}

  從上面的代碼可以得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。

  下面解釋下一下構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有非常大的關係。在創建了線程池後,默認情況下,線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池後,線程池中的線程數為0,當有任務來之後,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裡的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來創建線程;
  • handler:表示當拒絕處理任務時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

  從上面給出的ThreadPoolExecutor類的代碼可以知道,ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實現:

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    
    public Future<?> submit(Runnable task) {};
    
    public <T> Future<T> submit(Runnable task, T result) { };
    
    public <T> Future<T> submit(Callable<T> task) { };
    
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)
    
        throws InterruptedException, ExecutionException, TimeoutException {
    
    };
    
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException {
    
    };
    
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    
                           long timeout, TimeUnit unit)
    
        throws InterruptedException, ExecutionException, TimeoutException {
    
    };
    
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    
        throws InterruptedException {
    
    };
    
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    
                                         long timeout, TimeUnit unit)
    
        throws InterruptedException {
    
    };

}

  AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

  我們接着看ExecutorService接口的實現:

public interface ExecutorService extends Executor {

    void shutdown();
    
    boolean isShutdown();
    
    boolean isTerminated();
    
    boolean awaitTermination(long timeout, TimeUnit unit)
    
        throws InterruptedException;
    
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
    
    Future<?> submit(Runnable task);
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    
        throws InterruptedException;
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
    
                                  long timeout, TimeUnit unit)
    
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
    
        throws InterruptedException, ExecutionException;
    
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
    
                    long timeout, TimeUnit unit)
    
        throws InterruptedException, ExecutionException, TimeoutException;
}

  而ExecutorService又是繼承了Executor接口,我們看一下Executor接口的實現:

public interface Executor {

    void execute(Runnable command);

}

  到這裡,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。

  Executor是一個頂層接口,在它裏面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,從字面意思可以理解,就是用來執行傳進去的任務的;

  然後ExecutorService接口繼承了Executor接口,並聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的所有方法;

  然後ThreadPoolExecutor繼承了類AbstractExecutorService。

  在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

  execute()方法實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

  shutdown()和shutdownNow()是用來關閉線程池的。

  還有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱API。

二.深入剖析線程池實現原理
  在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現原理,將從下面幾個方面講解:

  1.線程池狀態

  2.任務的執行

  3.線程池中的線程初始化

  4.任務緩存隊列及排隊策略

  5.任務拒絕策略

  6.線程池的關閉

  7.線程池容量的動態調整

1.線程池狀態

  在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態

volatile int runState;

static final int RUNNING    = 0;

static final int SHUTDOWN   = 1;

static final int STOP       = 2;

static final int TERMINATED = 3;
  • runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

  下面的幾個static final變量表示runState可能的幾個取值。

  當創建線程池後,初始時,線程池處於RUNNING狀態;

  • 如果調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;

  • 如果調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,並且會去嘗試終止正在執行的任務;

  • 當線程池處於SHUTDOWN或STOP狀態,並且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束後,線程池被設置為TERMINATED狀態。

2.任務的執行

  在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(比如線程池大小 )(runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工作集
private volatile long  keepAliveTime;    //線程存貨時間   
private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
private volatile int   poolSize;       //線程池中當前的線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory;   //線程工廠,用來創建線程
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

  每個變量的作用都已經標明出來了,這裡要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

  • corePoolSize在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:

  假如有一個工廠,工廠裏面有10個工人,每個工人同時只能做一件任務。

  因此只要當10個工人中有工人是空閑的,來了任務就分配給空閑的工人做;

  當10個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;

  如果說新任務數目增長的速度遠遠大於工人做任務的速度,那麼此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;

  然後就將任務也分配給這4個臨時工人做;

  如果說著14個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

  • 這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  • 也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。

  不過為了方便理解,在本文後面還是將corePoolSize翻譯成核心池大小。

  • largestPoolSize只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係。

  下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。

  在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然通過submit也可以提交任務,但是實際上submit方法裏面最終調用的還是execute()方法,所以我們只需要研究execute()方法的實現原理即可:

public void execute(Runnable command) {//舊版
    if (command == null)//查看    
        throw new NullPointerException();   
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {    
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        } 
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
public void execute(Runnable command) {//新版,本質上差不多,將addIfUnderCorePoolSize(command)直接加到了addworker裏面
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  上面的代碼可能看起來不是那麼容易理解,下面我們一句一句解釋:

  首先,判斷提交的任務command是否為null,若是null,則拋出空指針異常;

  接着是這句,這句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

  由於是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小於核心池大小,那麼就會直接進入下面的if語句塊了。

  如果線程池中當前線程數小於核心池大小,則接着執行後半部分,也就是執行

addIfUnderCorePoolSize(command)

  如果執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,否則整個方法就直接執行完畢了。

  如果執行完addIfUnderCorePoolSize這個方法返回false,然後接着判斷:

if (runState == RUNNING && workQueue.offer(command))

  如果當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;如果當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:

addIfUnderMaximumPoolSize(command)

  如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

  回到前面:

if (runState == RUNNING && workQueue.offer(command))

  這句的執行,如果說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:

if (runState != RUNNING || poolSize == 0)

  這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。如果是這樣就執行:

ensureQueuedTaskHandled(command)

  進行應急處理,從名字可以看出是保證 添加到任務緩存隊列中的任務得到處理。

  我們接着看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:重點

private boolean addIfUnderCorePoolSize(Runnable firstTask) {

    Thread t = null;
    
    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    
    try {
    
        if (poolSize < corePoolSize && runState == RUNNING)
    
            t = addThread(firstTask);        //創建線程去執行firstTask任務   
    
    } finally {
    
        mainLock.unlock();
    
    }
    
    if (t == null)  return false;
    
    t.start();//注意了。線程是在這裡啟動的
    
    return true;

}
private boolean addWorker(Runnable firstTask, boolean core) {//新版,將 addIfUnderCorePoolSize與addThread結合了起來。
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//線程是在這裡啟動,但是可能會有疑惑下面新版worker代碼中的runWorker有什麼用
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  這個是addIfUnderCorePoolSize方法的具體實現,從名字可以看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中並沒有加鎖,因此可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完之後,在其他線程中又向線程池提交了任務,就可能導致poolSize不小於corePoolSize了,所以需要在這個地方繼續判斷。然後接着判斷線程池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown或者shutdownNow方法。然後就是執行

t = addThread(firstTask);

  這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然後接着在下面判斷t是否為空,為空則表明創建線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),否則調用t.start()方法啟動線程。

  我們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) {

    Worker w = new Worker(firstTask);
    
    Thread t = threadFactory.newThread(w);  //創建一個線程,執行任務   
    
    if (t != null) {
    
        w.thread = t;            //將創建的線程的引用賦值為w的成員變量       
    
        workers.add(w);
    
        int nt = ++poolSize;     //當前線程數加1       
    
        if (nt > largestPoolSize)
    
            largestPoolSize = nt;
    
    }
    
    return t;

}

  在addThread方法中,首先用提交的任務創建了一個Worker對象,然後調用線程工廠threadFactory創建了一個新的線程t,然後將線程t的引用賦值給了Worker對象的成員變量thread,接着通過workers.add(w)將Worker對象添加到工作集當中。

  下面我們看一下Worker類的實現:重點

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據
            //自己需要重載這個方法和後面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //當任務隊列中沒有任務時,進行清理工作       
        }
    }
}
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable//新版,其中與舊版一樣,線程是再addworker(addThread)後啟動的,那麼worker中的run方法以及方法中的runWorker又是如何實現的呢?  關鍵就在於構造方法中的this.thread = getThreadFactory().newThread(this)這個語句,其中過的this為當前的worker對象本身,而上面可以看出worker是實現了Runable接口的,因此addworker中的線程t是基於Worker對象本身啟動的,而thread的run方法就是對runable對象的run的調用,而worker重寫了run方法,因此就能start0()調用JVM後調用run時調用runworker方法,繼而調用gettask等。
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

  它實際上實現了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:

Thread t = new Thread(w);

  相當於傳進去了一個Runnable任務,在線程t中執行這個Runnable。

  既然Worker實現了Runnable接口,那麼自然最核心的方法便是run()方法了:

public void run() {

    try {
    
        Runnable task = firstTask;
    
        firstTask = null;
    
        while (task != null || (task = getTask()) != null) {
    
            runTask(task);
    
            task = null;
    
        }
    
    } finally {
    
        workerDone(this);
    
    }

}

    final void runWorker(Worker w) {//新版 worker自己作為runnable對象(以便thread啟動時基於worker啟動來實現自己需要的邏輯)時接收了runnable對象,最後還是用接收的runnable對象run。
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

  從run方法的實現可以看出,它首先執行的是通過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask之後,在while循環裏面不斷通過getTask()去取新的任務來執行,那麼去哪裡取呢?自然是從任務緩存隊列裏面去取,getTask是ThreadPoolExecutor類中的方法,並不是Worker類中的方法,下面是getTask方法的實現:

Runnable getTask() {

    for (;;) {
    
        try {
    
            int state = runState;
    
            if (state > SHUTDOWN)
    
                return null;
    
            Runnable r;
    
            if (state == SHUTDOWN)  // Help drain queue
    
                r = workQueue.poll();
    
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數大於核心池大小或者允許為核心池線程設置空閑時間,
    
                //則通過poll取任務,若等待一定的時間取不到任務,則返回null
    
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
    
            else
    
                r = workQueue.take();
    
            if (r != null)
    
                return r;
    
            if (workerCanExit()) {    //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
    
                if (runState >= SHUTDOWN) // Wake up others
    
                    interruptIdleWorkers();   //中斷處於空閑狀態的worker
    
                return null;
    
            }
    
            // Else retry
    
        } catch (InterruptedException ie) {
    
            // On interruption, re-check runState
    
        }
    
    }

}

 private Runnable getTask() {//新版
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

  在getTask中,先判斷當前線程池狀態,如果runState大於SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。

  如果runState為SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。

  如果當前線程池的線程數大於核心池大小corePoolSize或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回null。

  然後判斷取到的任務r是否為null,為null則通過調用workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實現:

private boolean workerCanExit() {

    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    
    boolean canExit;
    
    //如果runState大於等於STOP,或者任務緩存隊列為空了
    
    //或者  允許為核心池線程設置空閑存活時間並且線程池中的線程數目大於1
    
    try {
    
        canExit = runState >= STOP ||
    
            workQueue.isEmpty() ||
    
            (allowCoreThreadTimeOut &&
    
             poolSize > Math.max(1, corePoolSize));
    
    } finally {
    
        mainLock.unlock();
    
    }
    
    return canExit;

}

  也就是說如果線程池處於STOP狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間並且線程數大於1時,允許worker退出。如果允許worker退出,則調用interruptIdleWorkers()中斷處於空閑狀態的worker,我們看一下interruptIdleWorkers()的實現:

void interruptIdleWorkers() {

    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    
    try {
    
        for (Worker w : workers)  //實際上調用的是worker的interruptIfIdle()方法
    
            w.interruptIfIdle();
    
    } finally {
    
        mainLock.unlock();
    
    }

}

  從實現可以看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {

    final ReentrantLock runLock = this.runLock;
    
    if (runLock.tryLock()) {    //注意這裡,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的
    
                                //如果成功獲取了鎖,說明當前worker處於空閑狀態
    
        try {
    
    if (thread != Thread.currentThread())  
    
    thread.interrupt();
    
        } finally {
    
            runLock.unlock();
    
        }
    
    }

}

  這裡有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這裡,並沒有採用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和複雜度,這裡直接讓執行完任務的線程去任務緩存隊列裏面取任務來執行。

  我們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想非常相似,唯一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小並且往任務隊列中添加任務失敗的情況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

    Thread t = null;
    
    final ReentrantLock mainLock = this.mainLock;
    
    mainLock.lock();
    
    try {
    
        if (poolSize < maximumPoolSize && runState == RUNNING)
    
            t = addThread(firstTask);
    
    } finally {
    
        mainLock.unlock();
    
    }
    
    if (t == null)
    
        return false;
    
    t.start();
    
    return true;

}

  看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本一模一樣,只是if語句判斷條件中的poolSize < maximumPoolSize不同而已。

  到這裡,大部分朋友應該對任務提交給線程池之後到被執行的整個過程有了一個基本的了解,下面總結一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含義;

  2)其次,要知道Worker是用來起到什麼作用的;

  3)要知道任務提交給線程池之後的處理策略,這裡總結一下主要有4點:

如果當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
如果當前線程池中的線程數目達到maximumPoolSize,則會採取任務拒絕策略進行處理;
如果線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;如果允許為核心池中的線程設置存活時間,那麼核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
3.線程池中的線程初始化

  默認情況下,創建線程池之後,線程池中是沒有線程的,需要提交任務之後才會創建線程。

  在實際中如果需要線程池創建之後立即創建線程,可以通過以下兩個方法辦到:

prestartCoreThread():初始化一個核心線程;
prestartAllCoreThreads():初始化所有核心線程
  下面是這2個方法的實現:

public boolean prestartCoreThread() {

    return addIfUnderCorePoolSize(null); //注意傳進去的參數是null

}

 

public int prestartAllCoreThreads() {

    int n = 0;
    
    while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
    
        ++n;
    
    return n;

}

  注意上面傳進去的參數是null,根據第2小節的分析可知如果傳進去的參數為null,則最後執行線程會阻塞在getTask方法中的

1

r = workQueue.take();

  即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

  在前面我們多次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

  workQueue的類型為BlockingQueue,通常可以取下面三種類型:

  1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列創建時必須指定大小;

  2)LinkedBlockingQueue:基於鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

  當線程池的任務緩存隊列已滿並且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會採取任務拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)

ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

從runworker方法中我們可以看到對應的Exception都是保存在thrownn中,在finally中交給了 afterExecute進行了處理。

try {
    beforeExecute(wt, task);
    Throwable thrown = null;
    try {
        task.run();
    } catch (RuntimeException x) {
        thrown = x; throw x;
    } catch (Error x) {
        thrown = x; throw x;
    } catch (Throwable x) {
        thrown = x; throw new Error(x);
    } finally {
        afterExecute(task, thrown);
    }
} finally {
    task = null;
    w.completedTasks++;
    w.unlock();
}

6.線程池的關閉

ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完後才終止,但再也不會接受新的任務
shutdownNow():立即終止線程池,並嘗試打斷正在執行的任務,並且清空任務緩存隊列,返回尚未執行的任務
7.線程池容量的動態調整

  ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

setCorePoolSize:設置核心池大小
setMaximumPoolSize:設置線程池最大能創建的線程數目大小
  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創建新的線程來執行任務。

三.使用示例
  前面我們討論了關於線程池的實現原理,這一節我們來看一下它的具體使用:

public class Test {

     public static void main(String[] args) {   
    
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
    
                 new ArrayBlockingQueue<Runnable>(5)       
         for(int i=0;i<15;i++){
    
             MyTask myTask = new MyTask(i);
    
             executor.execute(myTask);
    
             System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
    
             executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
    
         }
    
         executor.shutdown();
    
     }

}

class MyTask implements Runnable {

    private int taskNum;
    public MyTask(int num) {
    
        this.taskNum = num;
    
    }
    @Override
    
    public void run() {
    
        System.out.println("正在執行task "+taskNum);
    
        try {
    
            Thread.currentThread().sleep(4000);
    
        } catch (InterruptedException e) {
    
            e.printStackTrace();
    
        }
    
        System.out.println("task "+taskNum+"執行完畢");
    
    }

}

  執行結果:

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 13
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 14
task 3執行完畢
task 0執行完畢
task 2執行完畢
task 1執行完畢
正在執行task 8
正在執行task 7
正在執行task 6
正在執行task 5
task 4執行完畢
task 10執行完畢
task 11執行完畢
task 13執行完畢
task 12執行完畢
正在執行task 9
task 14執行完畢
task 8執行完畢
task 5執行完畢
task 7執行完畢
task 6執行完畢
task 9執行完畢

  從執行結果可以看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了之後,便創建新的線程。如果上面程序中,將for循環中改成執行20個任務,就會拋出任務拒絕異常了。

  不過在java doc中,並不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來創建線程池:

Executors.newCachedThreadPool();        //創建一個緩衝池,緩衝池容量大小為Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   //創建容量為1的緩衝池

Executors.newFixedThreadPool(int);    //創建固定容量大小的緩衝池

  下面是這三個靜態方法的具體實現;

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,
    
                                  0L, TimeUnit.MILLISECONDS,
    
                                  new LinkedBlockingQueue<Runnable>());

}

public static ExecutorService newSingleThreadExecutor() {

    return new FinalizableDelegatedExecutorService
    
        (new ThreadPoolExecutor(1, 1,
    
                                0L, TimeUnit.MILLISECONDS,
    
                                new LinkedBlockingQueue<Runnable>()));

}

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    
                                  60L, TimeUnit.SECONDS,
    
                                  new SynchronousQueue<Runnable>());

}

  從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

 

 newFixedThreadPool創建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

 newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置為1,也使用的LinkedBlockingQueue;

 newCachedThreadPool將corePoolSize設置為0,將maximumPoolSize設置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。

  實際中,如果Executors提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

四.如何合理配置線程池的大小
  本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

  一般需要根據任務的類型來配置線程池大小:

  如果是CPU密集型任務,就需要盡量壓榨CPU,參考值可以設為 NCPU+1

  如果是IO密集型任務,參考值可以設置為2*NCPU

  當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。