RocketMQ之消費者啟動與消費流程
vivo 互聯網伺服器團隊 – Li Kui
一、簡介
1.1 RocketMQ 簡介
RocketMQ是由阿里巴巴開源的分散式消息中間件,支援順序消息、定時消息、自定義過濾器、負載均衡、pull/push消息等功能。RocketMQ主要由 Producer、Broker、Consumer 、NameServer四部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。NameServer充當名字路由服務,整體架構圖如下所示:
- Producer:負責生產消息,一般由業務系統生產消息,可通過集群方式部署。RocketMQ提供多種發送方式,同步發送、非同步發送、順序發送、單向發送。同步和非同步方式均需要Broker返回確認資訊,單向發送不需要。
- Consumer:負責消費消息,一般是後台系統負責非同步消費,可通過集群方式部署。一個消息消費者會從Broker伺服器拉取消息、並將其提供給應用程式。提供pull/push兩者消費模式。
- Broker Server:負責存儲消息、轉發消息。RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作準備,存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
- Name Server:名字服務,充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個NameServer實例組成集群,相互獨立,沒有資訊交換。
本文基於Apache RocketMQ 最新版本主要講述RocketMQ的消費者機制,分析其啟動流程、pull/push機制,消息ack機制以及定時消息和順序消息的不同。
1.2 工作流程
(1)啟動NameServer。
NameServer起來後監聽埠,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
(2)啟動Broker。
跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker資訊(IP+埠等)以及存儲所有Topic資訊。註冊成功後,NameServer集群中就有Topic跟Broker的映射關係。
(3)創建Topic。
創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
(4)Producer發送消息。
啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然後與隊列所在的Broker建立長連接從而向Broker發消息。
(5)Consumer消費消息。
跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連接通道,開始消費消息。
二、消費者啟動流程
官方給出的消費者實現程式碼如下所示:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("Test", "*");
// 註冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
下面讓我們來分析消費者在啟動中每一階段中做了什麼吧,let』s go.
2.1 實例化消費者
第一步主要是實例化消費者,這裡採取默認的Push消費者模式,構造器中參數為對應的消費者分組,指定同一分組可以消費同一類型的消息,如果沒有指定,將會採取默認的分組模式,這裡實例化了一個DefaultMQPushConsumerImpl對象,它是後面消費功能的主要實現類。
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestConsumer");
主要通過DefaultMQPushConsumer實例化DefaultMQPushConsumerImpl,它是主要的消費功能實現類。
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
2.2 設置NameServer和訂閱topic過程
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe("Test", "*");
2.2.1 添加tag
設置NameServer地址後,這個地址為你名字服務集群的地址,類似於zookeeper集群地址,樣例給出的是單機本地地址,搭建集群後,可以設置為集群地址,接下來我們需要訂閱一個主題topic下的消息,設置對應的topic,可以進行分類,通過設置不同的tag來實現,但目前只支援”||”進行連接,如:”tag1 || tag2 || tag3″。歸根在於構造訂閱數據時,源碼通過”||”進行了字元串的分割,如下所示:
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,
String subString) throws Exception {
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionData.setTopic(topic);
subscriptionData.setSubString(subString);
if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
subscriptionData.setSubString(SubscriptionData.SUB_ALL);
} else {
String[] tags = subString.split("\\|\\|");
if (tags.length > 0) {
for (String tag : tags) {
if (tag.length() > 0) {
String trimString = tag.trim();
if (trimString.length() > 0) {
subscriptionData.getTagsSet().add(trimString);
subscriptionData.getCodeSet().add(trimString.hashCode());
}
}
}
} else {
throw new Exception("subString split error");
}
}
return subscriptionData;
}
2.2.2 發送心跳至Broker
前面構造好訂閱主題和分類後,將其放入了一個ConcurrentMap中,並調用sendHeartbeatToAllBrokerWithLock()方法,進行心跳檢測和上傳過濾器類至broker集群(生產者啟動過程也會進行此步驟)。如下所示:
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
首先會對broker集群進行心跳檢測,在此過程中會施加鎖,它會執行sendHeartbeatToAllBroker方法,構建心跳數據heartbeatData,然後遍歷消費和生產者table,將消費者和生產者資訊加入到heartbeatData中,當都存在消費者和生產者的情況下,會遍歷brokerAddrTable,往每個broker 地址發送心跳,相當於往對應地址發送一次http請求,用於探測當前broker是否存活。
this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
2.2.3上傳過濾器類至FilterServer
之後會執行uploadFilterClassSource()方法,只有push模式才會有此過程,在此模式下,它會循環遍歷訂閱數據SubscriptionData,如果此訂閱數據使用了類模式過濾,會調uploadFilterClassToAllFilterServer()方法:上傳用戶自定義的過濾消息實現類至過濾器伺服器。
private void uploadFilterClassSource() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> next = it.next();
MQConsumerInner consumer = next.getValue();
if (ConsumeType.CONSUME_PASSIVELY == consumer.consumeType()) {
Set<SubscriptionData> subscriptions = consumer.subscriptions();
for (SubscriptionData sub : subscriptions) {
if (sub.isClassFilterMode() && sub.getFilterClassSource() != null) {
final String consumerGroup = consumer.groupName();
final String className = sub.getSubString();
final String topic = sub.getTopic();
final String filterClassSource = sub.getFilterClassSource();
try {
this.uploadFilterClassToAllFilterServer(consumerGroup, className, topic, filterClassSource);
} catch (Exception e) {
log.error("uploadFilterClassToAllFilterServer Exception", e);
}
}
}
}
}
}
過濾器類的作用:消費端可以上傳一個Class類文件到 FilterServer,Consumer從FilterServer拉取消息時,FilterServer會把請求轉發給Broker,FilterServer收取到Broker消息後,根據上傳的過濾類中的邏輯做過濾操作,過濾完成後再把消息給到Consumer,用戶可以自定義過濾消息的實現類。
2.3 註冊回調實現類
接下來就是程式碼中的註冊回調實現類了,當然,如果你是pull模式的話就不需要實現它了,push模式需要定義,兩者區別後面會講到,它主要用於從broker實時獲取消息,這裡有兩種消費上下文類型,用於不同的消費類型。
ConsumeConcurrentlyContext:延時類消息上下文,用於延時消息,即定時消息,默認不延遲,可以設置延遲等級,每個等級對應固定時間刻度,RocketMQ中不能自定義延遲時間,延遲等級從1開始,對應的時間間隔如下所示:
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
ConsumeOrderlyContext :順序類消息上下文,控制發送消息的順序,生產者設置分片路由規則後,相同key只落到指定queue上,消費過程中會對順序消息所在的queue加鎖,保證消息的有序性。
2.4 消費者啟動
我們先來看下消費者啟動的過程,如下所示:
(1)this.checkConfig():首先是檢測消費配置項,包括消費分組group、消息模型(集群、廣播)、訂閱數據、消息監聽器等是否存在,如果不存在的話,會拋出異常。
(2)copySubscription():構建主題訂閱資訊SubscriptionData並加入到RebalanceImpl負載均衡方法的訂閱資訊中。
(3)getAndCreateMQClientInstance():初始化MQ客戶端實例。
(4)offsetStore.load():根據不同消息模式創建消費進度offsetStore並載入:BROADCASTING-廣播模式,同一個消費group中的consumer都消費一次,CLUSTERING-集群模式,默認方式,只被消費一次。
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
可以通過setMessageModel方式設置不同模式;廣播模式下同消費組的消費者相互獨立,消費進度在本地單獨進行存儲;集群模式下,同一條消息只會被同一個消費組消費一次,消費進度會參與到負載均衡中,消費進度是共享在整個消費組中的。
(5)consumeMessageService.start():根據不同消息監聽類型實例化並啟動。這裡有延時消息和順序消息。
這裡主要講下順序消息,RocketMQ也幫我們實現了,在啟動時,如果是集群模式並是順序類型,它會啟動定時任務,定時向broker發送批量鎖,鎖住當前順序消費發往的消息隊列,順序消息因為生產者生產消息時指定了分片策略和消息上下文,只會發往一個消費隊列。
定時任務發送批量鎖,鎖住當前順序消息隊列。
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}
發送鎖住隊列的消息至broker,broker端返回鎖住成功的隊列集合lockOKMQSet,順序消息具體實現可查看後面第四節。
(6)mQClientFactory.registerConsumer():MQClientInstance註冊消費者,並啟動MQClientInstance,沒有註冊成功會結束消費服務。
(7)mQClientFactory.start():最後會啟動如下服務:遠程客戶端、定時任務、pull消息服務、負載均衡服務、push消息服務,然後將狀態改為運行中。
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
全部啟動完畢後,整個消費者也就啟動好了,接下來就可以對生產者發送過來的消息進行消費了,那麼是如何進行消息消費的呢?不同的消息模式有何區別呢?
三、pull/push 模式消費
3.1 pull模式-DefaultMQPullConsumer
pull拉取式消費:應用通常主動調用Consumer的拉消息方法從Broker伺服器拉消息、主動權由應用程式控制,可以指定消費的位移,【偽程式碼】如下所示:
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("TestConsumer");
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 啟動消費者實例
consumer.start();
//獲取主題下所有的消息隊列,這裡根據主題從nameserver獲取的
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Test");
for (MessageQueue queue : mqs) {
//獲取當前隊列的消費位移,指定消費進度offset,fromstore:從broker中獲取還是本地獲取,true-broker
long offset = consumer.fetchConsumeOffset(queue, true);
PullResult pullResult = null;
while (offset < pullResult.getMaxOffset()) {
//第二個參數為tag,獲取指定topic下的tag
//第三個參數表示從哪個位移下開始消費消息
//第四個參數表示一次最大拉取多少個消息
try {
pullResult = consumer.pullBlockIfNotFound(queue, "*", offset, 32);
} catch (Exception e) {
e.printStackTrace();
System.out.println("pull拉取消息失敗");
}
//程式碼省略,記錄消息位移
offset = pullResult.getNextBeginOffset();
//程式碼省略,這裡為消費消息
}
}
可以看到我們是主動拉取topic對應下的消息隊列,然後遍歷它們,獲取當前消費進度並進行消費。
3.2 push模式-DefaultMQPushConsumer
該模式下Broker收到數據後會主動推送給消費端,該消費模式一般實時性較高,現在一般推薦使用該方式,具體示例可以觀看第一章開頭的官方demo。
它也是通過實現pull方式來實現的,首先,前面2.4消費者啟動之後,最後會啟動拉取消息服務pullMessageService和負載均衡rebalanceService服務,它們啟動後會一直有執行緒進行消費。
case CREATE_JUST:
//......
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
//.......
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
這裡面調用doRebalance()方法,進行負載均衡,默認每20s做一次,會輪詢所有訂閱該實例的topic。
public class RebalanceService extends ServiceThread {
//初始化,省略....
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
//做負載均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();
}
}
然後根據每個topic,以及它是否順序消息模式來做rebalance。
具體做法就是先對Topic下的消息消費隊列、消費者Id進行排序,然後用消息隊列的平均分配演算法,計算出待拉取的消息隊列,將分配到的消息隊列集合與processQueueTable做一個過濾比對,新隊列不包含或已過期,則進行移除 。
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
/根據 /每個topic,以及它是否順序消息模式來做rebalance
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
rebalanceByTopic中廣播和集群模式都會執行updateProcessQueueTableInRebalance()方法,最後會分發請求dispatchPullRequest,通過executePullRequestImmediately()方法將pull請求放入pull請求隊列pullRequestQueue中,注意,pull模式下分發請求方法dispatchPullRequest()實際實現是一個空方法,這裡兩者很大不同,push模式實現如下:
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
然後再PullMessageService中,因為前面consumer啟動成功了,PullMessageService執行緒會實時去取pullRequestQueue中的pull請求。
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
if (pullRequest != null) {
this.pullMessage(pullRequest);
}
} catch (InterruptedException e) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
取出來的pull請求又會經由DefaultMQPushConsumerImpl的消息監聽類,調用pullMessage()方法。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
pullMessage()中pullKernelImpl()有一個Pullback方法用於執行消息的回調,它會通過submitConsumeRequest()這個方法來處理消息,總而言之就是通過執行緒回調的方式讓push模式下的監聽器能夠感知到。
//Pull回調
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
//省略...消費位移更新
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
這個方法對應的不同消費模式有著不同實現,但都是會構建一個消費請求ConsumeRequest,裡面有一個run()方法,構建完畢後,會把它放入到listener監聽器中。
//監聽消息
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
還記得前面我們樣例給出的註冊監聽器回調處理方法嗎?
我們可以點擊上面的consumeMessage方法,查看它在源碼中的實現位置,發現它就回到了我們前面的2.3註冊回調實現類裡面了,整個流程是不是通順了呢?這個監聽器中就會收到push的消息,拉取出來進行業務消費邏輯,下面是我們自己定義的消息回調處理方法。
// 註冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
3.3 小結
push模式相比較於pull模式不同的是,做負載均衡時,pullRequest請求會放入pullRequestQueue,然後PullMessageService執行緒會實時去取出這個請求,將消息存入ProcessQueue,通過執行緒回調的方式讓push模式下的監聽器能夠感知到,這樣消息從分發請求到接收都是實時的,而pull模式是消費端主動去拉取指定消息的,需要指定消費進度。
對於我們開發者來說,選取哪種模式實現我們的業務邏輯比較合適呢?別急,先讓我們總結下他們的特點:
共同點:
兩者底層實際一樣,push模式也是基於pull模式來實現的。
pull模式需要我們通過程式主動通過consumer向broker拉消息,而消息的push模式則只需要我們提供一個listener監聽,實時獲取消息。
優點:
push模式採用長輪詢阻塞的方式獲取消息,實時性非常高;
push模式rocketMQ處理了獲取消息的細節,使用起來比較簡單方便;
pull模式可以指定消費進度,想消費多少就消費多少,靈活性大。
缺點:
push模式當消費者能力遠遠低於生產者能力的時候,會產生一定的消費者消息堆積;
pull模式實時性很低,頻率不好設置;
拉取消息的間隔不好設置,太短則產生很多無效Pull請求的RPC開銷,影響MQ整體的網路性能,太長則實時性差。
適用場景:
對於服務端生產消息數據比較大時,而消費端處理比較複雜,消費能力相對較低時,這種情況就適用pull模式;
對於數據實時性要求高的場景,就比較適用與push模式。
現在的你是否明確業務中該使用哪種模式了呢?
四、順序消息
4.1 實現MQ順序消息發送存在問題
(1)一般消息發送會採取輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,broker之間是無感知的,這種情況發送和消費是不能保證順序。
(2)非同步方式發送消息時,發送的時候不是按著一條一條順序發送的,保證不了消息到達Broker的時間也是按照發送的順序來的。
消息發送到存儲,最後到消費要經歷這麼多步驟,我們該如何在業務中使用順序消息呢?讓咱們來一步步拆解下吧。
4.2 實現MQ順序消息關鍵點
既然分散到多個broker上無法追蹤順序,那麼可以控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。在發送時設置分片路由規則,讓相同key的消息只落到指定queue上,然後消費過程中對順序消息所在的queue加鎖,保證消息的有序性,讓這個queue上的消息就按照FIFO順序來進行消費。因此我們滿足以下三個條件是否就可以呢?
1)消息順序發送:多執行緒發送的消息無法保證有序性,因此,需要業務方在發送時,針對同一個業務編號(如同一筆訂單)的消息需要保證在一個執行緒內順序發送,在上一個消息發送成功後,在進行下一個消息的發送。對應到mq中,消息發送方法就得使用同步發送,非同步發送無法保證順序性。
//採用的同步發送方式,在一個執行緒內順序發送,非同步發送方式為:producer.send(msg, new SendCallback() {...})
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}
2)消息順序存儲:MQ 的topic下會存在多個queue,要保證消息的順序存儲,同一個業務編號的消息需要被發送到一個queue中。對應到mq中,需要使用MessageQueueSelector來選擇要發送的queue。即可以對業務編號設置路由規則,像根據隊列數量對業務欄位hash取余,將消息發送到一個queue中。
//使用"%"操作,使得訂單id取余後相同的數據路由到同一個queue中,也可以自定義路由規則
long index = id % mqs.size();
return mqs.get((int) index);
3)消息順序消費:要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內部,也只能有一個消費執行緒來消費該隊列。這裡RocketMQ已經為我們實現好了。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
//....省略
}
}
消費者重新負載,並且分配完消費隊列後,需要向mq伺服器發起消息拉取請求,程式碼實現在RebalanceImpl#updateProcessQueueTableInRebalance()中,針對順序消息的消息拉取,mq做了以上判斷,即消費客戶端先向broker端發起對messageQueue的加鎖請求,只有加鎖成功時才創建pullRequest進行消息拉取,這裡的pullRequest就是前面pull和push模式消息體,而updateProcessQueueTableInRebalance這個方法也是在前面消費者啟動過程中有講到過哦。
具體加鎖邏輯如下:
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
可以看到,就是調用lockBatchMQ方法發送了一個加鎖請求,成功獲取到消息處理隊列就設為獲取到鎖,返回鎖定成功,如果加鎖成功,同一時刻只有一個執行緒進行消息消費。加鎖失敗,會延遲1000ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功後重新提交消費請求。
怎麼樣,這樣的加鎖方式是不是很像我們平時用到的分散式鎖呢?由你來設計實現你會怎麼做呢?
五、消息ack機制
5.1 消息消費失敗處理
消息被消費者消費了,那麼如何保證被消費成功呢?消息消費失敗會出現什麼情況呢?
消息被消費,那麼如何保證被消費成功呢?這裡只有使用方控制,只有使用方確認成功了,才會消費成功,否則會重新投遞。
RocketMQ其實是通過ACK機制來對失敗消息進行重試和通知的,具體流程如下所示:
消息成功與否是由使用方控制,只有使用方確認成功了,才會消費成功,否則會重新投遞,Consumer會通過監聽器監聽回調過來的消息,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消費成功,如果消費失敗,返回ConsumeConcurrentlyStatus.RECONSUME_LATER狀態(消費重試),RocketMQ就會默認為這條消息失敗了,延遲一定時間後(默認10s,可配置),會再次投送到ConsumerGroup,重試次數與間隔時間關係上圖所示。如果持續這樣,失敗到一定次數(默認16次),就會進入到DLQ死信隊列,不再投遞,此時可以通過監控人工來干預。
5.2 消息重投帶來問題
RocketMQ 消費消息因為消息重投很大一個問題就是無法保證消息只被消費一次,因此需要開發人員在業務裡面自己去處理。
六、總結
本文主要介紹了RocketMQ的消費者啟動流程,結合官方源碼和示例,一步步講述消費者在啟動和消息消費中的的工作原理及內容,並結合平時業務工作中,對我們所熟悉的順序、push/pull模式等進行詳細分析,以及對於消息消費失敗和重投帶來問題去進行分析。
對於自己而言,希望通過主動學習源碼方式,能夠明白其中啟動的原理,學習裡面優秀的方案,像對於pull/push,順序消息這些,學習之後能夠了解到push模式是何如做到實時拉取消息的,順序消息是如何保證的,再就是能夠聯想到平時遇到這種問題該如何處理,像順序消息在消息被消費時保持和存儲的順序一致,這裡自己施加分散式鎖寫能不能實現等,文中也有很多引導性問題,希望能引起讀者自己的思考,能夠對整個消費者啟動和消息消費流程有著較為直觀的認知,但還有著一些技術細節由於篇幅原因沒做出詳細說明,也歡迎大家一起探討交流~
參考資料: