手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(九) – Spring AMQP 集成與配置

手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(一) – 介紹
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(二) – 資料庫設計
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(三) – 項目初始化
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(四) – 日誌 & 跨域配置
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(五) – MyBatis-Plus & 程式碼生成器集成與配置
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(六) – 本地快取 Caffeine 和 分散式快取 Redis 集成與配置
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(七) – Elasticsearch 8.2 集成與配置
手把手教你使用 Spring Boot 3 開發上線一個前後端分離的生產級系統(八) – XXL-JOB 集成與配置

Spring AMQP 介紹

AMQP(高級消息隊列協議)是一個非同步消息傳遞所使用的應用層協議規範,為面向消息的中間件設計,不受產品和開發語言的限制. Spring AMQP 將核心 Spring 概念應用於基於 AMQP 消息傳遞解決方案的開發。

RabbitMQ 是基於 AMQP 協議的輕量級、可靠、可擴展、可移植的消息中間件,Spring 使用 RabbitMQ 通過 AMQP 協議進行通訊。Spring Boot 為通過 RabbitMQ 使用 AMQP 提供了多種便利,包括 spring-boot-starter-amqp 「Starter」。

Spring AMQP 集成與配置

  1. 可通過如下 Docker 命令 安裝 RabbiMQ:
 docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
  1. 登錄 RabbiMQ 的 web 管理介面,創建虛擬主機novel:

RabbitMQ

  1. 項目中加入如下的 maven 依賴:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在 application.yml 配置文件中加入 RabbitMQ 的連接配置:
spring:
  rabbitmq:
    addresses: "amqp://guest:[email protected]"
    virtual-host: novel
    template:
      retry:
        # 開啟重試
        enabled: true
        # 最大重試次數
        max-attempts: 3
        # 第一次和第二次重試之間的持續時間
        initial-interval: "3s"
  1. 此時已經可以在 Spring Beans 中注入 AmqpTemplate 發送消息了。

Spring AMQP 使用示例

  1. io.github.xxyopen.novel.core.constant包下創建 AMQP 相關常量類:
/**
 * AMQP 相關常量
 *
 * @author xiongxiaoyang
 * @date 2022/5/25
 */
public class AmqpConsts {

    /**
     * 小說資訊改變 MQ
     * */
    public static class BookChangeMq{

        /**
         * 小說資訊改變交換機
         * */
        public static final String EXCHANGE_NAME = "EXCHANGE-BOOK-CHANGE";

        /**
         * Elasticsearch book 索引更新的隊列
         * */
        public static final String QUEUE_ES_UPDATE = "QUEUE-ES-BOOK-UPDATE";

        /**
         * Redis book 快取更新的隊列
         * */
        public static final String QUEUE_REDIS_UPDATE = "QUEUE-REDIS-BOOK-UPDATE";

        // ... 其它的更新隊列

    }

}
  1. io.github.xxyopen.novel.core.config包下創建 AMQP 配置類,配置各個交換機、隊列以及綁定關係:
/**
 * AMQP 配置類
 *
 * @author xiongxiaoyang
 * @date 2022/5/25
 */
@Configuration
public class AmqpConfig {

    /**
     * 小說資訊改變交換機
     */
    @Bean
    public FanoutExchange bookChangeExchange() {
        return new FanoutExchange(AmqpConsts.BookChangeMq.EXCHANGE_NAME);
    }

    /**
     * Elasticsearch book 索引更新隊列
     */
    @Bean
    public Queue esBookUpdateQueue() {
        return new Queue(AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE);
    }

    /**
     * Elasticsearch book 索引更新隊列綁定到小說資訊改變交換機
     */
    @Bean
    public Binding esBookUpdateQueueBinding() {
        return BindingBuilder.bind(esBookUpdateQueue()).to(bookChangeExchange());
    }

    // ... 其它的更新隊列以及綁定關係

}
  1. io.github.xxyopen.novel.manager.mq包下創建 AMQP 消息管理類,用來發送各種 AMQP 消息:
/**
 * AMQP 消息管理類
 *
 * @author xiongxiaoyang
 * @date 2022/5/25
 */
@Component
@RequiredArgsConstructor
public class AmqpMsgManager {

    private final AmqpTemplate amqpTemplate;

    @Value("${spring.amqp.enable}")
    private String enableAmqp;

    /**
     * 發送小說資訊改變消息
     */
    public void sendBookChangeMsg(Long bookId) {
        if (Objects.equals(enableAmqp, CommonConsts.TRUE)) {
            sendAmqpMessage(amqpTemplate, AmqpConsts.BookChangeMq.EXCHANGE_NAME, null, bookId);
        }
    }

    private void sendAmqpMessage(AmqpTemplate amqpTemplate, String exchange, String routingKey, Object message) {
        // 如果在事務中則在事務執行完成後再發送,否則可以直接發送
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
                @Override
                public void afterCommit() {
                    amqpTemplate.convertAndSend(exchange, routingKey, message);
                }
            });
            return;
        }
        amqpTemplate.convertAndSend(exchange, routingKey, message);
    }

}
  1. 在小說資訊更新後,發送 AMQP 消息:
@Transactional(rollbackFor = Exception.class)
@Override
public RestResp<Void> saveBookChapter(ChapterAddReqDto dto) {
    // 1) 保存章節相關資訊到小說章節表
    //  a) 查詢最新章節號
   
    //  b) 設置章節相關資訊並保存   

    // 2) 保存章節內容到小說內容表

    // 3) 更新小說表最新章節資訊和小說總字數資訊
    //  a) 更新小說表關於最新章節的資訊
    
    //  b) 發送小說資訊更新的 MQ 消息
    amqpMsgManager.sendBookChangeMsg(dto.getBookId());
    return RestResp.ok();
}
  1. io.github.xxyopen.novel.core.listener包下創建 Rabbit 隊列監聽器,監聽各個 RabbitMQ 隊列的消息並處理:
/**
 * Rabbit 隊列監聽器
 *
 * @author xiongxiaoyang
 * @date 2022/5/25
 */
@Component
@RequiredArgsConstructor
@Slf4j
public class RabbitQueueListener {

    private final BookInfoMapper bookInfoMapper;

    private final ElasticsearchClient esClient;

    /**
     * 監聽小說資訊改變的 ES 更新隊列,更新最新小說資訊到 ES
     * */
    @RabbitListener(queues = AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE)
    @SneakyThrows
    public void updateEsBook(Long bookId) {
        BookInfo bookInfo = bookInfoMapper.selectById(bookId);
        IndexResponse response = esClient.index(i -> i
                .index(EsConsts.BookIndex.INDEX_NAME)
                .id(bookInfo.getId().toString())
                .document(EsBookDto.build(bookInfo))
        );
        log.info("Indexed with version " + response.version());
    }

    // ... 監聽其它隊列,刷新其它副本數據

}

此時,如果需要更新其它小說副本數據,只需要配置更新隊列和增加監聽器,不需要在小說資訊變更的地方增加任何業務程式碼,而且任意小說副本的數據刷新之間互不影響,真正實現了模組間的解耦。