RocketMQ中Broker的启动源码分析(二)

  • 2019 年 10 月 3 日
  • 筆記

接着上一篇博客  【RocketMQ中Broker的启动源码分析(一)】

 

在完成准备工作后,调用start方法:

 1 public static BrokerController start(BrokerController controller) {   2     try {   3   4         controller.start();   5   6         String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "   7             + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();   8   9         if (null != controller.getBrokerConfig().getNamesrvAddr()) {  10             tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();  11         }  12  13         log.info(tip);  14         System.out.printf("%s%n", tip);  15         return controller;  16     } catch (Throwable e) {  17         e.printStackTrace();  18         System.exit(-1);  19     }  20  21     return null;  22 }

这里最主要的是通过BrokerController 的start方法来完成启动

 

BrokerController的start方法:

 1 public void start() throws Exception {   2     if (this.messageStore != null) {   3         this.messageStore.start();   4     }   5   6     if (this.remotingServer != null) {   7         this.remotingServer.start();   8     }   9  10     if (this.fastRemotingServer != null) {  11         this.fastRemotingServer.start();  12     }  13  14     if (this.fileWatchService != null) {  15         this.fileWatchService.start();  16     }  17  18     if (this.brokerOuterAPI != null) {  19         this.brokerOuterAPI.start();  20     }  21  22     if (this.pullRequestHoldService != null) {  23         this.pullRequestHoldService.start();  24     }  25  26     if (this.clientHousekeepingService != null) {  27         this.clientHousekeepingService.start();  28     }  29  30     if (this.filterServerManager != null) {  31         this.filterServerManager.start();  32     }  33  34     if (!messageStoreConfig.isEnableDLegerCommitLog()) {  35         startProcessorByHa(messageStoreConfig.getBrokerRole());  36         handleSlaveSynchronize(messageStoreConfig.getBrokerRole());  37     }  38  39  40  41     this.registerBrokerAll(true, false, true);  42  43     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  44  45         @Override  46         public void run() {  47             try {  48                 BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());  49             } catch (Throwable e) {  50                 log.error("registerBrokerAll Exception", e);  51             }  52         }  53     }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);  54  55     if (this.brokerStatsManager != null) {  56         this.brokerStatsManager.start();  57     }  58  59     if (this.brokerFastFailure != null) {  60         this.brokerFastFailure.start();  61     }  62  63  64 }

首先通过messageStore启动messageStore

DefaultMessageStore的start方法:

 1 public void start() throws Exception {   2     lock = lockFile.getChannel().tryLock(0, 1, false);   3     if (lock == null || lock.isShared() || !lock.isValid()) {   4         throw new RuntimeException("Lock failed,MQ already started");   5     }   6   7     lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));   8     lockFile.getChannel().force(true);   9     {  10         /**  11          * 1. Make sure the fast-forward messages to be truncated during the recovering according to the max physical offset of the commitlog;  12          * 2. DLedger committedPos may be missing, so the maxPhysicalPosInLogicQueue maybe bigger that maxOffset returned by DLedgerCommitLog, just let it go;  13          * 3. Calculate the reput offset according to the consume queue;  14          * 4. Make sure the fall-behind messages to be dispatched before starting the commitlog, especially when the broker role are automatically changed.  15          */  16         long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();  17         for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {  18             for (ConsumeQueue logic : maps.values()) {  19                 if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {  20                     maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();  21                 }  22             }  23         }  24         if (maxPhysicalPosInLogicQueue < 0) {  25             maxPhysicalPosInLogicQueue = 0;  26         }  27         if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {  28             maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();  29             /**  30              * This happens in following conditions:  31              * 1. If someone removes all the consumequeue files or the disk get damaged.  32              * 2. Launch a new broker, and copy the commitlog from other brokers.  33              *  34              * All the conditions has the same in common that the maxPhysicalPosInLogicQueue should be 0.  35              * If the maxPhysicalPosInLogicQueue is gt 0, there maybe something wrong.  36              */  37             log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());  38         }  39         log.info("[SetReputOffset] maxPhysicalPosInLogicQueue={} clMinOffset={} clMaxOffset={} clConfirmedOffset={}",  40             maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset(), this.commitLog.getMaxOffset(), this.commitLog.getConfirmOffset());  41         this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);  42         this.reputMessageService.start();  43  44         /**  45          *  1. Finish dispatching the messages fall behind, then to start other services.  46          *  2. DLedger committedPos may be missing, so here just require dispatchBehindBytes <= 0  47          */  48         while (true) {  49             if (dispatchBehindBytes() <= 0) {  50                 break;  51             }  52             Thread.sleep(1000);  53             log.info("Try to finish doing reput the messages fall behind during the starting, reputOffset={} maxOffset={} behind={}", this.reputMessageService.getReputFromOffset(), this.getMaxPhyOffset(), this.dispatchBehindBytes());  54         }  55         this.recoverTopicQueueTable();  56     }  57  58     if (!messageStoreConfig.isEnableDLegerCommitLog()) {  59         this.haService.start();  60         this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());  61     }  62  63     this.flushConsumeQueueService.start();  64     this.commitLog.start();  65     this.storeStatsService.start();  66  67     this.createTempFile();  68     this.addScheduleTask();  69     this.shutdown = false;  70 }

这里首先尝试获取…/store/lock文件锁,保证磁盘上的文件只会被一个messageStore读写

然后通过commitLog的getMinOffset方法获取最小的Offset

commitLog会将消息持久化为文件,每个文件默认最大1G,当超过1G,则会新创建一个文件存储,如此反复
而commitLog会把这些文件在物理上不连续的Offset映射成逻辑上连续的Offset,以此来定位

CommitLog的getMinOffset方法:

 1 public long getMinOffset() {   2     MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();   3     if (mappedFile != null) {   4         if (mappedFile.isAvailable()) {   5             return mappedFile.getFileFromOffset();   6         } else {   7             return this.rollNextFile(mappedFile.getFileFromOffset());   8         }   9     }  10  11     return -1;  12 }

CommitLog管理的这些文件是通过mappedFileQueue管理,mappedFileQueue中会通过mappedFiles映射到每一个文件:

1 private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

 

MappedFileQueue的getFirstMappedFile方法:

 1 public MappedFile getFirstMappedFile() {   2     MappedFile mappedFileFirst = null;   3   4     if (!this.mappedFiles.isEmpty()) {   5         try {   6             mappedFileFirst = this.mappedFiles.get(0);   7         } catch (IndexOutOfBoundsException e) {   8             //ignore   9         } catch (Exception e) {  10             log.error("getFirstMappedFile has exception.", e);  11         }  12     }  13  14     return mappedFileFirst;  15 }

这里很简单,在mappedFiles不为空的情况下,会取出第一个MappedFile

MappedFile 则持有与文件有关的属性和操作:

1 public class MappedFile extends ReferenceResource {  2     protected int fileSize;  3     protected FileChannel fileChannel;  4     protected ByteBuffer writeBuffer = null;  5     private String fileName;  6     private long fileFromOffset;  7     private File file;  8     ......  9 }

MappedFile可以通过fileChannel来完成对文件的访问和修改

在得到第一个文件的MappedFile映射后,通过getFileFromOffset方法,获取该文件的Offset

在DefaultMessageStore的start方法中将这个Offset作为maxPhysicalPosInLogicQueue
然后遍历consumeQueueTable中的所有ConsumeQueue,通过ConsumeQueue可以得到消费的最大Offset
遍历完成,maxPhysicalPosInLogicQueue就会被替换为最大的那次的消费Offset,这样后续就可以通过这个Offset映射到具体哪个文件的哪个位置

 

接着调用reputMessageService的setReputFromOffset方法:

1 public void setReputFromOffset(long reputFromOffset) {  2     this.reputFromOffset = reputFromOffset;  3 }

将reputFromOffset更新为刚才得到的Offset

然后调用reputMessageService的start方法,启动ReputMessageService服务,ReputMessageService是一个Thread,所以是启动了一个线程:

 1 public void run() {   2     DefaultMessageStore.log.info(this.getServiceName() + " service started");   3   4     while (!this.isStopped()) {   5         try {   6             Thread.sleep(1);   7             this.doReput();   8         } catch (Exception e) {   9             DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);  10         }  11     }  12  13     DefaultMessageStore.log.info(this.getServiceName() + " service end");  14 }

这个线程很简单,定时1毫秒调用doReput方法

ReputMessageService的doReput方法:

 1 private void doReput() {   2     if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {   3         log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",   4             this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());   5         this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();   6     }   7     for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {   8   9         if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()  10             && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {  11             break;  12         }  13  14         SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);  15         if (result != null) {  16             try {  17                 this.reputFromOffset = result.getStartOffset();  18  19                 for (int readSize = 0; readSize < result.getSize() && doNext; ) {  20                     DispatchRequest dispatchRequest =  21                         DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);  22                     int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();  23  24                     if (dispatchRequest.isSuccess()) {  25                         if (size > 0) {  26                             DefaultMessageStore.this.doDispatch(dispatchRequest);  27  28                             if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()  29                                 && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {  30                                 DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),  31                                     dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,  32                                     dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),  33                                     dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());  34                             }  35  36                             this.reputFromOffset += size;  37                             readSize += size;  38                             if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {  39                                 DefaultMessageStore.this.storeStatsService  40                                     .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();  41                                 DefaultMessageStore.this.storeStatsService  42                                     .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())  43                                     .addAndGet(dispatchRequest.getMsgSize());  44                             }  45                         } else if (size == 0) {  46                             this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);  47                             readSize = result.getSize();  48                         }  49                     } else if (!dispatchRequest.isSuccess()) {  50  51                         if (size > 0) {  52                             log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);  53                             this.reputFromOffset += size;  54                         } else {  55                             doNext = false;  56                             log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",  57                                 this.reputFromOffset);  58  59                             this.reputFromOffset += result.getSize() - readSize;  60                         }  61                     }  62                 }  63             } finally {  64                 result.release();  65             }  66         } else {  67             doNext = false;  68         }  69     }  70 }

首先看到这个for循环的结束条件isCommitLogAvailable

isCommitLogAvailable方法:

1 private boolean isCommitLogAvailable() {  2     return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();  3 }

其中commitLog的getMaxOffset方法和getMinOffset方法相似:

 1 public long getMaxOffset() {   2     MappedFile mappedFile = getLastMappedFile();   3     if (mappedFile != null) {   4         return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();   5     }   6     return 0;   7 }   8   9 public MappedFile getLastMappedFile() {  10     MappedFile mappedFileLast = null;  11  12     while (!this.mappedFiles.isEmpty()) {  13         try {  14             mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);  15             break;  16         } catch (IndexOutOfBoundsException e) {  17             //continue;  18         } catch (Exception e) {  19             log.error("getLastMappedFile has exception.", e);  20             break;  21         }  22     }  23  24     return mappedFileLast;  25 }

先通过getLastMappedFile得到最后一个文件的映射MappedFile
进而得到fileFromOffset,通过fileFromOffset+ReadPosition定位到当前文件读取指针的位置

isCommitLogAvailable方法,就是判断reputFromOffset是否达到了最后一个文件能访问的地方

 

回到for循环,根据reputFromOffset,通过commitLog的getData方法获取SelectMappedBufferResult

CommitLog的getData方法:

 1 public SelectMappedBufferResult getData(final long offset) {   2     return this.getData(offset, offset == 0);   3 }   4   5 public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {   6     int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();   7     MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);   8     if (mappedFile != null) {   9         int pos = (int) (offset % mappedFileSize);  10         SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);  11         return result;  12     }  13  14     return null;  15 }

这里的mappedFileSize就是文件的大小,默认1G

根据reputFromOffset通过mappedFileQueue的findMappedFileByOffset方法定位具体的MappedFile文件映射

MappedFileQueue的findMappedFileByOffset方法:

 1 public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {   2     try {   3         MappedFile firstMappedFile = this.getFirstMappedFile();   4         MappedFile lastMappedFile = this.getLastMappedFile();   5         if (firstMappedFile != null && lastMappedFile != null) {   6             if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {   7                 LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",   8                     offset,   9                     firstMappedFile.getFileFromOffset(),  10                     lastMappedFile.getFileFromOffset() + this.mappedFileSize,  11                     this.mappedFileSize,  12                     this.mappedFiles.size());  13             } else {  14                 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));  15                 MappedFile targetFile = null;  16                 try {  17                     targetFile = this.mappedFiles.get(index);  18                 } catch (Exception ignored) {  19                 }  20  21                 if (targetFile != null && offset >= targetFile.getFileFromOffset()  22                     && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {  23                     return targetFile;  24                 }  25  26                 for (MappedFile tmpMappedFile : this.mappedFiles) {  27                     if (offset >= tmpMappedFile.getFileFromOffset()  28                         && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {  29                         return tmpMappedFile;  30                     }  31                 }  32             }  33  34             if (returnFirstOnNotFound) {  35                 return firstMappedFile;  36             }  37         }  38     } catch (Exception e) {  39         log.error("findMappedFileByOffset Exception", e);  40     }  41  42     return null;  43 }

首先检查offset的有效性,然后通过:

1 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));

这个简单的计算,得到offset对应的文件在mappedFiles这个list中的下标,进而得到文件映射MappedFile

回到getData,再通过offset得到MappedFile后
通过offset和mappedFileSize(1G)取余,得到文件指针起始位置

然后调用mappedFile的selectMappedBuffer方法,得到SelectMappedBufferResult:

 1 public SelectMappedBufferResult selectMappedBuffer(int pos) {   2      int readPosition = getReadPosition();   3      if (pos < readPosition && pos >= 0) {   4          if (this.hold()) {   5              ByteBuffer byteBuffer = this.mappedByteBuffer.slice();   6              byteBuffer.position(pos);   7              int size = readPosition - pos;   8              ByteBuffer byteBufferNew = byteBuffer.slice();   9              byteBufferNew.limit(size);  10              return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);  11          }  12      }  13  14      return null;  15  }

这里通过JDK的NIO操作,将文件从pos起始到readPosition结束的数据(所有的消息信息)放入byteBufferNew中

然后将这些信息封装在SelectMappedBufferResult中

 

回到doReput方法,在得到SelectMappedBufferResult后,首先会跟新当前reputFromOffset

进入for循环,会将封装好的消息从头读取完,通过commitLog的checkMessageAndReturnSize方法封装成一个个的DispatchRequest

CommitLog的checkMessageAndReturnSize方法:

  1 public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,    2     final boolean readBody) {    3     try {    4         // 1 TOTAL SIZE    5         int totalSize = byteBuffer.getInt();    6    7         // 2 MAGIC CODE    8         int magicCode = byteBuffer.getInt();    9         switch (magicCode) {   10             case MESSAGE_MAGIC_CODE:   11                 break;   12             case BLANK_MAGIC_CODE:   13                 return new DispatchRequest(0, true /* success */);   14             default:   15                 log.warn("found a illegal magic code 0x" + Integer.toHexString(magicCode));   16                 return new DispatchRequest(-1, false /* success */);   17         }   18   19         byte[] bytesContent = new byte[totalSize];   20   21         int bodyCRC = byteBuffer.getInt();   22   23         int queueId = byteBuffer.getInt();   24   25         int flag = byteBuffer.getInt();   26   27         long queueOffset = byteBuffer.getLong();   28   29         long physicOffset = byteBuffer.getLong();   30   31         int sysFlag = byteBuffer.getInt();   32   33         long bornTimeStamp = byteBuffer.getLong();   34   35         ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8);   36   37         long storeTimestamp = byteBuffer.getLong();   38   39         ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8);   40   41         int reconsumeTimes = byteBuffer.getInt();   42   43         long preparedTransactionOffset = byteBuffer.getLong();   44   45         int bodyLen = byteBuffer.getInt();   46         if (bodyLen > 0) {   47             if (readBody) {   48                 byteBuffer.get(bytesContent, 0, bodyLen);   49   50                 if (checkCRC) {   51                     int crc = UtilAll.crc32(bytesContent, 0, bodyLen);   52                     if (crc != bodyCRC) {   53                         log.warn("CRC check failed. bodyCRC={}, currentCRC={}", crc, bodyCRC);   54                         return new DispatchRequest(-1, false/* success */);   55                     }   56                 }   57             } else {   58                 byteBuffer.position(byteBuffer.position() + bodyLen);   59             }   60         }   61   62         byte topicLen = byteBuffer.get();   63         byteBuffer.get(bytesContent, 0, topicLen);   64         String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);   65   66         long tagsCode = 0;   67         String keys = "";   68         String uniqKey = null;   69   70         short propertiesLength = byteBuffer.getShort();   71         Map<String, String> propertiesMap = null;   72         if (propertiesLength > 0) {   73             byteBuffer.get(bytesContent, 0, propertiesLength);   74             String properties = new String(bytesContent, 0, propertiesLength, MessageDecoder.CHARSET_UTF8);   75             propertiesMap = MessageDecoder.string2messageProperties(properties);   76   77             keys = propertiesMap.get(MessageConst.PROPERTY_KEYS);   78   79             uniqKey = propertiesMap.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);   80   81             String tags = propertiesMap.get(MessageConst.PROPERTY_TAGS);   82             if (tags != null && tags.length() > 0) {   83                 tagsCode = MessageExtBrokerInner.tagsString2tagsCode(MessageExt.parseTopicFilterType(sysFlag), tags);   84             }   85   86             // Timing message processing   87             {   88                 String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);   89                 if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {   90                     int delayLevel = Integer.parseInt(t);   91   92                     if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {   93                         delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();   94                     }   95   96                     if (delayLevel > 0) {   97                         tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,   98                             storeTimestamp);   99                     }  100                 }  101             }  102         }  103  104         int readLength = calMsgLength(bodyLen, topicLen, propertiesLength);  105         if (totalSize != readLength) {  106             doNothingForDeadCode(reconsumeTimes);  107             doNothingForDeadCode(flag);  108             doNothingForDeadCode(bornTimeStamp);  109             doNothingForDeadCode(byteBuffer1);  110             doNothingForDeadCode(byteBuffer2);  111             log.error(  112                 "[BUG]read total count not equals msg total size. totalSize={}, readTotalCount={}, bodyLen={}, topicLen={}, propertiesLength={}",  113                 totalSize, readLength, bodyLen, topicLen, propertiesLength);  114             return new DispatchRequest(totalSize, false/* success */);  115         }  116  117         return new DispatchRequest(  118             topic,  119             queueId,  120             physicOffset,  121             totalSize,  122             tagsCode,  123             storeTimestamp,  124             queueOffset,  125             keys,  126             uniqKey,  127             sysFlag,  128             preparedTransactionOffset,  129             propertiesMap  130         );  131     } catch (Exception e) {  132     }  133  134     return new DispatchRequest(-1, false /* success */);  135 }

这里的操作其实不难,根据刚才存放在缓冲区ByteBuffer中的消息数据,按CommitLog存储消息的结构顺序读取数据,将比特信息转换为对应的消息结构中的值

消息结构如下:

然后将所需信息封装为DispatchRequest

得到DispatchRequest后,调用DefaultMessageStore的doDispatch方法,将消息分发

DefaultMessageStore的doDispatch方法:

1 public void doDispatch(DispatchRequest req) {  2     for (CommitLogDispatcher dispatcher : this.dispatcherList) {  3         dispatcher.dispatch(req);  4     }  5 }

这里的dispatcherList,在DefaultMessageStore初始化的时候添加了CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex实例

所以这里实际执行CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex的dispatch方法

CommitLogDispatcherBuildConsumeQueue的dispatch方法:

 1 public void dispatch(DispatchRequest request) {   2     final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());   3     switch (tranType) {   4         case MessageSysFlag.TRANSACTION_NOT_TYPE:   5         case MessageSysFlag.TRANSACTION_COMMIT_TYPE:   6             DefaultMessageStore.this.putMessagePositionInfo(request);   7             break;   8         case MessageSysFlag.TRANSACTION_PREPARED_TYPE:   9         case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:  10             break;  11     }  12 }

当消息满足TRANSACTION_NOT_TYPE和TRANSACTION_COMMIT_TYPE时,调用putMessagePositionInfo方法

DefaultMessageStore的putMessagePositionInfo方法:

1 public void putMessagePositionInfo(DispatchRequest dispatchRequest) {  2     ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());  3     cq.putMessagePositionInfoWrapper(dispatchRequest);  4 }

首先根据dispatchRequest封装的Topic和QueueId查找对应的ConsumeQueue

findConsumeQueue方法:

 1 public ConsumeQueue findConsumeQueue(String topic, int queueId) {   2     ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);   3     if (null == map) {   4         ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);   5         ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);   6         if (oldMap != null) {   7             map = oldMap;   8         } else {   9             map = newMap;  10         }  11     }  12  13     ConsumeQueue logic = map.get(queueId);  14     if (null == logic) {  15         ConsumeQueue newLogic = new ConsumeQueue(  16             topic,  17             queueId,  18             StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),  19             this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),  20             this);  21         ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);  22         if (oldLogic != null) {  23             logic = oldLogic;  24         } else {  25             logic = newLogic;  26         }  27     }  28  29     return logic;  30 }

这里的实现还是比较简单的,根据Topic和queueId在consumeQueueTable中查找,若是不存在直接创建

得到ConsumeQueue后,调用其putMessagePositionInfoWrapper方法

ConsumeQueue的putMessagePositionInfoWrapper方法:

 1 public void putMessagePositionInfoWrapper(DispatchRequest request) {   2    final int maxRetries = 30;   3     boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();   4     for (int i = 0; i < maxRetries && canWrite; i++) {   5         long tagsCode = request.getTagsCode();   6         if (isExtWriteEnable()) {   7             ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();   8             cqExtUnit.setFilterBitMap(request.getBitMap());   9             cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());  10             cqExtUnit.setTagsCode(request.getTagsCode());  11  12             long extAddr = this.consumeQueueExt.put(cqExtUnit);  13             if (isExtAddr(extAddr)) {  14                 tagsCode = extAddr;  15             } else {  16                 log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,  17                     topic, queueId, request.getCommitLogOffset());  18             }  19         }  20         boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),  21             request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());  22         if (result) {  23             this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());  24             return;  25         } else {  26             // XXX: warn and notify me  27             log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()  28                 + " failed, retry " + i + " times");  29  30             try {  31                 Thread.sleep(1000);  32             } catch (InterruptedException e) {  33                 log.warn("", e);  34             }  35         }  36     }  37  38     // XXX: warn and notify me  39     log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);  40     this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();  41 }

这个方法主要是在重试次数内完成对putMessagePositionInfo的调用

putMessagePositionInfo方法:

 1 private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,   2    final long cqOffset) {   3   4     if (offset + size <= this.maxPhysicOffset) {   5         log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);   6         return true;   7     }   8   9     this.byteBufferIndex.flip();  10     this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);  11     this.byteBufferIndex.putLong(offset);  12     this.byteBufferIndex.putInt(size);  13     this.byteBufferIndex.putLong(tagsCode);  14  15     final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;  16  17     MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);  18     if (mappedFile != null) {  19  20         if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {  21             this.minLogicOffset = expectLogicOffset;  22             this.mappedFileQueue.setFlushedWhere(expectLogicOffset);  23             this.mappedFileQueue.setCommittedWhere(expectLogicOffset);  24             this.fillPreBlank(mappedFile, expectLogicOffset);  25             log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "  26                 + mappedFile.getWrotePosition());  27         }  28  29         if (cqOffset != 0) {  30             long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();  31  32             if (expectLogicOffset < currentLogicOffset) {  33                 log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",  34                     expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);  35                 return true;  36             }  37  38             if (expectLogicOffset != currentLogicOffset) {  39                 LOG_ERROR.warn(  40                     "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",  41                     expectLogicOffset,  42                     currentLogicOffset,  43                     this.topic,  44                     this.queueId,  45                     expectLogicOffset - currentLogicOffset  46                 );  47             }  48         }  49         this.maxPhysicOffset = offset + size;  50         return mappedFile.appendMessage(this.byteBufferIndex.array());  51     }  52     return false;  53 }

将DispatchRequest中封装的CommitLogOffset、MsgSize以及tagsCode这 20字节的信息byteBufferIndex这个ByteBuffer中

根据ConsumeQueueOffset即cqOffset*CQ_STORE_UNIT_SIZE(20)计算expectLogicOffset

ConsumeQueue文件是通过20字节来存放对应CommitLog文件中的消息映射
其原理和CommitLog的相同

expectLogicOffset就是ConsumeQueue文件逻辑Offset,由此可以通过getLastMappedFile找到对应的文件映射MappedFile

在得到MappedFile后通过appendMessage方法,将byteBufferIndex中的数据追加在对应的ConsumeQueue文件中

MappedFile的appendMessage方法:

 1 public boolean appendMessage(final byte[] data) {   2     int currentPos = this.wrotePosition.get();   3   4     if ((currentPos + data.length) <= this.fileSize) {   5         try {   6             this.fileChannel.position(currentPos);   7             this.fileChannel.write(ByteBuffer.wrap(data));   8         } catch (Throwable e) {   9             log.error("Error occurred when append message to mappedFile.", e);  10         }  11         this.wrotePosition.addAndGet(data.length);  12         return true;  13     }  14  15     return false;  16 }

这里就通过JDK的NIO提供的API完成20字节数据从currentPos起始位置的追加

 

CommitLogDispatcherBuildIndex的dispatch方法:

1 public void dispatch(DispatchRequest request) {  2     if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {  3         DefaultMessageStore.this.indexService.buildIndex(request);  4     }  5 }

根据messageIndexEnable属性的设置,调用indexService的buildIndex方法,实际上就是向Index文件的追加,原理类似,就不再说了

(IndexFile:CommitLog的另外一种形式的索引文件,只是索引的是messageKey,每个MsgKey经过hash后计算存储的slot,然后将offset存到IndexFile的相应slot上,根据msgKey来查询消息时,可以先到IndexFile中查询offset,然后根据offset去commitLog中查询消息详情)

在完成doDispatch后
如果当前是Master并且设置了长轮询的话,则需要通过messageArrivingListener通知消费队列有新的消息,后续博客再分析

对于SLAVE会进行相应的统计工作

也就是说doReput根据CommitLog文件中的数据,不断地进行消息分配

 

回到DefaultMessageStore的start方法,启动完reputMessageService后,用了一个while循环,一直等待消息分配完毕

其中dispatchBehindBytes方法:

1 public long dispatchBehindBytes() {  2     return this.reputMessageService.behind();  3 }  4  5 public long behind() {  6     return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;  7 }

用来检查是否分配完毕

然后调用recoverTopicQueueTable方法:

 1 public void recoverTopicQueueTable() {   2     HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);   3     long minPhyOffset = this.commitLog.getMinOffset();   4     for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {   5         for (ConsumeQueue logic : maps.values()) {   6             String key = logic.getTopic() + "-" + logic.getQueueId();   7             table.put(key, logic.getMaxOffsetInQueue());   8             logic.correctMinOffset(minPhyOffset);   9         }  10     }  11  12     this.commitLog.setTopicQueueTable(table);  13 }

由于前面的消息分配,这里将ConsumeQueue的Topic和QueueId,以及MaxOffset保存在table中,同时调用correctMinOffset方法根据物理队列最小offset计算修正逻辑队列最小offset

 

当所有的ConsumeQueue遍历完成后,更新commitLog的topicQueueTable:

1 protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);

 

在完成这些过后,会开启HA服务(非DLeger情况下),关于HA后续博客再详细介绍
接着开启flushConsumeQueueService服务
和reputMessageService类似,这里也会启动一个线程,使用doFlush方法定时刷新ConsumeQueue

FlushConsumeQueueService的doFlush方法:

 1 private void doFlush(int retryTimes) {   2     int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();   3   4     if (retryTimes == RETRY_TIMES_OVER) {   5         flushConsumeQueueLeastPages = 0;   6     }   7   8     long logicsMsgTimestamp = 0;   9  10     int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();  11     long currentTimeMillis = System.currentTimeMillis();  12     if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {  13         this.lastFlushTimestamp = currentTimeMillis;  14         flushConsumeQueueLeastPages = 0;  15         logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();  16     }  17  18     ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;  19  20     for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {  21         for (ConsumeQueue cq : maps.values()) {  22             boolean result = false;  23             for (int i = 0; i < retryTimes && !result; i++) {  24                 result = cq.flush(flushConsumeQueueLeastPages);  25             }  26         }  27     }  28  29     if (0 == flushConsumeQueueLeastPages) {  30         if (logicsMsgTimestamp > 0) {  31             DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);  32         }  33         DefaultMessageStore.this.getStoreCheckpoint().flush();  34     }  35 }

这里通过遍历consumeQueueTable中所有的ConsumeQueue,执行其flush方法

ConsumeQueue的flush方法:

1 public boolean flush(final int flushLeastPages) {  2     boolean result = this.mappedFileQueue.flush(flushLeastPages);  3     if (isExtReadEnable()) {  4         result = result & this.consumeQueueExt.flush(flushLeastPages);  5     }  6  7     return result;  8 }

调用mappedFileQueue的flush方法:

 1 public boolean flush(final int flushLeastPages) {   2     boolean result = true;   3     MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);   4     if (mappedFile != null) {   5         long tmpTimeStamp = mappedFile.getStoreTimestamp();   6         int offset = mappedFile.flush(flushLeastPages);   7         long where = mappedFile.getFileFromOffset() + offset;   8         result = where == this.flushedWhere;   9         this.flushedWhere = where;  10         if (0 == flushLeastPages) {  11             this.storeTimestamp = tmpTimeStamp;  12         }  13     }  14  15     return result;  16 }

根据flushedWhere,通过findMappedFileByOffset获取要刷新的文件映射MappedFile

调用其MappedFile的flush方法

 1 public int flush(final int flushLeastPages) {   2     if (this.isAbleToFlush(flushLeastPages)) {   3         if (this.hold()) {   4             int value = getReadPosition();   5   6             try {   7                 //We only append data to fileChannel or mappedByteBuffer, never both.   8                 if (writeBuffer != null || this.fileChannel.position() != 0) {   9                     this.fileChannel.force(false);  10                 } else {  11                     this.mappedByteBuffer.force();  12                 }  13             } catch (Throwable e) {  14                 log.error("Error occurred when force data to disk.", e);  15             }  16  17             this.flushedPosition.set(value);  18             this.release();  19         } else {  20             log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());  21             this.flushedPosition.set(getReadPosition());  22         }  23     }  24     return this.getFlushedPosition();  25 }

这里就通过NIO的force,将更新的数据强制写入MappedFile对应的ConsumeQueue文件

完成写入后,更新flushedWhere值,方便下一次刷新的定位

 

在启动完ConsumeQueue的刷新服务后,启动commitLog

1 public void start() {  2     this.flushCommitLogService.start();  3  4     if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {  5         this.commitLogService.start();  6     }  7 }

首先会启动CommitLog的刷盘服务,分为同步刷盘和异步刷盘两种模式

在采用内存池缓存消息的时候需要启动commitLogService,在使用内存池的时候,这个服务会定时将内存池中的数据刷新到FileChannel中

关于这两个后续博客再详细介绍

 

接着还会启动storeStatsService服务,监控Store

通过addScheduleTask方法,会设置几个定时任务

 1 private void addScheduleTask() {   2     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   3         @Override   4         public void run() {   5             DefaultMessageStore.this.cleanFilesPeriodically();   6         }   7     }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);   8   9     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  10         @Override  11         public void run() {  12             DefaultMessageStore.this.checkSelf();  13         }  14     }, 1, 10, TimeUnit.MINUTES);  15  16     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {  17         @Override  18         public void run() {  19             if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {  20                 try {  21                     if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {  22                         long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();  23                         if (lockTime > 1000 && lockTime < 10000000) {  24  25                             String stack = UtilAll.jstack();  26                             final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"  27                                 + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;  28                             MixAll.string2FileNotSafe(stack, fileName);  29                         }  30                     }  31                 } catch (Exception e) {  32                 }  33             }  34         }  35     }, 1, 1, TimeUnit.SECONDS);  36 }

①定期清除文件,会定期删除掉长时间(默认72小时)未被引用的CommitLog文件

②定期检查CommitLog和ConsumeQueue文件有否损坏、丢失,做日志打印

③定期虚拟机堆栈使用日志记录

 

DefaultMessageStore的启动完毕,回到BrokerController的start方法
接着启动remotingServer和fastRemotingServer,建立物理层的网络侦听,这在NameServer中介绍过了,不要忘记对serverBootstrap绑定了一个NettyServerHandler,这是后续博客的基础

brokerOuterAPI的start,其实调用了Netty客户端的方法:

1 public void start() {  2     this.remotingClient.start();  3 }

这在分析Producer的启动时也详细说过

之后会启动pullRequestHoldService,分析消息拉取时再说

clientHousekeepingService的start方法会设置一个定时任务:

 1 public void start() {   2     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {   3         @Override   4         public void run() {   5             try {   6                 ClientHousekeepingService.this.scanExceptionChannel();   7             } catch (Throwable e) {   8                 log.error("Error occurred when scan not active client channels.", e);   9             }  10         }  11     }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);  12 }  13  14 private void scanExceptionChannel() {  15     this.brokerController.getProducerManager().scanNotActiveChannel();  16     this.brokerController.getConsumerManager().scanNotActiveChannel();  17     this.brokerController.getFilterServerManager().scanNotActiveChannel();  18 }

用于扫描并清除产生异常的Channel缓存

接着启动filterServerManager服务

之后在非DLeger模式下,
Master会启动事务消息检查,遍历未提交、未回滚的部分消息并向生产者发送检查请求以获取事务状态
进行偏移量的检查和计算等操作,并移除掉需要丢弃的消息
Slave会启动同步操作
后续详细介绍

 

接着会调用registerBrokerAll,向Master的注册
registerBrokerAll方法:

 1 public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {   2    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();   3   4     if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())   5         || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {   6         ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();   7         for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {   8             TopicConfig tmp =   9                 new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),  10                     this.brokerConfig.getBrokerPermission());  11             topicConfigTable.put(topicConfig.getTopicName(), tmp);  12         }  13         topicConfigWrapper.setTopicConfigTable(topicConfigTable);  14     }  15  16     if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),  17         this.getBrokerAddr(),  18         this.brokerConfig.getBrokerName(),  19         this.brokerConfig.getBrokerId(),  20         this.brokerConfig.getRegisterBrokerTimeoutMills())) {  21         doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);  22     }  23 }

这里通过TopicConfigManager的buildTopicConfigSerializeWrapper方法,把其topicConfigTable

1 private final ConcurrentMap<String, TopicConfig> topicConfigTable =  2         new ConcurrentHashMap<String, TopicConfig>(1024);

这张记录Topic信息的表封装在TopicConfigSerializeWrapper中

在注册前会通过needRegister检查是否需要注册
needRegister方法:

 1 private boolean needRegister(final String clusterName,   2    final String brokerAddr,   3     final String brokerName,   4     final long brokerId,   5     final int timeoutMills) {   6   7     TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();   8     List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);   9     boolean needRegister = false;  10     for (Boolean changed : changeList) {  11         if (changed) {  12             needRegister = true;  13             break;  14         }  15     }  16     return needRegister;  17 }

这里通过brokerOuterAPI的needRegister方法:

 1 public List<Boolean> needRegister(   2     final String clusterName,   3     final String brokerAddr,   4     final String brokerName,   5     final long brokerId,   6     final TopicConfigSerializeWrapper topicConfigWrapper,   7     final int timeoutMills) {   8     final List<Boolean> changedList = new CopyOnWriteArrayList<>();   9     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();  10     if (nameServerAddressList != null && nameServerAddressList.size() > 0) {  11         final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());  12         for (final String namesrvAddr : nameServerAddressList) {  13             brokerOuterExecutor.execute(new Runnable() {  14                 @Override  15                 public void run() {  16                     try {  17                         QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();  18                         requestHeader.setBrokerAddr(brokerAddr);  19                         requestHeader.setBrokerId(brokerId);  20                         requestHeader.setBrokerName(brokerName);  21                         requestHeader.setClusterName(clusterName);  22                         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);  23                         request.setBody(topicConfigWrapper.getDataVersion().encode());  24                         RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);  25                         DataVersion nameServerDataVersion = null;  26                         Boolean changed = false;  27                         switch (response.getCode()) {  28                             case ResponseCode.SUCCESS: {  29                                 QueryDataVersionResponseHeader queryDataVersionResponseHeader =  30                                     (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);  31                                 changed = queryDataVersionResponseHeader.getChanged();  32                                 byte[] body = response.getBody();  33                                 if (body != null) {  34                                     nameServerDataVersion = DataVersion.decode(body, DataVersion.class);  35                                     if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {  36                                         changed = true;  37                                     }  38                                 }  39                                 if (changed == null || changed) {  40                                     changedList.add(Boolean.TRUE);  41                                 }  42                             }  43                             default:  44                                 break;  45                         }  46                         log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);  47                     } catch (Exception e) {  48                         changedList.add(Boolean.TRUE);  49                         log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);  50                     } finally {  51                         countDownLatch.countDown();  52                     }  53                 }  54             });  55  56         }  57         try {  58             countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);  59         } catch (InterruptedException e) {  60             log.error("query dataversion from nameserver countDownLatch await Exception", e);  61         }  62     }  63     return changedList;  64 }

首先获取NameServer的地址列表

遍历所有NameServer地址

封装QueryDataVersionRequestHeader请求头
通过remotingClient的invokeSync方法(【RocketMQ中Producer消息的发送源码分析】

以同步方式向NameServe发送QUERY_DATA_VERSION请求,将DataVersion信息发送过去,在NameServe端进行比对,进行相应响应

在收到成功的响应后,检查回送的nameServerDataVersion是否相等,若不相等,在changedList中添加一个true
直至和所有NameServe比对完成

若是返回的List中有true,则代表需要向NameServe注册

调用doRegisterBrokerAll方法:

 1 private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,   2         TopicConfigSerializeWrapper topicConfigWrapper) {   3     List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(   4         this.brokerConfig.getBrokerClusterName(),   5         this.getBrokerAddr(),   6         this.brokerConfig.getBrokerName(),   7         this.brokerConfig.getBrokerId(),   8         this.getHAServerAddr(),   9         topicConfigWrapper,  10         this.filterServerManager.buildNewFilterServerList(),  11         oneway,  12         this.brokerConfig.getRegisterBrokerTimeoutMills(),  13         this.brokerConfig.isCompressedRegister());  14  15     if (registerBrokerResultList.size() > 0) {  16         RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);  17         if (registerBrokerResult != null) {  18             if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {  19                 this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());  20             }  21  22             this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());  23  24             if (checkOrderConfig) {  25                 this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());  26             }  27         }  28     }  29 }

从这可以看到,会向NameServer注册很多信息,调用brokerOuterAPI的registerBrokerAll方法:

 1 public List<RegisterBrokerResult> registerBrokerAll(   2     final String clusterName,   3     final String brokerAddr,   4     final String brokerName,   5     final long brokerId,   6     final String haServerAddr,   7     final TopicConfigSerializeWrapper topicConfigWrapper,   8     final List<String> filterServerList,   9     final boolean oneway,  10     final int timeoutMills,  11     final boolean compressed) {  12  13     final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();  14     List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();  15     if (nameServerAddressList != null && nameServerAddressList.size() > 0) {  16  17         final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();  18         requestHeader.setBrokerAddr(brokerAddr);  19         requestHeader.setBrokerId(brokerId);  20         requestHeader.setBrokerName(brokerName);  21         requestHeader.setClusterName(clusterName);  22         requestHeader.setHaServerAddr(haServerAddr);  23         requestHeader.setCompressed(compressed);  24  25         RegisterBrokerBody requestBody = new RegisterBrokerBody();  26         requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);  27         requestBody.setFilterServerList(filterServerList);  28         final byte[] body = requestBody.encode(compressed);  29         final int bodyCrc32 = UtilAll.crc32(body);  30         requestHeader.setBodyCrc32(bodyCrc32);  31         final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());  32         for (final String namesrvAddr : nameServerAddressList) {  33             brokerOuterExecutor.execute(new Runnable() {  34                 @Override  35                 public void run() {  36                     try {  37                         RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);  38                         if (result != null) {  39                             registerBrokerResultList.add(result);  40                         }  41  42                         log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);  43                     } catch (Exception e) {  44                         log.warn("registerBroker Exception, {}", namesrvAddr, e);  45                     } finally {  46                         countDownLatch.countDown();  47                     }  48                 }  49             });  50         }  51  52         try {  53             countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);  54         } catch (InterruptedException e) {  55         }  56     }  57  58     return registerBrokerResultList;  59 }

和前面的方法类似,构建RegisterBrokerRequestHeader请求头,然后通过registerBroker向列表中的所有NameServer注册

registerBroker:

 1 private RegisterBrokerResult registerBroker(   2     final String namesrvAddr,   3     final boolean oneway,   4     final int timeoutMills,   5     final RegisterBrokerRequestHeader requestHeader,   6     final byte[] body   7 ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,   8     InterruptedException {   9     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);  10     request.setBody(body);  11  12     if (oneway) {  13         try {  14             this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);  15         } catch (RemotingTooMuchRequestException e) {  16             // Ignore  17         }  18         return null;  19     }  20  21     RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);  22     assert response != null;  23     switch (response.getCode()) {  24         case ResponseCode.SUCCESS: {  25             RegisterBrokerResponseHeader responseHeader =  26                 (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);  27             RegisterBrokerResult result = new RegisterBrokerResult();  28             result.setMasterAddr(responseHeader.getMasterAddr());  29             result.setHaServerAddr(responseHeader.getHaServerAddr());  30             if (response.getBody() != null) {  31                 result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));  32             }  33             return result;  34         }  35         default:  36             break;  37     }  38  39     throw new MQBrokerException(response.getCode(), response.getRemark());  40 }

这里会构建REGISTER_BROKER的请求

由于传入的oneway是false,所以这里依然使用invokeSync同步的方式

在响应成功后,可以看到NameServer会回送MasterAddr、HaServerAddr以及KvTable等信息

回到doRegisterBrokerAll方法,当向所有NameServer注册完毕后,会得到一张List

由于Master和Slave是一对多的关系,所以接下来只需要从List中得到第一个RegisterBrokerResult就行了

然后根据updateMasterHAServerAddrPeriodically,这个在我前一篇博客提了下,若是此时updateMasterHAServerAddrPeriodically为false,说明之前完成过updateMasterAddress操作,否则现在完成updateHaMasterAddress

还要向slaveSynchronize,Slave的同步操作更新Master地址

 

回到start,在registerBrokerAll结束后,还会设置一个定时任务,定时调用registerBrokerAll,以便及时更新相关信息

最后后开启brokerStatsManager和brokerFastFailure

至此Broker的启动结束