RocketMQ中Producer的啟動源碼分析

  • 2019 年 10 月 3 日
  • 筆記

RocketMQ中通過DefaultMQProducer創建Producer

 

DefaultMQProducer定義如下:

 1 public class DefaultMQProducer extends ClientConfig implements MQProducer {   2     protected final transient DefaultMQProducerImpl defaultMQProducerImpl;   3   4     private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";   5   6     private volatile int defaultTopicQueueNums = 4;   7   8     private int sendMsgTimeout = 3000;   9  10     private int compressMsgBodyOverHowmuch = 1024 * 4;  11  12     private int retryTimesWhenSendFailed = 2;  13  14     private int retryTimesWhenSendAsyncFailed = 2;  15  16     private boolean retryAnotherBrokerWhenNotStoreOK = false;  17  18     private int maxMessageSize = 1024 * 1024 * 4; // 4M  19 }

其中defaultMQProducerImpl成員是Producer的具體實現,其餘的一些成員是對一些參數的設置:
createTopicKey:是一個Topic值,在創建時使用,後面會說明
defaultTopicQueueNums :默認的Topic隊列個數
sendMsgTimeout:發送消息超時時間
compressMsgBodyOverHowmuch:消息容量限制,超過需要進行壓縮
retryTimesWhenSendFailed:同步消息發送失敗的允許重發次數
retryTimesWhenSendAsyncFailed:非同步消息發送失敗的允許重發次數
retryAnotherBrokerWhenNotStoreOK:是否允許發送給Broker失敗後,重新選擇Broker發送
maxMessageSize:消息最大大小
這些屬性可以通過DefaultMQProducer提供的get、set方法進行相應操作

常用的構造方法如下:

 1 public DefaultMQProducer() {   2     this(MixAll.DEFAULT_PRODUCER_GROUP, null);   3 }   4   5 public DefaultMQProducer(final String producerGroup) {   6     this(producerGroup, null);   7 }   8   9 public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {  10     this.producerGroup = producerGroup;  11     defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);  12 }

DefaultMQProducer繼承自ClientConfig,首先會設置ClientConfig提供的更底層的參數配置:

 1 public class ClientConfig {   2     public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";   3   4     private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));   5   6     private String clientIP = RemotingUtil.getLocalAddress();   7   8     private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");   9  10     private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();  11  12     private int pollNameServerInterval = 1000 * 30;  13  14     private int heartbeatBrokerInterval = 1000 * 30;  15  16     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));  17 }

其中namesrvAddr是非常重要的成員,其保存著名稱伺服器(Name Server)的地址,在一開始構造時會根據系統屬性進行設置,若是沒有設置系統屬性就是null,則需要在後面通過set方法進行設置
clientIP:Producer端的本地IP
instanceName:Producer的實例名稱
pollNameServerInterval :輪詢NameServer的時間間隔
heartbeatBrokerInterval :向Broker發送心跳包的時間間隔
SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY 和vipChannelEnabled:決定是否使用VIP通道,即高優先順序

回到DefaultMQProducer的構造方法,其會創建DefaultMQProducerImpl實例

 1 private final Random random = new Random();   2 private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =   3         new ConcurrentHashMap<String, TopicPublishInfo>();   4 private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();   5 private final RPCHook rpcHook;   6 protected BlockingQueue<Runnable> checkRequestQueue;   7 protected ExecutorService checkExecutor;   8 private ServiceState serviceState = ServiceState.CREATE_JUST;   9 private MQClientInstance mQClientFactory;  10 private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();  11 private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));  12 private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();  13 private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;  14 private final ExecutorService defaultAsyncSenderExecutor;  15 private ExecutorService asyncSenderExecutor;  16  17 public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {  18     this.defaultMQProducer = defaultMQProducer;  19     this.rpcHook = rpcHook;  20  21     this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);  22     this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(  23         Runtime.getRuntime().availableProcessors(),  24         Runtime.getRuntime().availableProcessors(),  25         1000 * 60,  26         TimeUnit.MILLISECONDS,  27         this.asyncSenderThreadPoolQueue,  28         new ThreadFactory() {  29             private AtomicInteger threadIndex = new AtomicInteger(0);  30  31             @Override  32             public Thread newThread(Runnable r) {  33                 return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());  34             }  35         });  36 }

在構造方法中會創建一個執行緒池,用來處理非同步消息的發送
其中有一個topicPublishInfoTable成員很重要,是一個map,保存了不同top和消息隊列之間的映射,在後面詳細介紹

DefaultMQProducer創建完成後,接著來看DefaultMQProducer的start方法:

 1 public void start() throws MQClientException {   2     this.defaultMQProducerImpl.start();   3     if (null != traceDispatcher) {   4         try {   5             traceDispatcher.start(this.getNamesrvAddr());   6         } catch (MQClientException e) {   7             log.warn("trace dispatcher start failed ", e);   8         }   9     }  10 }

首先交給了defaultMQProducerImpl的start方法去處理

defaultMQProducerImpl的start方法:

 1 public void start() throws MQClientException {   2     this.start(true);   3 }   4   5 public void start(final boolean startFactory) throws MQClientException {   6     switch (this.serviceState) {   7         case CREATE_JUST:   8             this.serviceState = ServiceState.START_FAILED;   9  10             this.checkConfig();  11  12             if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {  13                 this.defaultMQProducer.changeInstanceNameToPID();  14             }  15  16             this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);  17  18             boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);  19             if (!registerOK) {  20                 this.serviceState = ServiceState.CREATE_JUST;  21                 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()  22                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),  23                     null);  24             }  25  26             this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());  27  28             if (startFactory) {  29                 mQClientFactory.start();  30             }  31  32             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),  33                 this.defaultMQProducer.isSendMessageWithVIPChannel());  34             this.serviceState = ServiceState.RUNNING;  35             break;  36         case RUNNING:  37         case START_FAILED:  38         case SHUTDOWN_ALREADY:  39             throw new MQClientException("The producer service state not OK, maybe started once, "  40                 + this.serviceState  41                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),  42                 null);  43         default:  44             break;  45     }  46  47     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();  48 }

在一開始DefaultMQProducerImpl實例化的時候,serviceState初始化為CREATE_JUST狀態,這是一個枚舉值,一共有如下幾種狀態:

1 public enum ServiceState {  2     CREATE_JUST,  3     RUNNING,  4     SHUTDOWN_ALREADY,  5     START_FAILED;  6  7     private ServiceState() {  8     }  9 }

這幾個狀態值很容易理解,在後面MQClientInstance中還會使用到

回到start方法,根據serviceState進行判斷,只有當是CREATE_JUST狀態時正常執行,防止在其他狀態下錯誤調用start

直接看到CREATE_JUST的case部分:

 1 this.serviceState = ServiceState.START_FAILED;   2   3 this.checkConfig();   4   5 if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {   6 this.defaultMQProducer.changeInstanceNameToPID();   7 }   8   9 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);  10  11 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);  12 if (!registerOK) {  13 this.serviceState = ServiceState.CREATE_JUST;  14 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()  15 + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),  16 null);  17 }  18  19 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());  20  21 if (startFactory) {  22 mQClientFactory.start();  23 }  24  25 log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),  26 this.defaultMQProducer.isSendMessageWithVIPChannel());  27 this.serviceState = ServiceState.RUNNING;  28 break;

首先更改serviceState狀態為START_FAILED,防止中途的失敗

checkConfig方法是用來進行ProducerGroup命名檢查:

 1 private void checkConfig() throws MQClientException {   2     Validators.checkGroup(this.defaultMQProducer.getProducerGroup());   3   4     if (null == this.defaultMQProducer.getProducerGroup()) {   5         throw new MQClientException("producerGroup is null", null);   6     }   7   8     if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {   9         throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",  10             null);  11     }  12 }

主要是檢查命名的合法性,以及防止和默認的producerGroup生產者組名DEFAULT_PRODUCER_GROUP產生衝突

1 public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";

接下來實例化mQClientFactory,這其實是生產者客戶端的實例,其中MQClientManager採用單例模式,getInstance是獲取MQClientManager的單例,根據ClientConfig的類型,通過getAndCreateMQClientInstance方法實例化不同屬性的生產者客戶端

MQClientManager:

 1 public class MQClientManager {   2     private final static InternalLogger log = ClientLogger.getLog();   3     private static MQClientManager instance = new MQClientManager();   4     private AtomicInteger factoryIndexGenerator = new AtomicInteger();   5     private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =   6         new ConcurrentHashMap<String, MQClientInstance>();   7   8     private MQClientManager() {   9     }  10  11     public static MQClientManager getInstance() {  12         return instance;  13     }  14 }

其中factoryTable是所有生產者客戶端實例的map快取,factoryIndexGenerator 是創建的每個客戶端實例的流水號

getAndCreateMQClientInstance方法:

 1 public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {   2     String clientId = clientConfig.buildMQClientId();   3     MQClientInstance instance = this.factoryTable.get(clientId);   4     if (null == instance) {   5         instance =   6             new MQClientInstance(clientConfig.cloneClientConfig(),   7                 this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);   8         MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);   9         if (prev != null) {  10             instance = prev;  11             log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);  12         } else {  13             log.info("Created new MQClientInstance for clientId:[{}]", clientId);  14         }  15     }  16  17     return instance;  18 }

首先通過buildMQClientId方法創建clientId:

 1 public String buildMQClientId() {   2     StringBuilder sb = new StringBuilder();   3     sb.append(this.getClientIP());   4   5     sb.append("@");   6     sb.append(this.getInstanceName());   7     if (!UtilAll.isBlank(this.unitName)) {   8         sb.append("@");   9         sb.append(this.unitName);  10     }  11  12     return sb.toString();  13 }

clientId主要由生產者客戶端的ip地址以及實例名稱,根據unitName的有無,附加unitName

通過生成的clientId,在factoryTable快取中先去獲取是否創建過客戶端實例
若是沒有獲取到,就需要實例化一個MQClientInstance
這裡在實例化MQClientInstance時,並沒有直接傳入clientConfig,而是通過cloneClientConfig方法複製了一份,來保證安全性:

 1 public ClientConfig cloneClientConfig() {   2     ClientConfig cc = new ClientConfig();   3     cc.namesrvAddr = namesrvAddr;   4     cc.clientIP = clientIP;   5     cc.instanceName = instanceName;   6     cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;   7     cc.pollNameServerInterval = pollNameServerInterval;   8     cc.heartbeatBrokerInterval = heartbeatBrokerInterval;   9     cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;  10     cc.unitMode = unitMode;  11     cc.unitName = unitName;  12     cc.vipChannelEnabled = vipChannelEnabled;  13     cc.useTLS = useTLS;  14     cc.language = language;  15     return cc;  16 }

創建MQClientInstance實例:

 1 public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {   2     this.clientConfig = clientConfig;   3     this.instanceIndex = instanceIndex;   4     this.nettyClientConfig = new NettyClientConfig();   5     this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());   6     this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());   7     this.clientRemotingProcessor = new ClientRemotingProcessor(this);   8     this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);   9  10     if (this.clientConfig.getNamesrvAddr() != null) {  11         this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());  12         log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());  13     }  14  15     this.clientId = clientId;  16  17     this.mQAdminImpl = new MQAdminImpl(this);  18  19     this.pullMessageService = new PullMessageService(this);  20  21     this.rebalanceService = new RebalanceService(this);  22  23     this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);  24     this.defaultMQProducer.resetClientConfig(clientConfig);  25  26     this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);  27  28     log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",  29         this.instanceIndex,  30         this.clientId,  31         this.clientConfig,  32         MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());  33 }

可以看到MQClientInstance的構造方法創建了很多東西,就不一一說明,主要說幾個重要的
其中nettyClientConfig,就很清楚的說明了RocketMQ通過Netty來進行網路之間的I/O,其保存了對Netty的一些配置
clientRemotingProcessor,用來進行消息的處理

mQClientAPIImpl則是一個非常重要的部分,直接實例化了一個MQClientAPIImpl對象:

 1 public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,   2     final ClientRemotingProcessor clientRemotingProcessor,   3     RPCHook rpcHook, final ClientConfig clientConfig) {   4     this.clientConfig = clientConfig;   5     topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());   6     this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);   7     this.clientRemotingProcessor = clientRemotingProcessor;   8   9     this.remotingClient.registerRPCHook(rpcHook);  10     this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);  11  12     this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);  13  14     this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);  15  16     this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);  17  18     this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);  19  20     this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);  21 }

可以看到在這個構造方法里,首先創建了一個TopAddressing,用於以後的名稱服務的定址,其默認地址是:

1 http://jmenv.tbsite.net:8080/rocketmq/nsaddr

需要通過系統屬性來完成更改

接著創建了一個NettyRemotingClient,這個就是實實在在的Netty客戶端

 1 private final Bootstrap bootstrap = new Bootstrap();   2 // 名稱服務列表   3 private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();   4   5 public NettyRemotingClient(final NettyClientConfig nettyClientConfig,   6     final ChannelEventListener channelEventListener) {   7     super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());   8     this.nettyClientConfig = nettyClientConfig;   9     this.channelEventListener = channelEventListener;  10  11     int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();  12     if (publicThreadNums <= 0) {  13         publicThreadNums = 4;  14     }  15  16     this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {  17         private AtomicInteger threadIndex = new AtomicInteger(0);  18  19         @Override  20         public Thread newThread(Runnable r) {  21             return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());  22         }  23     });  24  25     this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {  26         private AtomicInteger threadIndex = new AtomicInteger(0);  27  28         @Override  29         public Thread newThread(Runnable r) {  30             return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));  31         }  32     });  33  34     if (nettyClientConfig.isUseTLS()) {  35         try {  36             sslContext = TlsHelper.buildSslContext(true);  37             log.info("SSL enabled for client");  38         } catch (IOException e) {  39             log.error("Failed to create SSLContext", e);  40         } catch (CertificateException e) {  41             log.error("Failed to create SSLContext", e);  42             throw new RuntimeException("Failed to create SSLContext", e);  43         }  44     }  45 }

此時Netty的客戶端僅僅完成了對Bootstrap的初始化,以及對NioEventLoopGroup的設置和初始化

回到MQClientInstance的構造方法,在完成MQClientAPIImpl的創建後,會根據clientConfig的getNamesrvAddr判斷是否設置了namesrvAddr名稱服務地址,若是設置了,需要通過mQClientAPIImpl的updateNameServerAddressList方法,完成對名稱服務地址的更新:

MQClientAPIImpl的updateNameServerAddressList方法:

1 public void updateNameServerAddressList(final String addrs) {  2     String[] addrArray = addrs.split(";");  3     List<String> list = Arrays.asList(addrArray);  4     this.remotingClient.updateNameServerAddressList(list);  5 }

由於名稱服務可以是集群的方式,所以在這裡用“;”進行分割,得到所有的名稱服務地址,再由remotingClient進行更新,而此時的remotingClient也就是剛才創建的NettyRemotingClient
NettyRemotingClient的updateNameServerAddressList方法:

 1 public void updateNameServerAddressList(List<String> addrs) {   2     List<String> old = this.namesrvAddrList.get();   3     boolean update = false;   4   5     if (!addrs.isEmpty()) {   6         if (null == old) {   7             update = true;   8         } else if (addrs.size() != old.size()) {   9             update = true;  10         } else {  11             for (int i = 0; i < addrs.size() && !update; i++) {  12                 if (!old.contains(addrs.get(i))) {  13                     update = true;  14                 }  15             }  16         }  17  18         if (update) {  19             Collections.shuffle(addrs);  20             log.info("name server address updated. NEW : {} , OLD: {}", addrs, old);  21             this.namesrvAddrList.set(addrs);  22         }  23     }  24 }

這裡邏輯比較簡單,完成了名稱服務列表的更新

回到MQClientInstance的構造方法,做完以上操作後,又在後面創建了MQAdminImpl、PullMessageService、RebalanceService、ConsumerStatsManager以及一個新的DefaultMQProducer,關於這幾個在後面出現時再介紹

回到MQClientManager的getAndCreateMQClientInstance方法,在完成MQClientInstance的創建後,將其放入快取中

再回到DefaultMQProducerImpl的start方法,在創建完MQClientInstance後,調用registerProducer方法
MQClientInstance的registerProducer方法:

 1 public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {   2     if (null == group || null == producer) {   3         return false;   4     }   5   6     MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);   7     if (prev != null) {   8         log.warn("the producer group[{}] exist already.", group);   9         return false;  10     }  11  12     return true;  13 }

在MQClientInstance初始化時,會創建producerTable 、consumerTable 、topicRouteTable 、brokerAddrTable 這幾個比較重要的map

1 private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();  2 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();  3 private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();  4 private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =  5         new ConcurrentHashMap<String, HashMap<Long, String>>();

其中MQProducerInner是介面,DefaultMQProducerImpl是其實現類,完成了以group組名稱為鍵值的DefaultMQProducerImpl的關聯
在這裡就是根據group,進行DefaultMQProducerImpl的快取,MQConsumerInner同理
topicRouteTable 則記錄與Topic對應的Broker以及消息隊列資訊
brokerAddrTable則記錄與Broker Name對應的Broker的地址列表

還是回到start方法,在完成registerProducer方法後,根據返回值registerOK,判斷接下來的操作
若是失敗,將serviceState置為CREATE_JUST,並報出異常,方便下一次的正常start

若是成功,則先需要向topicPublishInfoTable中添加一條鍵值為createTopicKey(”TBW102″)的TopicPublishInfo記錄
TopicPublishInfo:

1 public class TopicPublishInfo {  2     private boolean orderTopic = false;  3     private boolean haveTopicRouterInfo = false;  4     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();  5     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();  6 }

其中messageQueueList存放消息隊列MessageQueue,sendWhichQueue 是用來獲取sendWhichQueue中的下標,也就是當前所要發送的具體的消息隊列

MessageQueue:

 1 public class MessageQueue implements Comparable<MessageQueue>, Serializable {   2     private static final long serialVersionUID = 6191200464116433425L;   3     private String topic;   4     private String brokerName;   5     private int queueId;   6   7     public MessageQueue() {   8     }   9  10     public MessageQueue(String topic, String brokerName, int queueId) {  11         this.topic = topic;  12         this.brokerName = brokerName;  13         this.queueId = queueId;  14     }  15 }

可以看到這是一個簡單的pojo,其封裝了topic,brokerName以及queueId

ThreadLocalIndex :

 1 public class ThreadLocalIndex {   2     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();   3     private final Random random = new Random();   4   5     public int getAndIncrement() {   6         Integer index = this.threadLocalIndex.get();   7         if (null == index) {   8             index = Math.abs(random.nextInt());   9             if (index < 0)  10                 index = 0;  11             this.threadLocalIndex.set(index);  12         }  13  14         index = Math.abs(index + 1);  15         if (index < 0)  16             index = 0;  17  18         this.threadLocalIndex.set(index);  19         return index;  20     }  21  22     @Override  23     public String toString() {  24         return "ThreadLocalIndex{" +  25             "threadLocalIndex=" + threadLocalIndex.get() +  26             '}';  27     }  28 }

通過ThreadLocal,賦予每個執行緒一個隨機值,後面會根據這個隨機值通過和messageQueueList的length取余運算,選取一個MessageQueue ,進而選取一條真正的消息隊列進行消息發送

再次回到DefaultMQProducerImpl的start方法,在完成createTopicKey的Topic的記錄添加後,根據startFactory判斷是否需要調用mQClientFactory的start方法,這裡默認startFactory是true,就需要調用mQClientFactory的start方法:

MQClientInstance的start方法:

 1 public void start() throws MQClientException {   2     synchronized (this) {   3         switch (this.serviceState) {   4             case CREATE_JUST:   5                 this.serviceState = ServiceState.START_FAILED;   6                 // If not specified,looking address from name server   7                 if (null == this.clientConfig.getNamesrvAddr()) {   8                     this.mQClientAPIImpl.fetchNameServerAddr();   9                 }  10                 // Start request-response channel  11                 this.mQClientAPIImpl.start();  12                 // Start various schedule tasks  13                 this.startScheduledTask();  14                 // Start pull service  15                 this.pullMessageService.start();  16                 // Start rebalance service  17                 this.rebalanceService.start();  18                 // Start push service  19                 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);  20                 log.info("the client factory [{}] start OK", this.clientId);  21                 this.serviceState = ServiceState.RUNNING;  22                 break;  23             case RUNNING:  24                 break;  25             case SHUTDOWN_ALREADY:  26                 break;  27             case START_FAILED:  28                 throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);  29             default:  30                 break;  31         }  32     }  33 }

MQClientInstance在創建時其serviceState狀態也是CREATE_JUST

這裡首先檢查名稱服務地址是否設置,若是沒有設置,則通過MQClientAPIImpl的fetchNameServerAddr方法,嘗試自動獲取名稱服務
MQClientAPIImpl的fetchNameServerAddr方法:

 1 public String fetchNameServerAddr() {   2     try {   3         String addrs = this.topAddressing.fetchNSAddr();   4         if (addrs != null) {   5             if (!addrs.equals(this.nameSrvAddr)) {   6                 log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);   7                 this.updateNameServerAddressList(addrs);   8                 this.nameSrvAddr = addrs;   9                 return nameSrvAddr;  10             }  11         }  12     } catch (Exception e) {  13         log.error("fetchNameServerAddr Exception", e);  14     }  15     return nameSrvAddr;  16 }

這裡首先根據topAddressing的fetchNSAddr方法獲取名稱服務地址,若是獲取到了,則判斷是否需要更新名稱服務列表以及原來的nameSrvAddr

topAddressing在前面說過,MQClientAPIImpl構造方法中,創建TopAddressing實例
TopAddressing的fetchNSAddr方法:

 1 public final String fetchNSAddr() {   2     return fetchNSAddr(true, 3000);   3 }   4   5 public final String fetchNSAddr(boolean verbose, long timeoutMills) {   6     String url = this.wsAddr;   7     try {   8         if (!UtilAll.isBlank(this.unitName)) {   9             url = url + "-" + this.unitName + "?nofix=1";  10         }  11         HttpTinyClient.HttpResult result = HttpTinyClient.httpGet(url, null, null, "UTF-8", timeoutMills);  12         if (200 == result.code) {  13             String responseStr = result.content;  14             if (responseStr != null) {  15                 return clearNewLine(responseStr);  16             } else {  17                 log.error("fetch nameserver address is null");  18             }  19         } else {  20             log.error("fetch nameserver address failed. statusCode=" + result.code);  21         }  22     } catch (IOException e) {  23         if (verbose) {  24             log.error("fetch name server address exception", e);  25         }  26     }  27  28     if (verbose) {  29         String errorMsg =  30             "connect to " + url + " failed, maybe the domain name " + MixAll.getWSAddr() + " not bind in /etc/hosts";  31         errorMsg += FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL);  32  33         log.warn(errorMsg);  34     }  35     return null;  36 }

首先根據wsAddr和unitName創建url,其中wsAddr在前面說過,默認是http://jmenv.tbsite.net:8080/rocketmq/nsaddr,需要通過系統屬性來更改

然後通過HttpTinyClient的httpGet方法建立連接,進行GET請求,獲取名稱地址
HttpTinyClient的httpGet方法:

 1 static public HttpResult httpGet(String url, List<String> headers, List<String> paramValues,   2     String encoding, long readTimeoutMs) throws IOException {   3     String encodedContent = encodingParams(paramValues, encoding);   4     url += (null == encodedContent) ? "" : ("?" + encodedContent);   5   6     HttpURLConnection conn = null;   7     try {   8         conn = (HttpURLConnection) new URL(url).openConnection();   9         conn.setRequestMethod("GET");  10         conn.setConnectTimeout((int) readTimeoutMs);  11         conn.setReadTimeout((int) readTimeoutMs);  12         setHeaders(conn, headers, encoding);  13  14         conn.connect();  15         int respCode = conn.getResponseCode();  16         String resp = null;  17  18         if (HttpURLConnection.HTTP_OK == respCode) {  19             resp = IOTinyUtils.toString(conn.getInputStream(), encoding);  20         } else {  21             resp = IOTinyUtils.toString(conn.getErrorStream(), encoding);  22         }  23         return new HttpResult(respCode, resp);  24     } finally {  25         if (conn != null) {  26             conn.disconnect();  27         }  28     }  29 }

這裡就通過了JDK原生的HttpURLConnection ,完成了一次指定url的GET請求,返回請求數據,將請求到的數據以及狀態碼封裝為HttpResult,返回給上一級調用,也就是TopAddressing的fetchNSAddr方法中,再調用clearNewLine方法,將狀態碼為200的數據處理(清除不必要的空客、換行、回車),得到名稱地址,最後回到fetchNameServerAddr方法中,完成名稱服務列表的更新,至此自動獲取名稱服務結束

回到MQClientInstance的start方法中:
在確定有名稱服務的情況下,首先調用mQClientAPIImpl的start方法:
MQClientAPIImpl的start方法:

1 public void start() {  2     this.remotingClient.start();  3 }

這裡實際上調用了前面所創建的Nettt客戶端的start方法:
NettyRemotingClient的start方法:

 1 public void start() {   2     this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(   3         nettyClientConfig.getClientWorkerThreads(),   4         new ThreadFactory() {   5   6             private AtomicInteger threadIndex = new AtomicInteger(0);   7   8             @Override   9             public Thread newThread(Runnable r) {  10                 return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());  11             }  12         });  13  14     Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)  15         .option(ChannelOption.TCP_NODELAY, true)  16         .option(ChannelOption.SO_KEEPALIVE, false)  17         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())  18         .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())  19         .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())  20         .handler(new ChannelInitializer<SocketChannel>() {  21             @Override  22             public void initChannel(SocketChannel ch) throws Exception {  23                 ChannelPipeline pipeline = ch.pipeline();  24                 if (nettyClientConfig.isUseTLS()) {  25                     if (null != sslContext) {  26                         pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));  27                         log.info("Prepend SSL handler");  28                     } else {  29                         log.warn("Connections are insecure as SSLContext is null!");  30                     }  31                 }  32                 pipeline.addLast(  33                     defaultEventExecutorGroup,  34                     new NettyEncoder(),  35                     new NettyDecoder(),  36                     new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),  37                     new NettyConnectManageHandler(),  38                     new NettyClientHandler());  39             }  40         });  41  42     this.timer.scheduleAtFixedRate(new TimerTask() {  43         @Override  44         public void run() {  45             try {  46                 NettyRemotingClient.this.scanResponseTable();  47             } catch (Throwable e) {  48                 log.error("scanResponseTable exception", e);  49             }  50         }  51     }, 1000 * 3, 1000);  52  53     if (this.channelEventListener != null) {  54         this.nettyEventExecutor.start();  55     }  56 }

這裡完成了Bootstrap對前面創建的EventLoopGroup以及handler的綁定

在完成mQClientAPIImpl的start方法後,調用startScheduledTask方法,啟動定時任務
startScheduledTask方法:

 1 private void startScheduledTask() {   2     if (null == this.clientConfig.getNamesrvAddr()) {   3         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   4   5             @Override   6             public void run() {   7                 try {   8                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();   9                 } catch (Exception e) {  10                     log.error("ScheduledTask fetchNameServerAddr exception", e);  11                 }  12             }  13         }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);  14     }  15  16     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  17  18         @Override  19         public void run() {  20             try {  21                 MQClientInstance.this.updateTopicRouteInfoFromNameServer();  22             } catch (Exception e) {  23                 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);  24             }  25         }  26     }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);  27  28     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  29  30         @Override  31         public void run() {  32             try {  33                 MQClientInstance.this.cleanOfflineBroker();  34                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();  35             } catch (Exception e) {  36                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);  37             }  38         }  39     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);  40  41     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  42  43         @Override  44         public void run() {  45             try {  46                 MQClientInstance.this.persistAllConsumerOffset();  47             } catch (Exception e) {  48                 log.error("ScheduledTask persistAllConsumerOffset exception", e);  49             }  50         }  51     }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);  52  53     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  54  55         @Override  56         public void run() {  57             try {  58                 MQClientInstance.this.adjustThreadPool();  59             } catch (Exception e) {  60                 log.error("ScheduledTask adjustThreadPool exception", e);  61             }  62         }  63     }, 1, 1, TimeUnit.MINUTES);  64 }

可以看到,一共設置了五個定時任務

①若是名稱服務地址namesrvAddr不存在,則調用前面的fetchNameServerAddr方法,定時更新名稱服務

②通過updateTopicRouteInfoFromNameServer方法定時更新Topic所對應的路由資訊:

 1 public void updateTopicRouteInfoFromNameServer() {   2     Set<String> topicList = new HashSet<String>();   3   4     // Consumer   5     {   6         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();   7         while (it.hasNext()) {   8             Entry<String, MQConsumerInner> entry = it.next();   9             MQConsumerInner impl = entry.getValue();  10             if (impl != null) {  11                 Set<SubscriptionData> subList = impl.subscriptions();  12                 if (subList != null) {  13                     for (SubscriptionData subData : subList) {  14                         topicList.add(subData.getTopic());  15                     }  16                 }  17             }  18         }  19     }  20  21     // Producer  22     {  23         Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();  24         while (it.hasNext()) {  25             Entry<String, MQProducerInner> entry = it.next();  26             MQProducerInner impl = entry.getValue();  27             if (impl != null) {  28                 Set<String> lst = impl.getPublishTopicList();  29                 topicList.addAll(lst);  30             }  31         }  32     }  33  34     for (String topic : topicList) {  35         this.updateTopicRouteInfoFromNameServer(topic);  36     }  37 }

將所有Consumer和Producer的Topic封裝在topicList,交給updateTopicRouteInfoFromNameServer調用

updateTopicRouteInfoFromNameServer方法:

 1 public boolean updateTopicRouteInfoFromNameServer(final String topic) {   2     return updateTopicRouteInfoFromNameServer(topic, false, null);   3 }   4   5 public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,   6         DefaultMQProducer defaultMQProducer) {   7     try {   8         if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {   9             try {  10                 TopicRouteData topicRouteData;  11                 if (isDefault && defaultMQProducer != null) {  12                     topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),  13                         1000 * 3);  14                     if (topicRouteData != null) {  15                         for (QueueData data : topicRouteData.getQueueDatas()) {  16                             int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());  17                             data.setReadQueueNums(queueNums);  18                             data.setWriteQueueNums(queueNums);  19                         }  20                     }  21                 } else {  22                     topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);  23                 }  24                 if (topicRouteData != null) {  25                     TopicRouteData old = this.topicRouteTable.get(topic);  26                     boolean changed = topicRouteDataIsChange(old, topicRouteData);  27                     if (!changed) {  28                         changed = this.isNeedUpdateTopicRouteInfo(topic);  29                     } else {  30                         log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);  31                     }  32  33                     if (changed) {  34                         TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();  35  36                         for (BrokerData bd : topicRouteData.getBrokerDatas()) {  37                             this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());  38                         }  39  40                         // Update Pub info  41                         {  42                             TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);  43                             publishInfo.setHaveTopicRouterInfo(true);  44                             Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();  45                             while (it.hasNext()) {  46                                 Entry<String, MQProducerInner> entry = it.next();  47                                 MQProducerInner impl = entry.getValue();  48                                 if (impl != null) {  49                                     impl.updateTopicPublishInfo(topic, publishInfo);  50                                 }  51                             }  52                         }  53  54                         // Update sub info  55                         {  56                             Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);  57                             Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();  58                             while (it.hasNext()) {  59                                 Entry<String, MQConsumerInner> entry = it.next();  60                                 MQConsumerInner impl = entry.getValue();  61                                 if (impl != null) {  62                                     impl.updateTopicSubscribeInfo(topic, subscribeInfo);  63                                 }  64                             }  65                         }  66                         log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);  67                         this.topicRouteTable.put(topic, cloneTopicRouteData);  68                         return true;  69                     }  70                 } else {  71                     log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);  72                 }  73             } catch (Exception e) {  74                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {  75                     log.warn("updateTopicRouteInfoFromNameServer Exception", e);  76                 }  77             } finally {  78                 this.lockNamesrv.unlock();  79             }  80         } else {  81             log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);  82         }  83     } catch (InterruptedException e) {  84         log.warn("updateTopicRouteInfoFromNameServer Exception", e);  85     }  86  87     return false;  88 }

這裡首先由mQClientAPIImpl的getTopicRouteInfoFromNameServer方法,從名稱伺服器上獲取其Topic所對應的路由資訊

其中Topic的路由資訊由TopicRouteData進行封裝:

1 public class TopicRouteData extends RemotingSerializable {  2     private String orderTopicConf;  3     private List<QueueData> queueDatas;  4     private List<BrokerData> brokerDatas;  5     private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;  6 }

QueueData:

1 public class QueueData implements Comparable<QueueData> {  2     private String brokerName;  3     private int readQueueNums;  4     private int writeQueueNums;  5     private int perm;  6     private int topicSynFlag;  7 }

BrokerData:

1 public class BrokerData implements Comparable<BrokerData> {  2     private String cluster;  3     private String brokerName;  4     private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;  5 }

getTopicRouteInfoFromNameServer方法:

 1 public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)   2         throws RemotingException, MQClientException, InterruptedException {   3     return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);   4 }   5   6     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,   7         boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {   8     GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();   9     requestHeader.setTopic(topic);  10  11     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);  12  13     RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);  14     assert response != null;  15     switch (response.getCode()) {  16         case ResponseCode.TOPIC_NOT_EXIST: {  17             if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {  18                 log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);  19             }  20  21             break;  22         }  23         case ResponseCode.SUCCESS: {  24             byte[] body = response.getBody();  25             if (body != null) {  26                 return TopicRouteData.decode(body, TopicRouteData.class);  27             }  28         }  29         default:  30             break;  31     }  32  33     throw new MQClientException(response.getCode(), response.getRemark());  34 }

這裡主要通過remotingClient即Netty客戶端的invokeSync方法向名稱伺服器發送封裝好的request請求來獲取response
通過名稱伺服器尋找與Topic相關的Broker有關路由資訊,將這些資訊作為response返回,在這裡接收到進行處理,封裝成TopicRouteData

在invokeSync方法中採用懶載入的方式,嘗試獲取已經建立好連接的Channel,若是沒有,則需要通過bootstrap的connect方法先建立連接產生ChannelFuture,進而獲取並快取Channel

回到updateTopicRouteInfoFromNameServer,通過名稱伺服器獲取到了有關Topic的路由資訊,調用topicRouteDataIsChange方法和原來topicRouteTable保存的路由資訊進行比較
topicRouteDataIsChange方法:

 1 private boolean topicRouteDataIsChange(TopicRouteData olddata, TopicRouteData nowdata) {   2     if (olddata == null || nowdata == null)   3         return true;   4     TopicRouteData old = olddata.cloneTopicRouteData();   5     TopicRouteData now = nowdata.cloneTopicRouteData();   6     Collections.sort(old.getQueueDatas());   7     Collections.sort(old.getBrokerDatas());   8     Collections.sort(now.getQueueDatas());   9     Collections.sort(now.getBrokerDatas());  10     return !old.equals(now);  11 }

若是沒有發生改變,任然要調用isNeedUpdateTopicRouteInfo方法檢查是否有需要更新

isNeedUpdateTopicRouteInfo方法:

 1 private boolean isNeedUpdateTopicRouteInfo(final String topic) {   2     boolean result = false;   3     {   4         Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();   5         while (it.hasNext() && !result) {   6             Entry<String, MQProducerInner> entry = it.next();   7             MQProducerInner impl = entry.getValue();   8             if (impl != null) {   9                 result = impl.isPublishTopicNeedUpdate(topic);  10             }  11         }  12     }  13  14     {  15         Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();  16         while (it.hasNext() && !result) {  17             Entry<String, MQConsumerInner> entry = it.next();  18             MQConsumerInner impl = entry.getValue();  19             if (impl != null) {  20                 result = impl.isSubscribeTopicNeedUpdate(topic);  21             }  22         }  23     }  24  25     return result;  26 }

分別對所有的消費者和生產者進行檢查是否有需要更新有關該Topic的路由資訊

當存在需要跟新的情況時,在updateTopicRouteInfoFromNameServer中
首先從topicRouteData中取出BrokerData,即Broker的路由資訊,進行更新
再根據topicRouteData從中獲取消費者生產者的消息路由資訊,分別進行更新

③定時清除離線的Broker,以及向當前在線的Broker發送心跳包
cleanOfflineBroker清除離線的Broker:

 1 private void cleanOfflineBroker() {   2     try {   3         if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))   4             try {   5                 ConcurrentHashMap<String, HashMap<Long, String>> updatedTable = new ConcurrentHashMap<String, HashMap<Long, String>>();   6   7                 Iterator<Entry<String, HashMap<Long, String>>> itBrokerTable = this.brokerAddrTable.entrySet().iterator();   8                 while (itBrokerTable.hasNext()) {   9                     Entry<String, HashMap<Long, String>> entry = itBrokerTable.next();  10                     String brokerName = entry.getKey();  11                     HashMap<Long, String> oneTable = entry.getValue();  12  13                     HashMap<Long, String> cloneAddrTable = new HashMap<Long, String>();  14                     cloneAddrTable.putAll(oneTable);  15  16                     Iterator<Entry<Long, String>> it = cloneAddrTable.entrySet().iterator();  17                     while (it.hasNext()) {  18                         Entry<Long, String> ee = it.next();  19                         String addr = ee.getValue();  20                         if (!this.isBrokerAddrExistInTopicRouteTable(addr)) {  21                             it.remove();  22                             log.info("the broker addr[{} {}] is offline, remove it", brokerName, addr);  23                         }  24                     }  25  26                     if (cloneAddrTable.isEmpty()) {  27                         itBrokerTable.remove();  28                         log.info("the broker[{}] name's host is offline, remove it", brokerName);  29                     } else {  30                         updatedTable.put(brokerName, cloneAddrTable);  31                     }  32                 }  33  34                 if (!updatedTable.isEmpty()) {  35                     this.brokerAddrTable.putAll(updatedTable);  36                 }  37             } finally {  38                 this.lockNamesrv.unlock();  39             }  40     } catch (InterruptedException e) {  41         log.warn("cleanOfflineBroker Exception", e);  42     }  43 }

這裡的brokerAddrTable是會通過②中的定時任務來更新,遍歷其中的所有Broker資訊,通過isBrokerAddrExistInTopicRouteTable方法,進行檢查:

 1 private boolean isBrokerAddrExistInTopicRouteTable(final String addr) {   2     Iterator<Entry<String, TopicRouteData>> it = this.topicRouteTable.entrySet().iterator();   3     while (it.hasNext()) {   4         Entry<String, TopicRouteData> entry = it.next();   5         TopicRouteData topicRouteData = entry.getValue();   6         List<BrokerData> bds = topicRouteData.getBrokerDatas();   7         for (BrokerData bd : bds) {   8             if (bd.getBrokerAddrs() != null) {   9                 boolean exist = bd.getBrokerAddrs().containsValue(addr);  10                 if (exist)  11                     return true;  12             }  13         }  14     }  15  16     return false;  17 }

通過比對topicRouteTable中的所有TopicRouteData保存的BrokerAddrs來判斷,若是Broker不存在,需要進行清除,進而更新brokerAddrTable

sendHeartbeatToAllBrokerWithLock定時向Broker發送心跳包:

 1 public void sendHeartbeatToAllBrokerWithLock() {   2     if (this.lockHeartbeat.tryLock()) {   3         try {   4             this.sendHeartbeatToAllBroker();   5             this.uploadFilterClassSource();   6         } catch (final Exception e) {   7             log.error("sendHeartbeatToAllBroker exception", e);   8         } finally {   9             this.lockHeartbeat.unlock();  10         }  11     } else {  12         log.warn("lock heartBeat, but failed.");  13     }  14 }

這一部分就不詳細介紹,主要還是通過Netty客戶端完成心跳包的發送

④定時持久化消費者隊列的消費進度,這個在分析消費者時再詳細說明

⑤定時調整消費者端的執行緒池的大小,還是在分析消費者時再詳細說明

startScheduledTask創建的五個定時任務結束,回到MQClientInstance的start方法
接著開啟pullMessageService服務,為消費者拉取消息
然後開啟rebalanceService服務,用來均衡消息隊列
這兩個服務在有關消費者時再介紹

接著通過:

1 this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

開啟push service服務
其中defaultMQProducer是在前面MQClientInstance構造方法中創建的

1 this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);

只不過他調用的start方法,參數為false,也就是沒有調用mQClientFactory的start方法
後續會介紹其用途

到這DefaultMQProducerImpl的start方法已經基本完畢,只不過在最後,會通過mQClientFactory的sendHeartbeatToAllBrokerWithLock方法,給所有Broker發送一次心跳包

到此,Producer的啟動結束