­

【RocketMQ】事務的實現原理

事務的使用

RocketMQ事務的使用場景

單體架構下的事務

在單體系統的開發過程中,假如某個場景下需要對資料庫的多張表進行操作,為了保證數據的一致性,一般會使用事務,將所有的操作全部提交或者在出錯的時候全部回滾。以創建訂單為例,假設下單後需要做兩個操作:

  1. 在訂單表生成訂單
  2. 在積分表增加本次訂單增加的積分記錄

在單體架構下只需使用@Transactional開啟事務,就可以保證數據的一致性:

    @Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 生成訂單
        orderService.createOrder(orderId);
        // 增加積分
        creditService.addCredits(orderId);
    }

然而現在越來越多系統開始使用分散式架構,在分散式架構下,訂單系統和積分系統可能是兩個獨立的服務,此時就不能使用上述的方法開啟事務了,因為它們不處於同一個事務中,在出錯的情況下,無法進行全部回滾,只能對當前服務的事務進行回滾,所以就有可能出現訂單生成成功但是積分服務增加積分失敗的情況(也可能相反),此時數據處於不一致的狀態。

分散式架構下的事務

分散式架構下如果需要保證事務的一致性,需要使用分散式事務,分散式事務的實現方式有多種,這裡我們先看通過RocketMQ事務的實現方式。

同樣以下單流程為例,在分散式架構下的處理流程如下:

  1. 訂單服務生成訂單
  2. 發送訂單生成的MQ消息,積分服務訂閱消息,有新的訂單生成之後消費消息,增加對應的積分記錄

普通MQ消息存在的問題

如果使用@Transactional + 發送普通MQ的方式,看下存在的問題:

  1. 假如訂單創建成功,MQ消息發送成功,但是order方法在返回的前一刻,服務突然宕機,由於開啟了事務,事務還未提交(方法結束後才會正常提交),所以訂單表並未生成記錄,但是MQ卻已經發送成功並且被積分服務消費,此時就會存在訂單未創建但是積分記錄增加的情況
  2. 假如先發送MQ消息再創建訂單呢,此時問題就更明顯了,如果MQ消息發送成功,創建訂單失敗,那麼同樣處於不一致的狀態
    @Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 創建訂單
        Order order = orderService.createOrder(orderDTO.getOrderId());
        // 發送訂單創建的MQ消息
        sendOrderMessge(order);
        return;
    }

解決上述問題的方式就是使用RocketMQ事務消息。

RocketMQ事務消息的使用

使用事務消息需要實現自定義的事務監聽器,TransactionListener提供了本地事務執行和狀態回查的介面,executeLocalTransaction方法用於執行我們的本地事務,checkLocalTransaction是一種補償機制,在異常情況下如果未收到事務的提交請求,會調用此方法進行事務狀態查詢,以此決定是否將事務進行提交/回滾:

public interface TransactionListener {
    /**
     * 執行本地事務
     *
     * @param msg Half(prepare) message half消息
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * 本地事務狀態回查
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

這裡我們實現自定義的事務監聽器OrderTransactionListenerImpl:

  • executeLocalTransaction方法中創建訂單,如果創建成功返回COMMIT_MESSAGE,如果出現異常返回ROLLBACK_MESSAGE
  • checkLocalTransaction方法中回查事務狀態,根據消息體中的訂單ID查詢訂單是否已經創建,如果創建成功提交事務,如果未獲取到認為失敗,此時回滾事務。
public class OrderTransactionListenerImpl implements TransactionListener {

    @Autowired
    private OrderService orderService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String body = new String(msg.getBody(), Charset.forName("UTF-8"));
            OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
            // 模擬生成訂單
            orderService.createOrder(orderDTO.getOrderId());
        } catch (Exception e) {
            // 出現異常,返回回滾狀態
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 創建成功,返回提交狀態
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String body = new String(msg.getBody(), Charset.forName("UTF-8"));
        OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
        try {
            // 根據訂單ID查詢訂單是否存在
            Order order = orderService.getOrderByOrderId(orderDTO.getOrderId());
            if (null != order) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

接下來看如何發送事務消息,事務消息對應的生產者為TransactionMQProducer,創建TransactionMQProducer之後,設置上一步自定義的事務監聽器OrderTransactionListenerImpl,然後將訂單ID放入消息體中, 調用sendMessageInTransaction發送事務消息:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 創建下單事務監聽器
        TransactionListener transactionListener = new OrderTransactionListenerImpl();
        // 創建生產者
        TransactionMQProducer producer = new TransactionMQProducer("order_group");
        // 事務狀態回查執行緒池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 設置執行緒池
        producer.setExecutorService(executorService);
        // 設置事務監聽器
        producer.setTransactionListener(transactionListener);
        // 啟動生產者
        producer.start();
        try {
            // 創建訂單消息
            OrderDTO orderDTO = new OrderDTO();
            // 模擬生成訂單唯一標識
            orderDTO.setOrderId(UUID.randomUUID().toString());
            // 轉為位元組數組
            byte[] msgBody = JSON.toJSONString(orderDTO).getBytes(RemotingHelper.DEFAULT_CHARSET);
            // 構建消息
            Message msg = new Message("ORDER_TOPIC", msgBody);
            // 調用sendMessageInTransaction發送事務消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf(sendResult.toString());
            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

事務的執行流程:

  1. 在訂單服務下單後,向Borker發送生成訂單的事務消息,投遞到ORDER_TOPIC主題中
  2. Broker收到事務消息之後,不會直接投遞到ORDER_TOPIC主題中,而是先放在另外一個主題中,也叫half主題,half主題對消費者不可見
  3. half主題加入消息成功之後,會回調事務監聽器的的executeLocalTransaction方法,執行本地事務,也就是訂單創建,如果創建成功返回COMMIT狀態,如果出現異常返回ROLLBACK狀態
  4. 根據上一步的返回狀態,進行結束事務的處理
    • 提交:從half主題中刪除消息,然後將消息投送到ORDER_TOPIC主題中,積分服務訂閱ORDER_TOPIC主題進行消費,生成積分記錄
    • 回滾:從half主題中刪除消息即可
  5. 如果本地事務返回的執行結果狀態由於網路原因或者其他原因未能成功的發送給Broker,Broker未收到事務的執行結果,在補償機制定時檢查half主題中消息的事務執行狀態時,會回調事務監聽器checkLocalTransaction的介面,進行狀態回查,判斷訂單是否創建成功,然後進行結束事務的處理

使用事務消息不會存在訂單創建失敗但是消息發送成功的情況,不過你可能還有一個疑問,假如訂單創建成功了,消息已經投送到隊列中,但是積分服務在消費的時候失敗了,這樣數據還是處於不一致的狀態,個人感覺,積分服務可以在失敗的時候進行重試或者進行一些其他的補償機制來保證積分記錄成功的生成,在極端情況下積分記錄依舊沒有生成,此時可能就要人工接入處理了。

RocketMQ事務實現原理

RocketMQ在4.3.0版中開始支援事務消息,它使用兩階段提交協議實現事務消息,同時增加補償機制定時對事務的狀態進行回查,來處理未提交/回滾的事務。

兩階段提交

發送事務消息分為兩個階段:

第一階段:生產者向Broker發送half(prepare)消息,生產者發送事務消息的時候,消息不會直接存入對應的主題中,而是先將消息存入RMQ_SYS_TRANS_HALF_TOPIC主題中,此時消息對消費者不可見,不能被消費者消費,稱為half消息,half消息發送成功之後,開始執行本地事務

第二階段:提交階段,根據第一階段的本地事務執行結果來決定是提交事務還是回滾事務,提交或者回滾的事務會從RMQ_SYS_TRANS_HALF_TOPIC中刪除,對於提交的事務消息,會將消息投送到實際的主題隊列中,之後消費者可以從隊列中拉取到消息進行消費,對於回滾的事務消息,直接從RMQ_SYS_TRANS_HALF_TOPIC主題中刪除即可。

注意:由於RocketMQ追加寫的性能並不會直接從RMQ_SYS_TRANS_HALF_TOPIC隊列中刪除消息,而是使用了另外一個隊列,將已提交或者回滾的事務放入到OP隊列中,在補償機制對half消息進行檢查的時候會從OP中判斷是消息是否已經提交或者回滾。

補償機制

兩階段提交事務的過程中,任一階段出現異常都有可能導致事務未能成功的進行提交/回滾,所以需要增加一種補償機制,定時對RMQ_SYS_TRANS_HALF_TOPIC主題中的half消息進行處理。

RocketMQ使用了一種回查機制,在處理half消息時,對該消息的本地事務執行狀態進行回查,根據回查結果決定是否需要提交/回滾,或者是等待下一次回查。

接下來就從源碼的角度研究一下事務的實現原理。

上面可知,發送事務消息調用的是TransactionMQProducersendMessageInTransaction方法:

public class TransactionMQProducer extends DefaultMQProducer {
    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }
        // 設置主題
        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        // 發送事務消息
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
}

sendMessageInTransactionDefaultMQProducerImpl中實現,主要有以下幾個步驟:

  1. 獲取事務監聽器TransactionListener,如果獲取為空或者本地事務執行器LocalTransactionExecuter為空將拋出異常,因為需要通過TransactionListener或者LocalTransactionExecuter來執行本地事務,所以不能為空
  2. 在消息中設置prepared屬性,此時與普通消息(非事務消息)相比多了PROPERTY_TRANSACTION_PREPARED屬性
  3. 調用send方法發送prepared消息也就是half消息,發送消息的流程與普通消息一致
  4. 根據消息的發送結果判斷:
    • 如果發送成功執行本地事務,並返回本地事務執行結果狀態,如果返回的執行狀態結果為空,將本地事務狀態設置為UNKNOW
    • 發送成功之外的其他情況,包括FLUSH_DISK_TIMEOUT刷盤超時、FLUSH_SLAVE_TIMEOUTSLAVE_NOT_AVAILABLE從節點不可用三種情況,此時意味著half消息發送失敗,本地事務狀態置為ROLLBACK_MESSAGE回滾消息
  5. 調用endTransaction方法結束事務
public class DefaultMQProducerImpl implements MQProducerInner {
    // 發送事務消息
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // 獲取事務監聽器
        TransactionListener transactionListener = getCheckListener();
        // 如果本地事務執行器或者監聽為空
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        // ...
        SendResult sendResult = null;
        // 設置prepared屬性
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // 設置生產者組
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // 發送消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        // 本地事務狀態
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) { // 判斷消息發送狀態
            case SEND_OK: { // 如果發送成功
                try {
                    // ...
                    if (null != localTransactionExecuter) { // 如果本地事務執行器不為空
                        // 執行本地事務
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) { // 如果事務監聽器不為空
                        log.debug("Used new transaction API");
                        // 執行本地事務
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        // 如果本地事務狀態為空,設置為UNKNOW
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
                    // ...
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; // 本地事務狀態設置為回滾
                break;
            default:
                break;
        }
        try {
            // 結束事務
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        // ...
        return transactionSendResult;
    }
}

half消息處理

Broker對消息發送請求的處理在SendMessageProcessor中,當Broker收到消息後,判斷消息是否含有PROPERTY_TRANSACTION_PREPARED屬性,如果含有prepared屬性,會獲取TransactionalMessageService,然後調用asyncPrepareMessage對消息進行處理:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
     private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        // ...
        CompletableFuture<PutMessageResult> putMessageResult = null;
        // 獲取prepared屬性標記
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        // 如果事務標記不為空
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 事務消息持久化
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            // 普通消息持久化
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }
}

TransactionalMessageServiceImplasyncPrepareMessage方法中,又調用了TransactionalMessageBridgeasyncPutHalfMessage方法,添加half消息

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
        // 添加half消息
        return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
    }
}

TransactionalMessageBridgeasyncPutHalfMessage方法中,調用了parseHalfMessageInner方法設置half消息的相關屬性。

因為是half消息,此時還不能直接加入到實際的消息隊列中,否則一旦加入就會被消費者消費,所以需要先對half消息暫存,等收到消息提交請求時才可以添加到實際的消息隊列中,RocketMQ設置了一個RMQ_SYS_TRANS_HALF_TOPIC主題來暫存half消息。

parseHalfMessageInner方法中,會對消息進行如下處理:

  1. 設置消息實際的主題和隊列ID,待收到事務提交請求後恢復實際的主題和隊列ID,向實際的隊列中添加消息
  2. 更改消息的主題為half消息主題RMQ_SYS_TRANS_HALF_TOPIC,先將消息投送到half消息隊列中
  3. half主題對應的消息隊列ID為0,所以更改消息的隊列ID為0

之後調用asyncPutMessage添加消息,接下來的流程就和普通消息的添加一致了,具體可參考【RocketMQ】消息的存儲

public class TransactionalMessageBridge {
    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        // 添加消息
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }
    
    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // 設置實際的主題
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        // 設置實際的隊列ID
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 設置事務主題RMQ_SYS_TRANS_HALF_TOPIC
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 設置事務隊列ID
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
}

public class TransactionalMessageUtil {
    public static String buildHalfTopic() {
        // half消息主題
        return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
    }
}

結束事務

在進行了half消息發送和執行本地事務的操作後,消息暫存在Broker的half主題中,接下來生產者需要根據本地事務的執行結果,向Broker發送結束事務的請求,結束事務的方法endTransactionDefaultMQProducerImpl中實現:

  1. 構建結束事務的請求頭EndTransactionRequestHeader
  2. 判斷本地事務執行狀態:
    • COMMIT_MESSAGE:表示提交事務,結束事務的請求頭中設置TRANSACTION_COMMIT_TYPE標識進行事務提交
    • ROLLBACK_MESSAGE:表示回滾事務,請求頭中設置TRANSACTION_ROLLBACK_TYPE標識進行事務回滾
    • UNKNOW:事務執行結果未知狀態,請求頭中設置TRANSACTION_NOT_TYPE標識未知狀態的事務
  3. 調用endTransactionOneway向Broker發送結束事務的請求
public class DefaultMQProducerImpl implements MQProducerInner {
    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 消息
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        // 獲取事務ID
        String transactionId = sendResult.getTransactionId();
        // 獲取Broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        // 結束事務請求頭
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        // 設置事務ID
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        // 判斷本地事務狀態
        switch (localTransactionState) {
            case COMMIT_MESSAGE: // 如果提交
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE: // 如果是回滾
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW: // 未知
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // 發送結束事務的請求
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
}

Broker事務結束請求處理

Broker對事務結束的請求處理在EndTransactionProcessor中:

  1. 判斷是否是從節點,從節點沒有結束事務的許可權,如果是從節點返回SLAVE_NOT_AVAILABLE
  2. 從請求頭中獲取事務的提交類型:
    • TRANSACTION_COMMIT_TYPE:表示提交事務,會調用commitMessage方法提交消息,如果提交成功調用endMessageTransaction結束事務,恢復消息的原始主題和隊列並調用deletePrepareMessage方法刪掉half消息
    • TRANSACTION_ROLLBACK_TYPE:表示回滾事務,會調用rollbackMessage方法回滾事務,然後刪掉half消息
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        // 創建響應
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        // 如果是從節點,從節點沒有結束事務的許可權,返回SLAVE_NOT_AVAILABLE
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
        // ...
        OperationResult result = new OperationResult();
        // 判斷事務提交類型,如果是提交事務
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 提交消息
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 校驗Prepare消息
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 結束事務,恢復消息的原始主題和隊列
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        // 刪除half消息
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 如果是回滾
            // 回滾消息
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    
// 刪除half消息
                  this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }
}

刪除half消息

由於CommitLog追加寫的性質,RocketMQ並不會直接將half消息從CommitLog中刪除,而是使用了另外一個OP主題RMQ_SYS_TRANS_OP_HALF_TOPIC(以下簡稱OP主題/隊列),將已經提交/回滾的消息記錄在OP主題隊列中

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public boolean deletePrepareMessage(MessageExt msgExt) {
        // 添加到OP消息隊列
        if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
            log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
            return true;
        } else {
            log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
            return false;
        }
    }
}

putOpMessage方法在TransactionalMessageBridge中實現,它又調用了addRemoveTagInTransactionOp方法向OP隊列中添加消息:

  1. 構建OP消息,主要是創建Message對象,然後設置主題為RMQ_SYS_TRANS_OP_HALF_TOPIC,設置half消息在隊列的偏移量
  2. 調用 writeOp方法將消息寫入OP隊列,makeOpMessageInner方法用於構建消息體,然後調用putMessage放將消息寫入CommitLog
public class TransactionalMessageBridge {
    private final ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>();
    public boolean putOpMessage(MessageExt messageExt, String opType) {
        // 構建消息隊列,設置消息所屬主題、Broker名稱、隊列ID資訊
        MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
            this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
        if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
            // 添加OP消息
            return addRemoveTagInTransactionOp(messageExt, messageQueue);
        }
        return true;
    }
    
    /**
     * 當事務消息進行提交或者回滾時,記錄在operation隊列中(OP隊列)
     */
    private boolean addRemoveTagInTransactionOp(MessageExt prepareMessage, MessageQueue messageQueue) {
        // 構建OP消息,主題為RMQ_SYS_TRANS_OP_HALF_TOPIC
        Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
            String.valueOf(prepareMessage.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
        // 將消息寫入OP隊列
        writeOp(message, messageQueue);
        return true;
    }

    private void writeOp(Message message, MessageQueue mq) {
        MessageQueue opQueue;
        // 如果已經添加過
        if (opQueueMap.containsKey(mq)) {
            opQueue = opQueueMap.get(mq);
        } else {
            opQueue = getOpQueueByHalf(mq);
            MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
            if (oldQueue != null) {
                opQueue = oldQueue;
            }
        }
        // 如果為空
        if (opQueue == null) {
            // 創建
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
        }
        // 構建OP消息添加到OP隊列中
        putMessage(makeOpMessageInner(message, opQueue));
    }
}

事務狀態檢查

由於各種原因有可能未成功收到提交/回滾事務的請求,所以RocketMQ需要定期檢查half消息,檢查事務的執行結果,TransactionalMessageCheckService用於half消息狀態的檢查,它實現了ServiceThread,默認可以看到在onWaitEnd方法中調用了check方法進行狀態檢查:

public class TransactionalMessageCheckService extends ServiceThread {
    
    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        // 狀態檢查
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

}

check方法在TransactionalMessageServiceImpl中實現:

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // 根據主題獲取消息隊列
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            // 遍歷所有的消息隊列
            for (MessageQueue messageQueue : msgQueues) {
                // 獲取當前時間做為開始時間
                long startTime = System.currentTimeMillis();
                // 獲取對應的OP消息隊列
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 獲取half消息隊列的消費進度
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // 獲取op消息隊列的消費進度
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                // 如果消費進度小於0表示不合法
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                // 存儲已處理的消息
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                // 根據當前的消費進度從已處理隊列中拉取消息
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                // 如果拉取消息為空,列印錯誤繼續處理下一個消息隊列
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // 獲取消息為空的數量默認為1
                int getMessageNullCount = 1;
                // 新的進度
                long newOffset = halfOffset;
                // 獲取half隊列的消費進度,賦值給i
                long i = halfOffset;
                while (true) {
                    // 如果當前時間減去開始時間大於最大處理時間限制,終止循環
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 如果OP隊列中包含當前偏移量,表示消息已經被處理,加入到已處理集合中
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        // 加入到doneOpOffset集合中
                        doneOpOffset.add(removedOpOffset);
                    } else { // 如果已處理隊列中不包含當前消息
                        // 根據偏移量從half隊列獲取half消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        // 獲取消息對象
                        MessageExt msgExt = getResult.getMsg();
                        // 如果獲取消息為空
                        if (msgExt == null) {
                            // 判斷獲取空消息的次數是否大於MAX_RETRY_COUNT_WHEN_HALF_NULL
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            // 判斷從half隊列獲取消息的結果是NO_NEW_MSG,表示沒有消息,此時終止循環等待下一次進行檢查
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                               
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                 // 走到這裡說明消息的偏移量不合法,繼續獲取下一條消息進行處理
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        // 是否需要丟棄消息或者需要跳過消息
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            // 繼續處理下一條消息
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        // 如果消息的添加時間是否大於等於本次檢查的開始時間,說明是在檢查開始之後加入的消息,暫不進行處理
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        // 計算half消息在隊列中的保留時間:當前時間減去消息加入的時間
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        // 事務超時時間
                        long checkImmunityTime = transactionTimeout;
                        // 獲取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS屬性,表示事務回查最晚的時間
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        // 如果PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS屬性不為空
                        if (null != checkImmunityTimeStr) {
                            // 獲取事務回查最晚檢查時間,如果checkImmunityTimeStr為-1則返回事務超時時間,否則返回checkImmunityTimeStr轉為long後乘以1000得到的值
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            // 如果消息的保留時間小於事務回查最晚檢查時間
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                // 檢查half消息在隊列中的偏移量,如果返回true跳過本條消息
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    // 處理下一個消息
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            // 如果valueOfCurrentMinusBorn小於checkImmunityTime
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        // 獲取OP消息
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // 判斷是否需要檢查,滿足檢查的條件為以下三種情況之一:
                        // 1.拉取消息為空並且消息的保留時間已經大於事務設置的最晚回查時間
                        // 2.拉取消息不為空並且拉取到的最後一條消息的存入時間減去當前時間超過了事務的超時時間
                        // 3.half消息存留時間為負數
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);
                        // 如果需要進行回查
                        if (isNeedCheck) {
                            // 將half消息重新加入到隊列中
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 發送回查請求
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            // 繼續從OP隊列中拉取消息
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    // 加1繼續處理下一條消息
                    newOffset = i + 1;
                    i++;
                }
                if (newOffset != halfOffset) {
                    // 更新消費進度
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    // 更新處理進度
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }
    }
}

check方法中會獲取half主題(RMQ_SYS_TRANS_HALF_TOPIC)下的所有消息隊列,遍歷所有的half消息隊列,對隊列中的half消息進行處理,主要步驟如下。

一、 構建OP隊列的消息隊列對象MessageQueue

調用getOpQueue獲取當前half消息隊列對應的OP隊列的MessageQueue對象,實際上是創建了一個MessageQueue對象,設置為OP隊列的主題、以及Broker名稱和隊列的ID,在後面獲取消費進度時使用:

   private MessageQueue getOpQueue(MessageQueue messageQueue) {
        // 獲取OP消息隊列
        MessageQueue opQueue = opQueueMap.get(messageQueue);
        if (opQueue == null) {
            // 如果獲取為空,則創建MessageQueue,主題設置為OP TOPIC,設置Broker名稱和隊列ID
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), messageQueue.getBrokerName(),
                messageQueue.getQueueId());
            // 加入到opQueueMap中
            opQueueMap.put(messageQueue, opQueue);
        }
        return opQueue;
    }

二、獲取half隊列的消費進度和OP消費隊列的消費進度

消費進度的獲取是通過調用transactionalMessageBridgefetchConsumeOffset方法進行查詢的,可以看到方法的入參是MessageQueue類型的,所以第一步需要構造OP隊列的MessageQueue對象,在這一步查詢消費進度使用:

   public long fetchConsumeOffset(MessageQueue mq) {
        long offset = brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(),
            mq.getTopic(), mq.getQueueId());
        if (offset == -1) {
            offset = store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId());
        }
        return offset;
    }

三、從OP隊列中拉取消息

調用fillOpRemoveMap方法根據消費進度資訊從OP隊列中拉取消息,將拉取的消費放入removeMap中,用於判斷half消息是否已經處理:

   private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
        MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
        // 從OP隊列中拉取消息,每次拉取32條
        PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
        // 如果拉取為空返回null
        if (null == pullResult) {
            return null;
        }
        // 如果拉取狀態為消費進度不合法或者沒有匹配的消息
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
            || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
            log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            // 從拉取結果中獲取消費進度並更新
            transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
            return pullResult;
        } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { // 如果沒有消息
            log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            return pullResult;
        }
        // 獲取拉取到的消息
        List<MessageExt> opMsg = pullResult.getMsgFoundList();
        if (opMsg == null) { // 如果為空列印日誌
            log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
            return pullResult;
        }
        // 遍歷拉取的消息
        for (MessageExt opMessageExt : opMsg) {
            // 獲取隊列中的偏移量
            Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
            log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
                opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
            if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
                // 如果偏移量小於最小的偏移量
                if (queueOffset < miniOffset) {
                    // 加入到doneOpOffset中
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                } else {
                    // 加入到已處理消息的集合removeMap中
                    removeMap.put(queueOffset, opMessageExt.getQueueOffset());
                }
            } else {
                log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
            }
        }
        log.debug("Remove map: {}", removeMap);
        log.debug("Done op list: {}", doneOpOffset);
        return pullResult;
    }

四、處理每一個half消息

開啟while循環,從half隊列的消費進度處開始,處理每一個half消息:

  1. 如果當前時間減去檢查開始時間大於最大處理時間,此時終止循環

  2. 如果removeMap中包含當前half消息,表示消息已經被處理,放入到已處理消息集合中doneOpOffset

  3. 如果removeMap不包含當前half消息, 調用getHalfMsg方法根據偏移量從half隊列獲取half消息,如果消息獲取不為空繼續下一步,否則進行如下處理

    • 判斷獲取空消息的個數是否大於MAX_RETRY_COUNT_WHEN_HALF_NULL,如果大於將終止本次循環,處理下一個half消息隊列
    • 判斷拉取消息的狀態是否為NO_NEW_MSG,如果是表示隊列中沒有消息,先終止循環
    • 如果拉取消息的狀態是不是NO_NEW_MSG,表示消費進度不合法,獲取half消息隊列中下一條消息進行處理
  4. 調用needDiscard判斷是否需要丟棄half消息,或者調用needSkip判斷是否需要跳過當前half消息:

    • needDiscard是根據half消息的檢查次數是否超過最大限制來決定是否丟棄half消息

         private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
              // 從屬性中獲取檢查次數
              String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
              int checkTime = 1;
              // 如果不為空
              if (null != checkTimes) {
                  checkTime = getInt(checkTimes);
                  // 如果檢查次數大於事務最大的檢查次數,表示需要丟棄
                  if (checkTime >= transactionCheckMax) {
                      return true;
                  } else {
                      // 檢查次數加一
                      checkTime++;
                  }
              }
              // 更新檢查次數
              msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime));
              return false;
          }
      
    • needSkip是根據half消息在隊列中的存留時間是否超過了最大的保留時間限制來決定是否跳過

          private boolean needSkip(MessageExt msgExt) {
              // 計算half消息在隊列中的保留時間
              long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
              // 如果大於Broker中設置的最大保留時間,表示需要跳過
              if (valueOfCurrentMinusBorn
                  > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime()
                  * 3600L * 1000) {
                  log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}",
                      msgExt.getMsgId(), msgExt.getBornTimestamp());
                  return true;
              }
              return false;
          }
      
  5. 判斷消息的的存入時間是否大於本次開始檢查的時間,如果大於說明是新加入的消息,由於事務消息發送後不會立刻提交,所以此時暫不需要進行檢查,中斷循環即可

  6. 計算half消息在隊列中的存留時間valueOfCurrentMinusBorn:當前時間 – 消息存入的時間

  7. 設置立刻回查事務狀態的時間checkImmunityTime:事務的超時時間

  8. 從消息屬性中獲取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS屬性的值放在checkImmunityTimeStr變數中,表示事務的最晚回查時間:

    (1)如果checkImmunityTimeStr獲取不為空,調用getImmunityTime方法計算事務立刻回查時間,並賦值給checkImmunityTime,從程式碼中可以看出如果checkImmunityTimeStr為-1則返回事務的超時時間,否則返回checkImmunityTimeStr的值並乘以1000轉為秒:

      private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) {
            long checkImmunityTime;
            // 轉為long
            checkImmunityTime = getLong(checkImmunityTimeStr);
            if (-1 == checkImmunityTime) { // 如果為-1,使用事務的超時時間
                checkImmunityTime = transactionTimeout;
            } else {
                checkImmunityTime *= 1000; // 使用checkImmunityTime,乘以1000轉為秒
            }
            return checkImmunityTime;
        }
    

    計算完checkImmunityTime的值後,判斷valueOfCurrentMinusBorn是否小於checkImmunityTime,如果是表明還未到事務的超時時間,此時調用checkPrepareQueueOffset檢查half消息在隊列中的偏移量,根據檢查結果判斷是否需要跳過當前消息:

    • 如果PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET屬性獲取為空,調用putImmunityMsgBackToHalfQueue將消息重新加入half隊列,如果返回true表示加入成功,此時向前推薦消費進度,處理下一條消息,如果加入失敗會繼續循環處理本條消息(因為進度未向前推進)
    • 如果PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET屬性獲取不為空,轉為long型,判斷OP隊列中是否已經包含當前消息的偏移量,如果包含加入到doneOpOffset中並返回true,此時向前推進消費進度,處理下一條消息,否則同樣調用putImmunityMsgBackToHalfQueue將消息重新加入half隊列,並根據加入成功與否判斷是否繼續處理下一條消息

    總結

    如果事務設置了PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS屬性,並且half消息的存留時間小於立刻檢查事務的時間,說明還未到時間不需要進行狀態檢查,此時獲取消息在half隊列的偏移量,如果獲取為空,將消息重新加入到half隊列中,如果獲取不為空判斷是否已經在OP處理隊列中,如果返回true處理下一個消息即可,否則同樣將消息重新加入half隊列中。

    RocketMQ在事務未到最晚回查時間時將消息重新加入了half消息隊列,因為加入之後half隊列的消費進度會往前推進並在回查結束時更新進度,所以下次檢查時並不會檢查到舊的half消息。

     private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset,
            MessageExt msgExt) {
            // 從屬性中獲取消息在half隊列的偏移量
            String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
            // 如果為空,調用putImmunityMsgBackToHalfQueue將消息重新加入half隊列
            if (null == prepareQueueOffsetStr) {
                return putImmunityMsgBackToHalfQueue(msgExt);
            } else {
                // 轉為long
                long prepareQueueOffset = getLong(prepareQueueOffsetStr);
                // 如果為-1,返回false,等待下次循環進行處理
                if (-1 == prepareQueueOffset) {
                    return false;
                } else {
                    // 如果OP隊列中已經包含當前消息的偏移量
                    if (removeMap.containsKey(prepareQueueOffset)) {
                        long tmpOpOffset = removeMap.remove(prepareQueueOffset);
                        // 加入到已完成的消息集合中
                        doneOpOffset.add(tmpOpOffset);
                        return true;
                    } else {
                        // 將消息重新加入half隊列
                        return putImmunityMsgBackToHalfQueue(msgExt);
                    }
                }
            }
        }
    

    (2)如果checkImmunityTimeStr獲取為空,判斷valueOfCurrentMinusBorn(消息存留時間)是否大於等於0並且小於checkImmunityTime(事務超時時間),如果滿足條件表示新加入的消息並且還未過事務的超時時間,此時終止循環暫不進行回查,否則進入下一步

  9. 判斷是否需要進行狀態回查isNeedCheck,滿足檢查的條件為以下三種情況之一:

    (1)從OP隊列中拉取消息為空並且當前half消息的存留時間已經大於事務設置的最晚回查時間

    opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
    

    (2)從OP隊列中拉取的消息不為空,並且拉取的最後一條消息的存入時間減去本次開始檢查時間大於事務的超時時間

    opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)
    

    (3)half消息在隊列中的保留時間小於等於1,說明加入half消息的時間大於本次開始檢查的時間

    valueOfCurrentMinusBorn <= -1
    
  10. 根據isNeedCheck判斷是否需要回查

    (1)需要回查:調用putBackHalfMsgQueue將half消息重新加入到隊列中,如果加入失敗繼續循環再次處理,如果加入成功調用resolveHalfMsg發送回查請求

    (2)不需要回查:調用fillOpRemoveMap繼續從OP隊列中拉取消息判斷

  11. 更新i的值,繼續處理下一個half消息

五、更新消費進度

主要是更half隊列和OP隊列的消費進度。

重新添加half消息

putBackHalfMsgQueue方法中可以看出將消息重新加入到了half隊列:

  private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
        // 重新將消息入到half消息隊列中
        PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
        // 如果加入成功
        if (putMessageResult != null
            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
            // 設置消息的邏輯偏移量
            msgExt.setQueueOffset(
                putMessageResult.getAppendMessageResult().getLogicsOffset());
            // 設置消息在CommitLog的偏移量
            msgExt.setCommitLogOffset(
                putMessageResult.getAppendMessageResult().getWroteOffset());
            // 設消息ID
            msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            log.debug(
                "Send check message, the offset={} restored in queueOffset={} "
                    + "commitLogOffset={} "
                    + "newMsgId={} realMsgId={} topic={}",
                offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
                msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                msgExt.getTopic());
            return true;
        } else {
            // 加入失敗
            log.error(
                "PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, "
                    + "msgId: {}",
                msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
            return false;
        }
    }

狀態回查請求發送

resolveHalfMsg方法中向客戶端發送事務狀態回查的請求,可以看到是通過執行緒池非同步實現的:

public abstract class AbstractTransactionalMessageCheckListener {
   public void resolveHalfMsg(final MessageExt msgExt) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // 發送狀態回查請求
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    }
}

sendCheckMessage方法在AbstractTransactionalMessageCheckListener中實現,主要是構建請求資訊,然後向消息的生產者發送事務狀態回查的請求:

public abstract class AbstractTransactionalMessageCheckListener {
    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        // 構建回查請求頭
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); // 設置Commitlog偏移量
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        // 設置消息實際的TOPIC
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        // 設置消息實際的隊列ID
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        // 獲取channel
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            // 發送回查請求
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }
}

事務狀態回查請求處理

事務狀態回查請求的處理在ClientRemotingProcessor中,如果請求類型是CHECK_TRANSACTION_STATE表示是事務狀態回查請求,調用checkTransactionState方法進行事務狀態檢查:

  1. 從請求中獲取消息,判斷消息是否為空,不為空進入下一步
  2. 從消息屬性中獲取生產者組名稱,如果不為空進入下一步
  3. 根據生產者組名稱獲取MQProducerInner對象,然後調用checkTransactionState進行狀態檢查
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                // 檢查事務狀態
                return this.checkTransactionState(ctx, request);
            // ...
            default:
                break;
        }
        return null;
    }
    
    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        // 獲取消息
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        // 如果消息不為空
        if (messageExt != null) {
            // ...
            // 獲取事務ID
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                messageExt.setTransactionId(transactionId);
            }
            // 獲取生產者組
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) {
                // 獲取MQProducerInner
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    // 調用checkTransactionState進行狀態檢查
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }

        return null;
    }
}

checkTransactionState方法在DefaultMQProducerImpl中實現,可以看到它創建了Runnable對象,然後提交到執行緒池中非同步執行事務的狀態檢查,檢查的主要邏輯如下:

  1. 獲取TransactionCheckListener(已廢棄)類型的事務監聽器
  2. 獲取TransactionListener類型的事務監聽器
  3. 如果TransactionCheckListenerTransactionListener其中之一不為空,調用checkLocalTransaction進行狀態檢查
  4. 調用processTransactionState處理事務查詢結果,這一步主要是根據事務的查詢結果構建請求資訊,然後調用endTransactionOneway方法向Broker發送結束事務的請求
public class DefaultMQProducerImpl implements MQProducerInner {
    @Override
    public void checkTransactionState(final String addr, final MessageExt msg,
        final CheckTransactionStateRequestHeader header) {
        Runnable request = new Runnable() {
            // ...
            
            @Override
            public void run() {
                // 獲取TransactionCheckListener監聽器(已不推薦使用)
                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
                // 獲取事務監聽器
                TransactionListener transactionListener = getCheckListener();
                // 如果其中之一不為空
                if (transactionCheckListener != null || transactionListener != null) {
                    // 初始化為UNKNOW狀態
                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                    Throwable exception = null;
                    try {
                        if (transactionCheckListener != null) {
                            localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                        } else if (transactionListener != null) {
                            log.debug("Used new check API in transaction message");
                            // 調用checkLocalTransaction回查狀態
                            localTransactionState = transactionListener.checkLocalTransaction(message);
                        } else {
                            log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                        }
                    } catch (Throwable e) {
                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                        exception = e;
                    }
                    // 處理事務狀態
                    this.processTransactionState(
                        localTransactionState,
                        group,
                        exception);
                } else {
                    log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
                }
            }
            
            // 處理事務狀態
            private void processTransactionState(
                final LocalTransactionState localTransactionState,
                final String producerGroup,
                final Throwable exception) {
                // 構建結束事務的請求頭
                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                // 設置tCommitLog的偏移量
                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                thisHeader.setProducerGroup(producerGroup);// 設置生產者組
                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                thisHeader.setFromTransactionCheck(true); // 設置狀態檢查為true
                // ...
                thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); // 設置事務ID
                switch (localTransactionState) {
                    case COMMIT_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); // 設置為提交
                        break;
                    case ROLLBACK_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); // 設置為回滾
                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                        break;
                    case UNKNOW:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); // 設置為未知
                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                        break;
                    default:
                        break;
                }

                // ...
                // 執行結束事務鉤子函數
                doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);

                try {
                    // 向Broker發送消息的回查結果
                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                        3000);
                } catch (Exception e) {
                    log.error("endTransactionOneway exception", e);
                }
            }
        };
        // 提交到執行緒池中執行任務
        this.checkExecutor.submit(request);
    }
}

總結


參考
丁威、周繼鋒《RocketMQ技術內幕》

RocketMQ官方示例

RocketMQ官方架構設計

RocketMQ版本:4.9.3

Tags: