【高並發】通過源碼深度解析ThreadPoolExecutor類是如何保證線程池正確運行的

大家好,我是冰河~~

對於線程池的核心類ThreadPoolExecutor來說,有哪些重要的屬性和內部類為線程池的正確運行提供重要的保障呢?

ThreadPoolExecutor類中的重要屬性

在ThreadPoolExecutor類中,存在幾個非常重要的屬性和方法,接下來,我們就介紹下這些重要的屬性和方法。

ctl相關的屬性

AtomicInteger類型的常量ctl是貫穿線程池整個生命周期的重要屬性,它是一個原子類對象,主要用來保存線程的數量和線程池的狀態,我們看下與這個屬性相關的代碼如下所示。

//主要用來保存線程數量和線程池的狀態,高3位保存線程狀態,低29位保存線程數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程池中線程的數量的位數(32-3)
private static final int COUNT_BITS = Integer.SIZE - 3;
//表示線程池中的最大線程數量
//將數字1的二進制值向右移29位,再減去1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//線程池的運行狀態
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
//獲取線程狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//獲取線程數量
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
	return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
	return c >= s;
}
private static boolean isRunning(int c) {
	return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
	return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
	return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
	do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

對於線程池的各狀態說明如下所示。

  • RUNNING:運行狀態,能接收新提交的任務,並且也能處理阻塞隊列中的任務
  • SHUTDOWN: 關閉狀態,不能再接收新提交的任務,但是可以處理阻塞隊列中已經保存的任務,當線程池處於RUNNING狀態時,調用shutdown()方法會使線程池進入該狀態
  • STOP: 不能接收新任務,也不能處理阻塞隊列中已經保存的任務,會中斷正在處理任務的線程,如果線程池處於RUNNING或SHUTDOWN狀態,調用shutdownNow()方法,會使線程池進入該狀態
  • TIDYING: 如果所有的任務都已經終止,有效線程數為0(阻塞隊列為空,線程池中的工作線程數量為0),線程池就會進入該狀態。
  • TERMINATED: 處於TIDYING狀態的線程池調用terminated ()方法,會使用線程池進入該狀態

也可以按照ThreadPoolExecutor類的注釋,將線程池的各狀態之間的轉化總結成如下圖所示。

  • RUNNING -> SHUTDOWN:顯式調用shutdown()方法, 或者隱式調用了finalize()方法
  • (RUNNING or SHUTDOWN) -> STOP:顯式調用shutdownNow()方法
  • SHUTDOWN -> TIDYING:當線程池和任務隊列都為空的時候
  • STOP -> TIDYING:當線程池為空的時候
  • TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候

其他重要屬性

除了ctl相關的屬性外,ThreadPoolExecutor類中其他一些重要的屬性如下所示。

//用於存放任務的阻塞隊列  
private final BlockingQueue<Runnable> workQueue;
//可重入鎖
private final ReentrantLock mainLock = new ReentrantLock();
//存放線程池中線程的集合,訪問這個集合時,必須獲得mainLock鎖
private final HashSet<Worker> workers = new HashSet<Worker>();
//在鎖內部阻塞等待條件完成
private final Condition termination = mainLock.newCondition();
//線程工廠,以此來創建新線程
private volatile ThreadFactory threadFactory;
//拒絕策略
private volatile RejectedExecutionHandler handler;
//默認的拒絕策略
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

ThreadPoolExecutor類中的重要內部類

在ThreadPoolExecutor類中存在對於線程池的執行至關重要的內部類,Worker內部類和拒絕策略內部類。接下來,我們分別看這些內部類。

Worker內部類

Worker類從源代碼上來看,實現了Runnable接口,說明其本質上是一個用來執行任務的線程,接下來,我們看下Worker類的源代碼,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
	private static final long serialVersionUID = 6138294804551838833L;
	//真正執行任務的線程
	final Thread thread;
	//第一個Runnable任務,如果在創建線程時指定了需要執行的第一個任務
	//則第一個任務會存放在此變量中,此變量也可以為null
	//如果為null,則線程啟動後,通過getTask方法到BlockingQueue隊列中獲取任務
	Runnable firstTask;
	//用於存放此線程完全的任務數,注意:使用了volatile關鍵字
	volatile long completedTasks;
	
	//Worker類唯一的構造放大,傳遞的firstTask可以為null
	Worker(Runnable firstTask) {
		//防止在調用runWorker之前被中斷
		setState(-1);
		this.firstTask = firstTask;
		//使用ThreadFactory 來創建一個新的執行任務的線程
		this.thread = getThreadFactory().newThread(this);
	}
	//調用外部ThreadPoolExecutor類的runWorker方法執行任務
	public void run() {
		runWorker(this);
	}

	//是否獲取到鎖 
	//state=0表示鎖未被獲取
	//state=1表示鎖被獲取
	protected boolean isHeldExclusively() {
		return getState() != 0;
	}

	protected boolean tryAcquire(int unused) {
		if (compareAndSetState(0, 1)) {
			setExclusiveOwnerThread(Thread.currentThread());
			return true;
		}
		return false;
	}

	protected boolean tryRelease(int unused) {
		setExclusiveOwnerThread(null);
		setState(0);
		return true;
	}

	public void lock()        { acquire(1); }
	public boolean tryLock()  { return tryAcquire(1); }
	public void unlock()      { release(1); }
	public boolean isLocked() { return isHeldExclusively(); }

	void interruptIfStarted() {
		Thread t;
		if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
			try {
				t.interrupt();
			} catch (SecurityException ignore) {
			}
		}
	}
}

在Worker類的構造方法中,可以看出,首先將同步狀態state設置為-1,設置為-1是為了防止runWorker方法運行之前被中斷。這是因為如果其他線程調用線程池的shutdownNow()方法時,如果Worker類中的state狀態的值大於0,則會中斷線程,如果state狀態的值為-1,則不會中斷線程。

Worker類實現了Runnable接口,需要重寫run方法,而Worker的run方法本質上調用的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先調用unlock方法,該方法會將state置為0,所以這個時候調用shutDownNow方法就會中斷當前線程,而這個時候已經進入了runWork方法,就不會在還沒有執行runWorker方法的時候就中斷線程。

注意:大家需要重點理解Worker類的實現。

拒絕策略內部類

在線程池中,如果workQueue阻塞隊列滿了,並且沒有空閑的線程池,此時,繼續提交任務,需要採取一種策略來處理這個任務。而線程池總共提供了四種策略,如下所示。

  • 直接拋出異常,這也是默認的策略。實現類為AbortPolicy。
  • 用調用者所在的線程來執行任務。實現類為CallerRunsPolicy。
  • 丟棄隊列中最靠前的任務並執行當前任務。實現類為DiscardOldestPolicy。
  • 直接丟棄當前任務。實現類為DiscardPolicy。

在ThreadPoolExecutor類中提供了4個內部類來默認實現對應的策略,如下所示。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

	public CallerRunsPolicy() { }

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		if (!e.isShutdown()) {
			r.run();
		}
	}
}

public static class AbortPolicy implements RejectedExecutionHandler {

	public AbortPolicy() { }

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
	}
}

public static class DiscardPolicy implements RejectedExecutionHandler {

	public DiscardPolicy() { }

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
	}
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

	public DiscardOldestPolicy() { }


	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		if (!e.isShutdown()) {
			e.getQueue().poll();
			e.execute(r);
		}
	}
}

我們也可以通過實現RejectedExecutionHandler接口,並重寫RejectedExecutionHandler接口的rejectedExecution方法來自定義拒絕策略,在創建線程池時,調用ThreadPoolExecutor的構造方法,傳入我們自己寫的拒絕策略。

例如,自定義的拒絕策略如下所示。

public class CustomPolicy implements RejectedExecutionHandler {

	public CustomPolicy() { }

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		if (!e.isShutdown()) {
			System.out.println("使用調用者所在的線程來執行任務")
			r.run();
		}
	}
}

使用自定義拒絕策略創建線程池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
		       new CustomPolicy());

今天就到這兒吧,我是冰河,我們下期見~~

Tags: