【RocketMQ】事务的实现原理

事务的使用

RocketMQ事务的使用场景

单体架构下的事务

在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。以创建订单为例,假设下单后需要做两个操作:

  1. 在订单表生成订单
  2. 在积分表增加本次订单增加的积分记录

在单体架构下只需使用@Transactional开启事务,就可以保证数据的一致性:

    @Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 生成订单
        orderService.createOrder(orderId);
        // 增加积分
        creditService.addCredits(orderId);
    }

然而现在越来越多系统开始使用分布式架构,在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中,在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚,所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。

分布式架构下的事务

分布式架构下如果需要保证事务的一致性,需要使用分布式事务,分布式事务的实现方式有多种,这里我们先看通过RocketMQ事务的实现方式。

同样以下单流程为例,在分布式架构下的处理流程如下:

  1. 订单服务生成订单
  2. 发送订单生成的MQ消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录

普通MQ消息存在的问题

如果使用@Transactional + 发送普通MQ的方式,看下存在的问题:

  1. 假如订单创建成功,MQ消息发送成功,但是order方法在返回的前一刻,服务突然宕机,由于开启了事务,事务还未提交(方法结束后才会正常提交),所以订单表并未生成记录,但是MQ却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况
  2. 假如先发送MQ消息再创建订单呢,此时问题就更明显了,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态
    @Transactional
    public void order() {
        String orderId = UUID.randomUUID().toString();
        // 创建订单
        Order order = orderService.createOrder(orderDTO.getOrderId());
        // 发送订单创建的MQ消息
        sendOrderMessge(order);
        return;
    }

解决上述问题的方式就是使用RocketMQ事务消息。

RocketMQ事务消息的使用

使用事务消息需要实现自定义的事务监听器,TransactionListener提供了本地事务执行和状态回查的接口,executeLocalTransaction方法用于执行我们的本地事务,checkLocalTransaction是一种补偿机制,在异常情况下如果未收到事务的提交请求,会调用此方法进行事务状态查询,以此决定是否将事务进行提交/回滚:

public interface TransactionListener {
    /**
     * 执行本地事务
     *
     * @param msg Half(prepare) message half消息
     * @param arg Custom business parameter
     * @return Transaction state
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * 本地事务状态回查
     *
     * @param msg Check message
     * @return Transaction state
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

这里我们实现自定义的事务监听器OrderTransactionListenerImpl:

  • executeLocalTransaction方法中创建订单,如果创建成功返回COMMIT_MESSAGE,如果出现异常返回ROLLBACK_MESSAGE
  • checkLocalTransaction方法中回查事务状态,根据消息体中的订单ID查询订单是否已经创建,如果创建成功提交事务,如果未获取到认为失败,此时回滚事务。
public class OrderTransactionListenerImpl implements TransactionListener {

    @Autowired
    private OrderService orderService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            String body = new String(msg.getBody(), Charset.forName("UTF-8"));
            OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
            // 模拟生成订单
            orderService.createOrder(orderDTO.getOrderId());
        } catch (Exception e) {
            // 出现异常,返回回滚状态
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        // 创建成功,返回提交状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String body = new String(msg.getBody(), Charset.forName("UTF-8"));
        OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);
        try {
            // 根据订单ID查询订单是否存在
            Order order = orderService.getOrderByOrderId(orderDTO.getOrderId());
            if (null != order) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

接下来看如何发送事务消息,事务消息对应的生产者为TransactionMQProducer,创建TransactionMQProducer之后,设置上一步自定义的事务监听器OrderTransactionListenerImpl,然后将订单ID放入消息体中, 调用sendMessageInTransaction发送事务消息:

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建下单事务监听器
        TransactionListener transactionListener = new OrderTransactionListenerImpl();
        // 创建生产者
        TransactionMQProducer producer = new TransactionMQProducer("order_group");
        // 事务状态回查线程池
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });
        // 设置线程池
        producer.setExecutorService(executorService);
        // 设置事务监听器
        producer.setTransactionListener(transactionListener);
        // 启动生产者
        producer.start();
        try {
            // 创建订单消息
            OrderDTO orderDTO = new OrderDTO();
            // 模拟生成订单唯一标识
            orderDTO.setOrderId(UUID.randomUUID().toString());
            // 转为字节数组
            byte[] msgBody = JSON.toJSONString(orderDTO).getBytes(RemotingHelper.DEFAULT_CHARSET);
            // 构建消息
            Message msg = new Message("ORDER_TOPIC", msgBody);
            // 调用sendMessageInTransaction发送事务消息
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf(sendResult.toString());
            Thread.sleep(10);
        } catch (MQClientException | UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

事务的执行流程:

  1. 在订单服务下单后,向Borker发送生成订单的事务消息,投递到ORDER_TOPIC主题中
  2. Broker收到事务消息之后,不会直接投递到ORDER_TOPIC主题中,而是先放在另外一个主题中,也叫half主题,half主题对消费者不可见
  3. half主题加入消息成功之后,会回调事务监听器的的executeLocalTransaction方法,执行本地事务,也就是订单创建,如果创建成功返回COMMIT状态,如果出现异常返回ROLLBACK状态
  4. 根据上一步的返回状态,进行结束事务的处理
    • 提交:从half主题中删除消息,然后将消息投送到ORDER_TOPIC主题中,积分服务订阅ORDER_TOPIC主题进行消费,生成积分记录
    • 回滚:从half主题中删除消息即可
  5. 如果本地事务返回的执行结果状态由于网络原因或者其他原因未能成功的发送给Broker,Broker未收到事务的执行结果,在补偿机制定时检查half主题中消息的事务执行状态时,会回调事务监听器checkLocalTransaction的接口,进行状态回查,判断订单是否创建成功,然后进行结束事务的处理

使用事务消息不会存在订单创建失败但是消息发送成功的情况,不过你可能还有一个疑问,假如订单创建成功了,消息已经投送到队列中,但是积分服务在消费的时候失败了,这样数据还是处于不一致的状态,个人感觉,积分服务可以在失败的时候进行重试或者进行一些其他的补偿机制来保证积分记录成功的生成,在极端情况下积分记录依旧没有生成,此时可能就要人工接入处理了。

RocketMQ事务实现原理

RocketMQ在4.3.0版中开始支持事务消息,它使用两阶段提交协议实现事务消息,同时增加补偿机制定时对事务的状态进行回查,来处理未提交/回滚的事务。

两阶段提交

发送事务消息分为两个阶段:

第一阶段:生产者向Broker发送half(prepare)消息,生产者发送事务消息的时候,消息不会直接存入对应的主题中,而是先将消息存入RMQ_SYS_TRANS_HALF_TOPIC主题中,此时消息对消费者不可见,不能被消费者消费,称为half消息,half消息发送成功之后,开始执行本地事务

第二阶段:提交阶段,根据第一阶段的本地事务执行结果来决定是提交事务还是回滚事务,提交或者回滚的事务会从RMQ_SYS_TRANS_HALF_TOPIC中删除,对于提交的事务消息,会将消息投送到实际的主题队列中,之后消费者可以从队列中拉取到消息进行消费,对于回滚的事务消息,直接从RMQ_SYS_TRANS_HALF_TOPIC主题中删除即可。

注意:由于RocketMQ追加写的性能并不会直接从RMQ_SYS_TRANS_HALF_TOPIC队列中删除消息,而是使用了另外一个队列,将已提交或者回滚的事务放入到OP队列中,在补偿机制对half消息进行检查的时候会从OP中判断是消息是否已经提交或者回滚。

补偿机制

两阶段提交事务的过程中,任一阶段出现异常都有可能导致事务未能成功的进行提交/回滚,所以需要增加一种补偿机制,定时对RMQ_SYS_TRANS_HALF_TOPIC主题中的half消息进行处理。

RocketMQ使用了一种回查机制,在处理half消息时,对该消息的本地事务执行状态进行回查,根据回查结果决定是否需要提交/回滚,或者是等待下一次回查。

接下来就从源码的角度研究一下事务的实现原理。

上面可知,发送事务消息调用的是TransactionMQProducersendMessageInTransaction方法:

public class TransactionMQProducer extends DefaultMQProducer {
    @Override
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final Object arg) throws MQClientException {
        if (null == this.transactionListener) {
            throw new MQClientException("TransactionListener is null", null);
        }
        // 设置主题
        msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));
        // 发送事务消息
        return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
    }
}

sendMessageInTransactionDefaultMQProducerImpl中实现,主要有以下几个步骤:

  1. 获取事务监听器TransactionListener,如果获取为空或者本地事务执行器LocalTransactionExecuter为空将抛出异常,因为需要通过TransactionListener或者LocalTransactionExecuter来执行本地事务,所以不能为空
  2. 在消息中设置prepared属性,此时与普通消息(非事务消息)相比多了PROPERTY_TRANSACTION_PREPARED属性
  3. 调用send方法发送prepared消息也就是half消息,发送消息的流程与普通消息一致
  4. 根据消息的发送结果判断:
    • 如果发送成功执行本地事务,并返回本地事务执行结果状态,如果返回的执行状态结果为空,将本地事务状态设置为UNKNOW
    • 发送成功之外的其他情况,包括FLUSH_DISK_TIMEOUT刷盘超时、FLUSH_SLAVE_TIMEOUTSLAVE_NOT_AVAILABLE从节点不可用三种情况,此时意味着half消息发送失败,本地事务状态置为ROLLBACK_MESSAGE回滚消息
  5. 调用endTransaction方法结束事务
public class DefaultMQProducerImpl implements MQProducerInner {
    // 发送事务消息
    public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        // 获取事务监听器
        TransactionListener transactionListener = getCheckListener();
        // 如果本地事务执行器或者监听为空
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }
        // ...
        SendResult sendResult = null;
        // 设置prepared属性
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        // 设置生产者组
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            // 发送消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
        // 本地事务状态
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) { // 判断消息发送状态
            case SEND_OK: { // 如果发送成功
                try {
                    // ...
                    if (null != localTransactionExecuter) { // 如果本地事务执行器不为空
                        // 执行本地事务
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) { // 如果事务监听器不为空
                        log.debug("Used new transaction API");
                        // 执行本地事务
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        // 如果本地事务状态为空,设置为UNKNOW
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }
                    // ...
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务状态设置为回滚
                break;
            default:
                break;
        }
        try {
            // 结束事务
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
        // ...
        return transactionSendResult;
    }
}

half消息处理

Broker对消息发送请求的处理在SendMessageProcessor中,当Broker收到消息后,判断消息是否含有PROPERTY_TRANSACTION_PREPARED属性,如果含有prepared属性,会获取TransactionalMessageService,然后调用asyncPrepareMessage对消息进行处理:

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
     private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
        final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        // ...
        CompletableFuture<PutMessageResult> putMessageResult = null;
        // 获取prepared属性标记
        String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
        // 如果事务标记不为空
        if (transFlag != null && Boolean.parseBoolean(transFlag)) {
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                                + "] sending transaction message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            // 事务消息持久化
            putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
        } else {
            // 普通消息持久化
            putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
        }
        return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
    }
}

TransactionalMessageServiceImplasyncPrepareMessage方法中,又调用了TransactionalMessageBridgeasyncPutHalfMessage方法,添加half消息

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
        // 添加half消息
        return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
    }
}

TransactionalMessageBridgeasyncPutHalfMessage方法中,调用了parseHalfMessageInner方法设置half消息的相关属性。

因为是half消息,此时还不能直接加入到实际的消息队列中,否则一旦加入就会被消费者消费,所以需要先对half消息暂存,等收到消息提交请求时才可以添加到实际的消息队列中,RocketMQ设置了一个RMQ_SYS_TRANS_HALF_TOPIC主题来暂存half消息。

parseHalfMessageInner方法中,会对消息进行如下处理:

  1. 设置消息实际的主题和队列ID,待收到事务提交请求后恢复实际的主题和队列ID,向实际的队列中添加消息
  2. 更改消息的主题为half消息主题RMQ_SYS_TRANS_HALF_TOPIC,先将消息投送到half消息队列中
  3. half主题对应的消息队列ID为0,所以更改消息的队列ID为0

之后调用asyncPutMessage添加消息,接下来的流程就和普通消息的添加一致了,具体可参考【RocketMQ】消息的存储

public class TransactionalMessageBridge {
    public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) {
        // 添加消息
        return store.asyncPutMessage(parseHalfMessageInner(messageInner));
    }
    
    private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        // 设置实际的主题
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        // 设置实际的队列ID
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        // 设置事务主题RMQ_SYS_TRANS_HALF_TOPIC
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        // 设置事务队列ID
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
    }
}

public class TransactionalMessageUtil {
    public static String buildHalfTopic() {
        // half消息主题
        return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
    }
}

结束事务

在进行了half消息发送和执行本地事务的操作后,消息暂存在Broker的half主题中,接下来生产者需要根据本地事务的执行结果,向Broker发送结束事务的请求,结束事务的方法endTransactionDefaultMQProducerImpl中实现:

  1. 构建结束事务的请求头EndTransactionRequestHeader
  2. 判断本地事务执行状态:
    • COMMIT_MESSAGE:表示提交事务,结束事务的请求头中设置TRANSACTION_COMMIT_TYPE标识进行事务提交
    • ROLLBACK_MESSAGE:表示回滚事务,请求头中设置TRANSACTION_ROLLBACK_TYPE标识进行事务回滚
    • UNKNOW:事务执行结果未知状态,请求头中设置TRANSACTION_NOT_TYPE标识未知状态的事务
  3. 调用endTransactionOneway向Broker发送结束事务的请求
public class DefaultMQProducerImpl implements MQProducerInner {
    public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 消息
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        // 获取事务ID
        String transactionId = sendResult.getTransactionId();
        // 获取Broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        // 结束事务请求头
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        // 设置事务ID
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        // 判断本地事务状态
        switch (localTransactionState) {
            case COMMIT_MESSAGE: // 如果提交
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE: // 如果是回滚
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW: // 未知
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }
        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        // 发送结束事务的请求
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }
}

Broker事务结束请求处理

Broker对事务结束的请求处理在EndTransactionProcessor中:

  1. 判断是否是从节点,从节点没有结束事务的权限,如果是从节点返回SLAVE_NOT_AVAILABLE
  2. 从请求头中获取事务的提交类型:
    • TRANSACTION_COMMIT_TYPE:表示提交事务,会调用commitMessage方法提交消息,如果提交成功调用endMessageTransaction结束事务,恢复消息的原始主题和队列并调用deletePrepareMessage方法删掉half消息
    • TRANSACTION_ROLLBACK_TYPE:表示回滚事务,会调用rollbackMessage方法回滚事务,然后删掉half消息
public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        // 创建响应
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        // 如果是从节点,从节点没有结束事务的权限,返回SLAVE_NOT_AVAILABLE
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }
        // ...
        OperationResult result = new OperationResult();
        // 判断事务提交类型,如果是提交事务
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            // 提交消息
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                // 校验Prepare消息
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    // 结束事务,恢复消息的原始主题和队列
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        // 删除half消息
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 如果是回滚
            // 回滚消息
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    
// 删除half消息
                  this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
    }
}

删除half消息

由于CommitLog追加写的性质,RocketMQ并不会直接将half消息从CommitLog中删除,而是使用了另外一个OP主题RMQ_SYS_TRANS_OP_HALF_TOPIC(以下简称OP主题/队列),将已经提交/回滚的消息记录在OP主题队列中

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public boolean deletePrepareMessage(MessageExt msgExt) {
        // 添加到OP消息队列
        if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
            log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
            return true;
        } else {
            log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
            return false;
        }
    }
}

putOpMessage方法在TransactionalMessageBridge中实现,它又调用了addRemoveTagInTransactionOp方法向OP队列中添加消息:

  1. 构建OP消息,主要是创建Message对象,然后设置主题为RMQ_SYS_TRANS_OP_HALF_TOPIC,设置half消息在队列的偏移量
  2. 调用 writeOp方法将消息写入OP队列,makeOpMessageInner方法用于构建消息体,然后调用putMessage放将消息写入CommitLog
public class TransactionalMessageBridge {
    private final ConcurrentHashMap<MessageQueue, MessageQueue> opQueueMap = new ConcurrentHashMap<>();
    public boolean putOpMessage(MessageExt messageExt, String opType) {
        // 构建消息队列,设置消息所属主题、Broker名称、队列ID信息
        MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
            this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
        if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
            // 添加OP消息
            return addRemoveTagInTransactionOp(messageExt, messageQueue);
        }
        return true;
    }
    
    /**
     * 当事务消息进行提交或者回滚时,记录在operation队列中(OP队列)
     */
    private boolean addRemoveTagInTransactionOp(MessageExt prepareMessage, MessageQueue messageQueue) {
        // 构建OP消息,主题为RMQ_SYS_TRANS_OP_HALF_TOPIC
        Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
            String.valueOf(prepareMessage.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
        // 将消息写入OP队列
        writeOp(message, messageQueue);
        return true;
    }

    private void writeOp(Message message, MessageQueue mq) {
        MessageQueue opQueue;
        // 如果已经添加过
        if (opQueueMap.containsKey(mq)) {
            opQueue = opQueueMap.get(mq);
        } else {
            opQueue = getOpQueueByHalf(mq);
            MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
            if (oldQueue != null) {
                opQueue = oldQueue;
            }
        }
        // 如果为空
        if (opQueue == null) {
            // 创建
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
        }
        // 构建OP消息添加到OP队列中
        putMessage(makeOpMessageInner(message, opQueue));
    }
}

事务状态检查

由于各种原因有可能未成功收到提交/回滚事务的请求,所以RocketMQ需要定期检查half消息,检查事务的执行结果,TransactionalMessageCheckService用于half消息状态的检查,它实现了ServiceThread,默认可以看到在onWaitEnd方法中调用了check方法进行状态检查:

public class TransactionalMessageCheckService extends ServiceThread {
    
    @Override
    protected void onWaitEnd() {
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        // 状态检查
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
    }

}

check方法在TransactionalMessageServiceImpl中实现:

public class TransactionalMessageServiceImpl implements TransactionalMessageService {
    @Override
    public void check(long transactionTimeout, int transactionCheckMax,
        AbstractTransactionalMessageCheckListener listener) {
        try {
            String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
            // 根据主题获取消息队列
            Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
            if (msgQueues == null || msgQueues.size() == 0) {
                log.warn("The queue of topic is empty :" + topic);
                return;
            }
            log.debug("Check topic={}, queues={}", topic, msgQueues);
            // 遍历所有的消息队列
            for (MessageQueue messageQueue : msgQueues) {
                // 获取当前时间做为开始时间
                long startTime = System.currentTimeMillis();
                // 获取对应的OP消息队列
                MessageQueue opQueue = getOpQueue(messageQueue);
                // 获取half消息队列的消费进度
                long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
                // 获取op消息队列的消费进度
                long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
                log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
                // 如果消费进度小于0表示不合法
                if (halfOffset < 0 || opOffset < 0) {
                    log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                        halfOffset, opOffset);
                    continue;
                }
                // 存储已处理的消息
                List<Long> doneOpOffset = new ArrayList<>();
                HashMap<Long, Long> removeMap = new HashMap<>();
                // 根据当前的消费进度从已处理队列中拉取消息
                PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
                // 如果拉取消息为空,打印错误继续处理下一个消息队列
                if (null == pullResult) {
                    log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                        messageQueue, halfOffset, opOffset);
                    continue;
                }
                // 获取消息为空的数量默认为1
                int getMessageNullCount = 1;
                // 新的进度
                long newOffset = halfOffset;
                // 获取half队列的消费进度,赋值给i
                long i = halfOffset;
                while (true) {
                    // 如果当前时间减去开始时间大于最大处理时间限制,终止循环
                    if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
                        log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                        break;
                    }
                    // 如果OP队列中包含当前偏移量,表示消息已经被处理,加入到已处理集合中
                    if (removeMap.containsKey(i)) {
                        log.debug("Half offset {} has been committed/rolled back", i);
                        Long removedOpOffset = removeMap.remove(i);
                        // 加入到doneOpOffset集合中
                        doneOpOffset.add(removedOpOffset);
                    } else { // 如果已处理队列中不包含当前消息
                        // 根据偏移量从half队列获取half消息
                        GetResult getResult = getHalfMsg(messageQueue, i);
                        // 获取消息对象
                        MessageExt msgExt = getResult.getMsg();
                        // 如果获取消息为空
                        if (msgExt == null) {
                            // 判断获取空消息的次数是否大于MAX_RETRY_COUNT_WHEN_HALF_NULL
                            if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
                                break;
                            }
                            // 判断从half队列获取消息的结果是NO_NEW_MSG,表示没有消息,此时终止循环等待下一次进行检查
                            if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
                                log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                    messageQueue, getMessageNullCount, getResult.getPullResult());
                                break;
                            } else {
                               
                                log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                    i, messageQueue, getMessageNullCount, getResult.getPullResult());
                                 // 走到这里说明消息的偏移量不合法,继续获取下一条消息进行处理
                                i = getResult.getPullResult().getNextBeginOffset();
                                newOffset = i;
                                continue;
                            }
                        }
                        // 是否需要丢弃消息或者需要跳过消息
                        if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                            listener.resolveDiscardMsg(msgExt);
                            // 继续处理下一条消息
                            newOffset = i + 1;
                            i++;
                            continue;
                        }
                        // 如果消息的添加时间是否大于等于本次检查的开始时间,说明是在检查开始之后加入的消息,暂不进行处理
                        if (msgExt.getStoreTimestamp() >= startTime) {
                            log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
                                new Date(msgExt.getStoreTimestamp()));
                            break;
                        }
                        // 计算half消息在队列中的保留时间:当前时间减去消息加入的时间
                        long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                        // 事务超时时间
                        long checkImmunityTime = transactionTimeout;
                        // 获取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性,表示事务回查最晚的时间
                        String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                        // 如果PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性不为空
                        if (null != checkImmunityTimeStr) {
                            // 获取事务回查最晚检查时间,如果checkImmunityTimeStr为-1则返回事务超时时间,否则返回checkImmunityTimeStr转为long后乘以1000得到的值
                            checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                            // 如果消息的保留时间小于事务回查最晚检查时间
                            if (valueOfCurrentMinusBorn < checkImmunityTime) {
                                // 检查half消息在队列中的偏移量,如果返回true跳过本条消息
                                if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
                                    // 处理下一个消息
                                    newOffset = i + 1;
                                    i++;
                                    continue;
                                }
                            }
                        } else {
                            // 如果valueOfCurrentMinusBorn小于checkImmunityTime
                            if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
                                log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                    checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                                break;
                            }
                        }
                        // 获取OP消息
                        List<MessageExt> opMsg = pullResult.getMsgFoundList();
                        // 判断是否需要检查,满足检查的条件为以下三种情况之一:
                        // 1.拉取消息为空并且消息的保留时间已经大于事务设置的最晚回查时间
                        // 2.拉取消息不为空并且拉取到的最后一条消息的存入时间减去当前时间超过了事务的超时时间
                        // 3.half消息存留时间为负数
                        boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                            || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                            || (valueOfCurrentMinusBorn <= -1);
                        // 如果需要进行回查
                        if (isNeedCheck) {
                            // 将half消息重新加入到队列中
                            if (!putBackHalfMsgQueue(msgExt, i)) {
                                continue;
                            }
                            // 发送回查请求
                            listener.resolveHalfMsg(msgExt);
                        } else {
                            // 继续从OP队列中拉取消息
                            pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                messageQueue, pullResult);
                            continue;
                        }
                    }
                    // 加1继续处理下一条消息
                    newOffset = i + 1;
                    i++;
                }
                if (newOffset != halfOffset) {
                    // 更新消费进度
                    transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
                }
                long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
                if (newOpOffset != opOffset) {
                    // 更新处理进度
                    transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
                }
            }
        } catch (Throwable e) {
            log.error("Check error", e);
        }
    }
}

check方法中会获取half主题(RMQ_SYS_TRANS_HALF_TOPIC)下的所有消息队列,遍历所有的half消息队列,对队列中的half消息进行处理,主要步骤如下。

一、 构建OP队列的消息队列对象MessageQueue

调用getOpQueue获取当前half消息队列对应的OP队列的MessageQueue对象,实际上是创建了一个MessageQueue对象,设置为OP队列的主题、以及Broker名称和队列的ID,在后面获取消费进度时使用:

   private MessageQueue getOpQueue(MessageQueue messageQueue) {
        // 获取OP消息队列
        MessageQueue opQueue = opQueueMap.get(messageQueue);
        if (opQueue == null) {
            // 如果获取为空,则创建MessageQueue,主题设置为OP TOPIC,设置Broker名称和队列ID
            opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), messageQueue.getBrokerName(),
                messageQueue.getQueueId());
            // 加入到opQueueMap中
            opQueueMap.put(messageQueue, opQueue);
        }
        return opQueue;
    }

二、获取half队列的消费进度和OP消费队列的消费进度

消费进度的获取是通过调用transactionalMessageBridgefetchConsumeOffset方法进行查询的,可以看到方法的入参是MessageQueue类型的,所以第一步需要构造OP队列的MessageQueue对象,在这一步查询消费进度使用:

   public long fetchConsumeOffset(MessageQueue mq) {
        long offset = brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(),
            mq.getTopic(), mq.getQueueId());
        if (offset == -1) {
            offset = store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId());
        }
        return offset;
    }

三、从OP队列中拉取消息

调用fillOpRemoveMap方法根据消费进度信息从OP队列中拉取消息,将拉取的消费放入removeMap中,用于判断half消息是否已经处理:

   private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
        MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
        // 从OP队列中拉取消息,每次拉取32条
        PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
        // 如果拉取为空返回null
        if (null == pullResult) {
            return null;
        }
        // 如果拉取状态为消费进度不合法或者没有匹配的消息
        if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
            || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
            log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            // 从拉取结果中获取消费进度并更新
            transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
            return pullResult;
        } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { // 如果没有消息
            log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
                pullResult);
            return pullResult;
        }
        // 获取拉取到的消息
        List<MessageExt> opMsg = pullResult.getMsgFoundList();
        if (opMsg == null) { // 如果为空打印日志
            log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
            return pullResult;
        }
        // 遍历拉取的消息
        for (MessageExt opMessageExt : opMsg) {
            // 获取队列中的偏移量
            Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
            log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
                opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
            if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
                // 如果偏移量小于最小的偏移量
                if (queueOffset < miniOffset) {
                    // 加入到doneOpOffset中
                    doneOpOffset.add(opMessageExt.getQueueOffset());
                } else {
                    // 加入到已处理消息的集合removeMap中
                    removeMap.put(queueOffset, opMessageExt.getQueueOffset());
                }
            } else {
                log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
            }
        }
        log.debug("Remove map: {}", removeMap);
        log.debug("Done op list: {}", doneOpOffset);
        return pullResult;
    }

四、处理每一个half消息

开启while循环,从half队列的消费进度处开始,处理每一个half消息:

  1. 如果当前时间减去检查开始时间大于最大处理时间,此时终止循环

  2. 如果removeMap中包含当前half消息,表示消息已经被处理,放入到已处理消息集合中doneOpOffset

  3. 如果removeMap不包含当前half消息, 调用getHalfMsg方法根据偏移量从half队列获取half消息,如果消息获取不为空继续下一步,否则进行如下处理

    • 判断获取空消息的个数是否大于MAX_RETRY_COUNT_WHEN_HALF_NULL,如果大于将终止本次循环,处理下一个half消息队列
    • 判断拉取消息的状态是否为NO_NEW_MSG,如果是表示队列中没有消息,先终止循环
    • 如果拉取消息的状态是不是NO_NEW_MSG,表示消费进度不合法,获取half消息队列中下一条消息进行处理
  4. 调用needDiscard判断是否需要丢弃half消息,或者调用needSkip判断是否需要跳过当前half消息:

    • needDiscard是根据half消息的检查次数是否超过最大限制来决定是否丢弃half消息

         private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) {
              // 从属性中获取检查次数
              String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
              int checkTime = 1;
              // 如果不为空
              if (null != checkTimes) {
                  checkTime = getInt(checkTimes);
                  // 如果检查次数大于事务最大的检查次数,表示需要丢弃
                  if (checkTime >= transactionCheckMax) {
                      return true;
                  } else {
                      // 检查次数加一
                      checkTime++;
                  }
              }
              // 更新检查次数
              msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime));
              return false;
          }
      
    • needSkip是根据half消息在队列中的存留时间是否超过了最大的保留时间限制来决定是否跳过

          private boolean needSkip(MessageExt msgExt) {
              // 计算half消息在队列中的保留时间
              long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
              // 如果大于Broker中设置的最大保留时间,表示需要跳过
              if (valueOfCurrentMinusBorn
                  > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime()
                  * 3600L * 1000) {
                  log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}",
                      msgExt.getMsgId(), msgExt.getBornTimestamp());
                  return true;
              }
              return false;
          }
      
  5. 判断消息的的存入时间是否大于本次开始检查的时间,如果大于说明是新加入的消息,由于事务消息发送后不会立刻提交,所以此时暂不需要进行检查,中断循环即可

  6. 计算half消息在队列中的存留时间valueOfCurrentMinusBorn:当前时间 – 消息存入的时间

  7. 设置立刻回查事务状态的时间checkImmunityTime:事务的超时时间

  8. 从消息属性中获取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性的值放在checkImmunityTimeStr变量中,表示事务的最晚回查时间:

    (1)如果checkImmunityTimeStr获取不为空,调用getImmunityTime方法计算事务立刻回查时间,并赋值给checkImmunityTime,从代码中可以看出如果checkImmunityTimeStr为-1则返回事务的超时时间,否则返回checkImmunityTimeStr的值并乘以1000转为秒:

      private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) {
            long checkImmunityTime;
            // 转为long
            checkImmunityTime = getLong(checkImmunityTimeStr);
            if (-1 == checkImmunityTime) { // 如果为-1,使用事务的超时时间
                checkImmunityTime = transactionTimeout;
            } else {
                checkImmunityTime *= 1000; // 使用checkImmunityTime,乘以1000转为秒
            }
            return checkImmunityTime;
        }
    

    计算完checkImmunityTime的值后,判断valueOfCurrentMinusBorn是否小于checkImmunityTime,如果是表明还未到事务的超时时间,此时调用checkPrepareQueueOffset检查half消息在队列中的偏移量,根据检查结果判断是否需要跳过当前消息:

    • 如果PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET属性获取为空,调用putImmunityMsgBackToHalfQueue将消息重新加入half队列,如果返回true表示加入成功,此时向前推荐消费进度,处理下一条消息,如果加入失败会继续循环处理本条消息(因为进度未向前推进)
    • 如果PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET属性获取不为空,转为long型,判断OP队列中是否已经包含当前消息的偏移量,如果包含加入到doneOpOffset中并返回true,此时向前推进消费进度,处理下一条消息,否则同样调用putImmunityMsgBackToHalfQueue将消息重新加入half队列,并根据加入成功与否判断是否继续处理下一条消息

    总结

    如果事务设置了PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性,并且half消息的存留时间小于立刻检查事务的时间,说明还未到时间不需要进行状态检查,此时获取消息在half队列的偏移量,如果获取为空,将消息重新加入到half队列中,如果获取不为空判断是否已经在OP处理队列中,如果返回true处理下一个消息即可,否则同样将消息重新加入half队列中。

    RocketMQ在事务未到最晚回查时间时将消息重新加入了half消息队列,因为加入之后half队列的消费进度会往前推进并在回查结束时更新进度,所以下次检查时并不会检查到旧的half消息。

     private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset,
            MessageExt msgExt) {
            // 从属性中获取消息在half队列的偏移量
            String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
            // 如果为空,调用putImmunityMsgBackToHalfQueue将消息重新加入half队列
            if (null == prepareQueueOffsetStr) {
                return putImmunityMsgBackToHalfQueue(msgExt);
            } else {
                // 转为long
                long prepareQueueOffset = getLong(prepareQueueOffsetStr);
                // 如果为-1,返回false,等待下次循环进行处理
                if (-1 == prepareQueueOffset) {
                    return false;
                } else {
                    // 如果OP队列中已经包含当前消息的偏移量
                    if (removeMap.containsKey(prepareQueueOffset)) {
                        long tmpOpOffset = removeMap.remove(prepareQueueOffset);
                        // 加入到已完成的消息集合中
                        doneOpOffset.add(tmpOpOffset);
                        return true;
                    } else {
                        // 将消息重新加入half队列
                        return putImmunityMsgBackToHalfQueue(msgExt);
                    }
                }
            }
        }
    

    (2)如果checkImmunityTimeStr获取为空,判断valueOfCurrentMinusBorn(消息存留时间)是否大于等于0并且小于checkImmunityTime(事务超时时间),如果满足条件表示新加入的消息并且还未过事务的超时时间,此时终止循环暂不进行回查,否则进入下一步

  9. 判断是否需要进行状态回查isNeedCheck,满足检查的条件为以下三种情况之一:

    (1)从OP队列中拉取消息为空并且当前half消息的存留时间已经大于事务设置的最晚回查时间

    opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
    

    (2)从OP队列中拉取的消息不为空,并且拉取的最后一条消息的存入时间减去本次开始检查时间大于事务的超时时间

    opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)
    

    (3)half消息在队列中的保留时间小于等于1,说明加入half消息的时间大于本次开始检查的时间

    valueOfCurrentMinusBorn <= -1
    
  10. 根据isNeedCheck判断是否需要回查

    (1)需要回查:调用putBackHalfMsgQueue将half消息重新加入到队列中,如果加入失败继续循环再次处理,如果加入成功调用resolveHalfMsg发送回查请求

    (2)不需要回查:调用fillOpRemoveMap继续从OP队列中拉取消息判断

  11. 更新i的值,继续处理下一个half消息

五、更新消费进度

主要是更half队列和OP队列的消费进度。

重新添加half消息

putBackHalfMsgQueue方法中可以看出将消息重新加入到了half队列:

  private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {
        // 重新将消息入到half消息队列中
        PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);
        // 如果加入成功
        if (putMessageResult != null
            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
            // 设置消息的逻辑偏移量
            msgExt.setQueueOffset(
                putMessageResult.getAppendMessageResult().getLogicsOffset());
            // 设置消息在CommitLog的偏移量
            msgExt.setCommitLogOffset(
                putMessageResult.getAppendMessageResult().getWroteOffset());
            // 设消息ID
            msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            log.debug(
                "Send check message, the offset={} restored in queueOffset={} "
                    + "commitLogOffset={} "
                    + "newMsgId={} realMsgId={} topic={}",
                offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),
                msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
                msgExt.getTopic());
            return true;
        } else {
            // 加入失败
            log.error(
                "PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, "
                    + "msgId: {}",
                msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());
            return false;
        }
    }

状态回查请求发送

resolveHalfMsg方法中向客户端发送事务状态回查的请求,可以看到是通过线程池异步实现的:

public abstract class AbstractTransactionalMessageCheckListener {
   public void resolveHalfMsg(final MessageExt msgExt) {
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // 发送状态回查请求
                    sendCheckMessage(msgExt);
                } catch (Exception e) {
                    LOGGER.error("Send check message error!", e);
                }
            }
        });
    }
}

sendCheckMessage方法在AbstractTransactionalMessageCheckListener中实现,主要是构建请求信息,然后向消息的生产者发送事务状态回查的请求:

public abstract class AbstractTransactionalMessageCheckListener {
    public void sendCheckMessage(MessageExt msgExt) throws Exception {
        // 构建回查请求头
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); // 设置Commitlog偏移量
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        // 设置消息实际的TOPIC
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        // 设置消息实际的队列ID
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        // 获取channel
        Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);
        if (channel != null) {
            // 发送回查请求
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
    }
}

事务状态回查请求处理

事务状态回查请求的处理在ClientRemotingProcessor中,如果请求类型是CHECK_TRANSACTION_STATE表示是事务状态回查请求,调用checkTransactionState方法进行事务状态检查:

  1. 从请求中获取消息,判断消息是否为空,不为空进入下一步
  2. 从消息属性中获取生产者组名称,如果不为空进入下一步
  3. 根据生产者组名称获取MQProducerInner对象,然后调用checkTransactionState进行状态检查
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.CHECK_TRANSACTION_STATE:
                // 检查事务状态
                return this.checkTransactionState(ctx, request);
            // ...
            default:
                break;
        }
        return null;
    }
    
    public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        // 获取消息
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        // 如果消息不为空
        if (messageExt != null) {
            // ...
            // 获取事务ID
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                messageExt.setTransactionId(transactionId);
            }
            // 获取生产者组
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) {
                // 获取MQProducerInner
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    // 调用checkTransactionState进行状态检查
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }

        return null;
    }
}

checkTransactionState方法在DefaultMQProducerImpl中实现,可以看到它创建了Runnable对象,然后提交到线程池中异步执行事务的状态检查,检查的主要逻辑如下:

  1. 获取TransactionCheckListener(已废弃)类型的事务监听器
  2. 获取TransactionListener类型的事务监听器
  3. 如果TransactionCheckListenerTransactionListener其中之一不为空,调用checkLocalTransaction进行状态检查
  4. 调用processTransactionState处理事务查询结果,这一步主要是根据事务的查询结果构建请求信息,然后调用endTransactionOneway方法向Broker发送结束事务的请求
public class DefaultMQProducerImpl implements MQProducerInner {
    @Override
    public void checkTransactionState(final String addr, final MessageExt msg,
        final CheckTransactionStateRequestHeader header) {
        Runnable request = new Runnable() {
            // ...
            
            @Override
            public void run() {
                // 获取TransactionCheckListener监听器(已不推荐使用)
                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
                // 获取事务监听器
                TransactionListener transactionListener = getCheckListener();
                // 如果其中之一不为空
                if (transactionCheckListener != null || transactionListener != null) {
                    // 初始化为UNKNOW状态
                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                    Throwable exception = null;
                    try {
                        if (transactionCheckListener != null) {
                            localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                        } else if (transactionListener != null) {
                            log.debug("Used new check API in transaction message");
                            // 调用checkLocalTransaction回查状态
                            localTransactionState = transactionListener.checkLocalTransaction(message);
                        } else {
                            log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                        }
                    } catch (Throwable e) {
                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                        exception = e;
                    }
                    // 处理事务状态
                    this.processTransactionState(
                        localTransactionState,
                        group,
                        exception);
                } else {
                    log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
                }
            }
            
            // 处理事务状态
            private void processTransactionState(
                final LocalTransactionState localTransactionState,
                final String producerGroup,
                final Throwable exception) {
                // 构建结束事务的请求头
                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                // 设置tCommitLog的偏移量
                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                thisHeader.setProducerGroup(producerGroup);// 设置生产者组
                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                thisHeader.setFromTransactionCheck(true); // 设置状态检查为true
                // ...
                thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); // 设置事务ID
                switch (localTransactionState) {
                    case COMMIT_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); // 设置为提交
                        break;
                    case ROLLBACK_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); // 设置为回滚
                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                        break;
                    case UNKNOW:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); // 设置为未知
                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                        break;
                    default:
                        break;
                }

                // ...
                // 执行结束事务钩子函数
                doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);

                try {
                    // 向Broker发送消息的回查结果
                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                        3000);
                } catch (Exception e) {
                    log.error("endTransactionOneway exception", e);
                }
            }
        };
        // 提交到线程池中执行任务
        this.checkExecutor.submit(request);
    }
}

总结


参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ官方示例

RocketMQ官方架构设计

RocketMQ版本:4.9.3

Tags: