disruptor筆記之八:知識點補充(終篇)

  • 2021 年 10 月 8 日
  • 筆記

歡迎訪問我的GitHub

//github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列鏈接

  1. 快速入門
  2. Disruptor類分析
  3. 環形隊列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 本文是《disruptor筆記》系列的終篇,前面咱們看了那麼多代碼,也寫了那麼多代碼,現在咱們去看幾個知識點,在輕鬆的閱讀過程中完成disruptor之旅;
  • 要關注的知識點有以下四處:
  1. 偽共享
  2. Translators
  3. Lambda風格
  4. 清理數據
  • 接下來開始逐個了解;

偽共享

  • 下圖是多核處理器的CPU緩存,可見每個核都有自己的L1和L2緩存,而L3緩存是共享的:

在這裡插入圖片描述

  • 假設disruptor的Sequence是long型,那麼一個生產者和一個消費者的disruptor應該有兩個long型Sequence,在L1中緩存這兩個數字時,因為每個緩存行大小是64位元組,所以兩個Sequence很有可能在一個緩存行中

  • 此時如果程序修改了生產者Sequence的值,就會讓L1上對應的緩存行失效,再從Main memory中讀取最新的值,此時因為消費者Sequence也在同一個緩存行中,因此也被失效了,這就導致一個沒有變化的值也被清理掉,還要再去Main memory中取一次,這是影響性能的行為

  • 看到這裡,聰明的您一定想到解決問題的思路:不要讓兩個Sequence在同一個緩存行中

  • 具體的手段呢?您有沒有聯想到日常生活中的佔座位,在身邊座位放個背包,其他人就不能使用了(這是不文明行為,僅舉例用)

  • 實際上disruptor用的也是佔座的套路,咱們來看看Sequence的源碼就一目了然了,如下圖所示,Sequence的值是Value對象的成員變量value

// 父類,
class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}

class Value extends LhsPadding
{
    protected volatile long value;
}

class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}

public class Sequence extends RhsPadding
{
	...
  • 類圖如下,可見Value的父子類中都是佔位的long型:

在這裡插入圖片描述

  • 因此,Sequence對象有16個成員變量,在L1 cache中是下圖的排列方式:

在這裡插入圖片描述

  • 想像一下,L1 cache的緩存行,每64位元組為一個,也就是說上面那一串,每八個就佔據一個緩存行(每個long型8位元組),於是就有以下三種排列的可能:
  1. V出現在一個緩存行的首位;
  2. V出現在一個緩存行的末尾;
  3. V出現在一個緩存行的首位和末尾之間的其他六個位置之一;
  • 也就是下圖三種可能(U是L1 cache中的其他內容),可見不論哪種可能,V都能用P把自己所在緩存行全部佔座,這樣就不會出現兩個Sequence出現在同一個緩存行的情況了:

在這裡插入圖片描述

Translators

  • Translators是個小的編程技巧,disruptor幫使用者做了些封裝,讓發佈事件的代碼更簡潔;

  • 打開disruptor-tutorials項目的consume-mode這個module,回顧一下,業務發佈事件要調用的方法,在OrderEventProducer.java中:

    public void onData(String content) {

        // ringBuffer是個隊列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 空事件添加業務信息
            orderEvent.setValue(content);
        } finally {
            // 發佈
            ringBuffer.publish(sequence);
        }
    }
  • 上面的代碼中,其實開發者最關注的是orderEvent.setValue(content)這部分,其他幾行是我從官方demo抄的…

  • 顯然disruptor也發現了這個小問題,於是從3.0版本開始提供了EventTranslatorOneArg接口,開發者將業務邏輯放入放在此接口的實現類中,至於前面代碼中的ringBuffer.next()、ringBuffer.get(sequence)這些,以及try-finally代碼塊這些東西統統都省去了,咱們可以將OrderEventProducer.java改造成一個新的類,代碼如下,可見新增內部類EventTranslatorOneArg,裏面是將數據轉為事件的業務邏輯,對外提供調用的onData方法中,只需一行代碼即可,和業務無關的代碼全部省掉了:

package com.bolingcavalry.service;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

public class OrderEventProducerWithTranslator {

    // 存儲數據的環形隊列
    private final RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducerWithTranslator(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 內部類
     */
    private static final EventTranslatorOneArg<OrderEvent, String> TRANSLATOR = new EventTranslatorOneArg<OrderEvent, String>() {
        @Override
        public void translateTo(OrderEvent event, long sequence, String arg0) {
            event.setValue(arg0);
        }
    };

    public void onData(String content) {
        ringBuffer.publishEvent(TRANSLATOR, content);
    }
}
  • 在consume-mode中,上述代碼有對應的服務類TranslatorPublishServiceImpl.java,並且有對應的單元測試代碼(ConsumeModeServiceTest.testTranslatorPublishService),這裡就不佔篇幅了,您若有興趣可以自行查閱;

  • 看看ringBuffer.publishEvent的內部實現,是如何幫咱們省去之前那幾行的,首先是調用了sequencer.next

@Override
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
    {
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence, arg0);
    }
  • 再打開translateAndPublish看看,ringBuffer.get、try-finally代碼塊、sequencer.publish都在,這下放心了,以前咱們做的事情,現在disruptor幫我們做了,咱們可以專心業務邏輯了:
    private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0)
    {
        try
        {
            translator.translateTo(get(sequence), sequence, arg0);
        }
        finally
        {
            sequencer.publish(sequence);
        }
    }

Lambda風格

  • disruptor的重要API也支持Lambda表達式作為入參,這裡將幾處常用的API整理如下:
  1. Disruptor類實例化(LambdaServiceImpl.java):
// lambda類型的實例化
disruptor = new Disruptor<OrderEvent>(OrderEvent::new, BUFFER_SIZE, DaemonThreadFactory.INSTANCE);
  1. 設置事件消費者的時候,可以用Lambda取代之前的對象(LambdaServiceImpl.java):
        // lambda表達式指定具體消費邏輯
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
            log.info("lambda操作, sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);
            // 這裡延時100ms,模擬消費事件的邏輯的耗時
            Thread.sleep(100);
            // 計數
            eventCountPrinter.accept(null);
        });
  1. 發佈事件的操作,也支持Lambda表達式,如下所示,我在父類ConsumeModeService.java中新增publistEvent方法,裏面調用的disruptor.getRingBuffer().publishEvent的入參就是Lambda表達式和事件所需的業務數據,這樣就省區了以前發佈事件的類OrderEventProducer.java:
public void publistEvent(EventTranslatorOneArg<OrderEvent, String> translator, String value) {
        disruptor.getRingBuffer().publishEvent(translator, value);
    }
  1. 如下所示,現在拿到業務數據後發佈事件的操作變得非常輕了,Lambda表達式中做好業務數據轉事件的邏輯即可,最終,不再需要OrderEventProducer.java,一行代碼完成事件發佈(ConsumeModeServiceTest.java):
for(int i=0;i<EVENT_COUNT;i++) {
  log.info("publich {}", i);
  final String content = String.valueOf(i);
  lambdaService.publistEvent((event, sequence, value) -> event.setValue(value), content);
}

清理數據

  • 由於存儲的數據結構是環形隊列,對於每個事件的實例,會一直保存在隊列中,直到再次在這個位置寫入時才會被新的事件實例覆蓋,考慮到可能有的場景要求數據被消費後就立即被清除,disruptor官方提供了以下建議:
  1. 事件定義的類中,增加一個清理業務數據的方法(假設是ObjectEvent類的clear方法);
  2. 新增一個事件處理類(假設是ClearingEventHandler),在裏面主動調用事件定義類的清理業務數據的方法;
  3. 在編寫事件消費邏輯時,最後添加上述事件處理類ClearingEventHandler,這樣就會調用ObjectEvent實例的clear方法,將業務數據清理掉;
  • 官方給出的代碼如下:

在這裡插入圖片描述

  • 至此,整個《disruptor筆記》就完成了,感謝您的關注,希望這個系列的內容能給您帶來幫助,在開發中多一些選擇和參考;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos