disruptor筆記之五:事件消費實戰

  • 2021 年 9 月 28 日
  • 筆記

歡迎訪問我的GitHub

//github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列鏈接

  1. 快速入門
  2. Disruptor類分析
  3. 環形隊列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

本篇是《disruptor筆記》的第五篇,前文《disruptor筆記之四:事件消費知識點小結》從理論上梳理分析了獨立消費和共同消費,留下了三個任務,今天就來成這些任務,即編碼實現以下三個場景:

  1. 100個訂單,簡訊和郵件系統獨立消費
  2. 100個訂單,郵件系統的兩個郵件伺服器共同消費;
  3. 100個訂單,簡訊系統獨立消費,與此同時,兩個郵件伺服器共同消費;

源碼下載

名稱 鏈接 備註
項目主頁 //github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) //github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:

在這裡插入圖片描述

  • disruptor-tutorials是個父工程,裡面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:

在這裡插入圖片描述

編寫公共程式碼

  • 為了完成任務,編碼實現上面那三個場景,咱們需要先把公共程式碼寫好;
  • 首先是在父工程disruptor-tutorials下面新建名為consume-mode的module,其build.gradle內容如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • springboot啟動類:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumeModeApplication {
	public static void main(String[] args) {
		SpringApplication.run(ConsumeModeApplication.class, args);
	}
}
  • 訂單事件定義:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class OrderEvent {

    private String value;
}
    • 訂單事件的工程類,定義事件實例如何創建:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class OrderEventFactory implements EventFactory<OrderEvent> {

    @Override
    public OrderEvent newInstance() {
        return new OrderEvent();
    }
}
  • 訂單事件生產者類,定義如何將業務資訊通過事件發布到環形隊列:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class OrderEventProducer {
    // 存儲數據的環形隊列
    private final RingBuffer<OrderEvent> ringBuffer;

    public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {
        // ringBuffer是個隊列,其next方法返回的是下最後一條記錄之後的位置,這是個可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            OrderEvent orderEvent = ringBuffer.get(sequence);
            // 空事件添加業務資訊
            orderEvent.setValue(content);
        } finally {
            // 發布
            ringBuffer.publish(sequence);
        }
    }
}
  • 消費訂單事件的簡訊服務,實現EventHandler介面,所以是用在獨立消費的場景:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class SmsEventHandler implements EventHandler<OrderEvent> {

    public SmsEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("簡訊服務 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 這裡延時100ms,模擬消費事件的邏輯的耗時
        Thread.sleep(100);

        // 如果外部傳入了consumer,就要執行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 消費訂單事件的郵件服務,實現EventHandler介面,所以是用在獨立消費的場景:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class MailEventHandler implements EventHandler<OrderEvent> {

    public MailEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("郵件服務 sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 這裡延時100ms,模擬消費事件的邏輯的耗時
        Thread.sleep(100);

        // 如果外部傳入了consumer,就要執行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 消費訂單事件的郵件服務,實現WorkHandler介面,所以是用在共同消費的場景:
package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class MailWorkHandler implements WorkHandler<OrderEvent> {

    public MailWorkHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以傳入Consumer實現類,每處理一條消息的時候,consumer的accept方法就會被執行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(OrderEvent event) throws Exception {
        log.info("共同消費模式的郵件服務 : {}", event);

        // 這裡延時100ms,模擬消費事件的邏輯的耗時
        Thread.sleep(100);

        // 如果外部傳入了consumer,就要執行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 最後,將發布和消費事件的邏輯寫在一個抽象類里,但是具體如何消費事件並不在此類中實現,而是留給子類,這個抽象類中有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service;

import com.lmax.disruptor.dsl.Disruptor;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public abstract class ConsumeModeService {
    /**
     * 獨立消費者數量
     */
    public static final int INDEPENDENT_CONSUMER_NUM = 2;

    /**
     * 環形緩衝區大小
     */
    protected int BUFFER_SIZE = 16;

    protected Disruptor<OrderEvent> disruptor;

    @Setter
    private OrderEventProducer producer;

    /**
     * 統計消息總數
     */
    protected final AtomicLong eventCount = new AtomicLong();

    /**
     * 這是輔助測試用的,
     * 測試的時候,完成事件發布後,測試主執行緒就用這個countDownLatch開始等待,
     * 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
     * 這樣測試主執行緒就可以結束等待了
     */
    private CountDownLatch countDownLatch;

    /**
     * 這是輔助測試用的,
     * 測試的時候,完成事件發布後,測試主執行緒就用這個countDownLatch開始等待,
     * 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
     * 這樣測試主執行緒就可以結束等待了
     */
    private int countDownLatchGate;

    /**
     * 準備一個匿名類,傳給disruptor的事件處理類,
     * 這樣每次處理事件時,都會將已經處理事件的總數列印出來
     */
    protected Consumer<?> eventCountPrinter = new Consumer<Object>() {
        @Override
        public void accept(Object o) {
            long count = eventCount.incrementAndGet();

            /**
             * 這是輔助測試用的,
             * 測試的時候,完成事件發布後,測試主執行緒就用這個countDownLatch開始等待,
             * 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
             * 這樣測試主執行緒就可以結束等待了
             */
            if (null!=countDownLatch && count>=countDownLatchGate) {
                countDownLatch.countDown();
            }
        }
    };

    /**
     * 發布一個事件
     * @param value
     * @return
     */
    public void publish(String value) {
        producer.onData(value);
    }

    /**
     * 返回已經處理的任務總數
     * @return
     */
    public long eventCount() {
        return eventCount.get();
    }

    /**
     * 這是輔助測試用的,
     * 測試的時候,完成事件發布後,測試主執行緒就用這個countDownLatch開始等待,
     * 在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,
     * 這樣測試主執行緒就可以結束等待了
     * @param countDownLatch
     * @param countDownLatchGate
     */
    public void setCountDown(CountDownLatch countDownLatch, int countDownLatchGate) {
        this.countDownLatch = countDownLatch;
        this.countDownLatchGate = countDownLatchGate;
    }

    /**
     * 留給子類實現具體的事件消費邏輯
     */
    protected abstract void disruptorOperate();

    @PostConstruct
    private void init() {
        // 實例化
        disruptor = new Disruptor<>(new OrderEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"));

        // 留給子類實現具體的事件消費邏輯
        disruptorOperate();

        // 啟動
        disruptor.start();

        // 生產者
        setProducer(new OrderEventProducer(disruptor.getRingBuffer()));
    }
}
  • 上述程式碼,有以下幾處需要注意:
  1. init方法是spring bean實例化後要執行的方法,這裡面實例化Disruptor,還啟動了消費執行緒,並且實例化了事件生產者,具體的事件消費邏輯,由子類在disruptorOperate方法中實現;
  2. eventCountPrinter是個匿名類實例,傳給事件消費的handler後,每消費一個事件都會執行一次eventCountPrinter.accept方法,這樣就把消費事件的總數準確的保存在eventCount變數中了;
  3. countDownLatch和countDownLatchGate是為了輔助單元測試而準備的,測試的時候,完成事件發布後,測試主執行緒就用這個countDownLatch開始等待,在消費到指定的數量(countDownLatchGate)後,消費執行緒執行countDownLatch的countDown方法,這樣測試主執行緒就可以結束等待了
  • 至此,公用程式碼就寫完了,可見抽象父類已經做好了大部分事情,咱們的子類可以聚焦事件消費的邏輯編排了,開始挨個實現那三個場景;

100個訂單,簡訊和郵件系統獨立消費

  • 兩個消費者獨立消費的邏輯非常簡單,就一行程式碼,調用handleEventsWith方法把所有消費者實例傳進去,就完事了:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;

@Service("independentModeService")
public class IndependentModeServiceImpl extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        // 調用handleEventsWith,表示創建的多個消費者,每個都是獨立消費的
        // 這裡創建兩個消費者,一個是簡訊的,一個是郵件的
        disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter), new MailEventHandler(eventCountPrinter));
    }
}
  • 單元測試程式碼如下,要注意的地方是發布完100事件後,調用countDownLatch.await()方法開始等待,直到消費者執行緒調用countDownLatch.countDown()方法解除等待,還有就是預期的消費消息總數等於200
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class ConsumeModeServiceTest {

    @Autowired
    @Qualifier("independentModeService")
    ConsumeModeService independentModeService;

    /**
     * 測試時生產的消息數量
     */
    private static final int EVENT_COUNT = 100;

    private void testConsumeModeService(ConsumeModeService service, int eventCount, int expectEventCount) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        // 告訴service,等消費到expectEventCount個消息時,就執行countDownLatch.countDown方法
        service.setCountDown(countDownLatch, expectEventCount);

        for(int i=0;i<eventCount;i++) {
            log.info("publich {}", i);
            service.publish(String.valueOf(i));
        }

        // 當前執行緒開始等待,前面的service.setCountDown方法已經告訴過service,
        // 等消費到expectEventCount個消息時,就執行countDownLatch.countDown方法
        // 千萬注意,要調用await方法,而不是wait方法!
        countDownLatch.await();

        // 消費的事件總數應該等於發布的事件數
        assertEquals(expectEventCount, service.eventCount());
    }

    @Test
    public void testIndependentModeService() throws InterruptedException {
        log.info("start testIndependentModeService");
        testConsumeModeService(independentModeService,
                EVENT_COUNT,
                EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
    }
}
  • 單元測試執行結果如下,符合預期:

在這裡插入圖片描述

100個訂單,郵件系統的兩個郵件伺服器共同消費

  • 兩個消費者共同消費的程式碼也很簡單,調用handleEventsWithWorkerPool方法即可,把共同消費的MailWorkHandler實例作為參數傳入:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;

@Service("shareModeService")
public class ShareModeServiceImpl extends ConsumeModeService {
    @Override
    protected void disruptorOperate() {
        // mailWorkHandler1模擬一號郵件伺服器
        MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);

        // mailWorkHandler2模擬一號郵件伺服器
        MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);

        // 調用handleEventsWithWorkerPool,表示創建的多個消費者以共同消費的模式消費
        disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
    }
}
  • 單元測試是在ConsumeModeServiceTest.java中添加如下程式碼,注意由於是共同消費,因此預期的消費事件數等於消息數,都是100:
    @Autowired
    @Qualifier("shareModeService")
    ConsumeModeService shareModeService;

    @Test
    public void testShareModeService() throws InterruptedException {
        log.info("start testShareModeService");
        testConsumeModeService(shareModeService, EVENT_COUNT, EVENT_COUNT);
    }
  • 執行單元測試,結果如下圖:

在這裡插入圖片描述

100個訂單,簡訊系統獨立消費,與此同時,兩個郵件伺服器共同消費

  • 最後一個場景,依舊很簡單,handleEventsWith調用一次,再調用一次handleEventsWithWorkerPool即可:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;

@Service("independentAndShareModeService")
public class IndependentAndShareModeServiceImpl extends ConsumeModeService {
    @Override
    protected void disruptorOperate() {
        // 調用handleEventsWith,表示創建的多個消費者,每個都是獨立消費的
        // 這裡創建一個消費者,簡訊服務
        disruptor.handleEventsWith(new SmsEventHandler(eventCountPrinter));

        // mailWorkHandler1模擬一號郵件伺服器
        MailWorkHandler mailWorkHandler1 = new MailWorkHandler(eventCountPrinter);

        // mailWorkHandler2模擬一號郵件伺服器
        MailWorkHandler mailWorkHandler2 = new MailWorkHandler(eventCountPrinter);

        // 調用handleEventsWithWorkerPool,表示創建的多個消費者以共同消費的模式消費
        disruptor.handleEventsWithWorkerPool(mailWorkHandler1, mailWorkHandler2);
    }
}
  • 單元測試是在ConsumeModeServiceTest.java中添加如下程式碼,預期的消費事件數應該是200,因為整體上是兩個獨立消費,只不過其中的一個內部有兩個消費者共同消費:
    @Autowired
    @Qualifier("independentAndShareModeService")
    ConsumeModeService independentAndShareModeService;

    @Test
    public void independentAndShareModeService() throws InterruptedException {
        log.info("start independentAndShareModeService");
        testConsumeModeService(independentAndShareModeService,
                EVENT_COUNT,
                EVENT_COUNT * ConsumeModeService.INDEPENDENT_CONSUMER_NUM);
    }
  • 單元測試結果如下,符合預期:

在這裡插入圖片描述

  • 至此,獨立消費和共同消費的實戰就完成了,藉助disruptor,三個常見場景都可以輕鬆完成,如果您正在做這些場景的開發,希望本文能給您一些參考;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜索「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos