RocketMQ中PullConsumer的消息拉取源碼分析

  • 2019 年 10 月 3 日
  • 筆記

在PullConsumer中,有關消息的拉取RocketMQ提供了很多API,但總的來說分為兩種,同步消息拉取和非同步消息拉取

同步消息拉取
以同步方式拉取消息都是通過DefaultMQPullConsumerImpl的pullSyncImpl方法:

 1 private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,   2     long timeout)   3     throws MQClientException, RemotingException, MQBrokerException, InterruptedException {   4     this.makeSureStateOK();   5   6     if (null == mq) {   7         throw new MQClientException("mq is null", null);   8     }   9  10     if (offset < 0) {  11         throw new MQClientException("offset < 0", null);  12     }  13  14     if (maxNums <= 0) {  15         throw new MQClientException("maxNums <= 0", null);  16     }  17  18     this.subscriptionAutomatically(mq.getTopic());  19  20     int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);  21  22     long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;  23  24     boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());  25     PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(  26         mq,  27         subscriptionData.getSubString(),  28         subscriptionData.getExpressionType(),  29         isTagType ? 0L : subscriptionData.getSubVersion(),  30         offset,  31         maxNums,  32         sysFlag,  33         0,  34         this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),  35         timeoutMillis,  36         CommunicationMode.SYNC,  37         null  38     );  39     this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);  40     if (!this.consumeMessageHookList.isEmpty()) {  41         ConsumeMessageContext consumeMessageContext = null;  42         consumeMessageContext = new ConsumeMessageContext();  43         consumeMessageContext.setConsumerGroup(this.groupName());  44         consumeMessageContext.setMq(mq);  45         consumeMessageContext.setMsgList(pullResult.getMsgFoundList());  46         consumeMessageContext.setSuccess(false);  47         this.executeHookBefore(consumeMessageContext);  48         consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());  49         consumeMessageContext.setSuccess(true);  50         this.executeHookAfter(consumeMessageContext);  51     }  52     return pullResult;  53 }

首先通過subscriptionAutomatically方法檢查Topic是否訂閱

 

 1 public void subscriptionAutomatically(final String topic) {   2     if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {   3         try {   4             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),   5                 topic, SubscriptionData.SUB_ALL);   6             this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);   7         } catch (Exception ignore) {   8         }   9     }  10 }

若是沒有就新建一條訂閱數據保存在rebalanceImpl的subscriptionInner中

之後調用pullKernelImpl方法:

 1 public PullResult pullKernelImpl(   2     final MessageQueue mq,   3     final String subExpression,   4     final String expressionType,   5     final long subVersion,   6     final long offset,   7     final int maxNums,   8     final int sysFlag,   9     final long commitOffset,  10     final long brokerSuspendMaxTimeMillis,  11     final long timeoutMillis,  12     final CommunicationMode communicationMode,  13     final PullCallback pullCallback  14 ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {  15     FindBrokerResult findBrokerResult =  16         this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),  17             this.recalculatePullFromWhichNode(mq), false);  18     if (null == findBrokerResult) {  19         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());  20         findBrokerResult =  21             this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),  22                 this.recalculatePullFromWhichNode(mq), false);  23     }  24  25     if (findBrokerResult != null) {  26         {  27             // check version  28             if (!ExpressionType.isTagType(expressionType)  29                 && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {  30                 throw new MQClientException("The broker[" + mq.getBrokerName() + ", "  31                     + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);  32             }  33         }  34         int sysFlagInner = sysFlag;  35  36         if (findBrokerResult.isSlave()) {  37             sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);  38         }  39  40         PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();  41         requestHeader.setConsumerGroup(this.consumerGroup);  42         requestHeader.setTopic(mq.getTopic());  43         requestHeader.setQueueId(mq.getQueueId());  44         requestHeader.setQueueOffset(offset);  45         requestHeader.setMaxMsgNums(maxNums);  46         requestHeader.setSysFlag(sysFlagInner);  47         requestHeader.setCommitOffset(commitOffset);  48         requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);  49         requestHeader.setSubscription(subExpression);  50         requestHeader.setSubVersion(subVersion);  51         requestHeader.setExpressionType(expressionType);  52  53         String brokerAddr = findBrokerResult.getBrokerAddr();  54         if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {  55             brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);  56         }  57  58         PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(  59             brokerAddr,  60             requestHeader,  61             timeoutMillis,  62             communicationMode,  63             pullCallback);  64  65         return pullResult;  66     }  67  68     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);  69 }

首先通過findBrokerAddressInSubscribe方法查找關於消息隊列的Broker資訊

這裡的recalculatePullFromWhichNode方法:

 1 public long recalculatePullFromWhichNode(final MessageQueue mq) {   2     if (this.isConnectBrokerByUser()) {   3         return this.defaultBrokerId;   4     }   5   6     AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);   7     if (suggest != null) {   8         return suggest.get();   9     }  10  11     return MixAll.MASTER_ID;  12 }

根據消息隊列,在pullFromWhichNodeTable查找其對應的Broker的ID
pullFromWhichNodeTable記錄了消息對了和BrokerID的映射

1 private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =  2         new ConcurrentHashMap<MessageQueue, AtomicLong>(32);

(master的BrokerID為0,slave的BrokerID大於0)

 

findBrokerAddressInSubscribe方法:

 1 public FindBrokerResult findBrokerAddressInSubscribe(   2     final String brokerName,   3     final long brokerId,   4     final boolean onlyThisBroker   5 ) {   6     String brokerAddr = null;   7     boolean slave = false;   8     boolean found = false;   9  10     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);  11     if (map != null && !map.isEmpty()) {  12         brokerAddr = map.get(brokerId);  13         slave = brokerId != MixAll.MASTER_ID;  14         found = brokerAddr != null;  15  16         if (!found && !onlyThisBroker) {  17             Entry<Long, String> entry = map.entrySet().iterator().next();  18             brokerAddr = entry.getValue();  19             slave = entry.getKey() != MixAll.MASTER_ID;  20             found = true;  21         }  22     }  23  24     if (found) {  25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));  26     }  27  28     return null;  29 }

這裡就根據brokerAddrTable表查找該BrokerID對應的Broker的地址資訊,以及是否是slave
封裝為FindBrokerResult返回

若是沒有找到Broker的路由資訊,則通過updateTopicRouteInfoFromNameServer方法向NameServer請求更新,更新完成後再調用findBrokerAddressInSubscribe方法查找

之後會根據相應的資訊封裝請求消息頭PullMessageRequestHeader

然後調用pullMessage方法:

 1 public PullResult pullMessage(   2     final String addr,   3     final PullMessageRequestHeader requestHeader,   4     final long timeoutMillis,   5     final CommunicationMode communicationMode,   6     final PullCallback pullCallback   7 ) throws RemotingException, MQBrokerException, InterruptedException {   8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);   9  10     switch (communicationMode) {  11         case ONEWAY:  12             assert false;  13             return null;  14         case ASYNC:  15             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);  16             return null;  17         case SYNC:  18             return this.pullMessageSync(addr, request, timeoutMillis);  19         default:  20             assert false;  21             break;  22     }  23  24     return null;  25 }

這裡就可以看出我前面說的兩種類型,同步拉取和非同步拉取

pullMessageSync方法:

1 private PullResult pullMessageSync(  2     final String addr,  3     final RemotingCommand request,  4     final long timeoutMillis  5 ) throws RemotingException, InterruptedException, MQBrokerException {  6     RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);  7     assert response != null;  8     return this.processPullResponse(response);  9 }

這裡其實就是通過invokeSync方法,由Netty進行同步發送,將請求發送給Broker
關於消息的發送詳見:

【RocketMQ中Producer消息的發送源碼分析】

 

在收到響應後由processPullResponse方法處理
processPullResponse方法:

 1 private PullResult processPullResponse(   2     final RemotingCommand response) throws MQBrokerException, RemotingCommandException {   3     PullStatus pullStatus = PullStatus.NO_NEW_MSG;   4     switch (response.getCode()) {   5         case ResponseCode.SUCCESS:   6             pullStatus = PullStatus.FOUND;   7             break;   8         case ResponseCode.PULL_NOT_FOUND:   9             pullStatus = PullStatus.NO_NEW_MSG;  10             break;  11         case ResponseCode.PULL_RETRY_IMMEDIATELY:  12             pullStatus = PullStatus.NO_MATCHED_MSG;  13             break;  14         case ResponseCode.PULL_OFFSET_MOVED:  15             pullStatus = PullStatus.OFFSET_ILLEGAL;  16             break;  17  18         default:  19             throw new MQBrokerException(response.getCode(), response.getRemark());  20     }  21  22     PullMessageResponseHeader responseHeader =  23         (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);  24  25     return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),  26         responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());  27 }

根據響應的狀態,設置PullStatus狀態

然後通過decodeCommandCustomHeader方法,將響應中的資訊解碼
最後由PullResultExt封裝消息資訊

 1 public class PullResultExt extends PullResult {   2     private final long suggestWhichBrokerId;   3     private byte[] messageBinary;   4   5     public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,   6         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {   7         super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);   8         this.suggestWhichBrokerId = suggestWhichBrokerId;   9         this.messageBinary = messageBinary;  10     }  11     ......  12 }  13  14 public class PullResult {  15     private final PullStatus pullStatus;  16     private final long nextBeginOffset;  17     private final long minOffset;  18     private final long maxOffset;  19     private List<MessageExt> msgFoundList;  20  21     public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,  22         List<MessageExt> msgFoundList) {  23         super();  24         this.pullStatus = pullStatus;  25         this.nextBeginOffset = nextBeginOffset;  26         this.minOffset = minOffset;  27         this.maxOffset = maxOffset;  28         this.msgFoundList = msgFoundList;  29     }  30     ......  31 }

拉取到的消息可能是多條,具體內容在PullResult 中的msgFoundList保存,MessageExt是Message的超類

 

回到pullSyncImpl方法,在拉取到消息後,調用processPullResult方法:

 1 public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,   2     final SubscriptionData subscriptionData) {   3     PullResultExt pullResultExt = (PullResultExt) pullResult;   4   5     this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());   6     if (PullStatus.FOUND == pullResult.getPullStatus()) {   7         ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());   8         List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);   9  10         List<MessageExt> msgListFilterAgain = msgList;  11         if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {  12             msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());  13             for (MessageExt msg : msgList) {  14                 if (msg.getTags() != null) {  15                     if (subscriptionData.getTagsSet().contains(msg.getTags())) {  16                         msgListFilterAgain.add(msg);  17                     }  18                 }  19             }  20         }  21  22         if (this.hasHook()) {  23             FilterMessageContext filterMessageContext = new FilterMessageContext();  24             filterMessageContext.setUnitMode(unitMode);  25             filterMessageContext.setMsgList(msgListFilterAgain);  26             this.executeHook(filterMessageContext);  27         }  28  29         for (MessageExt msg : msgListFilterAgain) {  30             String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);  31             if (traFlag != null && Boolean.parseBoolean(traFlag)) {  32                 msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));  33             }  34             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,  35                 Long.toString(pullResult.getMinOffset()));  36             MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,  37                 Long.toString(pullResult.getMaxOffset()));  38         }  39  40         pullResultExt.setMsgFoundList(msgListFilterAgain);  41     }  42  43     pullResultExt.setMessageBinary(null);  44  45     return pullResult;  46 }

首先調用updatePullFromWhichNode方法:

1 public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {  2    AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);  3     if (null == suggest) {  4         this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));  5     } else {  6         suggest.set(brokerId);  7     }  8 }

這裡就會將pullFromWhichNodeTable中記錄的消息隊列和BrokerID的映射,更新為Broker發送過來的建議ID
結合上一篇部落格來看,若是採用集群模式,就完成了消費者端的負載均衡

在PullStatus.FOUND情況下,會調用MessageDecoder的decodes方法,將CommitLog格式的消息數據進行解碼,轉化為真正可讀的消息

之後會對Tag進行判斷,設置了Tag,添加Tag消息記錄

之後,在設置了FilterMessageHook鉤子情況下,通過executeHook方法執行FilterMessageHook鉤子的filterMessage方法:

 1 public void executeHook(final FilterMessageContext context) {   2     if (!this.filterMessageHookList.isEmpty()) {   3         for (FilterMessageHook hook : this.filterMessageHookList) {   4             try {   5                 hook.filterMessage(context);   6             } catch (Throwable e) {   7                 log.error("execute hook error. hookName={}", hook.hookName());   8             }   9         }  10     }  11 }

然後對消息進行屬性設置

processPullResult完成後,若是設置了ConsumeMessageHook鉤子,調用executeHookBefore和executeHookAfter方法,分別執行鉤子中的consumeMessageBefore和consumeMessageAfter方法:

 1 public void executeHookBefore(final ConsumeMessageContext context) {   2     if (!this.consumeMessageHookList.isEmpty()) {   3         for (ConsumeMessageHook hook : this.consumeMessageHookList) {   4             try {   5                 hook.consumeMessageBefore(context);   6             } catch (Throwable ignored) {   7             }   8         }   9     }  10 }  11  12 public void executeHookAfter(final ConsumeMessageContext context) {  13     if (!this.consumeMessageHookList.isEmpty()) {  14         for (ConsumeMessageHook hook : this.consumeMessageHookList) {  15             try {  16                 hook.consumeMessageAfter(context);  17             } catch (Throwable ignored) {  18             }  19         }  20     }  21 }

PullConsumer消息的同步拉取到此結束

 

非同步消息拉取

非同步拉取的API都通過pullAsyncImpl方法實現:

 1 private void pullAsyncImpl(   2     final MessageQueue mq,   3     final SubscriptionData subscriptionData,   4     final long offset,   5     final int maxNums,   6     final PullCallback pullCallback,   7     final boolean block,   8     final long timeout) throws MQClientException, RemotingException, InterruptedException {   9     this.makeSureStateOK();  10  11     if (null == mq) {  12         throw new MQClientException("mq is null", null);  13     }  14  15     if (offset < 0) {  16         throw new MQClientException("offset < 0", null);  17     }  18  19     if (maxNums <= 0) {  20         throw new MQClientException("maxNums <= 0", null);  21     }  22  23     if (null == pullCallback) {  24         throw new MQClientException("pullCallback is null", null);  25     }  26  27     this.subscriptionAutomatically(mq.getTopic());  28  29     try {  30         int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);  31  32         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;  33  34         boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());  35         this.pullAPIWrapper.pullKernelImpl(  36             mq,  37             subscriptionData.getSubString(),  38             subscriptionData.getExpressionType(),  39             isTagType ? 0L : subscriptionData.getSubVersion(),  40             offset,  41             maxNums,  42             sysFlag,  43             0,  44             this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),  45             timeoutMillis,  46             CommunicationMode.ASYNC,  47             new PullCallback() {  48  49                 @Override  50                 public void onSuccess(PullResult pullResult) {  51                     pullCallback  52                         .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));  53                 }  54  55                 @Override  56                 public void onException(Throwable e) {  57                     pullCallback.onException(e);  58                 }  59             });  60     } catch (MQBrokerException e) {  61         throw new MQClientException("pullAsync unknow exception", e);  62     }  63 }

相比同步,參數多了個PullCallback,用於處理非同步拉取後的回調

過程基本上個同步拉取類似,只不過在調用pullKernelImpl方法時,會創建一個PullCallback
在onSuccess和onException中,實際上調用了pullCallback的相應方法,這樣就完成了非同步的回調

在onSuccess回調的參數中,同同步方式類似,會通過processPullResult方法,對結果進一步加工

之後的pullKernelImpl方法和同步一樣

只不過最後調用了pullMessageAsync方法:

 1 private void pullMessageAsync(   2     final String addr,   3     final RemotingCommand request,   4     final long timeoutMillis,   5     final PullCallback pullCallback   6 ) throws RemotingException, InterruptedException {   7     this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {   8         @Override   9         public void operationComplete(ResponseFuture responseFuture) {  10             RemotingCommand response = responseFuture.getResponseCommand();  11             if (response != null) {  12                 try {  13                     PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);  14                     assert pullResult != null;  15                     pullCallback.onSuccess(pullResult);  16                 } catch (Exception e) {  17                     pullCallback.onException(e);  18                 }  19             } else {  20                 if (!responseFuture.isSendRequestOK()) {  21                     pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));  22                 } else if (responseFuture.isTimeout()) {  23                     pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,  24                         responseFuture.getCause()));  25                 } else {  26                     pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));  27                 }  28             }  29         }  30     });  31 }

這裡實際上也是通過Netty完成非同步發送
詳見:

【RocketMQ中Producer消息的發送源碼分析】

 

由於是非同步發送,這裡又設置了一個回調InvokeCallback
當請求發送完成,收到響應後,就會執行InvokeCallback的operationComplete方法,

在operationComplete方法中,和同步一樣,執行processPullResponse方法,處理響應
之後調用pullCallback的onSuccess方法,也就是剛才創建的回調介面,進而執行用戶傳入的回調介面的方法

消息非同步拉取也就到此結束