RocketMQ事務消息學習及刨坑過程

  • 2019 年 10 月 19 日
  • 筆記

一、背景

MQ組件是系統架構里必不可少的一門利器,設計層面可以降低系統耦合度,高並發場景又可以起到削峰填谷的作用,從單體應用到集群部署方案,再到現在的微服務架構,MQ憑藉其優秀的性能和高可靠性,得到了廣泛的認可。
隨着數據量增多,系統壓力變大,開始出現這種現象:數據庫已經更新了,但消息沒發出來,或者消息先發了,但後來數據庫更新失敗了,結果研發童鞋各種數據修復,這種生產問題出現的概率不大,但讓人很鬱悶。這個其實就是數據庫事務與MQ消息的一致性問題,簡單來講,數據庫的事務跟普通MQ消息發送無法直接綁定與數據庫事務綁定在一起,例如上面提及的兩種問題場景:

  1. 數據庫事務提交後發送MQ消息;
  2. MQ消息先發,然後再提交數據庫事務。

場景1的問題是數據庫事務可能剛剛提交,服務器就宕機了,MQ消息沒發出去,場景2的問題就是MQ消息發送出去了,但數據庫事務提交失敗,又沒辦法追加已經發出去的MQ消息,結果導致數據沒更新,下游已經收到消息,最終事務出現不一致的情況。

二、事務消息的引出

我們以微服務架構的購物場景為例,參照一下RocketMQ官方的例子,用戶A發起訂單,支付100塊錢操作完成後,能得到100積分,賬戶服務和會員服務是兩個獨立的微服務模塊,有各自的數據庫,按照上文提及的問題可能性,將會出現這些情況:

  • 如果先扣款,再發消息,可能錢剛扣完,宕機了,消息沒發出去,結果積分沒增加。
  • 如果先發消息,再扣款,可能積分增加了,但錢沒扣掉,白送了100積分給人家。
  • 錢正常扣了,消息也發送成功了,但會員服務實例消費消息出現問題,結果積分沒增加。
    購物場景MQ通信案例

由此引出的是數據庫事務與MQ消息的事務一致性問題,rocketmq事務消息解決的問題:解決本地事務執行與消息發送的原子性問題。這裡界限一定要明白,是確保MQ生產端正確無誤地將消息發送出來,沒有多發,也不會漏發。但至於發送後消費端有沒有正常的消費掉(如上面提及的第三種情況,錢正常扣了,消息也發了,但下游消費出問題導致積分不對),這種異常場景將由MQ消息消費失敗重試機制來保證,不在此次的討論範圍內。

常用的MQ組件針對此場景都有自己的實現方案,如ActiveMQ使用AMQP協議(二階提交方式)保證消息正確發送,這裡我們以RocketMQ為重點進行學習。

三、RocketMQ事務消息設計思路

根據CAP理論,RocketMQ事務消息通過異步確保方式,保證事務的最終一致性。設計流程上借鑒兩階段提交理論,流程圖如下:
RocetMQ事務消息設計圖

  1. 應用模塊遇到要發送事務消息的場景時,先發送prepare消息給MQ。
  2. prepare消息發送成功後,應用模塊執行數據庫事務(本地事務)。
  3. 根據數據庫事務執行的結果,再返回Commit或Rollback給MQ。
  4. 如果是Commit,MQ把消息下發給Consumer端,如果是Rollback,直接刪掉prepare消息。
  5. 第3步的執行結果如果沒響應,或是超時的,啟動定時任務回查事務狀態(最多重試15次,超過了默認丟棄此消息),處理結果同第4步。
  6. MQ消費的成功機制由MQ自己保證。

四、RocketMQ事務消息實現流程

以RocketMQ 4.5.2版本為例,事務消息有專門的一個隊列RMQ_SYS_TRANS_HALF_TOPIC,所有的prepare消息都先往這裡放,當消息收到Commit請求後,就把消息再塞到真實的Topic隊列里,供Consumer消費,同時向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一條消息。簡易流程圖如下:
RocketMQ事務消息實現流程

