RocketMQ主從同步源碼分析

  • 2019 年 10 月 7 日
  • 筆記

之前寫了一篇關於 RocketMQ 隊列與 Kafka 分區副本的區別文章,裡面提到了 RocketMQ 的消息冗餘主要是通過主備同步機制實現的,這跟 Kafka 分區副本的 Leader-Follower 模型不同,HA(High Available) 指的是高可用性,而 RocketMQ 是通過主備同步(HA 機制)實現消息的高可用。

HA 核心類

HA 的實現邏輯放在了 store 存儲模組的ha目錄中,其核心實現類如下:

1.HAService:主從同步的核心實現類2.HAService$AcceptSocketService:主伺服器監聽從伺服器連接實現類3.HAService$GroupTransferService:主從同步通知類,實現同步複製和非同步複製的功能4.HAService$HAClient:從伺服器連接主服務實現類5.HAConnection:主服務端 HA 連接對象的封裝,當主伺服器接收到從伺服器發過來的消息後,會封裝成一個 HAConnection 對象,其中裡面又封裝了讀 Socket 連接實現與 寫 Socket 連接實現:

•HAConnection$ReadSocketService:主伺服器讀實現類•HAConnection$WriteSocketService:主伺服器寫實現類

RocketMQ 主從同步的整體工作機制大致是:

1.從伺服器主動建立 TCP 連接主伺服器,然後每隔 5s 向主伺服器發送 commitLog 文件最大偏移量拉取還未同步的消息;2.主伺服器開啟監聽埠,監聽從伺服器發送過來的資訊,主伺服器收到從伺服器發過來的偏移量進行解析,並返回查找出未同步的消息給從伺服器;3.客戶端收到主伺服器的消息後,將這批消息寫入 commitLog 文件中,然後更新 commitLog 拉取偏移量,接著繼續向主服務拉取未同步的消息。

Slave -> Master 過程

從 HA 實現邏輯可看出,可大致分為兩個過程,分別是從伺服器上報偏移量,以及主伺服器發送未同步消息到從伺服器。

從上面的實現類可知,從伺服器向主伺服器上報偏移量的邏輯在 HAClient 類中,HAClient 類是一個繼承了 ServiceThread 類,即它是一個執行緒服務類,在 Broker 啟動後,Broker 啟動開一條執行緒定時執行從伺服器上報偏移量到主伺服器的任務。

org.apache.rocketmq.store.ha.HAService.HAClient#run:

public void run() {    log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {      try {        // 主動連接主伺服器,獲取socketChannel對象        if (this.connectMaster()) {          if (this.isTimeToReportOffset()) {            // 執行上報偏移量到主伺服器            boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);            if (!result) {              this.closeMaster();            }          }                  // 每隔一秒鐘輪詢一遍          this.selector.select(1000);            // 處理主伺服器發送過來的消息          boolean ok = this.processReadEvent();          if (!ok) {            this.closeMaster();          }            // ......          } else {          this.waitForRunning(1000 * 5);        }      } catch (Exception e) {        log.warn(this.getServiceName() + " service has exception. ", e);        this.waitForRunning(1000 * 5);      }    }      log.info(this.getServiceName() + " service end");  }

以上是 HAClient 執行緒 run 方法邏輯,主要是做了主動連接主伺服器,並上報偏移量到主伺服器,以及處理主伺服器發送過來的消息,並不斷循環執行以上邏輯。

org.apache.rocketmq.store.ha.HAService.HAClient#connectMaster:

private boolean connectMaster() throws ClosedChannelException {    if (null == socketChannel) {      String addr = this.masterAddress.get();      if (addr != null) {        SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);        if (socketAddress != null) {          this.socketChannel = RemotingUtil.connect(socketAddress);          if (this.socketChannel != null) {            this.socketChannel.register(this.selector, SelectionKey.OP_READ);          }        }      }      this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();      this.lastWriteTimestamp = System.currentTimeMillis();    }    return this.socketChannel != null;  }

該方法是從伺服器連接主伺服器的邏輯,拿到主伺服器地址並且連接上以後,會獲取一個 socketChannel 對象,接著還會記錄當前時間戳為上次寫入的時間戳,lastWriteTimestamp 的作用時用來計算主從同步時間間隔,這裡需要注意一點,如果沒有配置主伺服器地址,該方法會返回 false,即不會執行主從複製。

該方法還會調用 DefaultMessageStore 的 getMaxPhyOffset() 方法獲取 commitLog 文件最大偏移量,作為本次上報的偏移量。

org.apache.rocketmq.store.ha.HAService.HAClient#reportSlaveMaxOffset:

private boolean reportSlaveMaxOffset(final long maxOffset) {    this.reportOffset.position(0);    this.reportOffset.limit(8);    this.reportOffset.putLong(maxOffset);    this.reportOffset.position(0);    this.reportOffset.limit(8);      for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {      try {        this.socketChannel.write(this.reportOffset);      } catch (IOException e) {        log.error(this.getServiceName()                  + "reportSlaveMaxOffset this.socketChannel.write exception", e);        return false;      }    }    return !this.reportOffset.hasRemaining();  }

該方法向主伺服器上報已拉取偏移量,具體做法是將 ByteBuffer 讀取位置 position 值為 0,其實跳用 flip() 方法也可以,然後調用 putLong() 方法將 maxOffset 寫入 ByteBuffer,將 limit 設置為 8,跟寫入 ByteBuffer 中的 maxOffset(long 型)大小一樣,最後採取 for 循環將 maxOffset 寫入網路通道中,並調用 hasRemaining() 方法,該方法的邏輯為判斷 position 是否小於 limit,即判斷 ByteBuffer 中的位元組流是否全部寫入到通道中。

Master -> Slave 過程

org.apache.rocketmq.store.ha.HAService.AcceptSocketService#run:

public void run() {    log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {      try {        this.selector.select(1000);        Set<SelectionKey> selected = this.selector.selectedKeys();          if (selected != null) {          for (SelectionKey k : selected) {            if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {              SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();                if (sc != null) {                HAService.log.info("HAService receive new connection, "                                   + sc.socket().getRemoteSocketAddress());                  try {                  HAConnection conn = new HAConnection(HAService.this, sc);                  conn.start();                  HAService.this.addConnection(conn);                } catch (Exception e) {                  log.error("new HAConnection exception", e);                  sc.close();                }              }            } else {              log.warn("Unexpected ops in select " + k.readyOps());            }          }            selected.clear();        }      } catch (Exception e) {        log.error(this.getServiceName() + " service has exception.", e);      }    }      log.info(this.getServiceName() + " service end");  }

主伺服器收到從伺服器的拉取偏移量後,會封裝成一個 HAConnection 對象,前面也說過 HAConnection 封裝主服務端 HA 連接對象的封裝,其中有讀實現類和寫實現類,start() 方法即開啟了讀寫執行緒:

org.apache.rocketmq.store.ha.HAConnection#start:

public void start() {    this.readSocketService.start();    this.writeSocketService.start();  }

org.apache.rocketmq.store.ha.HAConnection.ReadSocketService#processReadEvent:

private boolean processReadEvent() {    int readSizeZeroTimes = 0;      if (!this.byteBufferRead.hasRemaining()) {      this.byteBufferRead.flip();      this.processPostion = 0;    }      while (this.byteBufferRead.hasRemaining()) {      try {        int readSize = this.socketChannel.read(this.byteBufferRead);        if (readSize > 0) {          readSizeZeroTimes = 0;          this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();          if ((this.byteBufferRead.position() - this.processPostion) >= 8) {            int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);            // 從網路通道中讀取從伺服器上報的偏移量            long readOffset = this.byteBufferRead.getLong(pos - 8);            this.processPostion = pos;              // 同步從伺服器偏移量            HAConnection.this.slaveAckOffset = readOffset;            if (HAConnection.this.slaveRequestOffset < 0) {              HAConnection.this.slaveRequestOffset = readOffset;              log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);            }              // 這裡主要是同步後需要喚醒相關消息發送執行緒,實現主從同步是非同步還是同步的功能            HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);          }        } else if (readSize == 0) {          if (++readSizeZeroTimes >= 3) {            break;          }        } else {          log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");          return false;        }      } catch (IOException e) {        log.error("processReadEvent exception", e);        return false;      }    }      return true;  }

從以上源碼可看出,主伺服器接收到從伺服器上報的偏移量後,主要作了兩件事:

1.獲取從伺服器上報的偏移量;2.喚醒主從同步消費者發送消息同步返回的執行緒,該方法實現了主從同步-同步複製的功能。

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

public void run() {    HAConnection.log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {      try {        this.selector.select(1000);          // 如果slaveRequestOffset=-1,說明讀執行緒還沒有獲取從伺服器的偏移量,繼續循環等待        if (-1 == HAConnection.this.slaveRequestOffset) {          Thread.sleep(10);          continue;        }          // 如果nextTransferFromWhere=-1,說明執行緒剛開始執行數據傳輸        if (-1 == this.nextTransferFromWhere) {          // 如果slaveRequestOffset=0,說明從伺服器是第一次上報偏移量          if (0 == HAConnection.this.slaveRequestOffset) {            // 獲取最後一個 commitLog 文件且還未讀取消費的偏移量            long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();            // 求出最後一個commitLog偏移量的初始偏移量            masterOffset =              masterOffset              - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()                 .getMapedFileSizeCommitLog());              if (masterOffset < 0) {              masterOffset = 0;            }              // 更新 nextTransferFromWhere            this.nextTransferFromWhere = masterOffset;          } else {            // 如果slaveRequestOffset!=0,則將該值賦值給nextTransferFromWhere            this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;          }            log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr                   + "], and slave request " + HAConnection.this.slaveRequestOffset);        }          // 判斷上次寫事件是否已全部寫完成        if (this.lastWriteOver) {            // 計算是否已到發送心跳包時間          long interval =            HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;          // 發送心跳包,以保持長連接          if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()              .getHaSendHeartbeatInterval()) {            // Build Header            this.byteBufferHeader.position(0);            this.byteBufferHeader.limit(headerSize);            this.byteBufferHeader.putLong(this.nextTransferFromWhere);            this.byteBufferHeader.putInt(0);            this.byteBufferHeader.flip();            this.lastWriteOver = this.transferData();            if (!this.lastWriteOver)              continue;          }        } else {          this.lastWriteOver = this.transferData();          if (!this.lastWriteOver)            continue;        }          // 獲取同步消息數據        SelectMappedBufferResult selectResult =      HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);        if (selectResult != null) {          int size = selectResult.getSize();          if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {            size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();          }            long thisOffset = this.nextTransferFromWhere;          this.nextTransferFromWhere += size;            selectResult.getByteBuffer().limit(size);          this.selectMappedBufferResult = selectResult;            // Build Header          this.byteBufferHeader.position(0);          this.byteBufferHeader.limit(headerSize);          this.byteBufferHeader.putLong(thisOffset);          this.byteBufferHeader.putInt(size);          this.byteBufferHeader.flip();            // 傳輸消息到從伺服器          this.lastWriteOver = this.transferData();        } else {            HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);        }      } catch (Exception e) {          HAConnection.log.error(this.getServiceName() + " service has exception.", e);        break;      }    }      if (this.selectMappedBufferResult != null) {      this.selectMappedBufferResult.release();    }      this.makeStop();      readSocketService.makeStop();      haService.removeConnection(HAConnection.this);      SelectionKey sk = this.socketChannel.keyFor(this.selector);    if (sk != null) {      sk.cancel();    }      try {      this.selector.close();      this.socketChannel.close();    } catch (IOException e) {      HAConnection.log.error("", e);    }      HAConnection.log.info(this.getServiceName() + " service end");  }

讀實現類實現邏輯比較長,但主要做了以下幾件事情:

1.計算需要拉取的偏移量,如果從伺服器第一次拉取,則從最後一個 commitLog 文件的初始偏移量開始同步;2.傳輸消息到從伺服器;3.發送心跳包到從伺服器,保持長連接。

關於第一步,我還需要詳細講解一下,因為之前有想到一個問題:

把 brokerA 的從伺服器去掉,再啟動一台新的從伺服器指向brokerA 主伺服器,這時的主伺服器的消息是否會全量同步到從服務?

org.apache.rocketmq.store.MappedFileQueue#getMaxOffset:

public long getMaxOffset() {      MappedFile mappedFile = getLastMappedFile();      if (mappedFile != null) {          return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();      }      return 0;  }

org.apache.rocketmq.store.ha.HAConnection.WriteSocketService#run:

// 求出最後一個commitLog偏移量的初始偏移量  masterOffset =    masterOffset              - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()                 .getMapedFileSizeCommitLog());

從以上邏輯可找到答案,如果有新的從伺服器同步主伺服器消息,則從最後一個 commitLog 文件的初始偏移量開始同步。

回到最開始開啟 HAClient 執行緒上報偏移量的方法,我們發現裡面還做了一件事:

// 處理主伺服器發送過來的消息  boolean ok = this.processReadEvent();

org.apache.rocketmq.store.ha.HAService.HAClient#processReadEvent:

private boolean processReadEvent() {    int readSizeZeroTimes = 0;    while (this.byteBufferRead.hasRemaining()) {      try {        int readSize = this.socketChannel.read(this.byteBufferRead);        if (readSize > 0) {          lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();          readSizeZeroTimes = 0;          // 讀取消息並寫入commitLog文件中          boolean result = this.dispatchReadRequest();          if (!result) {            log.error("HAClient, dispatchReadRequest error");            return false;          }        } else if (readSize == 0) {          if (++readSizeZeroTimes >= 3) {            break;          }        } else {          // TODO ERROR          log.info("HAClient, processReadEvent read socket < 0");          return false;        }      } catch (IOException e) {        log.info("HAClient, processReadEvent read socket exception", e);        return false;      }    }      return true;  }

該方法用於處理主伺服器發送回來的消息數據,這裡用了 while 循環的處理,不斷地從 byteBuffer 讀取數據到緩衝區中,最後調用 dispatchReadRequest 方法將消息數據寫入 commitLog 文件中,完成主從複製最後一個步驟。

最後貼上《RocketMQ 技術內幕》這本書的一張 RocketMQ HA 交互圖: