disruptor筆記之六:常見場景

  • 2021 年 9 月 29 日
  • 筆記

歡迎訪問我的GitHub

//github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

《disruptor筆記》系列鏈接

  1. 快速入門
  2. Disruptor類分析
  3. 環形隊列的基礎操作(不用Disruptor類)
  4. 事件消費知識點小結
  5. 事件消費實戰
  6. 常見場景
  7. 等待策略
  8. 知識點補充(終篇)

本篇概覽

  • 本文是《disruptor筆記》系列的第六篇,主要內容是將一些常用的消費模式做匯總,後續日常開發中如果有需要就能拿來即用;
  • 以下是常用的模式:
  1. 多個消費者獨立消費,前文已實現,本篇跳過
  2. 多個消費者共同消費,前文已實現,本篇跳過
  3. 既有獨立消費,也有共同消費,前文已實現,本篇跳過
  4. 多個生產者和多個獨立消費者:

在這裡插入圖片描述

  1. C1、C2獨立消費,C3依賴C1和C2

在這裡插入圖片描述

  1. C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3

在這裡插入圖片描述

  1. C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

  1. C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

  1. C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

  1. C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

在這裡插入圖片描述

關於本篇代碼

  • 為了省事兒,本次不會新建工程,而是直接使用前文consume-mode模塊,因此,下面這些類直接就直接使用了,無需重寫代碼:
  1. 事件定義:OrderEvent
  2. 事件工廠:OrderEventFactory
  3. 事件生產者:OrderEventProducer
  4. 用在獨立消費場景的事件消費者:MailEventHandler
  5. 用在共同消費場景的事件消費者:MailWorkHandler

源碼下載

名稱 鏈接 備註
項目主頁 //github.com/zq2599/blog_demos 該項目在GitHub上的主頁
git倉庫地址(https) //github.com/zq2599/blog_demos.git 該項目源碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該項目源碼的倉庫地址,ssh協議
  • 這個git項目中有多個文件夾,本次實戰的源碼在disruptor-tutorials文件夾下,如下圖紅框所示:

在這裡插入圖片描述

  • disruptor-tutorials是個父工程,裏面有多個module,本篇實戰的module是consume-mode,如下圖紅框所示:

在這裡插入圖片描述

多個生產者和多個獨立消費者

咱們即將實現下圖的邏輯:

在這裡插入圖片描述

  • 前面幾篇文章所有實戰的生產者都只有一個,到了本篇,為了讓consume-mode模塊的代碼能夠支持多生產者,咱們要對功能業務的抽象父類做以下兩處改動:
  1. init方法原本為private型,現在為了能讓子類重此方法,將其改為protected類型;

  2. 增加名為publishWithProducer2的方法,可見內部只有拋出異常,要想其正常工作,需要子類自己來實現:

public void publishWithProducer2(String value) throws Exception {
	throw new Exception("父類未實現此方法,請在子類中重寫此方法後再調用");
}
  • 為了實現多生產者功能,新增MultiProducerServiceImpl.java,有幾處要注意的地方稍後會提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import lombok.Setter;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;

@Service("multiProducerService")
public class MultiProducerServiceImpl extends ConsumeModeService {

    /**
     * 第二個生產者
     */
    @Setter
    protected OrderEventProducer producer2;

    @PostConstruct
    @Override
    protected void init() {
        // 實例化
        disruptor = new Disruptor<>(new OrderEventFactory(),
                BUFFER_SIZE,
                new CustomizableThreadFactory("event-handler-"),
                // 生產類型是多生產者
                ProducerType.MULTI,
                // BlockingWaitStrategy是默認的等待策略
                new BlockingWaitStrategy());

        // 留給子類實現具體的事件消費邏輯
        disruptorOperate();

        // 啟動
        disruptor.start();

        // 第一個生產者
        setProducer(new OrderEventProducer(disruptor.getRingBuffer()));

        // 第二個生產者
        setProducer2(new OrderEventProducer(disruptor.getRingBuffer()));
    }

    @Override
    protected void disruptorOperate() {
        // 一號消費者
        MailEventHandler c1 = new MailEventHandler(eventCountPrinter);

        // 二號消費者
        MailEventHandler c2 = new MailEventHandler(eventCountPrinter);

        // 調用handleEventsWithWorkerPool,表示創建的多個消費者以共同消費的模式消費
        disruptor.handleEventsWith(c1, c2);
    }

    @Override
    public void publishWithProducer2(String value) throws Exception {
        producer2.onData(value);
    }
}
  • 上述代碼有以下幾處要注意:
  1. 重寫父類的init方法,主要是實例化Disruptor的時候,多傳入兩個參數:ProducerType.MULTI表示生產類型是多生產者,BlockingWaitStrategy是等待策略,之前的代碼中咱們沒有傳此參數時,默認的就是BlockingWaitStrategy
  2. init方法中還執行了setProducer2方法,設置成員變量producer2
  3. 重寫publishWithProducer2方法,調用成員變量producer2發表事件
  4. 重寫disruptorOperate方法,裏面設置了兩個獨立消費者
  • 驗證上述代碼的方式依舊是單元測試,打開ConsumeModeServiceTest.java,新增以下代碼,可見新增了兩個線程同時執行發佈事件的操作:
    @Autowired
    @Qualifier("multiProducerService")
    ConsumeModeService multiProducerService;

	@Test
    public void testMultiProducerService() throws InterruptedException {
        log.info("start testMultiProducerService");
        CountDownLatch countDownLatch = new CountDownLatch(1);

        // 兩個生產者,每個生產100個事件,一共生產兩百個事件
        // 兩個獨立消費者,每人消費200個事件,因此一共消費400個事件
        int expectEventCount = EVENT_COUNT*4;

        // 告訴service,等消費到400個消息時,就執行countDownLatch.countDown方法
        multiProducerService.setCountDown(countDownLatch, expectEventCount);

        // 啟動一個線程,用第一個生產者生產事件
        new Thread(() -> {
            for(int i=0;i<EVENT_COUNT;i++) {
                log.info("publich {}", i);
                multiProducerService.publish(String.valueOf(i));
            }
        }).start();

        // 再啟動一個線程,用第二個生產者生產事件
        new Thread(() -> {
            for(int i=0;i<EVENT_COUNT;i++) {
                log.info("publishWithProducer2 {}", i);
                try {
                    multiProducerService.publishWithProducer2(String.valueOf(i));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 當前線程開始等待,前面的service.setCountDown方法已經告訴過service,
        // 等消費到expectEventCount個消息時,就執行countDownLatch.countDown方法
        // 千萬注意,要調用await方法,而不是wait方法!
        countDownLatch.await();

        // 消費的事件總數應該等於發佈的事件數
        assertEquals(expectEventCount, multiProducerService.eventCount());
    }
  • 測試結果如下,測試通過,符合預期:

在這裡插入圖片描述

C1、C2獨立消費,C3依賴C1和C2

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現代碼如下,非常簡單,依賴關係用then即可實現:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import com.bolingcavalry.service.SmsEventHandler;
import org.springframework.stereotype.Service;

@Service("scene5")
public class Scene5 extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c3 = new MailEventHandler(eventCountPrinter);

        disruptor
                // C1、C2獨立消費
                .handleEventsWith(c1, c2)
                // C3依賴C1和C2
                .then(c3);
    }
}
  • 單元測試代碼:
    @Autowired
    @Qualifier("scene5")
    Scene5 scene5;
 
    @Test
    public void testScene5 () throws InterruptedException {
        log.info("start testScene5");
        testConsumeModeService(scene5,
                EVENT_COUNT,
                // 三個獨立消費者,一共消費300個事件
                EVENT_COUNT * 3);
    }
  • 為了節省篇幅,測試結果就不貼了,要注意的是,每個事件都一定是C1和C2先消費過,才會被C3消費到;

C1獨立消費,C2和C3也獨立消費,但依賴C1,C4依賴C2和C3

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現代碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;

@Service("scene6")
public class Scene6 extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c4 = new MailEventHandler(eventCountPrinter);

        disruptor
                // C1
                .handleEventsWith(c1)
                // C2和C3也獨立消費
                .then(c2, c3)
                // C4依賴C2和C3
                .then(c4);
    }
}
  • 單元測試代碼:
    @Autowired
    @Qualifier("scene6")
    Scene6 scene6;
 
    @Test
    public void testScene6 () throws InterruptedException {
        log.info("start testScene6");
        testConsumeModeService(scene6,
                EVENT_COUNT,
                // 四個獨立消費者,一共消費400個事件
                EVENT_COUNT * 4);
    }

C1和C2獨立消費,C3和C4也是獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現代碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import org.springframework.stereotype.Service;

@Service("scene7")
public class Scene7 extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c5 = new MailEventHandler(eventCountPrinter);

        disruptor
                // C1和C2獨立消費
                .handleEventsWith(c1, c2)
                // C3和C4也是獨立消費,但C3和C4都依賴C1和C2
                .then(c3, c4)
                // 然後C5依賴C3和C4
                .then(c5);
    }
}
  • 單元測試代碼:
    @Autowired
    @Qualifier("scene7")
    Scene7 scene7;

    @Test
    public void testScene7 () throws InterruptedException {
        log.info("start testScene7");
        testConsumeModeService(scene7,
                EVENT_COUNT,
                // 五個獨立消費者,一共消費500個事件
                EVENT_COUNT * 5);
    }

C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現代碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;

/**
 * @author will ([email protected])
 * @version 1.0
 * @description: C1和C2共同消費,C3和C4也是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4
 * @date 2021/5/23 11:05
 */
@Service("scene8")
public class Scene8 extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
        MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
        MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
        MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
        MailWorkHandler c5 = new MailWorkHandler(eventCountPrinter);

        disruptor
                // C1和C2共同消費
                .handleEventsWithWorkerPool(c1, c2)
                // C3和C4也是獨立消費,但C3和C4都依賴C1和C2
                .thenHandleEventsWithWorkerPool(c3, c4)
                // 然後C5依賴C3和C4
                .thenHandleEventsWithWorkerPool(c5);
    }
}
  • 單元測試代碼:
    @Autowired
    @Qualifier("scene8")
    Scene8 scene8;

    @Test
    public void testScene8 () throws InterruptedException {
        log.info("start testScene8");
        testConsumeModeService(scene8,
                EVENT_COUNT,
                // C1和C2共同消費,C3和C4共同消費,C5雖然只是一個,但也是共同消費模式,
                // 也就是一共有三組消費者,所以一共消費300個事件
                EVENT_COUNT * 3);
    }

C1和C2共同消費,C3和C4獨立消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現代碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;

@Service("scene9")
public class Scene9 extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        MailWorkHandler c1 = new MailWorkHandler(eventCountPrinter);
        MailWorkHandler c2 = new MailWorkHandler(eventCountPrinter);
        MailEventHandler c3 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c4 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c5 = new MailEventHandler(eventCountPrinter);

        disruptor
                // C1和C2共同消費
                .handleEventsWithWorkerPool(c1, c2)
                // C3和C4獨立消費,但C3和C4都依賴C1和C2
                .then(c3, c4)
                // 然後C5依賴C3和C4
                .then(c5);
    }
}
  • 單元測試代碼:
    @Autowired
    @Qualifier("scene9")
    Scene9 scene9;

    @Test
    public void testScene9 () throws InterruptedException {
        log.info("start testScene9");
        testConsumeModeService(scene9,
                EVENT_COUNT,
                // C1和C2共同消費(100個事件),
                // C3和C4獨立消費(200個事件),
                // C5獨立消費(100個事件),
                // 所以一共消費400個事件
                EVENT_COUNT * 4);
    }

C1和C2獨立消費,C3和C4是共同消費,但C3和C4都依賴C1和C2,然後C5依賴C3和C4

  • 邏輯圖如下:

在這裡插入圖片描述

  • 實現代碼如下:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.ConsumeModeService;
import com.bolingcavalry.service.MailEventHandler;
import com.bolingcavalry.service.MailWorkHandler;
import org.springframework.stereotype.Service;

@Service("scene10")
public class Scene10 extends ConsumeModeService {

    @Override
    protected void disruptorOperate() {
        MailEventHandler c1 = new MailEventHandler(eventCountPrinter);
        MailEventHandler c2 = new MailEventHandler(eventCountPrinter);
        MailWorkHandler c3 = new MailWorkHandler(eventCountPrinter);
        MailWorkHandler c4 = new MailWorkHandler(eventCountPrinter);
        MailEventHandler c5 = new MailEventHandler(eventCountPrinter);

        disruptor
                // C1和C2共同消費
                .handleEventsWith(c1, c2)
                // C3和C4是共同消費,但C3和C4都依賴C1和C2
                .thenHandleEventsWithWorkerPool(c3, c4)
                // 然後C5依賴C3和C4
                .then(c5);
    }
}
  • 單元測試代碼:
    @Test
    public void testScene10 () throws InterruptedException {
        log.info("start testScene10");
        testConsumeModeService(scene10,
                EVENT_COUNT,
                // C1和C2獨立消費(200個事件),
                // C3和C4共同消費(100個事件),
                // C5獨立消費(100個事件),
                // 所以一共消費400個事件
                EVENT_COUNT * 4);
    }
  • 至此,一些常見場景的代碼已完成,希望本文能給您一些參考,幫您更得心應手的用好這個優秀的工具;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 數據庫+中間件系列
  6. DevOps系列

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢遊Java世界…
//github.com/zq2599/blog_demos