RocketMQ主從如何同步消息消費進度?

  • 2019 年 10 月 7 日
  • 筆記

前面我也跟大家講述了 RocketMQ 讀寫分離的規則,但是你可能會問,主從伺服器之間的消費進度是如何保持同步的?下面我來給大家解答一下。

如果消費者消費模式不同,也會有不同的保存方式,消費者端的消息消費進度保存到 OffsetStore 中,他有兩個實現類:

org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore // 本地消費進度保存實現  org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore // 遠程消費進度保存實現

其中,如果是廣播模式消費,消息的消費進度是保存到本地,如果是集群消費模式,消息的消費進度則是保存到 Broker,但無論是保存到本地,還是保存到 Broker,消費者都會在本地留一份快取,我們暫且看看集群消費模式下,消息消費進度的快取是如何保存的:

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:

public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {    if (mq != null) {      AtomicLong offsetOld = this.offsetTable.get(mq);      if (null == offsetOld) {        offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));      }        if (null != offsetOld) {        if (increaseOnly) {          MixAll.compareAndIncreaseOnly(offsetOld, offset);        } else {          offsetOld.set(offset);        }      }    }  }

消息者在消費完消息後,會調用以上方法,將消費進度放入 offsetTable 快取中,當 Rebalance 負載重新分配生成 PullRequest 對象時,會調用 RemoteBrokerOffsetStore.readOffset 方法從 offsetTable 快取中取出對應的消費進度快取值,再將該值放進 PullRequest 對象中,接下來消息拉取時就會將消息消費進度快取發送到 Broker 端,所以我們繼續看 Broker 端的處理邏輯。

之前整理 Broker 啟動流程時,發現 Broker 啟動時會開啟一個定時任務:

org.apache.rocketmq.broker.BrokerController#initialize:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {      @Override      public void run() {          try {              BrokerController.this.slaveSynchronize.syncAll();          } catch (Throwable e) {              log.error("ScheduledTask syncAll slave exception", e);          }      }  }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

如果 Broker 是從伺服器,則會開啟以上定時任務。

org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll:

public void syncAll() {    this.syncTopicConfig();    this.syncConsumerOffset();    this.syncDelayOffset();    this.syncSubscriptionGroupConfig();  }

在主伺服器沒有宕機的情況下,從伺服器會定時從主伺服器中同步消息消費進度等資訊,那現在問題來了,由於這個同步是單方面同步,即只會從伺服器同步主伺服器,那如果主伺服器宕機了之後,消費者切換成從伺服器拉取消息進行消費,如果之後主伺服器啟動了,從伺服器在把已經消費過的偏移量同步過來,那豈不是造成同步消費了?

其實消費者在拉取消息的時候,如果消費者的快取中存在消費進度,也會向 Broker 更新消息消費進度,所以即使是主伺服器掛了,在它重新啟動之後,消費者的消費進度沒有丟失,依然會更新主伺服器的消息消費進度,這樣一來,消費端與主伺服器只掛了其中一個,並不會導致消息重新被消費,具體程式碼邏輯如下:

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

boolean storeOffsetEnable = brokerAllowSuspend;  storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;  storeOffsetEnable = storeOffsetEnable      && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;  if (storeOffsetEnable) {   this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());  }

其中 brokerAllowSuspend 表示 broker 是否允許掛起,該值默認為 true,hasCommitOffsetFlag 表示消費者在記憶體中是否快取了消息消費進度,從程式碼邏輯可看出,如果 Broker 為主伺服器,並且 brokerAllowSuspend 和 hasCommitOffsetFlag 都為true,那麼就會將消費者消費進度更新到本地。