九. SpringCloud Stream消息驅動
- 2021 年 3 月 4 日
- 筆記
- SpringCloud, 消息中間件
1. 消息驅動概述
1.1 是什麼
在實際應用中有很多消息中間件,比如現在企業里常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,學習所有這些消息中間件無疑需要大量時間經歷成本,那有沒有一種技術,使我們不再需要關注具體的消息中間件的細節,而只需要用一種適配綁定的方式,自動的在各種消息中間件內切換呢?消息驅動就是這樣的技術,它能 屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。
SpringCloud Stream是一個構件消息驅動微服務的框架。應用程式通過inputs和outputs來與SpringCloud Stream中的綁定器(binder)對象交互,通過配置來綁定,而SpringCloud Stream的綁定器對象負責與消息中間件交互,所以,我們只需要搞清楚如何與SpringCloud Stream交互就可以方便使用消息驅動的方式。但是 截至到目前 SpringCloud Stream僅支援RabbitMQ和Kafka。
1.2 設計思想
標準MQ模型
- 生產者 / 消費者之間靠消息媒介傳遞資訊內容 –
Messag
- 消息必須走特定的通道 –
Message Channel
- 消息通道里的消息如何被消費呢?誰負責處理? – 消息通道
MessageChannel
的子介面SubscribableChannel
,由 MessageHandler 消息處理器所訂閱
為什麼使用Cloud Stream
比如說我們用到了RabbitMQ和Kafka,由於這兩個消息中間件的架構上的不同,像RabbitMQ有exchange,Kafka有Topic和Partitions分區,這些中間件的差異性導致實際項目開發給我們造成了一定的困擾,我們如果用了兩個消息隊列的其中一種,後面的業務需求如果又要往另外一種消息隊列進行遷移,這無疑是一個災難,一大堆東西都要重新推到重做,因為它跟我們的系統耦合了,這時候SpringCloud Stream給我們提供了一種解耦合的方式。
stream憑什麼可以統一底層差異
在沒有綁定器這個概念的情況下,我們的SpringBoot應用要直接與消息中間件進行資訊交互的時候,由於各消息中間件構建的初衷不同,它們的實現細節上會有較大的差異性。
通過定義綁定器作為中間層,完美的實現了 應用程式與消息中間件細節之間的隔離。Stream對消息中間件的進一步封裝(通過嚮應用程式暴露統一的Channel通道,使得應用程式不需要再考慮各種不同的消息中間件實現),可以做到程式碼層面對中間件的無感知,甚至於動態的切換中間件(如RabbitMQ切換為Kafka),使得微服務開發的高度解耦,服務可以更多的關注自己的業務流程。
在消息綁定器中,INPUT對應於消費者,OUTPUT對應於生產者。
Stream中的消息通訊方式遵循了 發布-訂閱模式,用Topic(主題)進行廣播(RabbitMQ中對應於Exchange交換機,Kafka中就是Topic)。
1.3 SpringCloud Stream標準流程套路
Binder
很方便的連接中間件,屏蔽差異Channel
通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現了存儲和轉發的媒介,通過Channel對隊列進行配置Source
和Sink
簡單的可以理解為參照對象是SpringCloud Stream自身,從Stream發布消息就是輸出,接受消息就是輸入
1.4 SpringCloud Stream編碼API與常用註解
組成 | 說明 |
---|---|
Middleware | 中間件,目前只支援RabbitMQ和Kafka |
Binder | Binder是應用與消息中間件之間的封裝,目前實行了RabbitMQ和Kafka的Binder,通過Binder可以很方便的連接中間件,可以動態的改變消息類型(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置文件來實現 |
@Input | 註解標識輸入通道,通過該輸入通道接收到的消息進入應用程式 |
@Output | 註解標識輸出通道,發布的消息將通過該通道離開應用程式 |
@StreamListner | 監聽隊列,用於消費者的隊列的消息接收 |
@EnableBinding | 使信道Channel和交換機/主題(Exchange/Topic)綁定在一起 |
2. Spring Cloud Stream 案例
新建三個子模組分別對應於消息的生產者和消費者:
模組名 | 微服務功能 |
---|---|
cloud-stream-rabbitmq-provider8801 | 生產者,發送消息模組 |
cloud-stream-rabbitmq-consumer8802 | 消費者,接收消息模組 |
cloud-stream-rabbitmq-consumer8803 | 消費者,接收消息模組 |
2.1 消息驅動之消息生產者
新建Module:cloud-stream-rabbitmq-provider8801作為消息的生產者用來發送消息,在其POM文件中除引入web、actuator、eureka-client等必要啟動器外,還需要引入SpringCloud Stream對應實現RabbitMQ的啟動器依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
編寫其配置文件application.yml:
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要綁定的rabbitmq的服務資訊
defaultRabbit: # 表示定義的名稱,用于于binding整合
type: rabbit # 消息組件類型
environment: # 設置rabbitmq的相關的環境配置
spring:
rabbitmq:
host: mpolaris.top
port: 5672
username: admin
password: 1234321
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱,OUTPUT表示這是消息的發送方
# 表示要使用的Exchange名稱定義
destination: testExchange
# 設置消息類型,本次為json,文本則設置「text/plain」
content-type: application/json
# 設置要綁定的消息服務的具體設置
default-binder: defaultRabbit
eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: //eureka7001.com:7001/eureka
instance:
# 設置心跳的時間間隔(默認是30秒)
lease-renewal-interval-in-seconds: 2
# 如果現在超過了5秒的間隔(默認是90秒)
lease-expiration-duration-in-seconds: 5
# 在資訊列表時顯示主機名稱yml
instance-id: send-8801.com
# 訪問的路徑變為IP地址
prefer-ip-address: true
編寫其主啟動類
編寫業務類,在業務類中分別要編寫 發送消息介面 及其 實現類,並在發送介面消息的實現類中 添加 @EnableBinding
註解 用來綁定消息的推送管道,消息生產者綁定的消息推送管道為 org.springframework.cloud.stream.messaging.Source
:
public interface IMessageProvider {
public String send();
}
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @Author polaris
* @Date 2021/3/4 21:46
*/
@EnableBinding(Source.class) //定義消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; //消息發送管道
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build()); //發送消息
System.out.println("==> serial:" + serial);
return null;
}
}
注意我們在service的實現類中不再需要@Service
註解,因為這個service不再是傳統意義上的和Controller、DAO數據等進行交互的service,而是要綁定綁定器打交道的service。
然後編寫其業務層的Controller:
@RestController
public class SendMessageController {
@Autowired
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}
啟動服務註冊中心後和RabbitMQ後,啟動消息生產者微服務,我們在RabbitMQ的控制面板中可以看見多出了一個名為testExchange的交換機,這個交換機恰恰就是我們之前在配置文件中配置的交換機名字testExchange。
然後我們訪問 //localhost:8801/sendMessage 使用消息生產者微服務發送消息,在其微服務後台我們看到了列印的消息。
在RabbitMQ的控制面板中我們也看到了確實發送了消息。
2.2 消息驅動之消息消費者
新建Module:cloud-stream-rabbitmq-consumer8802/8803作為消息的生產者用來接收消息,其POM文件中引入的啟動器依賴和消息生產者微服務的依賴幾乎相同,然後編寫其配置文件application.yml,其配置文件的書寫和消息生產者的幾乎一致,特別需要注意的是,消息生產者微服務用到的通道為OUTPUT,而消息消費者微服務用到的通道為INPUT,其他的配置文件資訊就只需要注意埠號、註冊服務名的區別即可:
spring:
cloud:
bindings:
input: # 這個名字是一個通道的名稱,INPUT表示消息消費者
編寫主啟動類
編寫消息消費者的業務類,由於是消費者,所以只需要編寫其Controller即可,在其Controller上同樣需要添加 @EnableBinding
註解用來綁定消息的推送管道,消息消費者綁定的消息推送管道為import org.springframework.cloud.stream.messaging.Sink
,在接收消息的方法中需要使用 @StreamListner
註解來監聽其綁定的消息推送管道:
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消費者" + serverPort + "號,收到消息:"
+ message.getPayload());
}
}
然後啟動消息發送消費者服務,用生產者發送消息,我們可以發現在消費者端可以成功接收到消息。
3. 分組消費和持久化
3.1 重複消費問題
當生產者發送消息後,此時的我們的消費者都接受了消息並進行了消費,也就是說同一條消息被多個消息消費者所消費。
上述的問題就是消息的 重複消費 問題,那麼這個問題為什麼如此重要呢?其實重複消費這個問題本身不可怕,可怕的是沒考慮到重複消費之後,怎麼保證冪等性。(冪等性 通俗的說,就一個數據,或者一個請求,重複很多次,需要確保對應的數據是不會改變的,不能出錯)。分散式微服務應用為了實現高可用和負載均衡,實際上同一功能的服務都會部署多個具體的服務實例。舉個例子,假設有一個系統,有一條消息要求往資料庫里插入一條數據,要是這個消息重複消費兩次,結果就是向資料庫里插入了兩條數據,這樣數據就錯了,就違背了冪等性原則,但是要是該消息消費到第二次的時候,可以判斷一下已經消費過了,然後直接將該消息丟棄,這就實現了只插入一條數據,一條消息重複出現了兩次,但是只有第一次真正被消費了,資料庫里也就只插入了一條數據,這就保證了系統的冪等性。
上面簡單的介紹了消息的重複消費問題,那如何解決這種重複消費問題呢,那就需要我們進行 分組和持久化屬性組 操作,利用SpringCloud Stream中的消息分組來解決這個問題,需要注意的是在Stream中處於同一組中的多個消息消費者是競爭關係,也就是保證生產者所發送的同一個消息只會被其中一個消費者消費一次。 不同組的消費者是可以對消息進行全面消費(重複消費)的,只有同一組內才會發生競爭關係。
在RabbitMQ中,默認分組group是不同的,組流水號不一樣,被認為不同組,我們查看testExchange交換機,可以發現8802和8803兩個消息消費者處於不同的組,所以8801消息生產者發送的消息可以被這兩個消費者重複消費:
3.2 分組解決重複消費問題
上面在RabbitMQ控制面板中我們看到的組流水號是系統隨機分配的,這樣無疑不好控制,所以我們應該自定義配置分組,將8802/8803兩個消息消費者微服務分為同一個組,以此來解決消息的重複消費問題。
先來演示如何自定義分組
在8802/8803微服務中的配置文件中分別添加組名屬性:
spring:
cloud:
stream:
bindings:
input:
group: A/B # 分組名稱
這裡我們將8802設置為A組,8803設置為B組,然後我們將消息消費方的兩個微服務重啟,我們再次查看其組流水號,發現不再是長長的隨機組流水號,而變成了我們自定義的分組:
此時由於8802/8803位於兩個不同分組下,所以沒有競爭關係,消息生產者發送消息後,仍然可以重複消費。
下面我們將這兩個消息消費方微服務分到相同的消費組中,這樣每次就只有一個消費者,消息生產者發送的消息只能被8802或8803其中一個接受到,這樣就避免了重複消費,將8802和8803的分組名都改為A,再次重啟兩個消息消費方微服務,此時我們可以看到在分組A下已經有了兩個消費者。
再用生產者發送5條消息,我們發現8802/8803分別消費了3條和2條不同的消息,而沒有出現重複消費的問題。
3.3 持久化
通過上述,解決了重複消費問題,再來看看持久化
加上了group就自動支援持久化了
下面來演示一下持久化
-
停止8802/8803並去除掉8802分組group:A(8803的分組group A沒有去掉)
-
8801發送4條消息到rabbitmq
-
先啟動8802(無分組屬性配置),後台沒有打出來消息(消息丟失故障)
-
再啟動8803(有分組屬性配置),後台打出了4條消息(消費持久化消息)