RocketMQ中Broker的HA策略源碼分析
- 2019 年 10 月 3 日
- 筆記
Broker的HA策略分為兩部分
①同步元數據
②同步消息數據
同步元數據
在Slave啟動時,會啟動一個定時任務用來從master同步元數據
1 if (role == BrokerRole.SLAVE) { 2 if (null != slaveSyncFuture) { 3 slaveSyncFuture.cancel(false); 4 } 5 this.slaveSynchronize.setMasterAddr(null); 6 slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 7 @Override 8 public void run() { 9 try { 10 BrokerController.this.slaveSynchronize.syncAll(); 11 } 12 catch (Throwable e) { 13 log.error("ScheduledTask SlaveSynchronize syncAll error.", e); 14 } 15 } 16 }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS); 17 }
這裡設置了定時任務,執行slaveSynchronize的syncAll方法
可以注意在之前會通過setMasterAddr將Master的地址設為null,這是由於在後面會通過另一個定時任務registerBrokerAll來向NameServer獲取Master的地址,詳見:
SlaveSynchronize的syncAll方法:
1 public void syncAll() { 2 this.syncTopicConfig(); 3 this.syncConsumerOffset(); 4 this.syncDelayOffset(); 5 this.syncSubscriptionGroupConfig(); 6 }
這個方法會依次調用四個方法,來同步相應資訊:
syncTopicConfig:同步topic的配置資訊
syncConsumerOffset:同步Consumer的Offset資訊
syncDelayOffset:同步延遲隊列資訊
syncSubscriptionGroupConfig:同步訂閱資訊
由於這幾個方法的實現是類似的,這裡就只看下syncTopicConfig的實現:
syncTopicConfig方法:
1 private void syncTopicConfig() { 2 String masterAddrBak = this.masterAddr; 3 if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { 4 try { 5 TopicConfigSerializeWrapper topicWrapper = 6 this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); 7 if (!this.brokerController.getTopicConfigManager().getDataVersion() 8 .equals(topicWrapper.getDataVersion())) { 9 10 this.brokerController.getTopicConfigManager().getDataVersion() 11 .assignNewOne(topicWrapper.getDataVersion()); 12 this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); 13 this.brokerController.getTopicConfigManager().getTopicConfigTable() 14 .putAll(topicWrapper.getTopicConfigTable()); 15 this.brokerController.getTopicConfigManager().persist(); 16 17 log.info("Update slave topic config from master, {}", masterAddrBak); 18 } 19 } catch (Exception e) { 20 log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); 21 } 22 } 23 }
這裡首先獲取master的地址masterAddr,由於registerBrokerAll定時任務的存在,即便這一次沒有獲取到masterAddr,只要節點中有master,總會在後面定時執行時從NameServer中獲取到
當獲取到master地址後,通過BrokerOuterAPI的getAllTopicConfig方法,向master請求
BrokerOuterAPI的getAllTopicConfig方法:
1 public TopicConfigSerializeWrapper getAllTopicConfig( 2 final String addr) throws RemotingConnectException, RemotingSendRequestException, 3 RemotingTimeoutException, InterruptedException, MQBrokerException { 4 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); 5 6 RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000); 7 assert response != null; 8 switch (response.getCode()) { 9 case ResponseCode.SUCCESS: { 10 return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); 11 } 12 default: 13 break; 14 } 15 16 throw new MQBrokerException(response.getCode(), response.getRemark()); 17 }
首先構建GET_ALL_TOPIC_CONFIG求情指令,然後通過remotingClient的invokeSync進行同步發送,注意這裡會通過MixAll的brokerVIPChannel方法,得到對應的master地址的VIP通道地址,就是埠號減2,這在我之前的部落格中介紹過
有關同步發送在 【RocketMQ中Producer消息的發送源碼分析】 中詳細介紹過
請求發送給master後,來看看master是怎麼處理的
master端在收到請求後會通過AdminBrokerProcessor的processRequest方法判別請求指令:
1 case RequestCode.GET_ALL_TOPIC_CONFIG: 2 return this.getAllTopicConfig(ctx, request);
執行getAllTopicConfig方法:
1 private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { 2 final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); 3 // final GetAllTopicConfigResponseHeader responseHeader = 4 // (GetAllTopicConfigResponseHeader) response.readCustomHeader(); 5 6 String content = this.brokerController.getTopicConfigManager().encode(); 7 if (content != null && content.length() > 0) { 8 try { 9 response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); 10 } catch (UnsupportedEncodingException e) { 11 log.error("", e); 12 13 response.setCode(ResponseCode.SYSTEM_ERROR); 14 response.setRemark("UnsupportedEncodingException " + e); 15 return response; 16 } 17 } else { 18 log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress()); 19 response.setCode(ResponseCode.SYSTEM_ERROR); 20 response.setRemark("No topic in this broker"); 21 return response; 22 } 23 24 response.setCode(ResponseCode.SUCCESS); 25 response.setRemark(null); 26 27 return response; 28 }
這裡會將TopicConfigManager中保存的topicConfigTable:
1 private final ConcurrentMap<String, TopicConfig> topicConfigTable = 2 new ConcurrentHashMap<String, TopicConfig>(1024);
將這個map通過encode方法轉換成json字元串,再通過Netty發送給slave
回到slave中,在同步發送的情況下,會等待會送響應,收到響應後:
1 switch (response.getCode()) { 2 case ResponseCode.SUCCESS: { 3 return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); 4 } 5 default: 6 break; 7 }
通過decode解碼,將json字元串轉換為map封裝在 TopicConfigSerializeWrapper中
回到syncTopicConfig方法中:
得到TopicConfigSerializeWrapper實例後
1 if (!this.brokerController.getTopicConfigManager().getDataVersion() 2 .equals(topicWrapper.getDataVersion())) { 3 4 this.brokerController.getTopicConfigManager().getDataVersion() 5 .assignNewOne(topicWrapper.getDataVersion()); 6 this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); 7 this.brokerController.getTopicConfigManager().getTopicConfigTable() 8 .putAll(topicWrapper.getTopicConfigTable()); 9 this.brokerController.getTopicConfigManager().persist(); 10 11 log.info("Update slave topic config from master, {}", masterAddrBak); 12 }
判斷版本是否一致,若不一致,會進行替換,這樣slave的Topic配置資訊就和master保持同步了
其他三種資訊的同步同理
同步消息數據
在master啟動時,會通過JDK的NIO方式啟動一個HA服務執行緒,用以處理slave的連接:
1 public void run() { 2 log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 this.selector.select(1000); 7 Set<SelectionKey> selected = this.selector.selectedKeys(); 8 9 if (selected != null) { 10 for (SelectionKey k : selected) { 11 if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { 12 SocketChannel sc = ((ServerSocketChannel) k.channel()).accept(); 13 14 if (sc != null) { 15 HAService.log.info("HAService receive new connection, " 16 + sc.socket().getRemoteSocketAddress()); 17 18 try { 19 HAConnection conn = new HAConnection(HAService.this, sc); 20 conn.start(); 21 HAService.this.addConnection(conn); 22 } catch (Exception e) { 23 log.error("new HAConnection exception", e); 24 sc.close(); 25 } 26 } 27 } else { 28 log.warn("Unexpected ops in select " + k.readyOps()); 29 } 30 } 31 32 selected.clear(); 33 } 34 } catch (Exception e) { 35 log.error(this.getServiceName() + " service has exception.", e); 36 } 37 } 38 39 log.info(this.getServiceName() + " service end"); 40 }
這裡就是非常典型的JDK NIO的使用,在偵聽到連接取得SocketChannel後,將其封裝為HAConnection
1 public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException { 2 this.haService = haService; 3 this.socketChannel = socketChannel; 4 this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString(); 5 this.socketChannel.configureBlocking(false); 6 this.socketChannel.socket().setSoLinger(false, -1); 7 this.socketChannel.socket().setTcpNoDelay(true); 8 this.socketChannel.socket().setReceiveBufferSize(1024 * 64); 9 this.socketChannel.socket().setSendBufferSize(1024 * 64); 10 this.writeSocketService = new WriteSocketService(this.socketChannel); 11 this.readSocketService = new ReadSocketService(this.socketChannel); 12 this.haService.getConnectionCount().incrementAndGet(); 13 }
在構造方法內進行了對socketChannel的一些配置,還創建了一個WriteSocketService和一個ReadSocketService,這兩個是後續處理消息同步的基礎
在創建完HAConnection後,調用其start方法:
1 public void start() { 2 this.readSocketService.start(); 3 this.writeSocketService.start(); 4 }
這裡會啟動兩個執行緒,分別處理讀取slave發送的數據,以及向slave發送數據
到這裡,先不急著分析master了,來看看slave端
slave在啟動時,會啟動HAClient的執行緒:
1 public void run() { 2 log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 if (this.connectMaster()) { 7 8 if (this.isTimeToReportOffset()) { 9 boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); 10 if (!result) { 11 this.closeMaster(); 12 } 13 } 14 15 this.selector.select(1000); 16 17 boolean ok = this.processReadEvent(); 18 if (!ok) { 19 this.closeMaster(); 20 } 21 22 if (!reportSlaveMaxOffsetPlus()) { 23 continue; 24 } 25 26 long interval = 27 HAService.this.getDefaultMessageStore().getSystemClock().now() 28 - this.lastWriteTimestamp; 29 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig() 30 .getHaHousekeepingInterval()) { 31 log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress 32 + "] expired, " + interval); 33 this.closeMaster(); 34 log.warn("HAClient, master not response some time, so close connection"); 35 } 36 } else { 37 this.waitForRunning(1000 * 5); 38 } 39 } catch (Exception e) { 40 log.warn(this.getServiceName() + " service has exception. ", e); 41 this.waitForRunning(1000 * 5); 42 } 43 } 44 45 log.info(this.getServiceName() + " service end"); 46 }
在這個while循環中,首先通過connectMaster檢查是否和master連接了
connectMaster方法:
1 private boolean connectMaster() throws ClosedChannelException { 2 if (null == socketChannel) { 3 String addr = this.masterAddress.get(); 4 if (addr != null) { 5 6 SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr); 7 if (socketAddress != null) { 8 this.socketChannel = RemotingUtil.connect(socketAddress); 9 if (this.socketChannel != null) { 10 this.socketChannel.register(this.selector, SelectionKey.OP_READ); 11 } 12 } 13 } 14 15 this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); 16 17 this.lastWriteTimestamp = System.currentTimeMillis(); 18 } 19 20 return this.socketChannel != null; 21 }
若是socketChannel為null,意味著並沒有產生連接,或者連接斷開
需要重新根據masterAddress建立網路連接
只要是需要建立連接,都需要通過defaultMessageStore的getMaxPhyOffset方法,獲取本地最大的Offset,由currentReportedOffset保存,後續用於向master報告;以及保存了一個時間戳lastWriteTimestamp,用於之後的校對
當確保與master的連接建立成功後,通過isTimeToReportOffset方法,檢查是否需要向master報告當前的最大Offset
isTimeToReportOffset方法:
1 private boolean isTimeToReportOffset() { 2 long interval = 3 HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp; 4 boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig() 5 .getHaSendHeartbeatInterval(); 6 7 return needHeart; 8 }
這裡就通過lastWriteTimestamp和當前時間檢查,判斷是否達到了報告時間間隔HaSendHeartbeatInterval,默認5s
若是達到了,就需要通過reportSlaveMaxOffset方法,將記錄的currentReportedOffset這個最大的offset發送給master
reportSlaveMaxOffset方法:
1 private boolean reportSlaveMaxOffset(final long maxOffset) { 2 this.reportOffset.position(0); 3 this.reportOffset.limit(8); 4 this.reportOffset.putLong(maxOffset); 5 this.reportOffset.position(0); 6 this.reportOffset.limit(8); 7 8 for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { 9 try { 10 this.socketChannel.write(this.reportOffset); 11 } catch (IOException e) { 12 log.error(this.getServiceName() 13 + "reportSlaveMaxOffset this.socketChannel.write exception", e); 14 return false; 15 } 16 } 17 18 return !this.reportOffset.hasRemaining(); 19 }
其中reportOffset是專門用來快取offset的ByteBuffer
1 private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
將maxOffset存放在reportOffset中,然後通過socketChannel的write方法,完成向master的發送
其中hasRemaining方法用來檢查當前位置是否已經達到緩衝區極限limit,確保reportOffset 中的內容能被完全發送出去
發送成功後,會調用selector的select方法,在超時時間內進行NIO的輪詢,等待master的回送
通過這我們可以看出slave在和master建立連接後,會定時向master報告自己當前的offset
來看看master收到offset後是如何處理的:
在master端會通過前面提到的ReadSocketService執行緒進行處理:
1 public void run() { 2 HAConnection.log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 this.selector.select(1000); 7 boolean ok = this.processReadEvent(); 8 if (!ok) { 9 HAConnection.log.error("processReadEvent error"); 10 break; 11 } 12 13 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp; 14 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) { 15 log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval); 16 break; 17 } 18 } catch (Exception e) { 19 HAConnection.log.error(this.getServiceName() + " service has exception.", e); 20 break; 21 } 22 } 23 24 this.makeStop(); 25 26 writeSocketService.makeStop(); 27 28 haService.removeConnection(HAConnection.this); 29 30 HAConnection.this.haService.getConnectionCount().decrementAndGet(); 31 32 SelectionKey sk = this.socketChannel.keyFor(this.selector); 33 if (sk != null) { 34 sk.cancel(); 35 } 36 37 try { 38 this.selector.close(); 39 this.socketChannel.close(); 40 } catch (IOException e) { 41 HAConnection.log.error("", e); 42 } 43 44 HAConnection.log.info(this.getServiceName() + " service end"); 45 }
這裡的while循環中首先也是通過selector的select方法,在超時時間內進行NIO的輪詢
輪詢結束後的進一步的處理由processReadEvent來完成:
1 private boolean processReadEvent() { 2 int readSizeZeroTimes = 0; 3 4 if (!this.byteBufferRead.hasRemaining()) { 5 this.byteBufferRead.flip(); 6 this.processPostion = 0; 7 } 8 9 while (this.byteBufferRead.hasRemaining()) { 10 try { 11 int readSize = this.socketChannel.read(this.byteBufferRead); 12 if (readSize > 0) { 13 readSizeZeroTimes = 0; 14 this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 15 if ((this.byteBufferRead.position() - this.processPostion) >= 8) { 16 int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); 17 long readOffset = this.byteBufferRead.getLong(pos - 8); 18 this.processPostion = pos; 19 20 HAConnection.this.slaveAckOffset = readOffset; 21 if (HAConnection.this.slaveRequestOffset < 0) { 22 HAConnection.this.slaveRequestOffset = readOffset; 23 log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); 24 } 25 26 HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); 27 } 28 } else if (readSize == 0) { 29 if (++readSizeZeroTimes >= 3) { 30 break; 31 } 32 } else { 33 log.error("read socket[" + HAConnection.this.clientAddr + "] < 0"); 34 return false; 35 } 36 } catch (IOException e) { 37 log.error("processReadEvent exception", e); 38 return false; 39 } 40 } 41 42 return true; 43 } 44 }
這個方法其實就是通過socketChannel的read方法,將slave發送過來的數據存入byteBufferRead中
在確保發送過來的數據能達到8位元組時,取出long類型的offset值,然後交給HAConnection的slaveAckOffset成員進行保存
其中slaveRequestOffset是用來處理第一次連接時的同步
notifyTransferSome方法是作為同步master時,進行相應的喚醒操作,非同步master則沒有要求,在後面具體分析
也就是說ReadSocketService這個執行緒,只是不斷地讀取並更新slave發送來的offset數據
再來看看WriteSocketService執行緒是如何進行向slave的發送:
1 public void run() { 2 HAConnection.log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 this.selector.select(1000); 7 8 if (-1 == HAConnection.this.slaveRequestOffset) { 9 Thread.sleep(10); 10 continue; 11 } 12 13 if (-1 == this.nextTransferFromWhere) { 14 if (0 == HAConnection.this.slaveRequestOffset) { 15 long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset(); 16 masterOffset = 17 masterOffset 18 - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() 19 .getMapedFileSizeCommitLog()); 20 21 if (masterOffset < 0) { 22 masterOffset = 0; 23 } 24 25 this.nextTransferFromWhere = masterOffset; 26 } else { 27 this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset; 28 } 29 30 log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr 31 + "], and slave request " + HAConnection.this.slaveRequestOffset); 32 } 33 34 if (this.lastWriteOver) { 35 36 long interval = 37 HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp; 38 39 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() 40 .getHaSendHeartbeatInterval()) { 41 42 // Build Header 43 this.byteBufferHeader.position(0); 44 this.byteBufferHeader.limit(headerSize); 45 this.byteBufferHeader.putLong(this.nextTransferFromWhere); 46 this.byteBufferHeader.putInt(0); 47 this.byteBufferHeader.flip(); 48 49 this.lastWriteOver = this.transferData(); 50 if (!this.lastWriteOver) 51 continue; 52 } 53 } else { 54 this.lastWriteOver = this.transferData(); 55 if (!this.lastWriteOver) 56 continue; 57 } 58 59 SelectMappedBufferResult selectResult = 60 HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere); 61 if (selectResult != null) { 62 int size = selectResult.getSize(); 63 if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) { 64 size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize(); 65 } 66 67 long thisOffset = this.nextTransferFromWhere; 68 this.nextTransferFromWhere += size; 69 70 selectResult.getByteBuffer().limit(size); 71 this.selectMappedBufferResult = selectResult; 72 73 // Build Header 74 this.byteBufferHeader.position(0); 75 this.byteBufferHeader.limit(headerSize); 76 this.byteBufferHeader.putLong(thisOffset); 77 this.byteBufferHeader.putInt(size); 78 this.byteBufferHeader.flip(); 79 80 this.lastWriteOver = this.transferData(); 81 } else { 82 83 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100); 84 } 85 } catch (Exception e) { 86 87 HAConnection.log.error(this.getServiceName() + " service has exception.", e); 88 break; 89 } 90 } 91 92 HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable(); 93 94 if (this.selectMappedBufferResult != null) { 95 this.selectMappedBufferResult.release(); 96 } 97 98 this.makeStop(); 99 100 readSocketService.makeStop(); 101 102 haService.removeConnection(HAConnection.this); 103 104 SelectionKey sk = this.socketChannel.keyFor(this.selector); 105 if (sk != null) { 106 sk.cancel(); 107 } 108 109 try { 110 this.selector.close(); 111 this.socketChannel.close(); 112 } catch (IOException e) { 113 HAConnection.log.error("", e); 114 } 115 116 HAConnection.log.info(this.getServiceName() + " service end"); 117 }
這裡一開始會對slaveRequestOffset進行一次判斷,當且僅當slaveRequestOffset初始化的時候是才是-1
也就是說當slave還沒有發送過來offset時,WriteSocketService執行緒只會幹等
當slave發送來offset後
首先對nextTransferFromWhere進行了判斷,nextTransferFromWhere和slaveRequestOffset一樣,在初始化的時候為-1
也就代表著master和slave剛剛建立連接,並沒有進行過一次消息的同步!
此時會對修改了的slaveRequestOffset進行判斷
若是等於0,說明slave沒有任何消息的歷史記錄,那麼此時master會取得自身的MaxOffset,根據這個MaxOffset,通過:
1 masterOffset = masterOffset 2 - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig() 3 .getMapedFileSizeCommitLog() /* 1G */);
計算出最後一個文件開始的offset
也就是說,當slave沒有消息的歷史記錄,master只會從本地最後一個CommitLog文件開始的地方,將消息數據發送給slave
若是slave有數據,就從slave發送來的offset的位置起,進行發送,通過nextTransferFromWhere記錄這個offset值
接著對lastWriteOver進行了判斷,lastWriteOver是一個狀態量,用來表示上次發送是否傳輸完畢,初始化是true
若是true,這裡會進行一次時間檢查,lastWriteTimestamp記錄最後一次發送的時間
一次來判斷是否超過了時間間隔haSendHeartbeatInterval(默認5s)
也就是說至少有5s,master沒有向slave發送任何消息
那麼此時就會發送一個心跳包
其中byteBufferHeader是一個12位元組的ByteBuffer:
1 private final int headerSize = 8 + 4; 2 private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);
這裡就簡單地構造了一個心跳包,後續通過transferData方法來完成數據的發送
若是 lastWriteOver為false,則表示上次數據沒有發送完,就需要通過transferData方法,將剩餘數據繼續發送,只要沒發送完,只會重複循環,直到發完
先繼續往下看,下面就是發送具體的消息數據了:
首先根據nextTransferFromWhere,也就是剛才保存的offset,通過DefaultMessageStore的getCommitLogData方法,其實際上調用的是CommitLog的getData方法,這個方法在
【RocketMQ中Broker的啟動源碼分析(二)】中關於消息調度(ReputMessageService)時詳細介紹過
根據offset找到對應的CommitLog文件,將其從offset對應起始處所有數據讀入ByteBuffer中,由SelectMappedBufferResult封裝
這裡若是master已將將所有本地數據同步給了slave,那麼得到的SelectMappedBufferResult就會為null,會調用:
1 HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
將自身阻塞,超時等待100ms,要麼一直等到超時時間到了,要麼就會在後面所講的同步雙傳中被同步master喚醒
在得到SelectMappedBufferResult後,這裡會對讀取到的數據大小進行一次判斷,若是大於haTransferBatchSize(默認32K),將size改為32K,實際上就是對發送數據大小的限制,大於32K會切割,每次最多只允許發送32k
通過thisOffset記錄nextTransferFromWhere即offset
更新nextTransferFromWhere值,以便下一次定位
還會將讀取到的數據結果selectResult交給selectMappedBufferResult保存
然後構建消息頭,這裡就和心跳包格式一樣,前八位元組存放offset,後四位元組存放數據大小
最後調用transferData方法,進行發送:
1 private boolean transferData() throws Exception { 2 int writeSizeZeroTimes = 0; 3 // Write Header 4 while (this.byteBufferHeader.hasRemaining()) { 5 int writeSize = this.socketChannel.write(this.byteBufferHeader); 6 if (writeSize > 0) { 7 writeSizeZeroTimes = 0; 8 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 9 } else if (writeSize == 0) { 10 if (++writeSizeZeroTimes >= 3) { 11 break; 12 } 13 } else { 14 throw new Exception("ha master write header error < 0"); 15 } 16 } 17 18 if (null == this.selectMappedBufferResult) { 19 return !this.byteBufferHeader.hasRemaining(); 20 } 21 22 writeSizeZeroTimes = 0; 23 24 // Write Body 25 if (!this.byteBufferHeader.hasRemaining()) { 26 while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { 27 int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer()); 28 if (writeSize > 0) { 29 writeSizeZeroTimes = 0; 30 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); 31 } else if (writeSize == 0) { 32 if (++writeSizeZeroTimes >= 3) { 33 break; 34 } 35 } else { 36 throw new Exception("ha master write body error < 0"); 37 } 38 } 39 } 40 41 boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining(); 42 43 if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) { 44 this.selectMappedBufferResult.release(); 45 this.selectMappedBufferResult = null; 46 } 47 48 return result; 49 }
首先將byteBufferHeader中的12位元組消息頭通過socketChannel的write方法發送出去
然後將selectMappedBufferResult中的ByteBuffer的消息數據發送出去
若是selectMappedBufferResult等於null,說明是心跳包,只發送消息頭
無論發送什麼都會將時間記錄在lastWriteTimestamp中,以便後續發送心跳包的判斷
看到這裡其實就會發現WriteSocketService執行緒開啟後,只要slave向master發出了第一個offset後,WriteSocketService執行緒都會不斷地將對應位置自己本地的CommitLog文件中的內容發送給slave,直到完全同步後,WriteSocketService執行緒才會稍微緩緩,進入阻塞100ms以及每隔五秒發一次心跳包的狀態
但是只要當Producer向master發送來消息後,由刷盤執行緒完成持久化後,WriteSocketService執行緒又會忙碌起來,此時也才是體現同步雙寫和非同步複製的時候
先不急著說這個,來看看slave接收到消息是如何處理的:
是在HAClient的執行緒中的processReadEvent方法處理的:
1 private boolean processReadEvent() { 2 int readSizeZeroTimes = 0; 3 while (this.byteBufferRead.hasRemaining()) { 4 try { 5 int readSize = this.socketChannel.read(this.byteBufferRead); 6 if (readSize > 0) { 7 lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now(); 8 readSizeZeroTimes = 0; 9 boolean result = this.dispatchReadRequest(); 10 if (!result) { 11 log.error("HAClient, dispatchReadRequest error"); 12 return false; 13 } 14 } else if (readSize == 0) { 15 if (++readSizeZeroTimes >= 3) { 16 break; 17 } 18 } else { 19 log.info("HAClient, processReadEvent read socket < 0"); 20 return false; 21 } 22 } catch (IOException e) { 23 log.info("HAClient, processReadEvent read socket exception", e); 24 return false; 25 } 26 } 27 28 return true; 29 }
在socketChannel通過read方法將master發送的數據讀取到byteBufferRead緩衝區後,由dispatchReadRequest方法做進一步處理
dispatchReadRequest方法:
1 private boolean dispatchReadRequest() { 2 final int msgHeaderSize = 8 + 4; // phyoffset + size 3 int readSocketPos = this.byteBufferRead.position(); 4 5 while (true) { 6 int diff = this.byteBufferRead.position() - this.dispatchPostion; 7 if (diff >= msgHeaderSize) { 8 long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPostion); 9 int bodySize = this.byteBufferRead.getInt(this.dispatchPostion + 8); 10 11 long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); 12 13 if (slavePhyOffset != 0) { 14 if (slavePhyOffset != masterPhyOffset) { 15 log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " 16 + slavePhyOffset + " MASTER: " + masterPhyOffset); 17 return false; 18 } 19 } 20 21 if (diff >= (msgHeaderSize + bodySize)) { 22 byte[] bodyData = new byte[bodySize]; 23 this.byteBufferRead.position(this.dispatchPostion + msgHeaderSize); 24 this.byteBufferRead.get(bodyData); 25 26 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData); 27 28 this.byteBufferRead.position(readSocketPos); 29 this.dispatchPostion += msgHeaderSize + bodySize; 30 31 if (!reportSlaveMaxOffsetPlus()) { 32 return false; 33 } 34 35 continue; 36 } 37 } 38 39 if (!this.byteBufferRead.hasRemaining()) { 40 this.reallocateByteBuffer(); 41 } 42 43 break; 44 } 45 46 return true; 47 }
這裡就首先將12位元組的消息頭取出來
masterPhyOffset:8位元組offset ,bodySize :4位元組消息大小
根據master發來的masterPhyOffset會和自己本地的slavePhyOffset進行校驗,以便安全備份
之後就會將byteBufferRead中存放在消息頭後面的消息數據取出來,調用appendToCommitLog方法持久化到的CommitLog中
1 public boolean appendToCommitLog(long startOffset, byte[] data) { 2 if (this.shutdown) { 3 log.warn("message store has shutdown, so appendToPhyQueue is forbidden"); 4 return false; 5 } 6 7 boolean result = this.commitLog.appendData(startOffset, data); 8 if (result) { 9 this.reputMessageService.wakeup(); 10 } else { 11 log.error("appendToPhyQueue failed " + startOffset + " " + data.length); 12 } 13 14 return result; 15 }
實際上調用了commitLog的appendData方法將其寫入磁碟,這個方法我在前面部落格中介紹過
在完成寫入後,需要喚醒reputMessageService消息調度,以便Consumer的消費
關於消息調度詳見 【RocketMQ中Broker的啟動源碼分析(二)】
當然前面說過master還會發送心跳消息,但這裡明顯沒對心跳消息進行處理,只是appendToCommitLog調用時,傳入了一個大小為0的byte數組,顯然有些不合理,想不通
在完成後,還會調用reportSlaveMaxOffsetPlus方法:
1 private boolean reportSlaveMaxOffsetPlus() { 2 boolean result = true; 3 long currentPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset(); 4 if (currentPhyOffset > this.currentReportedOffset) { 5 this.currentReportedOffset = currentPhyOffset; 6 result = this.reportSlaveMaxOffset(this.currentReportedOffset); 7 if (!result) { 8 this.closeMaster(); 9 log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset); 10 } 11 } 12 13 return result; 14 }
由於完成了寫入,那麼此時獲取到的offset肯定比currentReportedOffset中保存的大,然後再次通過reportSlaveMaxOffset方法,將當前的offset報告給master
這其實上已經完成了非同步master的非同步複製過程
再來看看同步雙寫是如何實現的:
和刷盤一樣,都是在Producer發送完消息,Broker進行完消息的存儲後進行的
在CommitLog的handleHA方法:
1 public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { 2 if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { 3 HAService service = this.defaultMessageStore.getHaService(); 4 if (messageExt.isWaitStoreMsgOK()) { 5 // Determine whether to wait 6 if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { 7 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); 8 service.putRequest(request); 9 service.getWaitNotifyObject().wakeupAll(); 10 boolean flushOK = 11 request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); 12 if (!flushOK) { 13 log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " 14 + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); 15 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); 16 } 17 } 18 // Slave problem 19 else { 20 // Tell the producer, slave not available 21 putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); 22 } 23 } 24 } 25 26 }
這裡就會檢查Broker的類型,看以看到只對SYNC_MASTER即同步master進行了操作
這個操作過程其實就和同步刷盤類似
根據Offset+WroteBytes創建一條記錄GroupCommitRequest,然後會將添加在List中
然後調用getWaitNotifyObject的wakeupAll方法,把阻塞中的所有WriteSocketService執行緒喚醒
因為master和slave是一對多的關係,那麼這裡就會有多個slave連接,也就有多個WriteSocketService執行緒,保證消息能同步到所有slave中
在喚醒WriteSocketService執行緒工作後,調用request的waitForFlush方法,將自身阻塞,預示著同步複製的真正開啟
在HAService開啟時,還開啟了一個GroupTransferService執行緒:
1 public void run() { 2 log.info(this.getServiceName() + " service started"); 3 4 while (!this.isStopped()) { 5 try { 6 this.waitForRunning(10); 7 this.doWaitTransfer(); 8 } catch (Exception e) { 9 log.warn(this.getServiceName() + " service has exception. ", e); 10 } 11 } 12 13 log.info(this.getServiceName() + " service end"); 14 }
這裡的工作原理和同步刷盤GroupCommitService基本一致,相似的地方我就不仔細分析了
GroupTransferService同樣保存兩張List:
1 private volatile List<CommitLog.GroupCommitRequest> requestsWrite = new ArrayList<>(); 2 private volatile List<CommitLog.GroupCommitRequest> requestsRead = new ArrayList<>();
由這兩張List做一個類似JVM新生代的複製演算法
在handleHA方法中,就會將創建的GroupCommitRequest記錄添加在requestsWrite這個List中
其中doWaitTransfer方法:
1 private void doWaitTransfer() { 2 synchronized (this.requestsRead) { 3 if (!this.requestsRead.isEmpty()) { 4 for (CommitLog.GroupCommitRequest req : this.requestsRead) { 5 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 6 for (int i = 0; !transferOK && i < 5; i++) { 7 this.notifyTransferObject.waitForRunning(1000); 8 transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 9 } 10 11 if (!transferOK) { 12 log.warn("transfer messsage to slave timeout, " + req.getNextOffset()); 13 } 14 15 req.wakeupCustomer(transferOK); 16 } 17 18 this.requestsRead.clear(); 19 } 20 } 21 }
和刷盤一樣,這裡會通過複製演算法,將requestsWrite和requestsRead進行替換,那麼這裡的requestsRead實際上就存放著剛才添加的記錄
首先取出記錄中的NextOffset和push2SlaveMaxOffset比較
push2SlaveMaxOffset值是通過slave發送過來的,在之前說過的ReadSocketService執行緒中的:
1 HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
notifyTransferSome方法:
1 public void notifyTransferSome(final long offset) { 2 for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { 3 boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); 4 if (ok) { 5 this.groupTransferService.notifyTransferSome(); 6 break; 7 } else { 8 value = this.push2SlaveMaxOffset.get(); 9 } 10 } 11 }
即便也多個slave連接,這裡的push2SlaveMaxOffset永遠會記錄最大的那個offset
所以在doWaitTransfer中,根據當前NextOffset(完成寫入後master本地的offset),進行判斷
其實這裡主要要考慮到WriteSocketService執行緒的工作原理,只要本地文件有更新,那麼就會向slave發送數據,所以這裡由於HA同步是發生在刷盤後的,那麼就有可能在這個doWaitTransfer執行前,有slave已經將數據進行了同步,並且向master報告了自己offset,更新了push2SlaveMaxOffset的值
那麼
1 boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 2 ```
這個判斷就會為真,意味著節點中已經有了備份,所以就會直接調用
1 req.wakeupCustomer(transferOK);
以此來喚醒剛才在handleHA方法中的阻塞
若是判斷為假,就說明沒有一個slave完成同步,就需要
1 for (int i = 0; !transferOK && i < 5; i++) { 2 this.notifyTransferObject.waitForRunning(1000); 3 transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset(); 4 }
通過waitForRunning進行阻塞,超時等待,最多五次等待,超過時間會向Producer發送FLUSH_SLAVE_TIMEOUT
若是在超時時間內,有slave完成了同步,並向master發送了offset後,在notifyTransferSome方法中:
1 public void notifyTransferSome(final long offset) { 2 for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { 3 boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); 4 if (ok) { 5 this.groupTransferService.notifyTransferSome(); 6 break; 7 } else { 8 value = this.push2SlaveMaxOffset.get(); 9 } 10 } 11 }
就會更新push2SlaveMaxOffset,並通過notifyTransferSome喚醒上面所說的阻塞
然後再次判斷push2SlaveMaxOffset和getNextOffset
成功後喚醒剛才在handleHA方法中的阻塞,同步master的主從複製也就結束
由於同步master的刷盤是在主從複製前發生的,所以同步雙寫意味著master和slave都會完成消息的持久化
至此,RocketMQ中Broker的HA策略分析到此結束