RocketMQ 同步複製 SLAVE_NOT_AVAILABLE 異常源碼分析
- 2019 年 11 月 11 日
- 筆記
最近在 RocketMQ 釘釘官方群中看到有人反饋說 broker 主從部署,在發佈消息的時候會報 SLAVE_NOT_AVAILABLE 異常,報這個異常的前提 master 的模式一定為 SYNC_MASTER(同步複製),從 異常碼可以直接判斷的一種原因就是因為 slave 掛掉了,導致 slave 不可用,但是他說 slave 一切正常。
於是我決定擼一波源碼。
既然是主從同步的問題,那麼我們直接定位到處理同步複製的方法:
org.apache.rocketmq.store.CommitLog#handleHA
public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { HAService service = this.defaultMessageStore.getHaService(); if (messageExt.isWaitStoreMsgOK()) { // Determine whether to wait if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); service.getWaitNotifyObject().wakeupAll(); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); } } // Slave problem else { // Tell the producer, slave not available putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); } } } }
消息寫入時需要判斷 master 是否為 SYNC_MASTER 模式,從源碼可以看出來,isSlaveOK() 方法決定是否報 SLAVE_NOT_AVAILABLE 異常碼的關鍵邏輯,所以關鍵就是要看這個方法:
org.apache.rocketmq.store.ha.HAService#isSlaveOK
public boolean isSlaveOK(final long masterPutWhere) { boolean result = this.connectionCount.get() > 0; result = result && ((masterPutWhere - this.push2SlaveMaxOffset.get()) < this.defaultMessageStore .getMessageStoreConfig().getHaSlaveFallbehindMax()); return result; }
從源碼的邏輯看,masterPutWhere = result.getWroteOffset() + result.getWroteBytes(),其中 wroteOffset 表示從那個位移開始寫入,wroteBytes 表示寫入的消息量,因此 masterPutWhere 表示 master 最大的消息拉取位移,push2SlaveMaxOffset 表示的是此時 slave 拉取最大的位移,haSlaveFallbehindMax 表示 slave 主從同步同步複製時最多可落後 master 的位移,masterPutWhere – this.push2SlaveMaxOffset.get() 即可表示此時 slave 落後 master 的位移量,如果大於 haSlaveFallbehindMax,則報 SLAVE_NOT_AVAILABLE 給客戶端,不過不用擔心,只要 slave 沒有掛掉,slave 的同步位移肯定能夠追上來。
push2SlaveMaxOffset 參數值 是 slave 與 master 保持一個心跳頻率,定時上報給 master,master 再根據這個值判斷 slave 落後 master 多少位移量。
下面重點分析 slave 如何上報 push2SlaveMaxOffset 給master。
master 收到 slave 的位移量之後,是從以下方法進行更新的:
org.apache.rocketmq.store.ha.HAService#notifyTransferSome
public void notifyTransferSome(final long offset) { for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); if (ok) { this.groupTransferService.notifyTransferSome(); break; } else { value = this.push2SlaveMaxOffset.get(); } } }
從調用棧來看,該方法在服務端處理讀請求類中調用了,我們接着往下看:
org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent
if (readSize > 0) { readSizeZeroTimes = 0; this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now(); if ((this.byteBufferRead.position() - this.processPostion) >= 8) { int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8); long readOffset = this.byteBufferRead.getLong(pos - 8); this.processPostion = pos; HAConnection.this.slaveAckOffset = readOffset; if (HAConnection.this.slaveRequestOffset < 0) { HAConnection.this.slaveRequestOffset = readOffset; log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset); } HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset); }
如上源碼邏輯,如果讀取到的位元組大於 0,並且大於等於 8,則說明了收到了 slave 端反饋過來的位移量,於是將其取出並更新到 push2SlaveMaxOffset 中。
接着我們來看 slave 是如何上報位移的。
org.apache.rocketmq.store.ha.HAService.HAClient#run
if (this.isTimeToReportOffset()) { boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset); if (!result) { this.closeMaster(); } }
以上邏輯在 slave 端處理拉取同步消息線程 run 方法中,首先判斷是否到了需要上報位移的時間間隔了,到了直接調用上報位移方法。
org.apache.rocketmq.store.ha.HAService.HAClient#isTimeToReportOffset
private boolean isTimeToReportOffset() { long interval = HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp; boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig() .getHaSendHeartbeatInterval(); return needHeart; }
首先求出距離上次同步消息的時時間間隔的大小,再與 haSendHeartbeatInterval 作對比,若大於 haSendHeartbeatInterval 則發送心跳包上報位移。
org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset
private boolean reportSlaveMaxOffset(final long maxOffset) { this.reportOffset.position(0); this.reportOffset.limit(8); this.reportOffset.putLong(maxOffset); this.reportOffset.position(0); this.reportOffset.limit(8); for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) { try { this.socketChannel.write(this.reportOffset); } catch (IOException e) { log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e); return false; } } return !this.reportOffset.hasRemaining(); }
該方法向主服務器上報已拉取的位移,具體做法是將 ByteBuffer 讀取位置 position 值為 0,其實調用 flip() 方法也可以,然後調用 putLong() 方法將 maxOffset 寫入 ByteBuffer,將 limit 設置為 8,跟寫入 ByteBuffer 中的 maxOffset(long 型)大小一樣,最後採取 for 循環將 maxOffset 寫入網絡通道中,並調用 hasRemaining() 方法,該方法的邏輯為判斷 position 是否小於 limit,即判斷 ByteBuffer 中的位元組流是否全部寫入到通道中。
最後總結,如果 slave 正常運行,報這個錯是正常的,你可以適當調整 haSendHeartbeatInterval 參數(1000 * 5)的大小,它決定 slave 上報同步位移的心跳頻率,以及 haSlaveFallbehindMax 參數值(默認 1024 * 1024 * 256),它決定允許 slave 最多落後 master 的位移。