disruptor筆記之二:Disruptor類分析
- 2021 年 9 月 25 日
- 筆記
歡迎訪問我的GitHub
//github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列鏈接
本篇概覽
- 通過前文的實戰,咱們對Disruptor有了初步認識,藉助com.lmax.disruptor.dsl.Disruptor類可以輕鬆完成以下操作:
- 環形隊列初始化
- 指定事件消費者
- 啟動消費者執行緒
- 接下來要面對兩個問題:
- 深入了解Disruptor類是如何完成上述操作的;
- 對Disruptor類有了足夠了解時,嘗試不用Disruptor,自己動手操作環形隊列,實現消息的生產和消費,這樣做的目的是加深對Disruptor內部的認識,做到知其所以然;
- 接下來咱們先解決第一個問題吧,結合Disruptor對象的源碼來看看上述三個操作到底做了什麼;
環形隊列初始化
- 環形隊列初始化發生在實例化Disruptor對象的時候,即Disruptor的構造方法:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
- RingBuffer.createMultiProducer方法內部實例化了RingBuffer,如下圖紅框:
- 記下第一個重要知識點:創建RingBuffer對象;
指定事件消費者
- 在前文中,下面這行程式碼指定了事件由StringEventHandler消費:
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
- 查看handleEventsWith方法的內部:
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
- 展開createEventProcessors方法,如下圖,請重點關注創建SequenceBarrier和BatchEventProcessor等操作:
- 展開上圖紅框四中的updateGatingSequencesForNextInChain方法,如下圖,紅框中的ringBuffer.addGatingSequences需要重點關註:
- 小結一下,disruptor.handleEventsWith方法涉及到四個重要知識點:
- 創建SequenceBarrier對象,用於接收ringBuffer中的可消費事件
- 創建BatchEventProcessor,負責消費事件
- 綁定BatchEventProcessor對象的異常處理類
- 調用ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
啟動消費者執行緒
- 前文已通過日誌確定了消費事件的邏輯是在一個獨立的執行緒中執行的,啟動消費者執行緒的程式碼如下:
disruptor.start();
- 展開start方法,如下可見,關鍵程式碼是consumerInfo.start(executor):
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
- ConsumerInfo是介面,對應的實現類有EventProcessorInfo和WorkerPoolInfo兩種,這裡應該是哪種呢?既然來源是consumerRepository,這就要看當初是怎麼存入consumerRepository的,前面在分析createEventProcessors方法時,下圖紅框中的consumerRepository.add被忽略了,現在需要進去看看:
- 進去後一目了然,可見ConsumerInfo的實現是EventProcessorInfo:
- 所以,回到前面對consumerInfo.start(executor)方法的分析,這裡要看的就是EventProcessorInfo的start方法了,如下圖,非常簡單,就是啟動一個執行緒執行eventprocessor(這個eventprocessor是BatchEventProcessor對象):
- 小結一下,disruptor.start方法涉及到一個重要知識點:
- 啟動獨立執行緒,用來執行消費事件的業務邏輯;
消費事件的邏輯
- 為了理解消息處理邏輯,還要重點關注BatchEventProcessor.processEvents方法,如下圖所示,其實也很簡單,就是不停的從環形隊列取出可用的事件,然後再更新自己的Sequence,相當於標記已經消費到哪裡了:
總結
最後總結Disruptor類的重要功能:
- 創建環形隊列(RingBuffer對象)
- 創建SequenceBarrier對象,用於接收ringBuffer中的可消費事件
- 創建BatchEventProcessor,負責消費事件
- 綁定BatchEventProcessor對象的異常處理類
- 調用ringBuffer.addGatingSequences,將消費者的Sequence傳給ringBuffer
- 啟動獨立執行緒,用來執行消費事件的業務邏輯
- 聰明的您一定會發現,本文並沒有全面分析Disruptor類的源碼,例如after、shutdown等方法都沒有提到,確實如此,欣宸在此給您道歉了,本篇的重點是找出那些與基本功能有關程式碼,為後面的實戰提供理論指導(不用Disruptor類實現消息生產消費的實戰),因此很多高級功能都跳過了;
理解官方流程圖
- 此時再看官方流程圖,聰明的您應該很快就能理解此圖表達的意思:每個消費者都有自己的Sequence,通過此Sequence取得自己在環形隊列中消費的位置,再通過SequenceBarrier來等待可用事件的出現,等到事件出現了就用get方法取出具體的事件,給EventHandler來處理:
後續預告
- 此時,咱們對Disruptor類已經有了比較深入的理解,接下來的文章,咱們會嘗試不用Disruptor類,僅憑著對RingBuffer對象的操作來實現以下三種功能:
- 100個事件,單個消費者消費;
- 100個事件,三個消費者,每個都獨自消費這個100個事件;
- 100個事件,三個消費者共同消費這個100個事件;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程式設計師欣宸
微信搜索「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos