disruptor筆記之八:知識點補充(終篇)
- 2021 年 10 月 8 日
- 筆記
歡迎訪問我的GitHub
//github.com/zq2599/blog_demos
內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;
《disruptor筆記》系列鏈接
本篇概覽
- 本文是《disruptor筆記》系列的終篇,前面咱們看了那麼多代碼,也寫了那麼多代碼,現在咱們去看幾個知識點,在輕鬆的閱讀過程中完成disruptor之旅;
- 要關注的知識點有以下四處:
- 偽共享
- Translators
- Lambda風格
- 清理數據
- 接下來開始逐個了解;
偽共享
- 下圖是多核處理器的CPU緩存,可見每個核都有自己的L1和L2緩存,而L3緩存是共享的:
-
假設disruptor的Sequence是long型,那麼一個生產者和一個消費者的disruptor應該有兩個long型Sequence,在L1中緩存這兩個數字時,因為每個緩存行大小是64位元組,所以兩個Sequence很有可能在一個緩存行中
-
此時如果程序修改了生產者Sequence的值,就會讓L1上對應的緩存行失效,再從Main memory中讀取最新的值,此時因為消費者Sequence也在同一個緩存行中,因此也被失效了,這就導致一個沒有變化的值也被清理掉,還要再去Main memory中取一次,這是影響性能的行為
-
看到這裡,聰明的您一定想到解決問題的思路:不要讓兩個Sequence在同一個緩存行中
-
具體的手段呢?您有沒有聯想到日常生活中的佔座位,在身邊座位放個背包,其他人就不能使用了(這是不文明行為,僅舉例用)
-
實際上disruptor用的也是佔座的套路,咱們來看看Sequence的源碼就一目了然了,如下圖所示,Sequence的值是Value對象的成員變量value:
// 父類,
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{
...
- 類圖如下,可見Value的父子類中都是佔位的long型:
- 因此,Sequence對象有16個成員變量,在L1 cache中是下圖的排列方式:
- 想像一下,L1 cache的緩存行,每64位元組為一個,也就是說上面那一串,每八個就佔據一個緩存行(每個long型8位元組),於是就有以下三種排列的可能:
- V出現在一個緩存行的首位;
- V出現在一個緩存行的末尾;
- V出現在一個緩存行的首位和末尾之間的其他六個位置之一;
- 也就是下圖三種可能(U是L1 cache中的其他內容),可見不論哪種可能,V都能用P把自己所在緩存行全部佔座,這樣就不會出現兩個Sequence出現在同一個緩存行的情況了:
Translators
-
Translators是個小的編程技巧,disruptor幫使用者做了些封裝,讓發佈事件的代碼更簡潔;
-
打開disruptor-tutorials項目的consume-mode這個module,回顧一下,業務發佈事件要調用的方法,在OrderEventProducer.java中:
public void onData(String content) {
// ringBuffer是個隊列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
long sequence = ringBuffer.next();
try {
// sequence位置取出的事件是空事件
OrderEvent orderEvent = ringBuffer.get(sequence);
// 空事件添加業務信息
orderEvent.setValue(content);
} finally {
// 發佈
ringBuffer.publish(sequence);
}
}
-
上面的代碼中,其實開發者最關注的是orderEvent.setValue(content)這部分,其他幾行是我從官方demo抄的…
-
顯然disruptor也發現了這個小問題,於是從3.0版本開始提供了EventTranslatorOneArg接口,開發者將業務邏輯放入放在此接口的實現類中,至於前面代碼中的ringBuffer.next()、ringBuffer.get(sequence)這些,以及try-finally代碼塊這些東西統統都省去了,咱們可以將OrderEventProducer.java改造成一個新的類,代碼如下,可見新增內部類EventTranslatorOneArg,裏面是將數據轉為事件的業務邏輯,對外提供調用的onData方法中,只需一行代碼即可,和業務無關的代碼全部省掉了:
package com.bolingcavalry.service;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
public class OrderEventProducerWithTranslator {
// 存儲數據的環形隊列
private final RingBuffer<OrderEvent> ringBuffer;
public OrderEventProducerWithTranslator(RingBuffer<OrderEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 內部類
*/
private static final EventTranslatorOneArg<OrderEvent, String> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, String>() {
@Override
public void translateTo(OrderEvent event, long sequence, String arg0) {
event.setValue(arg0);
}
};
public void onData(String content) {
ringBuffer.publishEvent(TRANSLATOR, content);
}
}
-
在consume-mode中,上述代碼有對應的服務類TranslatorPublishServiceImpl.java,並且有對應的單元測試代碼(ConsumeModeServiceTest.testTranslatorPublishService),這裡就不佔篇幅了,您若有興趣可以自行查閱;
-
看看ringBuffer.publishEvent的內部實現,是如何幫咱們省去之前那幾行的,首先是調用了sequencer.next:
@Override
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
{
final long sequence = sequencer.next();
translateAndPublish(translator, sequence, arg0);
}
- 再打開translateAndPublish看看,ringBuffer.get、try-finally代碼塊、sequencer.publish都在,這下放心了,以前咱們做的事情,現在disruptor幫我們做了,咱們可以專心業務邏輯了:
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
{
try
{
translator.translateTo(get(sequence), sequence, arg0);
}
finally
{
sequencer.publish(sequence);
}
}
Lambda風格
- disruptor的重要API也支持Lambda表達式作為入參,這裡將幾處常用的API整理如下:
- Disruptor類實例化(LambdaServiceImpl.java):
// lambda類型的實例化
disruptor = new Disruptor<OrderEvent>(OrderEvent::new, BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
- 設置事件消費者的時候,可以用Lambda取代之前的對象(LambdaServiceImpl.java):
// lambda表達式指定具體消費邏輯
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
log.info("lambda操作, sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
// 這裡延時100ms,模擬消費事件的邏輯的耗時
Thread.sleep(100);
// 計數
eventCountPrinter.accept(null);
});
- 發佈事件的操作,也支持Lambda表達式,如下所示,我在父類ConsumeModeService.java中新增publistEvent方法,裏面調用的disruptor.getRingBuffer().publishEvent的入參就是Lambda表達式和事件所需的業務數據,這樣就省區了以前發佈事件的類OrderEventProducer.java:
public void publistEvent(EventTranslatorOneArg<OrderEvent, String> translator, String value) {
disruptor.getRingBuffer().publishEvent(translator, value);
}
- 如下所示,現在拿到業務數據後發佈事件的操作變得非常輕了,Lambda表達式中做好業務數據轉事件的邏輯即可,最終,不再需要OrderEventProducer.java,一行代碼完成事件發佈(ConsumeModeServiceTest.java):
for(int i=0;i<EVENT_COUNT;i++) {
log.info("publich {}", i);
final String content = String.valueOf(i);
lambdaService.publistEvent((event, sequence, value) -> event.setValue(value), content);
}
清理數據
- 由於存儲的數據結構是環形隊列,對於每個事件的實例,會一直保存在隊列中,直到再次在這個位置寫入時才會被新的事件實例覆蓋,考慮到可能有的場景要求數據被消費後就立即被清除,disruptor官方提供了以下建議:
- 事件定義的類中,增加一個清理業務數據的方法(假設是ObjectEvent類的clear方法);
- 新增一個事件處理類(假設是ClearingEventHandler),在裏面主動調用事件定義類的清理業務數據的方法;
- 在編寫事件消費邏輯時,最後添加上述事件處理類ClearingEventHandler,這樣就會調用ObjectEvent實例的clear方法,將業務數據清理掉;
- 官方給出的代碼如下:
- 至此,整個《disruptor筆記》就完成了,感謝您的關注,希望這個系列的內容能給您帶來幫助,在開發中多一些選擇和參考;
你不孤單,欣宸原創一路相伴
歡迎關注公眾號:程序員欣宸
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos