RabbitMQ消息丟失問題和保證消息可靠性-消費端不丟消息和HA(二)

  • 2019 年 10 月 3 日
  • 筆記

繼續上篇文章解決RabbitMQ消息丟失問題和保證消息可靠性(一) 未完成部分,我們聊聊MQ Server端的高可用和消費端如何保證消息不丟的問題?

回歸上篇的內容,我們知道消息從生產端到服務端,為了保證消息不丟,我們必須做哪些事情?

  • 發送端採用Confirm模式,注意Server端沒成功通知發送端,需要重發操作需要額外處理
  • 消息的持久化處理

上面兩個操作保證消息到服務端不丟,但是非高可用狀態,如果節點掛掉,服務暫時不可用,需要重啟後,消息恢復,消息不會丟失,因為有磁碟存儲。

本文先從消費端講起:

RabbitMQ Server到消費者消息如何不丟?

上面一篇文章也提到了,消費者獲取到消息之後,沒有來得及處理完畢,自己直接宕機了,因為消息者默認採用自動ack,此時RabbitMQ的自動ack機制會通知MQ Server這條消息已經處理好了,此時消息就丟了,並不是預期的。

那麼我們採用手動ack機制來解決這個問題,消費端處理完邏輯之後再通知MQ Server,這樣消費者沒處理完消息不會發送ack,如果在消費者拿到消息,沒來得及處理的情況下自己掛了,此時MQ集群會自動感知到,它就會自覺的重發消息給其他的消費者服務實例。

根據上面的思路你需要完成下面的兩步操作:

第一:消費者監聽設置手動ack

  1. this.channel = channelManager.getListenerChannel(namespace);
  2. this.queue = queue;
  3. this.channel.basicConsume(queue, false, consumerTag, this);
  4. this.disconnectedCallback.setChannel(channel);

核心程式碼: this.channel.basicConsume(queue, false, consumerTag, this); 第二個參數設置 false 代表不自動ack

第二:業務執行完成後手動ack

  1. public static void ack(MessageContext context) {
  2. long deliveryTag = context.getEnvelope().getDeliveryTag();
  3. try {
  4. context.getChannel().basicAck(deliveryTag, false);
  5. } catch (IOException e) {
  6. throw new MqAckException("消息ack出錯:連接異常或遠端關閉", context, e);
  7. }
  8. }

核心程式碼: context.getChannel().basicAck(deliveryTag, false);

這裡封裝來,需要業務在執行完自己的業務程式碼後,調用對象channel 的ack方法通知MQServer,說我這邊執行完了,你可以刪除了。

注意這裡有個問題: 如果忘記調用這個 context.getChannel().basicAck(deliveryTag, false);

或者因為程式碼異常,這個程式碼沒被執行,會怎麼樣?後面找時間再寫一篇文章講這個問題。

RabbitMQ Server中存儲的消息高可用

當我們解決了,生產端和消費端的問題後,基本保證消息的不丟問題,但是還有一個是消息的高可用問題,單節點問題,普通節點的問題都會影響消息的臨時不可用,這個時候要用上我們的HA 鏡像集群模式來保證。

上一篇文章 解決RabbitMQ消息丟失問題和保證消息可靠性(一) 已經提到過,服務端消息部署的三種模式的區別,今天就專門講鏡像模式的介紹。

鏡像模式至少採用3節點,2個磁碟節點和1個記憶體節點來保證,架構圖:

設置鏡像也有一些策略:

  • 同步至所有的,一般不這麼做,性能會受到極大影響
  • 同步最多N個機器
  • 只同步至符合指定名稱的nodes

命令處理HA策略模版:rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

  1. 為每個以“rock.wechat”開頭的隊列設置所有節點的鏡像,並且設置為自動同步模式
  1. rabbitmqctl set_policy ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
  2. rabbitmqctl set_policy -p rock ha-all "^rock.wechat" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
  1. 為每個以“rock.wechat.”開頭的隊列設置兩個節點的鏡像,並且設置為自動同步模式
  1. rabbitmqctl set_policy -p rock ha-exacly "^rock.wechat"
  2. '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  1. 為每個以“node.”開頭的隊列分配指定的節點做鏡像
  1. rabbitmqctl set_policy ha-nodes "^nodes."
  2. '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'

但是:HA 鏡像隊列有一個很大的缺點就是: 系統的吞吐量會有所下降

所以採用鏡像模式,要根據具體的業務規則訂製話處理,沒那麼重要的業務,消息丟了也沒關係的場景,又要求必須高的性能的時候,鏡像也可以不用設置。

總結

兩篇文章的講解,分析了消息中間件高可用問題的大概的思路,沒有具體的程式碼詳細,如有疑問可以下方留言評論,我會及時回復解答,後面我會逐步完善相關細節,歡迎多多關注。

後面計劃更新文章如下:

  • 什麼情況會導致重複消費並怎麼解決?
  • 什麼樣的真實業務場景需要保障順序性和如何保證消息的順序性?
  • 如何通過消息隊列優雅的解決微服務間介面失敗的重試?

推薦閱讀

解決RabbitMQ消息丟失問題和保證消息可靠性(一)

IntelliJ IDEA提升效率開發插件必備

END

如有收穫,請幫忙轉發,後續會有更好文章貢獻,您的鼓勵是作者最大的動力!

歡迎關注我的公眾號:架構師的修鍊,獲得獨家整理的學習資源和日常乾貨推送。