上述流程中,請允許我這樣劃分模塊職責:

  1. RocketMQ Client即我們工程中導入的依賴jar包,RocketMQ Broker端即部署的服務端,NameServer暫未體現。
  2. 應用模塊成對出現,上游為事務消息生產端,下游為事務消息消費端(事務消息對消費端是透明的,與普通消息一致)。

應用模塊的事務因為中斷,或是其他的網絡原因,導致無法立即響應的,RocketMQ當做UNKNOW處理,RocketMQ事務消息還提供了一個補救方案:定時查詢事務消息的數據庫事務狀態
簡易流程圖如下:
RocketMQ定時任務回查事務狀態實現流程

五、源碼剖析

講解的思路基本上按照如下流程圖,根據模塊職責和流程逐一分析。

  1. 環境準備
    閱讀源碼前需要在IDE上獲取和調試RocketMQ的源碼,這部分請自行查閱方法。

  2. 應用模塊(事務消息生產端)核心源碼
    創建一個監聽類,實現TransactionListener接口,在實現的數據庫事務提交方法和回查事務狀態方法模擬結果。
/**   * @program: rocket   * @description: 調試事務消息示例代碼   * @author: Huang   * @create: 2019-10-16   **/  public class SelfTransactionListener implements TransactionListener {     private AtomicInteger transactionIndex = new AtomicInteger(0);     private AtomicInteger checkTimes = new AtomicInteger(0);       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();     /**      * 執行本地事務      *      * @param message      * @param o      * @return      */     @Override     public LocalTransactionState executeLocalTransaction(Message message, Object o) {        String msgKey = message.getKeys();        System.out.println("start execute local transaction " + msgKey);        LocalTransactionState state;        if (msgKey.contains("1")) {           // 第一條消息讓他通過           state = LocalTransactionState.COMMIT_MESSAGE;        } else if (msgKey.contains("2")) {           // 第二條消息模擬異常,明確回復回滾操作           state = LocalTransactionState.ROLLBACK_MESSAGE;        } else {           // 第三條消息無響應,讓它調用回查事務方法           state = LocalTransactionState.UNKNOW;           // 給剩下3條消息,放1,2,3三種狀態           localTrans.put(msgKey, transactionIndex.incrementAndGet());        }        System.out.println("executeLocalTransaction:" + message.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis());        return state;     }       /**      * 回查本地事務結果      *      * @param messageExt      * @return      */     @Override     public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {        String msgKey = messageExt.getKeys();        System.out.println("start check local transaction " + msgKey);        Integer state = localTrans.get(msgKey);        switch (state) {           case 1:              System.out.println("check result unknown 回查次數" + checkTimes.incrementAndGet());              return LocalTransactionState.UNKNOW;           case 2:              System.out.println("check result commit message, 回查次數" + checkTimes.incrementAndGet());              return LocalTransactionState.COMMIT_MESSAGE;           case 3:              System.out.println("check result rollback message, 回查次數" + checkTimes.incrementAndGet());              return LocalTransactionState.ROLLBACK_MESSAGE;             default:              return LocalTransactionState.COMMIT_MESSAGE;        }     }  }

事務消息生產者代碼示例,共發送5條消息,基本上包含全部的場景,休眠時間設置足夠的時間,保證回查事務時實例還在運行中,代碼如下:

/**   * @program: rocket   * @description: Rocketmq事務消息   * @author: Huang   * @create: 2019-10-16   **/  public class TransactionProducer {       public static void main(String[] args) {        try {           TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");           producer.setNamesrvAddr("10.0.133.29:9876");           producer.setTransactionListener(new SelfTransactionListener());           producer.start();           for (int i = 1; i < 6; i++) {              Message message = new Message("TransactionTopic", "transactionTest","msg-" + i, ("Hello" + ":" +  i).getBytes());              try {                 SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" +  i);                 System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());              } catch (Exception e) {                 e.printStackTrace();              }           }           Thread.sleep(Integer.MAX_VALUE);           producer.shutdown();        } catch (MQClientException e) {           e.printStackTrace();        } catch (InterruptedException e) {           e.printStackTrace();        }     }  }
  1. RocketMQ Client端代碼,代碼主要邏輯可以分成三段:第一段為設置消息為prepare消息,並發送給RocketMQ服務端
SendResult sendResult = null;  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());  try {      sendResult = this.send(msg);  } catch (Exception e) {      throw new MQClientException("send message Exception", e);  }

第二段:消息發送成功後,調用應用模塊數據庫事務方法,獲取事務結果(為節省篇幅,代碼有刪節)

switch (sendResult.getSendStatus()) {      case SEND_OK: {          try {              if (null != localTransactionExecuter) {                  localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);              } else if (transactionListener != null) {                  log.debug("Used new transaction API");                  localTransactionState = transactionListener.executeLocalTransaction(msg, arg);              }              if (null == localTransactionState) {                  localTransactionState = LocalTransactionState.UNKNOW;              }          } catch (Throwable e) {              log.info("executeLocalTransactionBranch exception", e);              log.info(msg.toString());              localException = e;          }      }      break;      case FLUSH_DISK_TIMEOUT:      case FLUSH_SLAVE_TIMEOUT:      case SLAVE_NOT_AVAILABLE:          localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;          break;      default:          break;  }

第三段:發送事務結果到RocketMQ端,結束事務,並響應結果給應用模塊

try {      this.endTransaction(sendResult, localTransactionState, localException);  } catch (Exception e) {      log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);  }
  1. RocketMQ Broker端事務提交/回滾操作(這裡取endTransaction部分)
    代碼入口:org.apache.rocketmq.broker.processor.EndTransactionProcessor
OperationResult result = new OperationResult();  if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {      result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);      if (result.getResponseCode() == ResponseCode.SUCCESS) {          RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);          if (res.getCode() == ResponseCode.SUCCESS) {              // 修改消息的Topic為由RMQ_SYS_TRANS_HALF_TOPIC改為真實Topic              MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());              msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));              msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());              msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());              msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());              // 將消息存儲到真實Topic中,供Consumer消費              RemotingCommand sendResult = sendFinalMessage(msgInner);              if (sendResult.getCode() == ResponseCode.SUCCESS) {                  // 將消息存儲到RMQ_SYS_TRANS_OP_HALF_TOPIC,標記為刪除狀態,事務消息回查的定時任務中會做處理                  this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());              }              return sendResult;          }          return res;      }  } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {      result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);      if (result.getResponseCode() == ResponseCode.SUCCESS) {          RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);          if (res.getCode() == ResponseCode.SUCCESS) {              this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());          }          return res;      }  }
  1. RocketMQ Broker端定時任務回查數據庫事務部分
    方法入口:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService
@Override  protected void onWaitEnd() {      long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();      // 超過15次的回查事務狀態失敗後,默認是丟棄此消息      int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();      long begin = System.currentTimeMillis();      log.info("Begin to check prepare message, begin time:{}", begin);      this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());      log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);  }

回查事務調用入口:

// 此段代碼為TransactionalMessageServiceImpl類中的check方法  List<MessageExt> opMsg = pullResult.getMsgFoundList();  boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)      || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))      || (valueOfCurrentMinusBorn <= -1  );    if (isNeedCheck) {      if (!putBackHalfMsgQueue(msgExt, i)) {          continue;      }      // 調用AbstractTransactionalMessageCheckListener的      listener.resolveHalfMsg(msgExt);  } else {      pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);      log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,          messageQueue, pullResult);      continue;  }    // 此方法在AbstractTransactionalMessageCheckListener類中  public void resolveHalfMsg(final MessageExt msgExt) {      executorService.execute(new Runnable() {          @Override          public void run() {              try {                  sendCheckMessage(msgExt);              } catch (Exception e) {                  LOGGER.error("Send check message error!", e);              }          }      });  }    // 此方法在AbstractTransactionalMessageCheckListener類中  public void sendCheckMessage(MessageExt msgExt) throws Exception {      CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();      checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());      checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());      checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));      checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());      checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());      msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));      msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));      msgExt.setStoreSize(0);      String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);      Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);      if (channel != null) {          // 通過Netty發送請求到RocketMQ Client端,執行checkTransactionState方法          brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);      } else {          LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);      }  }

RocketMQ Client接收到服務端的請求後,重新調用回查數據庫事務方法,並將事務結果再次提交到RocketMQ Broker端
方法入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl類的方法

try {      if (transactionCheckListener != null) {          localTransactionState = transactionCheckListener.checkLocalTransactionState(message);      } else if (transactionListener != null) {          log.debug("Used new check API in transaction message");          localTransactionState = transactionListener.checkLocalTransaction(message);      } else {          log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);      }  } catch (Throwable e) {      log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);      exception = e;  }    this.processTransactionState(      localTransactionState,      group,      exception);

六、補充一個問題

官網有提及,事務消息是不支持延遲消息和批量消息,我手賤試了一下延遲消息,事務消息設置一個DelayTimeLevel,結果這條消息就一直無法從RMQ_SYS_TRANS_HALF_TOPIC移除掉了,應用模塊的日誌發現在反覆地嘗試回查事務,Console界面上RMQ_SYS_TRANS_HALF_TOPIC的消息查詢列表很快就超過2000條記錄了,為什麼?

我們回到代碼層面進行分析,過程如下:

1.設置了DelayTimeLevel後,數據事務提交後(或是回查數據庫事務完成後),將消息寫入目標Topic時,由於DelayTimeLevel的干擾,目標Topic將變成SCHEDULE_TOPIC_XXXX,同時REAL_TOPIC變成RMQ_SYS_TRANS_HALF_TOPIC,真實的Topic在這個環節已經丟失。

// RocketMQ Broker端接受事務提交後的處理  org.apache.rocketmq.broker.processor.EndTransactionProcessor類  OperationResult result = new OperationResult();  if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {      // 這裡調用CommitLog的putMessage方法      result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);      if (result.getResponseCode() == ResponseCode.SUCCESS) {          RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);          if (res.getCode() == ResponseCode.SUCCESS) {              // 修改消息的Topic為由RMQ_SYS_TRANS_HALF_TOPIC改為真實Topic              MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());              msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));              msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());              msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());              msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());              // 將消息存儲到真實Topic中,此時Topic已經變成SCHEDULE_TOPIC_XXXX              RemotingCommand sendResult = sendFinalMessage(msgInner);              if (sendResult.getCode() == ResponseCode.SUCCESS) {                  // 將消息存儲到RMQ_SYS_TRANS_OP_HALF_TOPIC,標記為刪除狀態,事務消息回查的定時任務中會做處理                  this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());              }              return sendResult;          }          return res;      }  }    // 此段代碼在org.apache.rocketmq.store.CommitLog類的putMessage方法中  // 由於DelayTimeLevel的干擾,目標Topic將變成SCHEDULE_TOPIC_XXXX  final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());  if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE      || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {      // Delay Delivery      if (msg.getDelayTimeLevel() > 0) {          if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {              msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());          }            topic = ScheduleMessageService.SCHEDULE_TOPIC;          queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());            // Backup real topic, queueId          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());          MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));          msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));            msg.setTopic(topic);          msg.setQueueId(queueId);      }  }

打印的日誌示例如下:

2019-10-17 14:41:05 INFO EndTransactionThread_4 - Transaction op message write successfully. messageId=0A00851D00002A9F0000000000000E09, queueId=0  msgExt:MessageExt [queueId=0, storeSize=335, queueOffset=5, sysFlag=8, bornTimestamp=1571293959305, bornHost=/10.0.133.29:54634, storeTimestamp=1571294460555,  storeHost=/10.0.133.29:10911, msgId=0A00851D00002A9F0000000000000E09, commitLogOffset=3593, bodyCRC=1849408413, reconsumeTimes=0, preparedTransactionOffset=0,  toString()=Message{topic='SCHEDULE_TOPIC_XXXX', flag=0, properties={REAL_TOPIC=RMQ_SYS_TRANS_HALF_TOPIC, TRANSACTION_CHECK_TIMES=3, KEYS=msg-test-3,  TRAN_MSG=true, UNIQ_KEY=0A00851D422C18B4AAC25584B0880000, WAIT=false, DELAY=1, PGROUP=transactionMQProducer, TAGS=transactionTest, REAL_QID=0},  body=[72, 101, 108, 108, 111, 84, 105, 109, 101, 58, 51], transactionId='null'}]

2.延遲消息是定時任務觸發的,我剛剛設置的延遲是1秒,定時任務又把消息重新放回RMQ_SYS_TRANS_HALF_TOPIC中,注意此時只有RMQ_SYS_TRANS_HALF_TOPIC有消息,RMQ_SYS_TRANS_OP_HALF_TOPIC隊列是沒有這條消息的,如下代碼:

// 此段代碼在org.apache.rocketmq.store.schedule.ScheduleMessageService類executeOnTimeup方法內  try {      // 消息重新回到RMQ_SYS_TRANS_HALF_TOPIC隊列中      MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);      PutMessageResult putMessageResult =          ScheduleMessageService.this.writeMessageStore              .putMessage(msgInner);        if (putMessageResult != null          && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {          continue;      } else {          log.error(              "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",              msgExt.getTopic(), msgExt.getMsgId());          ScheduleMessageService.this.timer.schedule(              new DeliverDelayedMessageTimerTask(this.delayLevel,                  nextOffset), DELAY_FOR_A_PERIOD);          ScheduleMessageService.this.updateOffset(this.delayLevel,              nextOffset);          return;      }  } catch (Exception e) {      log.error(          "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="              + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="              + offsetPy + ",sizePy=" + sizePy, e);  }

3.事務消息定時任務啟動,查RMQ_SYS_TRANS_HALF_TOPIC有消息,但RMQ_SYS_TRANS_OP_HALF_TOPIC沒有消息,為了保證消息順序寫入,又將此消息重新填入RMQ_SYS_TRANS_OP_HALF_TOPIC中,並且觸發一次回查事務操作。示例代碼如上文回查事務調用入口相同:

// 此段代碼為TransactionalMessageServiceImpl類中的check方法  List<MessageExt> opMsg = pullResult.getMsgFoundList();  boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)      || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))      || (valueOfCurrentMinusBorn <= -1  );    if (isNeedCheck) {      if (!putBackHalfMsgQueue(msgExt, i)) {          continue;      }      listener.resolveHalfMsg(msgExt);  } else {      pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);      log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,          messageQueue, pullResult);      continue;  }

這樣構成了一個死循環,直到嘗試到15次才丟棄此消息(默認最大嘗試次數是15次),這個代價有點大。針對此問題的優化,已經提交PR到RocketMQ社區,新版本發佈後,事務消息將屏蔽DelayTimeLevel,這個問題就不會再出現了。

在新版本發佈之前,我們的解決辦法:

  1. 明確研發過程中事務消息禁止設置DelayTimeLevel。
    感覺有風險,畢竟新來的童鞋,不是特別了解此部分功能的可能會手抖加上(像我最早那樣)。
  2. 對RocketMQ Client做一次簡單的封裝,比如提供一個rocketmq-spring-boot-starter,在提供發送事務消息的方法里不提供設置的入口,如下示例:
/**   * 事務消息發送   * 不支持延遲發送和批量發送   */  public void sendMessageInTransaction(String topic, String tag, Object message, String requestId) throws Exception {     TransactionMQProducer producer = annotationScan.getProducer(topic + "_" + tag);     producer.sendMessageInTransaction(MessageBuilder.of(topic, tag, message, requestId).build(), message);  }

應該靠譜一些,畢竟從源頭杜絕了DelayTimeLevel參數的設置。

七、結束語

本篇簡單介紹了事務消息的解決的場景和職責的界限,基本的設計思路和流程,在此借鑒學習了RocketMQ作者的圖稿,然後挑了部分代碼作簡要的講解,還是自己的刨坑過程,文章內有任何不正確或不詳盡之處請留言指導,謝謝。

專註Java高並發、分佈式架構,更多技術乾貨分享與心得,請關注公眾號:Java架構社區
Java架構社區