RocketMQ中PullConsumer的启动源码分析

  • 2019 年 10 月 3 日
  • 笔记

通过DefaultMQPullConsumer作为默认实现,这里的启动过程和Producer很相似,但相比复杂一些

【RocketMQ中Producer的启动源码分析】

 

DefaultMQPullConsumer的构造方法:

1 public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {  2     this.consumerGroup = consumerGroup;  3     defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);  4 }

这里会封装一个DefaultMQPullConsumerImpl,类似于Producer中DefaultMQProducerImpl

DefaultMQPullConsumerImpl:

 1 public class DefaultMQPullConsumerImpl implements MQConsumerInner {   2     private final InternalLogger log = ClientLogger.getLog();   3     private final DefaultMQPullConsumer defaultMQPullConsumer;   4     private final long consumerStartTimestamp = System.currentTimeMillis();   5     private final RPCHook rpcHook;   6     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();   7     private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();   8     private volatile ServiceState serviceState = ServiceState.CREATE_JUST;   9     private MQClientInstance mQClientFactory;  10     private PullAPIWrapper pullAPIWrapper;  11     private OffsetStore offsetStore;  12     private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);  13  14     public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {  15         this.defaultMQPullConsumer = defaultMQPullConsumer;  16         this.rpcHook = rpcHook;  17     }  18     ......  19 }

如上会封装这些东西,在后面遇到了再详细介绍

 

而DefaultMQPullConsumer的start方法,其实际上调用的是DefaultMQPullConsumerImpl的start方法

DefaultMQPullConsumerImpl的start方法:

 1 public synchronized void start() throws MQClientException {   2     switch (this.serviceState) {   3         case CREATE_JUST:   4             this.serviceState = ServiceState.START_FAILED;   5   6             this.checkConfig();   7   8             this.copySubscription();   9  10             if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {  11                 this.defaultMQPullConsumer.changeInstanceNameToPID();  12             }  13  14             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);  15  16             this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());  17             this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());  18             this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());  19             this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);  20  21             this.pullAPIWrapper = new PullAPIWrapper(  22                 mQClientFactory,  23                 this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());  24             this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);  25  26             if (this.defaultMQPullConsumer.getOffsetStore() != null) {  27                 this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();  28             } else {  29                 switch (this.defaultMQPullConsumer.getMessageModel()) {  30                     case BROADCASTING:  31                         this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());  32                         break;  33                     case CLUSTERING:  34                         this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());  35                         break;  36                     default:  37                         break;  38                 }  39                 this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);  40             }  41  42             this.offsetStore.load();  43  44             boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);  45             if (!registerOK) {  46                 this.serviceState = ServiceState.CREATE_JUST;  47  48                 throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()  49                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),  50                     null);  51             }  52  53             mQClientFactory.start();  54             log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());  55             this.serviceState = ServiceState.RUNNING;  56             break;  57         case RUNNING:  58         case START_FAILED:  59         case SHUTDOWN_ALREADY:  60             throw new MQClientException("The PullConsumer service state not OK, maybe started once, "  61                 + this.serviceState  62                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  63                 null);  64         default:  65             break;  66     }  67  68 }

首先checkConfig方法会对配置做检查

接着copySubscription方法:

 1 private void copySubscription() throws MQClientException {   2     try {   3         Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();   4         if (registerTopics != null) {   5             for (final String topic : registerTopics) {   6                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),   7                     topic, SubscriptionData.SUB_ALL);   8                 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);   9             }  10         }  11     } catch (Exception e) {  12         throw new MQClientException("subscription exception", e);  13     }  14 }

这里的registerTopics是由用户调用setRegisterTopics方法注册进来的Topic集合
在这里会将集合中的Topic包装成SubscriptionData保存在rebalanceImpl中

SubscriptionData:

 1 public class SubscriptionData implements Comparable<SubscriptionData> {   2     public final static String SUB_ALL = "*";   3     private boolean classFilterMode = false;   4     private String topic;   5     private String subString;   6     private Set<String> tagsSet = new HashSet<String>();   7     private Set<Integer> codeSet = new HashSet<Integer>();   8     private long subVersion = System.currentTimeMillis();   9     private String expressionType = ExpressionType.TAG;  10     ......  11 }

RebalanceImpl:

 1 public abstract class RebalanceImpl {   2     protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);   3     protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =   4         new ConcurrentHashMap<String, Set<MessageQueue>>();   5     protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =   6         new ConcurrentHashMap<String, SubscriptionData>();   7     protected String consumerGroup;   8     protected MessageModel messageModel;   9     protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;  10     protected MQClientInstance mQClientFactory;  11     ......  12 }

 

回到start方法,接着和Producer中一样通过MQClientManager获取一个MQClientInstance
然后会完成对rebalanceImpl属性的填充

接着会实例化一个PullAPIWrapper,同时向其注册过滤器的钩子,这个对象在之后分析消息拉取时详细介绍

接下来会根据消息的模式,决定使用不同方式的OffsetStore

 1 public enum MessageModel {   2     /**   3      * broadcast   4      */   5     BROADCASTING("BROADCASTING"),   6     /**   7      * clustering   8      */   9     CLUSTERING("CLUSTERING");  10     ......  11 }

分别是广播模式和集群模式
广播模式(BROADCASTING):同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费
集群模式(CLUSTERING):同一个ConsumerGroup里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体

采用广播模式,消费者的消费进度offset会被保存在本地;而采用集群模式,消费者的消费进度offset会被保存在远端(broker)上
故广播模式使用LocalFileOffsetStore,集群模式使用RemoteBrokerOffsetStore

在采用广播模式,即LocalFileOffsetStore,调用load方法会对其配置文件offsets.json进行加载,而RemoteBrokerOffsetStore时没意义的异步操作
LocalFileOffsetStore的load方法:

 1 public void load() throws MQClientException {   2     OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();   3     if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {   4         offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());   5   6         for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet()) {   7             AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);   8             log.info("load consumer's offset, {} {} {}",   9                 this.groupName,  10                 mq,  11                 offset.get());  12         }  13     }  14 }

readLocalOffset方法会将offsets.json文件中的json字符串转换成OffsetSerializeWrapper对象封装

 1 public class OffsetSerializeWrapper extends RemotingSerializable {   2     private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =   3         new ConcurrentHashMap<MessageQueue, AtomicLong>();   4   5     public ConcurrentMap<MessageQueue, AtomicLong> getOffsetTable() {   6         return offsetTable;   7     }   8   9     public void setOffsetTable(ConcurrentMap<MessageQueue, AtomicLong> offsetTable) {  10         this.offsetTable = offsetTable;  11     }  12 }

从这里就可里大致理解json文件中的内容,其中AtomicLong就对应MessageQueue下具体的Offset
之后在load方法中,会将该map保存在LocalFileOffsetStore中的offsetTable中

接着会调用mQClientFactory的start方法,这个方法在【RocketMQ中Producer的启动源码分析】中进行过分析

 

 1 public void start() throws MQClientException {   2     synchronized (this) {   3         switch (this.serviceState) {   4             case CREATE_JUST:   5                 this.serviceState = ServiceState.START_FAILED;   6                 // If not specified,looking address from name server   7                 if (null == this.clientConfig.getNamesrvAddr()) {   8                     this.mQClientAPIImpl.fetchNameServerAddr();   9                 }  10                 // Start request-response channel  11                 this.mQClientAPIImpl.start();  12                 // Start various schedule tasks  13                 this.startScheduledTask();  14                 // Start pull service  15                 this.pullMessageService.start();  16                 // Start rebalance service  17                 this.rebalanceService.start();  18                 // Start push service  19                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);  20                 log.info("the client factory [{}] start OK", this.clientId);  21                 this.serviceState = ServiceState.RUNNING;  22                 break;  23             case RUNNING:  24                 break;  25             case SHUTDOWN_ALREADY:  26                 break;  27             case START_FAILED:  28                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);  29             default:  30                 break;  31         }  32     }  33 }

首先若是没有设置NameServer的地址,会调用fetchNameServerAddr方法进行自动寻址,详见Producer的启动

之后mQClientAPIImpl的start方法会完成对Netty客户端的绑定操作,详见Producer的启动

startScheduledTask方法则会设置五个定时任务:
①若是名称服务地址namesrvAddr不存在,则调用前面的fetchNameServerAddr方法,定时更新名称服务
②定时更新Topic所对应的路由信息
③定时清除离线的Broker,以及向当前在线的Broker发送心跳包
(以上详见Producer的启动)

④定时持久化消费者队列的消费进度
DefaultMQPullConsumerImpl中的实现:

 1 public void persistConsumerOffset() {   2     try {   3         this.makeSureStateOK();   4         Set<MessageQueue> mqs = new HashSet<MessageQueue>();   5         Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();   6         mqs.addAll(allocateMq);   7         this.offsetStore.persistAll(mqs);   8     } catch (Exception e) {   9         log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);  10     }  11 }

首先从rebalanceImpl中取出所有处理的消费队列MessageQueue集合
然后调用offsetStore的persistAll方法进一步处理该集合

由于广播模式和集群模式,所以这里有两种实现:
广播模式LocalFileOffsetStore的persistAll方法:

 1 public void persistAll(Set<MessageQueue> mqs) {   2     if (null == mqs || mqs.isEmpty())   3         return;   4   5     OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();   6     for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {   7         if (mqs.contains(entry.getKey())) {   8             AtomicLong offset = entry.getValue();   9             offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);  10         }  11     }  12  13     String jsonString = offsetSerializeWrapper.toJson(true);  14     if (jsonString != null) {  15         try {  16             MixAll.string2File(jsonString, this.storePath);  17         } catch (IOException e) {  18             log.error("persistAll consumer offset Exception, " + this.storePath, e);  19         }  20     }  21 }

这里和之前的load方法相反,会将MessageQueue对应的offset信息替换掉原来的json文件中的内容
这样就完成了广播模式下定时持久化消费者队列的消费进度

集群模式RemoteBrokerOffsetStore的persistAll方法的实现:

 1 public void persistAll(Set<MessageQueue> mqs) {   2     if (null == mqs || mqs.isEmpty())   3         return;   4   5     final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();   6     if (!mqs.isEmpty()) {   7         for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {   8             MessageQueue mq = entry.getKey();   9             AtomicLong offset = entry.getValue();  10             if (offset != null) {  11                 if (mqs.contains(mq)) {  12                     try {  13                         this.updateConsumeOffsetToBroker(mq, offset.get());  14                         log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",  15                             this.groupName,  16                             this.mQClientFactory.getClientId(),  17                             mq,  18                             offset.get());  19                     } catch (Exception e) {  20                         log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);  21                     }  22                 } else {  23                     unusedMQ.add(mq);  24                 }  25             }  26         }  27     }  28  29     if (!unusedMQ.isEmpty()) {  30         for (MessageQueue mq : unusedMQ) {  31             this.offsetTable.remove(mq);  32             log.info("remove unused mq, {}, {}", mq, this.groupName);  33         }  34     }  35 }

和上面类似,遍历offsetTable中的内容,只不过不是保存在了本地,而是通过updateConsumeOffsetToBroker向Broker发送
updateConsumeOffsetToBroker方法:

 1 private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,   2     MQBrokerException, InterruptedException, MQClientException {   3     updateConsumeOffsetToBroker(mq, offset, true);   4 }   5   6 public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,   7     MQBrokerException, InterruptedException, MQClientException {   8     FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());   9     if (null == findBrokerResult) {  10  11         this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());  12         findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());  13     }  14  15     if (findBrokerResult != null) {  16         UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();  17         requestHeader.setTopic(mq.getTopic());  18         requestHeader.setConsumerGroup(this.groupName);  19         requestHeader.setQueueId(mq.getQueueId());  20         requestHeader.setCommitOffset(offset);  21  22         if (isOneway) {  23             this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(  24                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);  25         } else {  26             this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(  27                 findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);  28         }  29     } else {  30         throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);  31     }  32 }

首先根据BrokerName查找Broker的路由信息:

 1 public FindBrokerResult findBrokerAddressInAdmin(final String brokerName) {   2     String brokerAddr = null;   3     boolean slave = false;   4     boolean found = false;   5   6     HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);   7     if (map != null && !map.isEmpty()) {   8         for (Map.Entry<Long, String> entry : map.entrySet()) {   9             Long id = entry.getKey();  10             brokerAddr = entry.getValue();  11             if (brokerAddr != null) {  12                 found = true;  13                 if (MixAll.MASTER_ID == id) {  14                     slave = false;  15                 } else {  16                     slave = true;  17                 }  18                 break;  19  20             }  21         } // end of for  22     }  23  24     if (found) {  25         return new FindBrokerResult(brokerAddr, slave, findBrokerVersion(brokerName, brokerAddr));  26     }  27  28     return null;  29 }

brokerAddrTable中的borker的路由信息会由 ②定时更新Topic所对应的路由信息 ,来完成更新,在brokerAddrTable中只要找的一个Broker的信息后,将其封装为FindBrokerResult返回

若是没有找到会执行updateTopicRouteInfoFromNameServer方法,也就是执行了一次定时任务中的方法,立即更新一次,再通过findBrokerAddressInAdmin方法,重新查找

找到之后,实例化一个请求头 UpdateConsumerOffsetRequestHeader,将相应信息封装,由于使用的是Oneway模式,所以这里采用updateConsumerOffsetOneway方法,通过Netty向Broker发送

 1 public void updateConsumerOffsetOneway(   2     final String addr,   3     final UpdateConsumerOffsetRequestHeader requestHeader,   4     final long timeoutMillis   5 ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,   6     InterruptedException {   7     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);   8   9     this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);  10 }

其实这里就非常简单地调用了invokeOneway方法,完成向Broker的消息单向发送

【RocketMQ中Producer消息的发送源码分析】

非OneWay则采用同步发送
这样,在集群模式下,消费进度也就交给了Broker管理,之后的负载均衡以此为基础

⑤定时调整消费者端的线程池的大小
这里针对的是PushConsumer,后续博客再介绍

对于PullConsumer来说rebalanceService服务的开启才是最重要的

RebalanceService:

 1 public void run() {   2     log.info(this.getServiceName() + " service started");   3   4     while (!this.isStopped()) {   5         this.waitForRunning(waitInterval);   6         this.mqClientFactory.doRebalance();   7     }   8   9     log.info(this.getServiceName() + " service end");  10 }

这里的waitForRunning和Broker的刷盘以及主从复制类似,会进行超时阻塞(默认20s),也可以通过Broker发送的NOTIFY_CONSUMER_IDS_CHANGED请求将其唤醒,之后会调用doRebalance方法

RebalanceImpl的doRebalance方法:

 1 public void doRebalance(final boolean isOrder) {   2    Map<String, SubscriptionData> subTable = this.getSubscriptionInner();   3     if (subTable != null) {   4         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {   5             final String topic = entry.getKey();   6             try {   7                 this.rebalanceByTopic(topic, isOrder);   8             } catch (Throwable e) {   9                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  10                     log.warn("rebalanceByTopic Exception", e);  11                 }  12             }  13         }  14     }  15  16     this.truncateMessageQueueNotMyTopic();  17 }

这里就会取得copySubscription方法中说过的订阅Topic集合,这个集合会在②中的定时任务会通过NameServer来进行更新

通过rebalanceByTopic方法,处理订阅的Topic:

 1 private void rebalanceByTopic(final String topic, final boolean isOrder) {   2     switch (messageModel) {   3         case BROADCASTING: {   4             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);   5             if (mqSet != null) {   6                 boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);   7                 if (changed) {   8                     this.messageQueueChanged(topic, mqSet, mqSet);   9                     log.info("messageQueueChanged {} {} {} {}",  10                         consumerGroup,  11                         topic,  12                         mqSet,  13                         mqSet);  14                 }  15             } else {  16                 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  17             }  18             break;  19         }  20         case CLUSTERING: {  21             Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);  22             List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);  23             if (null == mqSet) {  24                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  25                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);  26                 }  27             }  28  29             if (null == cidAll) {  30                 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);  31             }  32  33             if (mqSet != null && cidAll != null) {  34                 List<MessageQueue> mqAll = new ArrayList<MessageQueue>();  35                 mqAll.addAll(mqSet);  36  37                 Collections.sort(mqAll);  38                 Collections.sort(cidAll);  39  40                 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;  41  42                 List<MessageQueue> allocateResult = null;  43                 try {  44                     allocateResult = strategy.allocate(  45                         this.consumerGroup,  46                         this.mQClientFactory.getClientId(),  47                         mqAll,  48                         cidAll);  49                 } catch (Throwable e) {  50                     log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),  51                         e);  52                     return;  53                 }  54  55                 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();  56                 if (allocateResult != null) {  57                     allocateResultSet.addAll(allocateResult);  58                 }  59  60                 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);  61                 if (changed) {  62                     log.info(  63                         "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",  64                         strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),  65                         allocateResultSet.size(), allocateResultSet);  66                     this.messageQueueChanged(topic, mqSet, allocateResultSet);  67                 }  68             }  69             break;  70         }  71         default:  72             break;  73     }  74 }

这里会根据广播模式和集群模式做不同的处理

 

广播模式:
先根据Topic取得对应的所有消息队列的集合

然后先通过updateProcessQueueTableInRebalance方法处理:

 1 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,   2     final boolean isOrder) {   3     boolean changed = false;   4   5     Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();   6     while (it.hasNext()) {   7         Entry<MessageQueue, ProcessQueue> next = it.next();   8         MessageQueue mq = next.getKey();   9         ProcessQueue pq = next.getValue();  10  11         if (mq.getTopic().equals(topic)) {  12             if (!mqSet.contains(mq)) {  13                 pq.setDropped(true);  14                 if (this.removeUnnecessaryMessageQueue(mq, pq)) {  15                     it.remove();  16                     changed = true;  17                     log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);  18                 }  19             } else if (pq.isPullExpired()) {  20                 switch (this.consumeType()) {  21                     case CONSUME_ACTIVELY:  22                         break;  23                     case CONSUME_PASSIVELY:  24                         pq.setDropped(true);  25                         if (this.removeUnnecessaryMessageQueue(mq, pq)) {  26                             it.remove();  27                             changed = true;  28                             log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",  29                                 consumerGroup, mq);  30                         }  31                         break;  32                     default:  33                         break;  34                 }  35             }  36         }  37     }  38  39     List<PullRequest> pullRequestList = new ArrayList<PullRequest>();  40     for (MessageQueue mq : mqSet) {  41         if (!this.processQueueTable.containsKey(mq)) {  42             if (isOrder && !this.lock(mq)) {  43                 log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);  44                 continue;  45             }  46  47             this.removeDirtyOffset(mq);  48             ProcessQueue pq = new ProcessQueue();  49             long nextOffset = this.computePullFromWhere(mq);  50             if (nextOffset >= 0) {  51                 ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);  52                 if (pre != null) {  53                     log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);  54                 } else {  55                     log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);  56                     PullRequest pullRequest = new PullRequest();  57                     pullRequest.setConsumerGroup(consumerGroup);  58                     pullRequest.setNextOffset(nextOffset);  59                     pullRequest.setMessageQueue(mq);  60                     pullRequest.setProcessQueue(pq);  61                     pullRequestList.add(pullRequest);  62                     changed = true;  63                 }  64             } else {  65                 log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);  66             }  67         }  68     }  69  70     this.dispatchPullRequest(pullRequestList);  71  72     return changed;  73 }

若是消息队列发生了更新,这里首先在while循环中会将处理队列中的无用的记录删除
而在for循环中则是为了添加新的处理记录,向processQueueTable添加了处理记录,computePullFromWhere方法在PullConsumer中默认返回0,作为nextOffset,会将该nextOffset作为下次拉取消息的位置保存在ProcessQueue中,进而保存在processQueueTable中,作为处理任务的记录

之后的dispatchPullRequest方法是对于PushConsumer而言的,这里没有作用

回到rebalanceByTopic方法,若是发生了更新,会调用messageQueueChanged方法:

 1 public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {   2     MessageQueueListener messageQueueListener = this.defaultMQPullConsumerImpl.getDefaultMQPullConsumer().getMessageQueueListener();   3     if (messageQueueListener != null) {   4         try {   5             messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);   6         } catch (Throwable e) {   7             log.error("messageQueueChanged exception", e);   8         }   9     }  10 }

这里实际上就交给MessageQueueListener执行messageQueueChanged回调方法

 

集群模式:
首先还是根据Topic得到消息队列的集合
由于是集合模式,每个消费者会取得不同的消息,所以这里通过findConsumerIdList方法,得到消费者的ID列表

 1 public List<String> findConsumerIdList(final String topic, final String group) {   2    String brokerAddr = this.findBrokerAddrByTopic(topic);   3     if (null == brokerAddr) {   4         this.updateTopicRouteInfoFromNameServer(topic);   5         brokerAddr = this.findBrokerAddrByTopic(topic);   6     }   7   8     if (null != brokerAddr) {   9         try {  10             return this.mQClientAPIImpl.getConsumerIdListByGroup(brokerAddr, group, 3000);  11         } catch (Exception e) {  12             log.warn("getConsumerIdListByGroup exception, " + brokerAddr + " " + group, e);  13         }  14     }  15  16     return null;  17 }

findBrokerAddrByTopic方法,会根据Topic选取所在集群的一个Broker的地址(由②定时任务通过NameServer更新),若是master存在选择master,否则随机选择一个slave

若是没找到,则重新向NameServer请求更新,再找一次

当得到Broker的地址信息后,通过getConsumerIdListByGroup方法,向Broker发送请求:

 1 public List<String> getConsumerIdListByGroup(   2     final String addr,   3     final String consumerGroup,   4     final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,   5     MQBrokerException, InterruptedException {   6     GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();   7     requestHeader.setConsumerGroup(consumerGroup);   8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader);   9  10     RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),  11         request, timeoutMillis);  12     assert response != null;  13     switch (response.getCode()) {  14         case ResponseCode.SUCCESS: {  15             if (response.getBody() != null) {  16                 GetConsumerListByGroupResponseBody body =  17                     GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class);  18                 return body.getConsumerIdList();  19             }  20         }  21         default:  22             break;  23     }  24  25     throw new MQBrokerException(response.getCode(), response.getRemark());  26 }

这里实际上就是向Broker发送了一个GET_CONSUMER_LIST_BY_GROUP请求,进行同步发送,再收到响应后,将响应中的数据,也就是消费者ID的封装成的List返回

回到rebalanceByTopic方法,得到消费者的ID列表后
会根据分配策略进行分配,这里默认使用的是AllocateMessageQueueAveragely
然后调用它的allocate方法,进行分配

 1 public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,   2     List<String> cidAll) {   3     if (currentCID == null || currentCID.length() < 1) {   4         throw new IllegalArgumentException("currentCID is empty");   5     }   6     if (mqAll == null || mqAll.isEmpty()) {   7         throw new IllegalArgumentException("mqAll is null or mqAll empty");   8     }   9     if (cidAll == null || cidAll.isEmpty()) {  10         throw new IllegalArgumentException("cidAll is null or cidAll empty");  11     }  12  13     List<MessageQueue> result = new ArrayList<MessageQueue>();  14     if (!cidAll.contains(currentCID)) {  15         log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",  16             consumerGroup,  17             currentCID,  18             cidAll);  19         return result;  20     }  21  22     int index = cidAll.indexOf(currentCID);  23     int mod = mqAll.size() % cidAll.size();  24     int averageSize =  25         mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()  26             + 1 : mqAll.size() / cidAll.size());  27     int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;  28     int range = Math.min(averageSize, mqAll.size() - startIndex);  29     for (int i = 0; i < range; i++) {  30         result.add(mqAll.get((startIndex + i) % mqAll.size()));  31     }  32     return result;  33 }

(关于这个ID在Producer的启动中介绍过,是在MQClientManager的getAndCreateMQClientInstance方法中,对于客户端来说是唯一的)

由于是集群模式,那么这里的Consumer也理所应当作为其中一员,所以会检查currentCID是否包含在集合中

接着会根据消费者的数量以及消息的数量,进行消息的分配,以此达到消费者端的负载均衡
这里采用的是平均分配的方式,利用消息的数量以及消费者的数量就,计算出当前消费者需要消费哪部分消息

处理之外,RocketMQ中还提供其他几种分配方式,根据需要,酌情使用

回到rebalanceByTopic方法中,在完成消息的分配后
会调用updateProcessQueueTableInRebalance方法,完成对消息队列和处理队列的更新,若是发生了更新,再通过messageQueueChanged方法,调用回调接口的方法,完成对消息队列变化的通知

至此,PullConsumer的启动结束