RocketMQ中PullConsumer的消息拉取源码分析

  • 2019 年 10 月 3 日
  • 筆記

在PullConsumer中,有关消息的拉取RocketMQ提供了很多API,但总的来说分为两种,同步消息拉取和异步消息拉取

同步消息拉取
以同步方式拉取消息都是通过DefaultMQPullConsumerImpl的pullSyncImpl方法:

 1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,   2     long timeout)   3     throws MQClientException, RemotingException, MQBrokerException, InterruptedException {   4     this.makeSureStateOK();   5   6     if (null == mq) {   7         throw new MQClientException("mq is null", null);   8     }   9  10     if (offset < 0) {  11         throw new MQClientException("offset < 0", null);  12     }  13  14     if (maxNums <= 0) {  15         throw new MQClientException("maxNums <= 0", null);  16     }  17  18     this.subscriptionAutomatically(mq.getTopic());  19  20     int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);  21  22     long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;  23  24     boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());  25     PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(  26         mq,  27         subscriptionData.getSubString(),  28         subscriptionData.getExpressionType(),  29         isTagType ? 0L : subscriptionData.getSubVersion(),  30         offset,  31         maxNums,  32         sysFlag,  33         0,  34         this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),  35         timeoutMillis,  36         CommunicationMode.SYNC,  37         null  38     );  39     this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);  40     if (!this.consumeMessageHookList.isEmpty()) {  41         ConsumeMessageContext consumeMessageContext = null;  42         consumeMessageContext = new ConsumeMessageContext();  43         consumeMessageContext.setConsumerGroup(this.groupName());  44         consumeMessageContext.setMq(mq);  45         consumeMessageContext.setMsgList(pullResult.getMsgFoundList());  46         consumeMessageContext.setSuccess(false);  47         this.executeHookBefore(consumeMessageContext);  48         consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());  49         consumeMessageContext.setSuccess(true);  50         this.executeHookAfter(consumeMessageContext);  51     }  52     return pullResult;  53 }

首先通过subscriptionAutomatically方法检查Topic是否订阅

 

 1 public void subscriptionAutomatically(final String topic) {   2     if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {   3         try {   4             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),   5                 topic, SubscriptionData.SUB_ALL);   6             this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);   7         } catch (Exception ignore) {   8         }   9     }  10 }

若是没有就新建一条订阅数据保存在rebalanceImpl的subscriptionInner中

之后调用pullKernelImpl方法:

 1 public PullResult pullKernelImpl(   2     final MessageQueue mq,   3     final String subExpression,   4     final String expressionType,   5     final long subVersion,   6     final long offset,   7     final int maxNums,   8     final int sysFlag,   9     final long commitOffset,  10     final long brokerSuspendMaxTimeMillis,  11     final long timeoutMillis,  12     final CommunicationMode communicationMode,  13     final PullCallback pullCallback  14 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  15     FindBrokerResult findBrokerResult =  16         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),  17             this.recalculatePullFromWhichNode(mq), false);  18     if (null == findBrokerResult) {  19         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());  20         findBrokerResult =  21             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),  22                 this.recalculatePullFromWhichNode(mq), false);  23     }  24  25     if (findBrokerResult != null) {  26         {  27             // check version  28             if (!ExpressionType.isTagType(expressionType)  29                 && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {  30                 throw new MQClientException("The broker[" + mq.getBrokerName() + ", "  31                     + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);  32             }  33         }  34         int sysFlagInner = sysFlag;  35  36         if (findBrokerResult.isSlave()) {  37             sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);  38         }  39  40         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();  41         requestHeader.setConsumerGroup(this.consumerGroup);  42         requestHeader.setTopic(mq.getTopic());  43         requestHeader.setQueueId(mq.getQueueId());  44         requestHeader.setQueueOffset(offset);  45         requestHeader.setMaxMsgNums(maxNums);  46         requestHeader.setSysFlag(sysFlagInner);  47         requestHeader.setCommitOffset(commitOffset);  48         requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);  49         requestHeader.setSubscription(subExpression);  50         requestHeader.setSubVersion(subVersion);  51         requestHeader.setExpressionType(expressionType);  52  53         String brokerAddr = findBrokerResult.getBrokerAddr();  54         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {  55             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);  56         }  57  58         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(  59             brokerAddr,  60             requestHeader,  61             timeoutMillis,  62             communicationMode,  63             pullCallback);  64  65         return pullResult;  66     }  67  68     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);  69 }

首先通过findBrokerAddressInSubscribe方法查找关于消息队列的Broker信息

这里的recalculatePullFromWhichNode方法:

 1 public long recalculatePullFromWhichNode(final MessageQueue mq) {   2     if (this.isConnectBrokerByUser()) {   3         return this.defaultBrokerId;   4     }   5   6     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);   7     if (suggest != null) {   8         return suggest.get();   9     }  10  11     return MixAll.MASTER_ID;  12 }

根据消息队列,在pullFromWhichNodeTable查找其对应的Broker的ID
pullFromWhichNodeTable记录了消息对了和BrokerID的映射

1 private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =  2         new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

(master的BrokerID为0,slave的BrokerID大于0)

 

findBrokerAddressInSubscribe方法:

 1 public FindBrokerResult findBrokerAddressInSubscribe(   2     final String brokerName,   3     final long brokerId,   4     final boolean onlyThisBroker   5 ) {   6     String brokerAddr = null;   7     boolean slave = false;   8     boolean found = false;   9  10     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);  11     if (map != null && !map.isEmpty()) {  12         brokerAddr = map.get(brokerId);  13         slave = brokerId != MixAll.MASTER_ID;  14         found = brokerAddr != null;  15  16         if (!found && !onlyThisBroker) {  17             Entry<Long, String> entry = map.entrySet().iterator().next();  18             brokerAddr = entry.getValue();  19             slave = entry.getKey() != MixAll.MASTER_ID;  20             found = true;  21         }  22     }  23  24     if (found) {  25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));  26     }  27  28     return null;  29 }

这里就根据brokerAddrTable表查找该BrokerID对应的Broker的地址信息,以及是否是slave
封装为FindBrokerResult返回

若是没有找到Broker的路由信息,则通过updateTopicRouteInfoFromNameServer方法向NameServer请求更新,更新完成后再调用findBrokerAddressInSubscribe方法查找

之后会根据相应的信息封装请求消息头PullMessageRequestHeader

然后调用pullMessage方法:

 1 public PullResult pullMessage(   2     final String addr,   3     final PullMessageRequestHeader requestHeader,   4     final long timeoutMillis,   5     final CommunicationMode communicationMode,   6     final PullCallback pullCallback   7 ) throws RemotingException, MQBrokerException, InterruptedException {   8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);   9  10     switch (communicationMode) {  11         case ONEWAY:  12             assert false;  13             return null;  14         case ASYNC:  15             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);  16             return null;  17         case SYNC:  18             return this.pullMessageSync(addr, request, timeoutMillis);  19         default:  20             assert false;  21             break;  22     }  23  24     return null;  25 }

这里就可以看出我前面说的两种类型,同步拉取和异步拉取

pullMessageSync方法:

1 private PullResult pullMessageSync(  2     final String addr,  3     final RemotingCommand request,  4     final long timeoutMillis  5 ) throws RemotingException, InterruptedException, MQBrokerException {  6     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);  7     assert response != null;  8     return this.processPullResponse(response);  9 }

这里其实就是通过invokeSync方法,由Netty进行同步发送,将请求发送给Broker
关于消息的发送详见:

【RocketMQ中Producer消息的发送源码分析】

 

在收到响应后由processPullResponse方法处理
processPullResponse方法:

 1 private PullResult processPullResponse(   2     final RemotingCommand response) throws MQBrokerException, RemotingCommandException {   3     PullStatus pullStatus = PullStatus.NO_NEW_MSG;   4     switch (response.getCode()) {   5         case ResponseCode.SUCCESS:   6             pullStatus = PullStatus.FOUND;   7             break;   8         case ResponseCode.PULL_NOT_FOUND:   9             pullStatus = PullStatus.NO_NEW_MSG;  10             break;  11         case ResponseCode.PULL_RETRY_IMMEDIATELY:  12             pullStatus = PullStatus.NO_MATCHED_MSG;  13             break;  14         case ResponseCode.PULL_OFFSET_MOVED:  15             pullStatus = PullStatus.OFFSET_ILLEGAL;  16             break;  17  18         default:  19             throw new MQBrokerException(response.getCode(), response.getRemark());  20     }  21  22     PullMessageResponseHeader responseHeader =  23         (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);  24  25     return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),  26         responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());  27 }

根据响应的状态,设置PullStatus状态

然后通过decodeCommandCustomHeader方法,将响应中的信息解码
最后由PullResultExt封装消息信息

 1 public class PullResultExt extends PullResult {   2     private final long suggestWhichBrokerId;   3     private byte[] messageBinary;   4   5     public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,   6         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {   7         super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);   8         this.suggestWhichBrokerId = suggestWhichBrokerId;   9         this.messageBinary = messageBinary;  10     }  11     ......  12 }  13  14 public class PullResult {  15     private final PullStatus pullStatus;  16     private final long nextBeginOffset;  17     private final long minOffset;  18     private final long maxOffset;  19     private List<MessageExt> msgFoundList;  20  21     public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,  22         List<MessageExt> msgFoundList) {  23         super();  24         this.pullStatus = pullStatus;  25         this.nextBeginOffset = nextBeginOffset;  26         this.minOffset = minOffset;  27         this.maxOffset = maxOffset;  28         this.msgFoundList = msgFoundList;  29     }  30     ......  31 }

拉取到的消息可能是多条,具体内容在PullResult 中的msgFoundList保存,MessageExt是Message的超类

 

回到pullSyncImpl方法,在拉取到消息后,调用processPullResult方法:

 1 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,   2     final SubscriptionData subscriptionData) {   3     PullResultExt pullResultExt = (PullResultExt) pullResult;   4   5     this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());   6     if (PullStatus.FOUND == pullResult.getPullStatus()) {   7         ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());   8         List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);   9  10         List<MessageExt> msgListFilterAgain = msgList;  11         if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {  12             msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());  13             for (MessageExt msg : msgList) {  14                 if (msg.getTags() != null) {  15                     if (subscriptionData.getTagsSet().contains(msg.getTags())) {  16                         msgListFilterAgain.add(msg);  17                     }  18                 }  19             }  20         }  21  22         if (this.hasHook()) {  23             FilterMessageContext filterMessageContext = new FilterMessageContext();  24             filterMessageContext.setUnitMode(unitMode);  25             filterMessageContext.setMsgList(msgListFilterAgain);  26             this.executeHook(filterMessageContext);  27         }  28  29         for (MessageExt msg : msgListFilterAgain) {  30             String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  31             if (traFlag != null && Boolean.parseBoolean(traFlag)) {  32                 msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));  33             }  34             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,  35                 Long.toString(pullResult.getMinOffset()));  36             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,  37                 Long.toString(pullResult.getMaxOffset()));  38         }  39  40         pullResultExt.setMsgFoundList(msgListFilterAgain);  41     }  42  43     pullResultExt.setMessageBinary(null);  44  45     return pullResult;  46 }

首先调用updatePullFromWhichNode方法:

1 public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {  2    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);  3     if (null == suggest) {  4         this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));  5     } else {  6         suggest.set(brokerId);  7     }  8 }

这里就会将pullFromWhichNodeTable中记录的消息队列和BrokerID的映射,更新为Broker发送过来的建议ID
结合上一篇博客来看,若是采用集群模式,就完成了消费者端的负载均衡

在PullStatus.FOUND情况下,会调用MessageDecoder的decodes方法,将CommitLog格式的消息数据进行解码,转化为真正可读的消息

之后会对Tag进行判断,设置了Tag,添加Tag消息记录

之后,在设置了FilterMessageHook钩子情况下,通过executeHook方法执行FilterMessageHook钩子的filterMessage方法:

 1 public void executeHook(final FilterMessageContext context) {   2     if (!this.filterMessageHookList.isEmpty()) {   3         for (FilterMessageHook hook : this.filterMessageHookList) {   4             try {   5                 hook.filterMessage(context);   6             } catch (Throwable e) {   7                 log.error("execute hook error. hookName={}", hook.hookName());   8             }   9         }  10     }  11 }

然后对消息进行属性设置

processPullResult完成后,若是设置了ConsumeMessageHook钩子,调用executeHookBefore和executeHookAfter方法,分别执行钩子中的consumeMessageBefore和consumeMessageAfter方法:

 1 public void executeHookBefore(final ConsumeMessageContext context) {   2     if (!this.consumeMessageHookList.isEmpty()) {   3         for (ConsumeMessageHook hook : this.consumeMessageHookList) {   4             try {   5                 hook.consumeMessageBefore(context);   6             } catch (Throwable ignored) {   7             }   8         }   9     }  10 }  11  12 public void executeHookAfter(final ConsumeMessageContext context) {  13     if (!this.consumeMessageHookList.isEmpty()) {  14         for (ConsumeMessageHook hook : this.consumeMessageHookList) {  15             try {  16                 hook.consumeMessageAfter(context);  17             } catch (Throwable ignored) {  18             }  19         }  20     }  21 }

PullConsumer消息的同步拉取到此结束

 

异步消息拉取

异步拉取的API都通过pullAsyncImpl方法实现:

 1 private void pullAsyncImpl(   2     final MessageQueue mq,   3     final SubscriptionData subscriptionData,   4     final long offset,   5     final int maxNums,   6     final PullCallback pullCallback,   7     final boolean block,   8     final long timeout) throws MQClientException, RemotingException, InterruptedException {   9     this.makeSureStateOK();  10  11     if (null == mq) {  12         throw new MQClientException("mq is null", null);  13     }  14  15     if (offset < 0) {  16         throw new MQClientException("offset < 0", null);  17     }  18  19     if (maxNums <= 0) {  20         throw new MQClientException("maxNums <= 0", null);  21     }  22  23     if (null == pullCallback) {  24         throw new MQClientException("pullCallback is null", null);  25     }  26  27     this.subscriptionAutomatically(mq.getTopic());  28  29     try {  30         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);  31  32         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;  33  34         boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());  35         this.pullAPIWrapper.pullKernelImpl(  36             mq,  37             subscriptionData.getSubString(),  38             subscriptionData.getExpressionType(),  39             isTagType ? 0L : subscriptionData.getSubVersion(),  40             offset,  41             maxNums,  42             sysFlag,  43             0,  44             this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),  45             timeoutMillis,  46             CommunicationMode.ASYNC,  47             new PullCallback() {  48  49                 @Override  50                 public void onSuccess(PullResult pullResult) {  51                     pullCallback  52                         .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));  53                 }  54  55                 @Override  56                 public void onException(Throwable e) {  57                     pullCallback.onException(e);  58                 }  59             });  60     } catch (MQBrokerException e) {  61         throw new MQClientException("pullAsync unknow exception", e);  62     }  63 }

相比同步,参数多了个PullCallback,用于处理异步拉取后的回调

过程基本上个同步拉取类似,只不过在调用pullKernelImpl方法时,会创建一个PullCallback
在onSuccess和onException中,实际上调用了pullCallback的相应方法,这样就完成了异步的回调

在onSuccess回调的参数中,同同步方式类似,会通过processPullResult方法,对结果进一步加工

之后的pullKernelImpl方法和同步一样

只不过最后调用了pullMessageAsync方法:

 1 private void pullMessageAsync(   2     final String addr,   3     final RemotingCommand request,   4     final long timeoutMillis,   5     final PullCallback pullCallback   6 ) throws RemotingException, InterruptedException {   7     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {   8         @Override   9         public void operationComplete(ResponseFuture responseFuture) {  10             RemotingCommand response = responseFuture.getResponseCommand();  11             if (response != null) {  12                 try {  13                     PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);  14                     assert pullResult != null;  15                     pullCallback.onSuccess(pullResult);  16                 } catch (Exception e) {  17                     pullCallback.onException(e);  18                 }  19             } else {  20                 if (!responseFuture.isSendRequestOK()) {  21                     pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));  22                 } else if (responseFuture.isTimeout()) {  23                     pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,  24                         responseFuture.getCause()));  25                 } else {  26                     pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));  27                 }  28             }  29         }  30     });  31 }

这里实际上也是通过Netty完成异步发送
详见:

【RocketMQ中Producer消息的发送源码分析】

 

由于是异步发送,这里又设置了一个回调InvokeCallback
当请求发送完成,收到响应后,就会执行InvokeCallback的operationComplete方法,

在operationComplete方法中,和同步一样,执行processPullResponse方法,处理响应
之后调用pullCallback的onSuccess方法,也就是刚才创建的回调接口,进而执行用户传入的回调接口的方法

消息异步拉取也就到此结束