還不知道事務消息嗎?這篇文章帶你全面掃盲!

  • 2020 年 3 月 30 日
  • 筆記

在分散式系統中,為了保證數據一致性是必須使用分散式事務。分散式事務實現方式就很多種,今天主要介紹一下使用 RocketMQ 事務消息,實現分布事務。

文末有彩蛋,看完再走

為什麼需要事務消息?

很多同學可能不知道事務消息是什麼,沒關係,舉一個真實業務場景,先來帶你了解一下普通的消息存在問題。

上面業務場景中,當用戶支付成功,將會更新支付訂單,然後發送 MQ 消息。手續費系統將會通過拉取消息,計算手續費然後保存到另外一個手續費資料庫中。

由於計算手續費這個步驟可以離線計算,所以這裡採用 MQ 解耦支付與計算手續費的流程。

流程主要涉及三個步驟:

  • 更新訂單數據
  • 發送消息給 MQ
  • 手續費系統拉取消息

上面提到的步驟,任何一個都會失敗,如果我們沒有處理,就會使兩邊數據不一致,將會造成下面兩種情況:

  • 訂單數據更新了,手續費數據沒有生成
  • 手續費數據生成,訂單數據卻沒有更新

這可是涉及到真正的錢,一旦少計算,就會造成資損,真的賠不起!

對於最後一步來講,比較簡單。如果消費消息失敗,只要沒有提交消息確認,MQ 服務端將會自動重試。

最大的問題在於我們無法保證更新操作與發送消息一致性。無論我們採用先更新訂單數據,再發送消息,還是先發送消息,再更新訂單數據,都在存在一個成功,一個失敗的可能。

如下所示,採用先發送消息,然後再更新資料庫的方式。

上面流程消息發送成功之後,再進行本地事務的提交。這個流程看起來很完美,但是想像一下,如果在提交事務時資料庫執行失敗,導致事務回滾了。

然而此時消息已經發送出去,無法撤回。這就導致手續費系統緊接會消費消息,計算手續費並更新到資料庫中。這就造成支付數據未更新,手續費系統卻生成的不一致的情況。

那如果我們流程反一下,是不是就好了呢?

我們使用下面的偽碼錶示:

// 開始事務  try {      // 1.執行資料庫操作      // 2.提交事務  }catch (Exception e){      // 3.回滾事務  }  // 4.發送 mq 消息  

這裡如果事務提交成功,但是 mq 消息發送失敗,就會導致支付數據更新但是手續費數據未生成的的不一致情況。

這裡有的同學可能會想到,將發送 mq 消息步驟移動到事務中,消息發送失敗,回滾事務,不就完美了嗎?

偽碼如下:

// 開始事務  try {    // 1.執行資料庫操作    // 2.發送 mq 消息    // 3.提交事務  }catch (Exception e){    // 4.回滾事務  }  

上面程式碼看起來確實沒什麼問題,消息發送失敗,回滾事務。

但是實際上第二步有可能存在消息已經發送到 MQ 服務端,但是由於網路問題未及時收到 MQ 的響應消息,從而導致消息發送端認為消息消息發送失敗。

這就會導致訂單事務回滾了,但是手續費系統卻能消費消息,兩邊資料庫又不一致了。

熟悉 MQ 的同學,可能會想到,消息發送失敗,可以重試啊。

是的,我們可以增加重試次數,重新發送消息。但是這裡我們需要注意,由於消息發送耦合在事務中,過多的重試會拉長資料庫事務執行時間,事務處理時間過長,導致事務中鎖的持有時間變長,影響整體的資料庫吞吐量。

實際業務中,不太建議將消息發送耦合在資料庫事務中。

事務消息

事務消息是 RocketMQ 提供的事務功能,可以實現分散式事務,從而保證上面事務操作與消息發送要麼都成功,要麼都失敗。

使用事務消息,整體流程如下:

首先我們將會發送一個半(half) 消息到 MQ 中,通知其開啟一個事務。這裡半消息並不是說消息內容不完整,實際上它包含所有完整的消息內容。

這個半消息與普通的消息唯一的區別在於,在事物提交之前,這個消息對消費者來說是不可見的,消費者不會消費這個消息。

一旦半消息發送成功,我們就可以執行資料庫事務。然後根據事務的執行結果再決定提交或回滾事務消息。

如果事務提交成功,將會發送確認消息至 MQ,手續費系統就可以成功消費到這條消息。

如果事務被回滾,將會發送回滾通知至 MQ,然後 MQ 將會刪除這條消息。對於手續費系統來說,都不會知道這條消息的存在。

這就解決了要麼都成功,要麼都失敗的一致性要求。

實際上面的流程還是存在問題,如果我們提交/回滾事務消息失敗怎麼辦?

對於這個問題,RocketMQ 給出一種事務反查的機制。我們需要需要註冊一個回調介面,用於反查本地事務狀態。

RocketMQ 若未收到提交或回滾的請求,將會定期去反查回調介面,然後可以根據反查結果決定回滾還是提交事務。

RocketMQ 事務消息流程整體如下:

事務消息示例程式碼如下:

public class TransactionMQProducerExample {      public static void main(String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {          TransactionMQProducer producer = new TransactionMQProducer("test_transaction_producer");          // 不定義將會使用默認的          ExecutorService executorService =                  new ThreadPoolExecutor(2, 5, 100,                          TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000),                                         new ThreadFactory() {                      @Override                      public Thread newThread(Runnable r) {                          Thread thread = new Thread(r);                          thread.setName("client-transaction-msg-check-thread");                          return thread;                      }                  });          producer.setExecutorService(executorService);          TransactionListener transactionListener = new TransactionListenerImpl();          producer.setTransactionListener(transactionListener);          // 改成自己的地址          producer.setNamesrvAddr("127.0.0.1:9876");          producer.start();            Order order = new Order("66666", "books");            Message msg =                  new Message("transaction_tp",                          JSON.toJSONString(order).getBytes(RemotingHelper.DEFAULT_CHARSET));          // 發送半消息          SendResult sendResult = producer.sendMessageInTransaction(msg, null);          System.out.println(sendResult.getSendStatus());          producer.shutdown();      }        public static class TransactionListenerImpl implements TransactionListener {            /**           * 半消息發送成功將會自動執行該邏輯           *           * @param msg           * @param arg           * @return           */          @Override          public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {              // 執行本地事務              Order order = null;              try {                  order = JSON.parseObject(new String(msg.getBody(),                          RemotingHelper.DEFAULT_CHARSET), Order.class);                  boolean isSuccess = updateOrder(order);                  if (isSuccess) {                      // 本地事務執行成功,提交半消息                      System.out.println("本地事務執行成功,提交事務事務消息");                      return LocalTransactionState.COMMIT_MESSAGE;                  } else {                      // 本地事務執行成功,回滾半消息                      System.out.println("本地事務執行失敗,回滾事務消息");                      return LocalTransactionState.ROLLBACK_MESSAGE;                  }              } catch (Exception e) {                  System.out.println("本地事務執行異常");              }              // 異常情況返回未知狀態              return LocalTransactionState.UNKNOW;          }            /**           * 更新訂單           * 這裡模擬資料庫更新,返回事務執行成功           *           * @param order           * @return           */          private boolean updateOrder(Order order) throws InterruptedException {              TimeUnit.SECONDS.sleep(1);              return true;          }            /***           * 若提交/回滾事務消息失敗,rocketmq 自動反查事務狀態           * @param msg           * @return           */          @Override          public LocalTransactionState checkLocalTransaction(MessageExt msg) {              try {                  Order order = JSON.parseObject(new String(msg.getBody(),                          RemotingHelper.DEFAULT_CHARSET), Order.class);                  boolean isSuccess = queryOrder(order.getOrderId());                  if (isSuccess) {                      // 本地事務執行成功,提交半消息                      return LocalTransactionState.COMMIT_MESSAGE;                  } else {                      // 本地事務執行成功,回滾半消息                      return LocalTransactionState.ROLLBACK_MESSAGE;                  }                } catch (Exception e) {                  System.out.println("查詢失敗");              }              // 異常情況返回未知狀態              return LocalTransactionState.UNKNOW;          }            /**           * 查詢訂單狀態           * 模擬返回查詢成功           *           * @param orderId           * @return           */          private boolean queryOrder(String orderId) throws InterruptedException {              TimeUnit.SECONDS.sleep(1);              return true;          }      }        @Data      public static class Order {          private String orderId;            private String goods;            public Order(String orderId, String goods) {              this.orderId = orderId;              this.goods = goods;          }      }  }  

上面程式碼中:

  1. 我們需要為生產者指定一個唯一ProducerGroup
  2. 需要繼承 TransactionListener 註解回調介面,其中 executeLocalTransaction 方法執行本地事務,checkLocalTranscation 用來執行檢查本地事務。
  3. 返回事務狀態有三種:
    • LocalTransactionState.UNKNOW 中間狀態,RocketMQ 將會反查
    • LocalTransactionState.COMMIT_MESSAGE 提交事務,消息這後續將會消費這條消息
    • LocalTransactionState.ROLLBACK_MESSAGE,回滾事務,RocketMQ 將會刪除這條消息

事務消息使用注意點

事務消息最大反查次數

由於單個消息反查次數過多,將會導致半消息隊列堆積,影響性能。 RocketMQ 默認將單個消息的檢查次數限制為 15 次。

我們可以通過修改 broker 配置文件,增加如下配置:

# N 為最大檢查次數  transactionCheckMax=N  

當檢查次數超過最大次數後,RocketMQ 將會丟棄消息並且列印錯誤日誌。

若想自定義丟棄消息行為,需要修改 RocketMQ broker 端程式碼,繼承 AbstractTransactionalMessageCheckListener 重寫 resolveDiscardMsg 方法,加入自定義邏輯。

同步的雙重寫入機制

為了確保事務消息不丟失,並且保證事務完整性,需要將事務消息複製到集群其他節點,建議使用同步雙重寫入機制。

事務反查時間設置

我們可以設置以下參數,設置 MQ 服務端多久之後開始反查事務消息(自事務消息保存成功之後開始計算)。

msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "10");  

或者我們可以在 broker.conf 設置以下參數:

# 單位為 ms,默認為 6 s  transactionTimeout=60000  

發送端主動設置配置參數優先順序大於 broker 端配置。

另外 RocketMQ 還有一個配置用於控制事務性消息檢查間隔:

## 默認為 60s  transactionCheckInterval=5000  

如果自定義配置如上,事務消息檢查間隔為 5 秒,事務消息設置檢查時間為 60 s。

這就代表 broker 每隔 5s 檢查一次事務消息,如果此時事務消息到 MQ 服務端時間還未超過 60s,此時將不會反查,直到時間大於 60s。

彩蛋

查找事務消息資料的時候,發現 RocketMQ 文檔存在相關錯誤。

文檔地址:https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

如上兩處實際是錯誤的,應該修改為:AbstractTransactionalMessageCheckListenertransactionTimeout

issue 地址:https://github.com/apache/rocketmq/issues/481

順手修改了一下,提交 PR 。哈哈,也為開源項目貢獻了一份力量。

Reference

  1. https://github.com/apache/rocketmq/issues/481
  2. https://github.com/9526xu/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
  3. 極客時間-消息隊列高手課

最後說一句(求關注)

以前總以為參加開源項目很難,直到最近接連參與兩次開源項目修改,才發現其實並沒有想像中那麼難。由於版本變更,開源項目文檔有些是存在錯誤的,如果我們看到了,順手修復一下,這也是為開源項目貢獻一份力。

才疏學淺,難免會有紕漏,如果你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。

再次感謝您的閱讀,我是樓下小黑哥,一位還未禿頭的工具猿,下篇文章我們再見~

歡迎關注我的公眾號:程式通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:studyidea.cn