Spring Cloud 系列之 Stream 消息驅動(二)
本篇文章為系列文章,未讀第一集的同學請猛戳這裡:Spring Cloud 系列之 Stream 消息驅動(一)
本篇文章講解 Stream 如何實現消息分組和消息分區。
消息分組
如果有多個消息消費者,那麼消息生產者發送的消息會被多個消費者都接收到,這種情況在某些實際場景下是有很大問題的,比如在如下場景中,訂單系統做集群部署,都會從 RabbitMQ 中獲取訂單資訊,如果一個訂單消息同時被兩個服務消費,系統肯定會出現問題。為了避免這種情況,Stream 提供了消息分組來解決該問題。
在 Stream 中處於同一個 group
中的多個消費者是競爭關係,能夠保證消息只會被其中一個應用消費。不同的組是可以消費的,同一個組會發生競爭關係,只有其中一個可以消費。通過 spring.cloud.stream.bindings.<bindingName>.group
屬性指定組名。
問題演示
在 stream-demo
項目下創建 stream-consumer02
子項目。
項目程式碼使用入門案例中消息消費者的程式碼。
單元測試程式碼如下:
package com.example;
import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSend() {
messageProducer.send("hello spring cloud stream");
}
}
測試
運行單元測試發送消息,兩個消息消費者控制台列印結果如下:
stream-consumer 的控制台:
message = hello spring cloud stream
stream-consumer02 的控制台:
message = hello spring cloud stream
通過結果可以看到消息被兩個消費者同時消費了,原因是因為它們屬於不同的分組,默認情況下分組名稱是隨機生成的,通過 RabbitMQ 也可以得知:
配置分組
stream-consumer 的分組配置為:group-A
。
server:
port: 8002 # 埠
spring:
application:
name: stream-consumer # 應用名稱
rabbitmq:
host: 192.168.10.101 # 伺服器 IP
port: 5672 # 伺服器埠
username: guest # 用戶名
password: guest # 密碼
virtual-host: / # 虛擬主機地址
cloud:
stream:
bindings:
# 消息接收通道
# 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
input:
destination: stream.message # 綁定的交換機名稱
group: group-A
stream-consumer02 的分組配置為:group-A
。
server:
port: 8003 # 埠
spring:
application:
name: stream-consumer # 應用名稱
rabbitmq:
host: 192.168.10.101 # 伺服器 IP
port: 5672 # 伺服器埠
username: guest # 用戶名
password: guest # 密碼
virtual-host: / # 虛擬主機地址
cloud:
stream:
bindings:
# 消息接收通道
# 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
input:
destination: stream.message # 綁定的交換機名稱
group: group-A
測試
運行單元測試發送消息,此時多個消息消費者只有其中一個可以消費。RabbitMQ 結果如下:
消息分區
通過消息分組可以解決消息被重複消費的問題,但在某些場景下分組還不能滿足我們的需求。比如,同時有多條同一個用戶的數據發送過來,我們需要根據用戶統計,但是消息被分散到了不同的集群節點上了,這時我們就可以考慮使用消息分區了。
當生產者將消息發送給多個消費者時,保證同一消息始終由同一個消費者實例接收和處理。消息分區是對消息分組的一種補充。
問題演示
先給大家演示一下消息未分區的效果,單元測試程式碼如下:
package com.example;
import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {
@Autowired
private MessageProducer messageProducer;
@Test
public void testSend() {
for (int i = 1; i <= 10; i++) {
messageProducer.send("hello spring cloud stream");
}
}
}
測試
運行單元測試發送消息,兩個消息消費者控制台列印結果如下:
stream-consumer 的控制台:
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
stream-consumer02 的控制台:
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
假設這 10 條消息都來自同一個用戶,正確的方式應該都由一個消費者消費所有消息,否則系統肯定會出現問題。為了避免這種情況,Stream 提供了消息分區來解決該問題。
配置分區
消息生產者配置分區鍵的表達式規則和消息分區的數量。
server:
port: 8001 # 埠
spring:
application:
name: stream-producer # 應用名稱
rabbitmq:
host: 192.168.10.101 # 伺服器 IP
port: 5672 # 伺服器埠
username: guest # 用戶名
password: guest # 密碼
virtual-host: / # 虛擬主機地址
cloud:
stream:
bindings:
# 消息發送通道
# 與 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 註解的 value 相同
output:
destination: stream.message # 綁定的交換機名稱
producer:
partition-key-expression: payload # 配置分區鍵的表達式規則
partition-count: 2 # 配置消息分區的數量
通過 partition-key-expression
參數指定分區鍵的表達式規則,用於區分每個消息被發送至對應分區的輸出 channel
。
該表達式作用於傳遞給 MessageChannel
的 send
方法的參數,該參數實現 org.springframework.messaging.Message
介面的 GenericMessage
類。
源碼 MessageChannel.java
package org.springframework.messaging;
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1L;
default boolean send(Message<?> message) {
return this.send(message, -1L);
}
boolean send(Message<?> var1, long var2);
}
源碼 GenericMessage.java
package org.springframework.messaging.support;
import java.io.Serializable;
import java.util.Map;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = 4268801052358035098L;
private final T payload;
private final MessageHeaders headers;
...
}
如果 partition-key-expression
的值是 payload
,將會使用所有放在 GenericMessage
中的數據作為分區數據。payload
是消息的實體類型,可以為自定義類型比如 User
,Role
等等。
如果 partition-key-expression
的值是 headers["xxx"]
,將由 MessageBuilder
類的 setHeader()
方法完成賦值,比如:
package com.example.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 消息生產者
*/
@Component
@EnableBinding(Source.class)
public class MessageProducer {
@Autowired
private Source source;
/**
* 發送消息
*
* @param message
*/
public void send(String message) {
source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build());
}
}
消息消費者配置消費者總數和當前消費者的索引並開啟分區支援。
stream-consumer 的 application.yml
server:
port: 8002 # 埠
spring:
application:
name: stream-consumer # 應用名稱
rabbitmq:
host: 192.168.10.101 # 伺服器 IP
port: 5672 # 伺服器埠
username: guest # 用戶名
password: guest # 密碼
virtual-host: / # 虛擬主機地址
cloud:
stream:
instance-count: 2 # 消費者總數
instance-index: 0 # 當前消費者的索引
bindings:
# 消息接收通道
# 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
input:
destination: stream.message # 綁定的交換機名稱
group: group-A
consumer:
partitioned: true # 開啟分區支援
stream-consumer02 的 application.yml
server:
port: 8003 # 埠
spring:
application:
name: stream-consumer # 應用名稱
rabbitmq:
host: 192.168.10.101 # 伺服器 IP
port: 5672 # 伺服器埠
username: guest # 用戶名
password: guest # 密碼
virtual-host: / # 虛擬主機地址
cloud:
stream:
instance-count: 2 # 消費者總數
instance-index: 1 # 當前消費者的索引
bindings:
# 消息接收通道
# 與 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 註解的 value 相同
input:
destination: stream.message # 綁定的交換機名稱
group: group-A
consumer:
partitioned: true # 開啟分區支援
測試
運行單元測試發送消息,此時多個消息消費者只有其中一個可以消費所有消息。RabbitMQ 結果如下:
至此 Stream 消息驅動所有的知識點就講解結束了。
本文採用 知識共享「署名-非商業性使用-禁止演繹 4.0 國際」許可協議
。
大家可以通過 分類
查看更多關於 Spring Cloud
的文章。
🤗 您的點贊
和轉發
是對我最大的支援。
📢 掃碼關注 哈嘍沃德先生
「文檔 + 影片」每篇文章都配有專門影片講解,學習更輕鬆噢 ~