spring-boot-route(十三)整合RabbitMQ
- 2020 年 10 月 12 日
- 筆記
- spring-boot-route
這篇是SpringBoot整合消息隊列的第一篇文章,我們詳細介紹下消息隊列的相關內容。
消息隊列簡介
1. 什麼是消息隊列
MQ
(Message Quene):通過典型的生產者和消費者模型,生產者不斷向消息隊列中產生消息,消費者不斷的從隊列中獲取消息。因為生產者和消費者都是非同步的,而且生產者只關心消息的發送,消費者只關心消息的接收,沒有業務邏輯的侵入,輕鬆實現業務解耦。
2. 消息隊列有什麼用
- 非同步處理
場景描述:某商場具有註冊功能,註冊的時候需要發送簡訊驗證碼。
傳統的做法是用戶提交資訊到用戶服務,用戶服務調用簡訊服務發送簡訊,然後給用戶返迴響應,這種是同步的處理方式,耗時較長。加入消息隊列後,用戶直接提交資訊到用戶服務,將資訊寫入消息隊列,直接給用戶返迴響應,簡訊服務從消息隊列中讀取消息進行發送簡訊。
- 應用解耦
場景描述:某商場下單流程。
傳統做法是用戶下單,訂單系統去查詢庫存系統,如果庫存系統宕機了,則下單失敗,損失訂單量。加入消息隊列後,用戶下單,訂單系統記錄訂單,將訂單資訊寫入消息隊列,下單成功,然後庫存系統恢復正常後去操作資料庫庫存(不考慮庫存為0的情況)。這樣訂單系統和庫存系統就達到松耦合的目的了
- 流量削峰
場景描述:秒殺活動。
流量過大肯定會導致響應超時或系統宕機,加入消息隊列,用戶秒殺請求寫入消息隊列,設置消息隊列的長度等屬性,達到消息隊列最大長度後,直接返回秒殺失敗,然後再去消費消息隊列的數據,完成秒殺。
RabbitMQ簡介
RabbitMQ是用Erlang語言編寫的,實現了高級消息隊列協議(AMQP)的消息中間件。
1. AMQP協議概念
AMQP
:AMQP
是一種鏈接協議,直接定義網路交換的數據格式,這使得實現了AMQP
的provider
本身就是跨平台的。以下是AMQP
協議模型:
- server – 又稱broker,接收客戶端的鏈接,實現amqp實體服務。
- Connection – 鏈接,應用程式跟broker的網路鏈接。
- channel – 網路信道,幾乎所有的操作都是在channel中進行,數據的流轉都要在channel上進行。channel是進行消息讀寫的通道。客戶端可以建立多個channel,每個channel代表一個會話任務。
- message – 消息,伺服器與應用程式之間傳送的數據。由properties和body組成。properties可以對消息進行修飾,比如消息的升級,延遲等高級特性。body就是消息體的內容。
- virtual host – 虛擬主機,用於進行邏輯隔離,最上層的消息路由,一個虛擬地址裡面可以有多個交換機。exchange和消息隊列message quene。
- exchange – 交換機,接收消息,根據路由器轉發消息到綁定的隊列。
- binding – 綁定,交換機和隊列之間的虛擬鏈接,綁定中可以包含routing key。
- routing key – 一個路由規則,虛擬機可以用它來確定jiekyi如何路由一個特定消息。
- quene – 消息隊列,保存消息並將它們轉發給消費者。
2. RabbitMQ的消息模型
1. 簡單模型
在上圖中:
- p:生成者
- C:消費者
- 紅色部分:quene,消息隊列
2. 工作模型
在上圖中:
- p:生成者
- C1、C2:消費者
- 紅色部分:quene,消息隊列
當消息處理比較耗時時,就會出現生產消息的速度遠遠大於消費消息的速度,這樣就會出現消息堆積,無法及時處理。這時就可以讓多個消費者綁定一個隊列,去消費消息,隊列中的消息一旦消費就會丟失,因此任務不會重複執行。
3. 廣播模型(fanout)
這種模型中生產者發送的消息所有消費者都可以消費。
在上圖中:
- p:生成者
- X:交換機
- C1、C2:消費者
- 紅色部分:quene,消息隊列
4. 路由模型(routing)
這種模型消費者發送的消息,不同類型的消息可以由不同的消費者去消費。
在上圖中:
- p:生成者
- X:交換機,接收到生產者的消息後將消息投遞給與routing key完全匹配的隊列
- C1、C2:消費者
- 紅色部分:quene,消息隊列
5. 訂閱模型(topic)
這種模型和direct模型一樣,都是可以根據routing key將消息路由到不同的隊列,只不過這種模型可以讓隊列綁定routing key 的時候使用通配符。這種類型的routing key都是由一個或多個單片語成,多個單詞之間用.
分割。
通配符介紹:
*
:只匹配一個單詞
#
:匹配一個或多個單詞
6. RPC模型
這種模式需要通知遠程電腦運行功能並等待返回運行結果。這個過程是阻塞的。
當客戶端啟動時,它創建一個匿名獨佔回調隊列。並提供名字為call的函數,這個call會發送RPC請求並且阻塞直到收到RPC運算的結果。
Spring Boot整合RabbitMQ
第一步:引入pom依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步:增加RabbitMQ服務配置資訊
spring:
rabbitmq:
virtual-host: javatrip
port: 5672
host: 127.0.0.1
username: guest
password: guest
這裡我們用廣播模型來舉例使用,廣播模型(fanout)比較好理解,就像公眾號一樣,我每天推文章後,會推送給每個關注用戶,他們都可以看到這條消息。
廣播模型注意點:
- 可以有多個隊列
- 每個隊列都需要綁定交換機
- 每個消費者有自己的隊列
- 交換機把消息發送給綁定過的所有隊列
1. 定義兩個隊列
@Configuration
public class RabbitConfig {
final static String queueNameA = "first-queue";
final static String queueNameB = "second-queue";
/***
* 定義一個隊列,設置隊列屬性
* @return
*/
@Bean("queueA")
public Queue queueA(){
Map<String,Object> map = new HashMap<>();
// 消息過期時長,10秒過期
map.put("x-message-ttl",10000);
// 隊列中最大消息條數,10條
map.put("x-max-length",10);
// 第一個參數,隊列名稱
// 第二個參數,durable:持久化
// 第三個參數,exclusive:排外的,
// 第四個參數,autoDelete:自動刪除
Queue queue = new Queue(queueNameA,true,false,false,map);
return queue;
}
@Bean("queueB")
public Queue queueB(){
Map<String,Object> map = new HashMap<>();
// 消息過期時長,10秒過期
map.put("x-message-ttl",10000);
// 隊列中最大消息條數,10條
map.put("x-max-length",10);
// 第一個參數,隊列名稱
// 第二個參數,durable:持久化
// 第三個參數,exclusive:排外的,
// 第四個參數,autoDelete:自動刪除
Queue queue = new Queue(queueNameB,true,false,false,map);
return queue;
}
}
2. 定義扇形交換機
@Bean
public FanoutExchange fanoutExchange(){
// 第一個參數,交換機名稱
// 第二個參數,durable,是否持久化
// 第三個參數,autoDelete,是否自動刪除
FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
return fanoutExchange;
}
3. 交換機和隊列綁定
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
return binding;
}
@Bean
public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
return binding;
}
4. 創建兩個消費者分別監聽兩個隊列
@RabbitListener(queues = RabbitConfig.queueNameA)
@Component
@Slf4j
public class ConsumerA {
@RabbitHandler
public void receive(String message){
log.info("消費者A接收到的消息:"+message);
}
}
@RabbitListener(queues = RabbitConfig.queueNameB)
@Component
@Slf4j
public class ConsumerB {
@RabbitHandler
public void receive(String message){
log.info("消費者B接收到的消息:"+message);
}
}
5. 創建生產者生產消息
@RestController
public class provider {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendMessage(){
String message = "你好,我是Java旅途";
rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
}
}
這樣生產者發送一條消息後,兩個消費者就能同時消費到消息了。
此是spring-boot-route系列的第十三篇文章,這個系列的文章都比較簡單,主要目的就是為了幫助初次接觸Spring Boot 的同學有一個系統的認識。本文已收錄至我的github,歡迎各位小夥伴star
!
github://github.com/binzh303/spring-boot-route
點關注、不迷路
如果覺得文章不錯,歡迎關注、點贊、收藏,你們的支援是我創作的動力,感謝大家。
如果文章寫的有問題,請不要吝嗇,歡迎留言指出,我會及時核查修改。
如果你還想更加深入的了解我,可以微信搜索「Java旅途」進行關注。回復「1024」即可獲得學習影片及精美電子書。每天7:30準時推送技術文章,讓你的上班路不在孤獨,而且每月還有送書活動,助你提升硬實力!