從中間件團隊竊取了這個組件,見識到了編碼能力的天花板!!
大家好,我是陶朱公Boy,又跟大家見面了。
前言
今天跟大家分享一款基於「生產者消費者模式」下實現的組件。
該組件是作者偶然在翻閱公司一中間件源碼的時候碰到的,覺得設計的非常精美、巧妙,花了點時間整理成文分享給大家。
生產者和消費者彼此之間不進行通訊,中間通過一個容器(如阻塞隊列)來解決強解耦問題。
阻塞隊列起到了一定的數據緩衝作用,平衡了生產者和消費者對數據的處理能力。by—《Java並發編程的藝術》
組件介紹
該組件基於生產者消費者模式來編碼實現,是一款本地化解決流量削峰、解耦、非同步的利器。
此組件由以下知識點構成:執行緒池、阻塞隊列、LockSupport、Executor框架、final、volatile。此外你還能接觸到hash取模演算法、介面回調等機制。
組件本身程式碼量並不大,但知識點比較密集,所以希望大家能花一點時間認真看完。我將從適用場景、架構設計、源碼解析這三個角度給大家講介紹這款組件。
適用場景
☆場景一:報表下載
現在很多後台下載功能,普適的做法是先篩選轉換數據,然後對接雲存儲平台進行保存,最後生成一個可訪問的文件地址,整個過程非常耗時。
其實完全可以生產者發送一個下載請求就結束響應,服務端非同步的去消費這個任務請求,處理完生成地址後,再進行通知(比如更新對應資料庫文件欄位)這是一種非同步體現,也解耦了生產者與消費者原來的同步交互方式,整體效率會更高。
☆場景二:日誌埋點
有些應用它的QPS非常高,產生的數據本身並不是特別重要比如埋點的日誌,如果實時調用埋點平台可能給平台側造成非常大的訪問壓力。所以這個時候中間的阻塞隊列就起到了一定的緩衝作用,等一段時間或隊列數據量達到一定量(參賽可動態配置)再一次性拿出來轉換後,最後批量傳遞出去。
☆場景三:Yana(阿里內部一款基於郵件分享技術文章的工具)
《Java並發編程的藝術》作者方騰飛有分享過他們基於生產者消費者模式實現的一個案例。
他們團隊早期有一個習慣,大家如果在平時工作當中遇到比較好的文章,會通過郵件轉發到專屬郵箱進行內部分享,這樣其他成員就能看到這篇文章,甚至大家會在底部評論、回復、交流。
但期間遇到一個問題:一旦時間一長,以前的文章很難被檢閱。郵件列表的可視化太差,也不能進行歸類,有些新入職員工也看不到以往其他成員分享過的文章。
基於這些問題,有幾個小夥伴自發的趁業餘時間開發了一個簡易工具–yana。該工具功能就是:生產者執行緒會先往郵箱里將所有分享的郵件下載下來(包括附件、圖片、郵件回復等內容),下載完成後,通過confluence的Web Service介面,把文章保存到confluence中去。這樣不僅好維護,而且留存問題也得到了解決。
不過隨著這款工具在其他部門的推廣,發現系統響應時間越來越長。只要單位時間內積累郵件一多,一次處理完可能就要花費幾分鐘。
於是他們升級了方案,把架構演進到了V2.0版本。整體思路是使用了生產者消費者模式來處理。
思路如下:生產者執行緒去郵件系統下載完郵件後,不會立即調用confluence的web service介面,而是選擇把下載的內容放入阻塞隊列後立即返回。而消費者啟動CPU*2個執行緒數來並行處理隊列中的郵件,從之前的單執行緒演變成了多執行緒處理,生產者和消費者實現了非同步、解耦。經過觀察,比起V1.0同步處理,速度比之前要快好了幾倍。
…
架構設計
☆對象圖
該組件支援「多生產者多消費者」場景,在多核時代充分利用CPU多核機制,消費者多執行緒並行處理阻塞隊列中的數據,加快任務處理速度。
☆邏輯架構圖
該組件內部持有一個工作執行緒對象數組,當生產者提交數據的時候,會先經過一個route組件(採用hash取模演算法),動態路由到其中一個執行緒對象內的阻塞隊列中存儲起來。等到滿足一定條件,工作執行緒就會將自身執行緒對象內阻塞隊列中的數據轉換成指定容量的List對象(BlockQueue的drainto方法有支援),然後調用已經註冊的回調函數把數據傳遞出去。
☆流程圖
我們一起來看下這張工作執行緒內部運行流程圖:
首選我們說此組件對象內部持有一個工作執行緒對象數組,每個工作執行緒對象內部持有一個有界阻塞隊列實例對象(ArrayBlockingQueue),方法有run(),add(),timeout()方法。
生產者調用組件自身的add方法後,add方法內部通過hash取模演算法動態路由到某個工作執行緒對象內部的blockingQueue中去。
timeout方法是這款組件設計的一個亮點(容錯性設計)👍。
假如實際運行過程中,工作執行緒內部的阻塞隊列內一直只佔少許幾個對象,如果僅僅只判斷隊列中的元素個數是否超出指定閾值,再去處理隊列中的數據,一旦長時間未超出,工作執行緒就會一直被阻塞,也將導致隊列中的數據長時間堆積。
所以新增的這個timeout()這個機制能應對一旦隊列中的數據長時間積壓,它會根據時間差即判斷當前時間距離上次任務處理時間是否超出指定閾值(可配置),如果超出了也會強制處理隊列中的數據。
源碼賞析
public class ProducerAndConsumerComponet {
private final static Logger log = LoggerFactory.getLogger(ProducerAndConsumerComponet.class);
//組件持有一個工作執行緒對象數組
private final WorkThread<T>[] workThreads;
private AtomicInteger index;
private static final Random r = new Random();
//任務定時器
private static ScheduledExecutorService scheduleThreadPool = new ScheduledThreadPoolExecutor(1);
//組件初始化完成工作執行緒的新建
private static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 構造器
* @param threadNum 默認新建的消費者執行緒個數
* @param limitSize 隊列長度閾值;超過將喚醒阻塞的執行緒
* @param period 前後兩個任務的執行周期 (for example :200ms 代表前面一次任務執行完畢後,200毫秒後下一個任務繼續執行)
* @param capacity 工作執行緒內部的有界阻塞隊列的初始容量大小
* @param processor 回調介面(初始化組價實例的時候需要傳遞)
*/
public ProducerAndConsumerComponet(int threadNum,int limitSize, int period, int capacity, Processor<T> processor) {
this.workThreads = new WorkThread[threadNum];
if (threadNum > 1) {
this.index = new AtomicInteger();
}
for(int i = 0; i < threadNum; ++i) {
WorkThread<T> workThread = new WorkThread("workThread"+ "_" + i, limitSize, period, capacity, processor);
this.workThreads[i] = workThread;
executorService.submit(workThread);
//調用scheduleAtFixedRate時,會向ScheduledThreadPoolExecutor的DelayQueue添加一個實現了RunableScheduleFuture介面的
//ScheduleFutureTask
scheduleThreadPool.scheduleAtFixedRate(workThread::timeout, r.nextInt(50), period, TimeUnit.MILLISECONDS);
}
}
/**
* 生產者執行緒將對象添加到對應消費者執行緒對象內部的阻塞隊列中去<br>
* 內部採用HASH取模演算法進行動態路由
* @param item 待添加的對象
* @return true:添加成功 false:添加失敗
*/
public boolean add(T item) {
// log.info("add item={}",item);
int len = this.workThreads.length;
//log.info("add len..."+len);
if (len == 1) {
return this.workThreads[0].add(item);
} else {
int mod = this.index.incrementAndGet() % len;
// log.info("路由到this.workThreads[mod]={}",mod);
return this.workThreads[mod].add(item);
}
}
/**
* 消費者執行緒
*/
private static class WorkThread<T> implements Runnable {
/**
* 工作執行緒命名
*/
private final String threadName;
/**
* 隊列中允許存放元素個數限制<br>
* 超出將從隊列中取出此大小的元素轉成List對象
*/
private final int queueSizeLimit;
/**
* 前後兩個任務的執行周期
*/
private int period;
/**
* 用來記錄任務的即時處理時間
*/
private volatile long lastFlushTime;
/**
* 當前工作執行緒對象
*/
private volatile Thread currentThread;
/**
* 工作執行緒對象內部的阻塞隊列
*/
private final BlockingQueue<T> queue;
/**
* 回調介面
*/
private final Processor<T> processor;
/**
* 消費者執行緒構造器
* @param threadName 執行緒名
* @param queueSizeLimit 指定隊列閾值(可配置)
* @param period 前後兩個任務的執行周期(可配置)
* @param capacity 阻塞隊列初始容量
* @param processor 回調介面
*/
public WorkThread(String threadName, int queueSizeLimit, int period, int capacity, Processor<T> processor) {
this.threadName = threadName;
this.queueSizeLimit = queueSizeLimit;
this.period = period;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue(capacity);
}
/**
* 往阻塞隊列中添加元素
* @param item 添加的對象
* @return true:添加成功 false:添加失敗
*/
public boolean add(T item) {
// log.info("add result:"+item);
boolean result = this.queue.offer(item);
// log.info("resultP{}",result);
this.checkQueueSize();
return result;
}
/**
* 當前時間與上次任務處理時間差是否超過指定閾值;如果超過觸發start方法
*/
public void timeout() {
// log.info("{}====check timeout",currentThread.getName());
if (System.currentTimeMillis() - this.lastFlushTime >= (long)this.period) {
log.info("當前時間距離上次任務處理時間周期={}超出指定閾值={}",System.currentTimeMillis() - this.lastFlushTime ,period);
this.start();
}
}
/**
* 喚醒被阻塞的工作執行緒
*/
private void start() {
log.info("執行start方法,喚醒被阻塞的執行緒"+currentThread.getName());
LockSupport.unpark(this.currentThread);
}
/**
* 判斷隊列實際長度是否超過指定閾值;如果超過觸發start方法
*/
private void checkQueueSize() {
if (this.queue.size() > this.queueSizeLimit) {
log.info("{}隊列大小={}超出指定閾值={}",currentThread.getName(),this.queue.size() ,queueSizeLimit);
this.start();
}
}
/**
* 將隊列中的元素通過調用<code>drainTo</code>方法,轉成List對象(容量受queueSizeLimit限制),最後調用回調函數傳遞List對象
*/
public void flush() {
if(queue.isEmpty()){
return;
}
this.lastFlushTime = System.currentTimeMillis();
List<T> temp = new ArrayList(this.queueSizeLimit);
int size = this.queue.drainTo(temp, this.queueSizeLimit);
if (size > 0) {
log.info("{}被喚醒後,開始執行任務:從隊列中騰出大小為{}的數據且轉成List對象",currentThread.getName(),size);
try {
//執行回調函數
this.processor.process(temp);
} catch (Throwable var4) {
System.out.println("process error");
}
}
}
/**
* 判斷隊列實際大小是否超過指定閾值亦或距離上次任務處理時間差是否超過指定閾值
* @return true:滿足觸發條件 false:不滿足觸發條件
*/
private boolean canFlush() {
return this.queue.size() > this.queueSizeLimit || System.currentTimeMillis() - this.lastFlushTime > (long)this.period;
}
@Override
public void run() {
this.currentThread = Thread.currentThread();
this.currentThread.setName(this.threadName);
//當前執行緒沒有被其他執行緒打斷
while(!this.currentThread.isInterrupted()) {
//死循環的判斷是否滿足觸發條件(隊列實際大小是否超出指定閾值或距離上次任務時間是否超出指定閾值),如果未滿足將阻塞當前執行緒,避免死循環給系統帶來性能開銷
while(!this.canFlush()) {
//當前工作執行緒被阻塞
log.info("執行緒被阻塞...");
LockSupport.park(this);
}
//一旦add方法執行的時候判斷存放的阻塞隊列元素大小超出自訂製閾值亦或距離上次任務處理時間差超出指定閾值,就會調用LockSupport.unpark方法解除阻塞的執行緒
//一旦執行緒被解除阻塞,就會觸發此方法,將隊列元素轉成List對象且調用已經註冊的回調函數
// log.info("阻塞執行緒被喚醒");
this.flush();
}
}
}
複製程式碼
測試用例
/**
* 前置條件:
* #主件初始化默認新增兩個工作執行緒
* config.threadNum=2
* config.period=12000
* config.queueSizeLimit=3
* config.capacity=10
*/
@DisplayName("隊列大小超出指定閾值")
@Test
void add() {
for(int i=0;i<10;i++){
producerAndConsumerComponet.add("1");
}
try {
TimeUnit.SECONDS.sleep(10);
}catch (Exception e){
e.printStackTrace();
}
}
結果列印:
2022-10-29 21:04:51,656 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_1隊列大小=4超出指定閾值=3
2022-10-29 21:04:51,658 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_1
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被喚醒後,開始執行任務:從隊列中騰出大小為3的數據且轉成List對象
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_0隊列大小=4超出指定閾值=3
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_0
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被喚醒後,開始執行任務:從隊列中騰出大小為3的數據且轉成List對象
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 當前時間距離上次任務處理時間周期=1714超出指定閾值=1000
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_0
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被喚醒後,開始執行任務:從隊列中騰出大小為2的數據且轉成List對象
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 當前時間距離上次任務處理時間周期=1720超出指定閾值=1000
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 執行start方法,喚醒被阻塞的執行緒workThread_1
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被喚醒後,開始執行任務:從隊列中騰出大小為2的數據且轉成List對象
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 執行緒被阻塞...
複製程式碼
可能有部分小夥伴對在生產環境如何正確使用這款組件有疑慮,對此完整版我已經開源到GitHub上(基於springboot構建內含如何正確初始化此組件以及完整的測試用例),有興趣的小夥伴可以自取。
github地址://github.com/TaoZhuGongBoy/ProducerAndConsumerComponet
總結
好了這款組件介紹已接近尾聲,接下來讓我們一起做下總結:
☆是什麼
我們說這款工具是一款基於「生產者-消費者模式」實現的組件。以前生產者必須同步調用、等待相關業務操作的處理結果後才能返回(一旦有些業務場景生產者生產的速度過快,方法內部自身業務處理又比較耗時)這時如果同步等待調用結果返回,系統整體吞吐量會極具降低。現在換了一種思路即生產者不需要同步等待業務的處理結果,當它發送一個請求後立即返回,耗時的處理由一致多個消費者執行緒來非同步處理,加快任務整體處理速度。(非同步、解耦)
☆適用場景
它比較適合處理一些重要程度不是很高的數據(比如埋點日誌、下載請求等),當生產者生產數據過快,業務本身處理又比較耗時,那用這個方案是比較合適的。
為什麼在這裡要強調重要程度不是很高這句話呢?因為BlockQueue畢竟是基於記憶體的數據結構,極端情況下是存在數據丟失風險的,像埋點日誌、下載請求這種數據小部分丟失其實對業務影響不大。
看到這裡可能有部分小夥伴會產生一個疑問:怎麼看這個組件的功能跟MQ這麼像。對的,功能是相似的,但這款組件是一個本地化的解決方案,目的就是為了降低引入消息隊列的複雜度才設計(設計意圖)。
☆怎麼實現的
每個消費者執行緒對象內部持有一個有界阻塞隊列,當外部生產者調用組件的add方法後,add方法內部實現路由,最終保存到指定的阻塞隊列內。
消費者執行緒本身死循環來判斷阻塞隊列中的元素是否滿足條件,如果不滿足,執行緒就會被阻塞(避免死循環給系統造成性能影響;通過Locksupport.park實現)。 一旦消費者執行緒對象內部的add、timeout方法滿足觸發條件後,被阻塞的執行緒就會被喚醒,然後執行緒繼續執行餘下業務邏輯:從阻塞隊列中取出數據,然後轉換成有初始容量限制的List對象後,調用回調介面傳遞數據。
寫到最後
這款組件幾乎囊括了並發編程領域半壁江山的技術點,能把這些散的點串起來,用好用對,著實不容易,也體現出組件作者深厚的基礎技術功底。
如果你是《並發編程》的初學者亦或有幾年經驗的老兵,都建議好好揣摩與學習一下這款組件的架構設計與編碼實現;如果在你的生產場景中你剛好也碰到類似問題與場景,那麼這款組件也許能幫助你。
本文完!
關注我
如果這篇文章你看了對你有幫助或啟發,麻煩點贊、關注一下作者。你的肯定是作者創作源源不斷的動力。
公眾號
裡面不僅彙集了硬核的乾貨技術、還彙集了像左耳朵耗子、張朝陽總結的高效學習方法論、職場升遷竅門、軟技能。希望能輔助你達到你想夢想之地!
公眾號內回復關鍵字「電子書」下載pdf格式的電子書籍(JAVAEE、Spring、JVM、並發編程、Mysql、Linux、kafka、分散式等)、「開發手冊」獲取阿里開發手冊2本、”面試“獲取面試PDF資料。