RabbitMQ高級之消息限流與延時隊列
- 2020 年 9 月 3 日
- 筆記
- JAVA, RabbitMQ, springboot
人生終將是場單人旅途,孤獨之前是迷茫,孤獨過後是成長。
楔子
本篇是消息隊列RabbitMQ
的第五彈。
上篇本來打算講述RabbitMQ
的一些高級用法:
- 如何保證消息的可靠性?
- 消息隊列如何進行限流?
- 如何設置延時隊列進行延時消費?
最終因為篇幅緣故,上篇只講了如何保證消息的可靠性?
,本篇將會把剩下兩個講完,本篇也可能是RabbitMQ
系列的最後一篇了~
我所講的知識點在工作中基本上也夠用了,希望大家好好消化。
舊坑填上之後可能會慢慢開新坑了,同時因為現在到九月中旬這段時間我有一場考試需要籌備,所以文章更新可能會比較慢,但是一周一更算是最低限度把,希望大家多多擔待。
1. 🔍消息隊列如何限流?
消息隊列限流是指在伺服器面臨巨額流量時,為了進行自保,進行的一種救急措施。
因為巨大的流量代表著非常多的消息,這些消息如果多到伺服器處理不過來就會造成伺服器癱瘓,影響用戶體驗,造成不良影響。
所以要進行一次降級操作,把處理不了的流量隔絕在系統之外,避免它們打垮系統。
基本上任何一個消息隊列都有限流的功能,今天我們就來看看在RabbitMQ
之中進行限流具體應該怎麼做?
RabbitMQ提供了一種QOS
(服務品質保證)功能,即在非自動確認消息的前提下,如果一定數目的消息還未被消費確認,則不進行新消息的消費。
spring:
rabbitmq:
addresses: 127.0.0.1
host: 5672
username: guest
password: guest
virtual-host: /
# 手動確認消息
listener:
simple:
acknowledge-mode: manual
prefetch: 2
我們只需要配置一下rabbitmq.listener.simple
下的prefetch
屬性即可,為了演示方便我這裡配置為兩條,語義即為:如果隊列中有兩條以上未簽收的消息,則不進行新的消息消費。
我往我的隊列中發送三條資訊,並不進行簽收,來看看效果:
發送完顯示我們系統中有三條Ready消息,這代表這三條消息還在隊列中沒有消費端去消費。
這時我打開消費端進行消費但依舊不進行簽收,接著來看效果:
unacked=2,ready=1,這就代表有兩條消息在服務端消費了但是沒有簽收,還有一條消息還在隊列中沒有往服務端推送,因為我們設置了prefetch=2
,所以現在隊列的最大同時在消費的消息數量為2,通過此種方式,我們就完成了消費限流。
Tip : 這種方式下消息一定要進行手動簽收,因為之前的文章中我們講過,自動簽收是消息一達到消費端就進行簽收了,可能我們的業務邏輯還沒運行就已經進行簽收了,所以自動簽收狀態下開啟限流幾乎沒有作用。
2. 📑RabbitMQ控制台
上一節我的截圖中,大家可以發現居然出現了可視化的介面,以往在我的截圖中一般都是DOS命令操作台介面,但其實RabbitMQ
是自帶了可視化介面的插件的,我們只需要開啟即可。
在我們的RabbitMQ
中輸入如下命令:rabbitmq-plugins.bat enable rabbitmq_management
就可以開啟可視化頁面了,緊接著訪問://localhost:15672/
默認用戶名和密碼都是 guest,直接登錄即可。
很方便的控制台,大家可以自己試一下~
3. 📔TTL消息/隊列
TTL
是Time To Live的縮寫,也就是生存時間的意思,RabbitMQ
支援消息的過期時間,在消息發送時可以進行指定,也支援隊列的過期時間,從消息入隊列開始計算,只要超過了隊列的超時時間配置,那麼消息會自動的清除。
設置隊列的話就是整個隊列的消息到時都會過期,設置消息的話就是單條消息到時自動過期。
// TTL隊列示例
@Bean
public Queue ttlQueue() {
Map<String, Object> arguments = new HashMap<>();
// 設置3s過期
arguments.put("x-message-ttl",3000);
return new Queue("topicQueue1",false,false,false, arguments);
}
上面的程式碼就是演示如何創建一個TTL隊列,需要放入參數才行,隊列構造中的其他參數我為了方便直接填了false。
public void sendTtl() {
String message = "Hello 我是作者和耳朵,歡迎關注我。" + LocalDateTime.now().toString();
System.out.println("Message content : " + message);
// 設置過期3s
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setExpiration("3000").build();
rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));
System.out.println("消息發送完畢。");
}
設置消息的TTL也是設置參數即可。
以上就是RabbitMQ
中關於TTL的知識點。
4. 📌DLX死信隊列
DLX死信隊列
雖然叫隊列,但其實指的是Exchange
,或者說指的Exchange
和它所屬的Queue
,他倆一塊構成了死信隊列。
當一條消息:
- 消費被拒絕(basic.reject/basic.nack)並且requeue=false
- TTL過期
- 要進入的隊列達到最大長度
這三種情況,就可以判定一條消息死了,這種消息如果我們沒有做處理,它就會被自動刪除。
但其實我們可以在隊列上加上一個參數,使當隊列中發現了死亡的消息
之後會將它自動轉發到某個Exchange
,由指定的Exchange
來處理這些死亡的消息。
這個處理死亡消息的Exchange
和之前我們講述的Exchange
沒什麼區別,依然可以綁定隊列然後進行消息消費。
// DLX隊列示例
@Bean
public Queue dlxQueue() {
Map<String, Object> arguments = new HashMap<>();
// 指定消息死亡後發送到ExchangeName="dlx.exchange"的交換機去
arguments.put("x-dead-letter-exchange","dlx.exchange");
return new Queue("topicQueue1", false, false, false, arguments);
}
如上程式碼,就是設置了一個隊列中的消息死亡後的去處,就等於消息死亡後給它不把它刪掉而是做一次轉發,發到其他Exchange
去。
那這樣搞有什麼用呢?這就取決於業務需求了,不過下一節會用到它,接著往下看~
5. 💡延時隊列
RabbitMQ
的基因中沒有延時隊列這回事,它不能直接指定一個隊列類型為延時隊列,然後去延時處理,但是經過上面兩節的鋪墊,我們可以將TTL+DLX相結合,這就能組成一個延時隊列。
設想一個場景,下完訂單之後15分鐘未付款我們就要將訂單關閉,這就是一個很經典的演示消費的場景,如果拿RabbitMQ
來做,我們就需要結合TTL+DLX了。
先把訂單消息設置好15分鐘過期時間,然後過期後隊列將消息轉發給我們設置好的DLX-Exchange
,DLX-Exchange
再將分發給它綁定的隊列,我們的消費者再消費這個隊列中的消息,就做到了延時十五分鐘消費。
真是super~~~簡單呢
後記
收尾了收尾了,RabbitMQ
結束了,雖然有些東西沒有講比如:鏡像隊列,因為我沒用過而且一般輪不到自己來做這個,所以就懶了一下就不寫這個了,RabbitMQ
畢竟不是一個天生的分散式消息隊列,弄鏡像什麼的還是有點麻煩的。
陸陸續續似乎寫了快一個月呢,東西有點多也有些繁雜,要不下期寫一篇文章專門回顧一下,再畫個思維導圖什麼的,給大家梳理一下,再抽幾個小冊六折碼
。
最後再給優狐打個廣告,最近掘金在GitHub上面建立了一個開源計劃 – open-source,旨在收錄各種好玩的好用的開源庫,如果大家有想要自薦或者分享的開源庫都可以參與進去,為這個開源計劃做一份貢獻,同時這個開源庫的Start
也在穩步增長中,參與進去也可以增加自己項目的曝光度,一舉兩得。
同時這個開源庫還有一個兄弟項目 – open-source-translation,旨在招募技術文章翻譯志願者進行技術文章的翻譯工作,
爭做最棒開源翻譯,翻譯業界高品質文稿,為技術人的成長獻一份力。
最近這段時間事情挺多,優狐令我八月底之前升級到三級,所以各位讀者的贊對我很重要,希望大家能夠高抬貴手,幫我一哈~
好了,以上就是本期的全部內容,感謝你能看到這裡,歡迎對本文點贊收藏與評論,👍你們的每個點贊都是我創作的最大動力。
我是耳朵,一個一直想做知識輸出的偽文藝程式設計師,我們下期見。