RocketMQ的可靠性傳輸

整體

分析:

需確保一發一存一消費這些過程均無消息丟失

利用ACK機制保證每個階段需要執行的操作成功後,再往下一個階段推動(放行)

消息處理過程:

由上圖分析可知:

消息丟失,可能發生在三個階段,生產階段、存儲階段、消費階段

如下,為每個階段保證消息不丟失:

消息生產階段

利用MQ的ack確認機制,在try-catch中處理好Broker的返回值,如果返回失敗,則進行重試,若重試次數過多,則進行報警日誌列印,排查解決問題

消息存儲階段

刷盤存儲的消息進行多副本備份處理,從高可用角度取設計中間件,搭建集群;同時,中間件也會進行備份,至少兩個節點以上備份成功之後才會給生產者返回ack確認消息

消息消費階段

消費者從消費隊列中拉去消息後,不是立馬給Broker返回ack確認消息,而是等待業務程式碼順利執行完成之後,再給Broker返回ack確認消息

實現:

Producer——>Broker

  • 發送方式

    • 同步發送

      • Producer向broker發送消息,會阻塞當前執行緒等待broker響應結果
      public class SyncProducer {
      	public static void main(String[] args) throws Exception {
          	// 實例化消息生產者Producer
              DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          	// 設置NameServer的地址
      	    	producer.setNamesrvAddr("localhost:9876");
          	// 啟動Producer實例
              producer.start();
          	for (int i = 0; i < 100; i++) {
          	    // 創建消息,並指定Topic,Tag和消息體
          	    Message msg = new Message("TopicTest" /* Topic */,
              	"TagA" /* Tag */,
              	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
              	);
              	// 發送消息到一個Broker
                  SendResult sendResult = producer.send(msg);
                  // 通過sendResult返回消息是否成功送達
                  System.out.printf("%s%n", sendResult);
          	}
          	// 如果不再發送消息,關閉Producer實例。
          	producer.shutdown();
          }
      }
      
    • 非同步發送

      • Producer首先構建一個向broker發送消息的任務,把該任務提交給執行緒池,等執行完該任務時,回調用戶自定義的回調函數,執行處理結果
      public class AsyncProducer {
      	public static void main(String[] args) throws Exception {
          	// 實例化消息生產者Producer
              DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          	// 設置NameServer的地址
              producer.setNamesrvAddr("localhost:9876");
          	// 啟動Producer實例
              producer.start();
              producer.setRetryTimesWhenSendAsyncFailed(0);
      	
      	int messageCount = 100;
              // 根據消息數量實例化倒計時計算器
      	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
          	for (int i = 0; i < messageCount; i++) {
                      final int index = i;
                  	// 創建消息,並指定Topic,Tag和消息體
                      Message msg = new Message("TopicTest",
                          "TagA",
                          "OrderID188",
                          "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                      // SendCallback接收非同步返回結果的回調
                      producer.send(msg, new SendCallback() {
                          @Override
                          public void onSuccess(SendResult sendResult) {
                              countDownLatch.countDown();
                              System.out.printf("%-10d OK %s %n", index,
                                  sendResult.getMsgId());
                          }
                          @Override
                          public void onException(Throwable e) {
                              countDownLatch.countDown();
            	                System.out.printf("%-10d Exception %s %n", index, e);
            	                e.printStackTrace();
                          }
                  	});
          	}
      	// 等待5s
      	countDownLatch.await(5, TimeUnit.SECONDS);
          	// 如果不再發送消息,關閉Producer實例。
          	producer.shutdown();
          }
      }
      
    • Oneway

      • Oneway方式只負責發送請求,不等待應答,Producer只負責把請求發出去,不會處理響應結果
      public class OnewayProducer {
      	public static void main(String[] args) throws Exception{
          	// 實例化消息生產者Producer
              DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
          	// 設置NameServer的地址
              producer.setNamesrvAddr("localhost:9876");
          	// 啟動Producer實例
              producer.start();
          	for (int i = 0; i < 100; i++) {
              	// 創建消息,並指定Topic,Tag和消息體
              	Message msg = new Message("TopicTest" /* Topic */,
                      "TagA" /* Tag */,
                      ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
              	);
              	// 發送單向消息,沒有任何返回結果
              	producer.sendOneway(msg);
      
          	}
          	// 如果不再發送消息,關閉Producer實例。
          	producer.shutdown();
          }
      }
      
  • 推薦

    同步發送:

    • 同步發送會返回四個狀態碼
      • SEND_OK:消息發送成功
      • FLUSH_DISK_TIMEOUT:消息發送成功但是消息刷盤超時
      • FLUSH_SLAVE_TIMEOUT:消息發送成功但是消息同步到 slave 節點時超時
      • SLAVE_NOT_AVAILABLE:消息發送成功但是 broker 的 slave 節點不可用
    • 處理
      • 根據返回的狀態碼,進行消息重試,默認設置為3次,可以通過設置調整

        producer.setRetryTimesWhenSendFailed(重試次數);

    非同步發送:

    • 在onException()方法中處理,如果發送失敗,則在這裡執行重試

    額外問題:

    • 如果Broker收到消息後,就因為某些原因宕機了,就算Producer再怎麼重試都是無法解決消息丟失的問題,該如何處理?

    👉 利用多主模式,掛了一個,就換一個master繼續消息發送

總結:

保證Producer——>Broker消息不丟失的方案

Broker存儲及備份

  • 刷盤

    • 同步刷盤

      • 消息寫入記憶體後,立刻調用刷盤執行緒進行刷盤
      • 如果消息在約定的時間內未刷盤成功(默認5s),則返回FLUSH_DISK_TIMEOUT,Producer收到後進行重試
    • 非同步刷盤(默認

      • 消息寫入CommitLog時,不會直接寫入磁碟,而是先寫到PageCache快取後返回成功
      • 啟用後台執行緒非同步將消息刷入磁碟
  • 高可用
    • 多主
      • 多個Master節點,防止單主宕機,丟失消息問題
    • 主從+雙寫
      • 主從的情況下(寫入master成功後立即ACK給Producer),會發生,master——>slave時,主節點Broker宕機,同步失敗,從而導致消息丟失
      • 開啟雙寫,只有等master和slave都寫入成功,即雙寫成功後才會ACK給Producer,否則,會觸發Producer的重試機制

總結

保證Broker存儲及備份階段,消息不丟失

Broker——>Consumer

  • 消息確認

    • 消費者從Broker中拉去消息後,不是立馬給Broker返回ack確認消息,而是等待業務程式碼順利執行完成之後,再給Broker返回ack確認消息
  • 消息重試

    • 消息消費失敗後,需提供重試消息的能力,RocketMQ本身提供了重新消費的能力

    總結

    保證Broker——>Consumer階段,消息不丟失

最終方案:

Tags: