[從源碼學設計]螞蟻金服SOFARegistry 之 自動調節間隔周期性任務
- 2020 年 12 月 19 日
- 筆記
- 005_源碼分析, 008_微服務, 210_SOFAStack
[從源碼學設計]螞蟻金服SOFARegistry 之 自動調節間隔周期性任務
0x00 摘要
SOFARegistry 是螞蟻金服開源的一個生產級、高時效、高可用的服務註冊中心。
本系列文章重點在於分析設計和架構,即利用多篇文章,從多個角度反推總結 DataServer 或者 SOFARegistry 的實現機制和架構思路,讓大家藉以學習阿里如何設計。
本文為第九篇,介紹SOFARegistry自動調節間隔周期性任務的實現。
0x01 業務領域
螞蟻金服這裡的業務需求主要是:
- 啟動一個無限循環任務,不定期執行任務;
- 啟動若干周期性延時任務;
- 某些周期性任務需要實現自動調節間隔功能:程式一旦遇到發生超時異常,就將間隔時間調大,如果連續超時,那麼每次間隔時間都會增大一倍,一直到達外部參數設定的上限為止,一旦新任務不再發生超時異常,間隔時間又會自動恢復為初始值
0x02 阿里方案
阿里採用了:
- ExecutorService實現了無限循環任務;
- ScheduledExecutorService 實現了周期性任務;
- TimedSupervisorTask 實現了自動調節間隔的周期性任務;
我們在設計延時/周期性任務時就可以參考TimedSupervisorTask的實現
0x03 Scheduler
Scheduler類中就是這個方案的體現。
首先,我們需要看看 Scheduler的程式碼。
public class Scheduler {
private final ScheduledExecutorService scheduler;
public final ExecutorService versionCheckExecutor;
private final ThreadPoolExecutor expireCheckExecutor;
@Autowired
private AcceptorStore localAcceptorStore;
public Scheduler() {
scheduler = new ScheduledThreadPoolExecutor(4, new NamedThreadFactory("SyncDataScheduler"));
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
}
public void startScheduler() {
scheduler.schedule(
new TimedSupervisorTask("FetchDataLocal", scheduler, expireCheckExecutor, 3,
TimeUnit.SECONDS, 10, () -> localAcceptorStore.checkAcceptorsChangAndExpired()),
30, TimeUnit.SECONDS);
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
}
public void stopScheduler() {
if (scheduler != null && !scheduler.isShutdown()) {
scheduler.shutdown();
}
if (versionCheckExecutor != null && !versionCheckExecutor.isShutdown()) {
versionCheckExecutor.shutdown();
}
}
}
接下來我們就逐一分析下其實現或者說是設計選擇。
0x04 無限循環任務
阿里這裡採用ExecutorService實現了無限循環任務,不定期完成業務。
4.1 ExecutorService
Executor:一個JAVA介面,其定義了一個接收Runnable對象的方法executor,其方法簽名為executor(Runnable command),該方法接收一個Runable實例,用來執行一個實現了Runnable介面的類。
ExecutorService:是一個比Executor使用更廣泛的子類介面。
其提供了生命周期管理的方法,返回 Future 對象,以及可跟蹤一個或多個非同步任務執行狀況返回Future的方法;
當所有已經提交的任務執行完畢後將會關閉ExecutorService。因此我們一般用該介面來實現和管理多執行緒。
這裡ExecutorService雖然其不能提供周期性功能,但是localAcceptorStore.changeDataCheck
本身就是一個while (true) loop,其可以依靠DelayQueue來完成類似周期功能。
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
versionCheckExecutor.execute(() -> localAcceptorStore.changeDataCheck());
public void changeDataCheck() {
while (true) {
try {
DelayItem<Acceptor> delayItem = delayQueue.take();
Acceptor acceptor = delayItem.getItem();
removeCache(acceptor); // compare and remove
} catch (InterruptedException e) {
break;
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
}
0x05 周期任務
阿里這裡採用了 ScheduledExecutorService 實現了周期性任務。
5.1 ScheduledExecutorService
ScheduledExecutorService是一種執行緒池,ScheduledExecutorService在ExecutorService提供的功能之上再增加了延遲和定期執行任務的功能。
其schedule方法創建具有各種延遲的任務,並返回可用於取消或檢查執行的任務對象。
尋常的Timer的內部只有一個執行緒,如果有多個任務的話就會順序執行,這樣我們的延遲時間和循環時間就會出現問題,而且異常未檢查會中止執行緒。
ScheduledExecutorService是執行緒池,並且執行緒池對異常做了處理,使得任務之間不會有影響。在對延遲任務和循環任務要求嚴格的時候,就需要考慮使用ScheduledExecutorService了。
0x06 Queue的選擇
6.1 ThreadPoolExecutor的queue
ThreadPoolExecutor的完整構造方法的簽名如下
ThreadPoolExecutor
(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler)12
其中,workQueue參數介紹如下:
workQueue任務隊列):用於保存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列。
- ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序;
- LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列;
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個執行緒調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列;
- PriorityBlockingQueue:一個具有優先順序的無限阻塞隊列;
6.2 SOFARegistry選擇
這裡採用了兩種Queue。
expireCheckExecutor = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("SyncDataScheduler-expireChangeCheck"));
versionCheckExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new NamedThreadFactory(
"SyncDataScheduler-versionChangeCheck"));
6.3 LinkedBlockingQueue
LinkedBlockingQueue是一種阻塞隊列。
LinkedBlockingQueue內部由單鏈表實現了BlockingQueue介面,只能從head取元素,從tail添加元素。
LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對並發進行控制,也就是說LinkedBlockingQueue是讀寫分離的,添加和刪除操作並不是互斥操作,可以並行進行,這樣也就可以大大提高吞吐量。
LinkedBlockingQueue不同於ArrayBlockingQueue,它如果不指定容量,默認為Integer.MAX_VALUE
,也就是無界隊列。如果存在添加速度大於刪除速度時候,有可能會記憶體溢出,所以為了避免隊列過大造成機器負載或者記憶體爆滿的情況出現,我們在使用的時候建議手動傳一個隊列的大小。
另外,LinkedBlockingQueue對每一個lock鎖都提供了一個Condition用來掛起和喚醒其他執行緒。
6.4 SynchronousQueue
不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue內部並沒有數據快取空間。
你不能調用peek()方法來看隊列中是否有數據元素,因為數據元素只有當你試著取走的時候才可能存在,不取走而只想偷窺一下是不行的,當然遍歷這個隊列的操作也是不允許的。隊列頭元素是第一個排隊要插入數據的執行緒,而不是要交換的數據。
數據是在配對的生產者和消費者執行緒之間直接傳遞的,並不會將數據緩衝數據到隊列中。可以這樣來理解:生產者和消費者互相等待對方,握手,然後一起離開。
SynchronousQueue的一個使用場景是在執行緒池裡。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個執行緒池根據需要(新任務到來時)創建新的執行緒,如果有空閑執行緒則會重複使用,執行緒空閑了60秒後會被回收。
0x07 自動調節間隔的周期性任務
TimedSupervisorTask 是一個自動調節間隔的周期性任務。這裡基本是借鑒了Eureka的同名實現,但是SOFA這裡去除了「部分異常處理邏輯」。
從整體上看,TimedSupervisorTask是固定間隔的周期性任務,一旦遇到超時就會將下一個周期的間隔時間調大,如果連續超時,那麼每次間隔時間都會增大一倍,一直到達外部參數設定的上限為止,一旦新任務不再超時,間隔時間又會自動恢復為初始值,另外還有CAS來控制多執行緒同步。
主要邏輯如下:
- 執行submit()方法提交任務;
- 執行future.get()方法,如果沒有在規定的時間得到返回值或者任務出現異常,則進入異常處理catch程式碼塊;
- 如果沒有發生異常,則再設置一次延時任務時間timeoutMillis;
- 如果發生異常:
- 發生TimeoutException異常,則執行
Math.min(maxDelay, currentDelay x 2)
得到任務延時時間 x 2 和 最大延時時間的最小值,然後改變任務的延時時間timeoutMillis; - 發生RejectedExecutionException異常,SOFA只是列印log。Eureka則將rejectedCounter值+1;
- 發生Throwable異常,SOFA只是列印log。Eureka則將throwableCounter值+1;
- 發生TimeoutException異常,則執行
- 進入finally程式碼塊
- .如果future不為null,則執行future.cancel(true),中斷執行緒停止任務;
- 如果執行緒池沒有shutdown,則創建一個新的定時任務;最關鍵就在上面的最後一行程式碼中:
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS)
:執行完任務後,會再次調用schedule方法,在指定的時間之後執行一次相同的任務,這個間隔時間和最近一次任務是否超時有關,如果超時了就間隔時間就會變大;
其實現如下:
public class TimedSupervisorTask extends TimerTask {
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor executor;
private final long timeoutMillis;
private final Runnable task;
private String name;
private final AtomicLong delay;
private final long maxDelay;
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler,
ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit,
int expBackOffBound, Runnable task) {
this.name = name;
this.scheduler = scheduler;
this.executor = executor;
this.timeoutMillis = timeUnit.toMillis(timeout);
this.task = task;
this.delay = new AtomicLong(timeoutMillis);
this.maxDelay = timeoutMillis * expBackOffBound;
}
@Override
public void run() {
Future future = null;
try {
//使用Future,可以設定子執行緒的超時時間,這樣當前執行緒就不用無限等待了
future = executor.submit(task);
//指定等待子執行緒的最長時間
// block until done or timeout
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 每次執行任務成功都會將delay重置
delay.set(timeoutMillis);
} catch (TimeoutException e) {
long currentDelay = delay.get();
// 如果出現異常,則將時間*2,然後取 定時時間 和 最長定時時間 中最小的為下次任務執行的延時時間
long newDelay = Math.min(maxDelay, currentDelay * 2);
// 設置為最新的值,考慮到多執行緒,所以用了CAS
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
// 執行緒池的阻塞隊列中放滿了待處理任務,觸發了拒絕策略
LOGGER.error("{} task supervisor rejected the task: {}", name, task, e);
} catch (Throwable e) {
// 出現未知的異常
LOGGER.error("{} task supervisor threw an exception", name, e);
} finally {
//這裡任務要麼執行完畢,要麼發生異常,都用cancel方法來清理任務;
if (future != null) {
future.cancel(true);
}
//這裡就是周期性任務的原因:只要沒有停止調度器,就再創建一次性任務,執行時間時dealy的值,
//假設外部調用時傳入的超時時間為30秒(構造方法的入參timeout),最大間隔時間為50秒(構造方法的入參expBackOffBound)
//如果最近一次任務沒有超時,那麼就在30秒後開始新任務,
//如果最近一次任務超時了,那麼就在50秒後開始新任務(異常處理中有個乘以二的操作,乘以二後的60秒超過了最大間隔50秒)
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
0xFF 參考
Eureka系列(六) TimedSupervisorTask類解析
Eureka的TimedSupervisorTask類(自動調節間隔的周期性任務)
java執行緒池ThreadPoolExecutor類使用詳解
Java執行緒池ThreadPoolExecutor實現原理剖析
深入理解Java執行緒池:ThreadPoolExecutor
Java中執行緒池ThreadPoolExecutor原理探究
ScheduledExecutorService 和 Timer 的區別
Java並發包中的同步隊列SynchronousQueue實現原理
ThreadPoolExecutor執行緒池解析與BlockingQueue的三種實現
【細談Java並發】談談LinkedBlockingQueue