SpringCloud(七)Stream消息驅動

Stream消息驅動

概述

屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型
官網://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.4.RELEASE/reference/html/

官方定義Spring Cloud Stream是一個構建消息驅動微服務的框架
應用程序通過 inputs 或者 outputs 來與Spring Cloud Stream中的 binder對象交互,通過配置來binding(綁定),而 Spring Cloud Stream 的 binder 對象負責與消息中間件交互,所以,我們只需要搞清楚如何與Spring Cloud Stream交互就可以方便使用消息驅動的方式
通過使用Spring Integration來連接消息代理中間件以實現消息事件驅動
Spring Cloud Stream是用於構建與共享消息傳遞系統連接的高度可伸縮的事件驅動微服務框架,該框架提供了一個靈活的編程模型,它建立在已建立和熟悉的Spring和最佳實踐上,包括支持持久化的發佈/訂閱、消費組以及消息分區這三個核心概念

目前僅支持RabbitMQ、Kafka

設計思想

標準MQ

生產者/消費者之間靠消息媒介(Message)傳遞信息內容
消息必須走特定的通道(消息通道MessageChannel)
消息通道里的消息如何被消費呢,誰負責收發處理消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息處理器所訂閱

使用原因

比方說我們用到了RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分區
這些中間件的差異性導致我們實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,後面的業務需求,我想往另外一種消息隊列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候springcloud Stream給我們提供了一種解耦合的方式
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行信息交互的時候,由於各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性
通過定義綁定器 Binder 作為中間層,實現了應用程序與消息中間件細節之間的隔離
通過嚮應用程序暴露統一的 Channel 通道,使得應用程序不需要再考慮各種不同的消息中間件實現
Stream中的消息通信方式遵循了發佈-訂閱模式,Topic主題進行廣播,在RabbitMQ就是Exchange,在Kafka中就是Topic

基本流程

  • Binder:很方便的連接中間件,屏蔽差異
  • Channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過Channel對隊列進行配置
  • Source和Sink:簡單的可理解為參照對象是Spring Cloud Stream自身,從Stream發佈消息就是輸出,接受消息就是輸入

常用API和註解

  • Middleware:中間件,目前只支持RabbitMQ和Kafka
  • Binder:Binder是應用與消息中間件之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現
  • @Input:註解標識輸入通道,通過該輸入通道接收到的消息進入應用程序
  • @Output:註解標識輸出通道,發佈的消息將通過該通道離開應用程序
  • @StreamListener:監聽隊列,用於消費者的隊列的消息接收
  • @EnableBinding:指信道channel和exchange綁定在一起

基本構建

新建三個子模塊,一個作為生產者進行發送消息模塊,兩個作為消息接收模塊

服務端

  1. 導入 pom 依賴
<!-- spring-cloud-starter-stream-rabbit -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. 修改 yml 配置文件
server:
  port: 8801

spring:
  application:
    name: stream-rabbitmq-provider
  cloud:
    stream:
      # 配置要綁定的 rabbitmq 的服務信息
      binders:
        # 表示定義的名稱,用於與 binding 整合
        defaultRabbit:
          # 消息組件類型
          type: rabbit
          # 設置 rabbitmq 相關配置環境
          enviroment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: admin
                password: 123456
      # 服務整合處理
      bindings:
        # 通道名稱
        output:
          # 表示要使用的 Exchange 名稱定義
          destination: studyExchange
          # 設置消息類型,本次為為 json,文本則設置「text/plain」
          content-type: application/json
            binder: defaultRabbit
  1. 業務類
    發送消息的接口
public interface MessageProvider {
    /**
     * 消息發送
     *  @return :返回值
     */
    Message<?> send();
}

發送消息接口的實現類

@EnableBinding(Source.class)
public class MessageProviderImpl implements MessageProvider {

    /**
     * @ InboundChannelAdapter
     * 作用:表示定義的方法能產生消息
     * fixedDelay:多少毫秒發送1次
     */
    @Override
    @InboundChannelAdapter(channel = Source.OUTPUT,poller = @Poller(fixedDelay = "10000"))  // 每隔10秒發送一次
    public Message<String> send() {
        String serial = UUID.randomUUID().toString();
        return MessageBuilder.withPayload(serial)
                .build();
    }
}

Controller 層

@RestController
public class SendMessageController {

    @Resource
    private MessageProvider provider;

    @GetMapping(value = "/senMessage")
    public Message senMessage(){
        return provider.send();
    }
}

消費者

  1. 導入 pom 依賴
<!-- spring-cloud-starter-stream-rabbit -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. 修改 yml 配置文件
# 服務整合處理
bindings:
  # 通道名稱
  input:
    # 表示要使用的 Exchange 名稱定義
    destination: studyExchange
    # 設置消息類型,本次為為 json,文本則設置「text/plain」
    content-type: application/json
    binder: defaultRabbit
  1. 業務類
@RestController
public class SendMessageController {

    @Resource
    private MessageProvider provider;

    @GetMapping(value = "/senMessage")
    public Message senMessage(){
        return provider.send();
    }
}

分組消費和持久化

當使用兩個消費者來進行接收消息時,會出現兩個問題:重複消費和消息持久化的問題

重複消費

目前是8802/8803同時都收到了,存在重複消費問題

解決方法:分組和持久化屬性Group
比如在如下場景中,訂單系統我們做集群部署,都會從RabbitMQ中獲取訂單信息
那如果一個訂單同時被兩個服務獲取到,那麼就會造成數據錯誤,為了避免這種情況這時我們就可以使用Stream中的消息分組來解決

注意在Stream中處於同一個group中的多個消費者是競爭關係,就能夠保證消息只會被其中一個應用消費一次,不同組是可以全面消費的(重複消費),同一組內會發生競爭關係,只有其中一個可以消費

分組

微服務應用放置於同一個group中,就能夠保證消息只會被其中一個應用消費一次不同的組是可以消費的,同一個組內會發生競爭關係,只有其中一個可以消費
自定義分組
修改消費者的 yml 文件,新增一個 group 的屬性

# 服務整合處理
bindings:
  # 通道名稱
  input:
    # 表示要使用的 Exchange 名稱定義
    destination: studyExchange
    # 設置消息類型,本次為為 json,文本則設置「text/plain」
    content-type: application/json
    binder: defaultRabbit
   group: Consumer

分佈式微服務應用為了實現高可用和負載均衡,實際上都會部署多個實例,這裡舉例實現兩個消費微服務
多數情況,生產者發送消息給某個具體微服務時只希望被消費一次,按照上面啟動兩個應用的例子,雖然它們同屬一個應用,但是這個消息出現了被重複消費兩次的情況。為了解決這個問題,在Spring Cloud Stream中提供了消費組的概念
實現輪詢分組,每次只有一個消費者,生產者發送的消息只能被一個消費者接收到,避免重複消費

將案例的兩個消費者變成相同組
同一個組的多個微服務實例,每次只會有一個拿到

持久化

配置好 group 這個屬性後可以發現
當消費者發生一些錯誤停止服務時,但是此時的生產者還在不斷的發送消息,如果消費者沒有配置 group ,那麼這些消息就被錯過了
當配置好 group 這個屬性,消費者就算髮生一些錯誤停止服務,再啟動時,就會獲取到之前停止服務期間生產者發來的消息