【RocketMQ】消息的拉取
RocketMQ消息的消費以組為單位,有兩種消費模式:
廣播模式:同一個消息隊列可以分配給組內的每個消費者,每條消息可以被組內的消費者進行消費。
集群模式:同一個消費組下,一個消息隊列同一時間只能分配給組內的一個消費者,也就是一條消息只能被組內的一個消費者進行消費。(一般情況下都使用的是集群模式)
消息的獲取也有兩種模式:
拉模式:消費者主動發起拉取消息的請求,獲取消息進行消費。
推模式:消息到達Broker後推送給消費者。RocketMQ對拉模式進行了包裝去實現推模式,本質還是需要消費者去拉取,一個拉取任務完成後繼續下一次拉取。
首先來看一個RocketMQ源碼中基於推模式DefaultMQPushConsumer
進行消費的例子,首先為消費者設置了消費者組名稱,然後註冊了消息監聽器,並設置訂閱的主題,最後調用start方法啟動消費者,接下來就去看看DefaultMQPushConsumer
如何進行消息消費的:
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQPushConsumerTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private static DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
// ...
// 消費者組
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
// 實例化DefaultMQPushConsumer
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
// 設置拉取間隔
pushConsumer.setPullInterval(60 * 1000);
// 註冊消息監聽器
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
// ...
// 設置訂閱的主題
pushConsumer.subscribe(topic, "*");
// 啟動消費者
pushConsumer.start();
}
}
消費者的啟動
DefaultMQPushConsumer
實現了MQPushConsumer
介面,它引用了默認的消息推送實現類DefaultMQPushConsumerImpl
,在構造函數中可以看到對其進行了實例化,並在start方法中進行了啟動:
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
/**
* 默認的消息推送實現類
*/
protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
/**
* 構造函數
*/
public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;
this.namespace = namespace;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
// 實例化DefaultMQPushConsumerImpl
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
/**
* 啟動
*/
@Override
public void start() throws MQClientException {
setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
// 啟動消費者
this.defaultMQPushConsumerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
}
DefaultMQPushConsumerImpl的start方法中處理邏輯如下:
- 調用
copySubscription
方法處理消息訂閱,主要是將訂閱資訊包裝成SubscriptionData對象,加入到負載均衡對象rebalanceImpl
中 - 創建客戶端實例對象
mQClientFactory
,對應實現類為MQClientInstance
,拉取服務執行緒、負載均衡執行緒都是通過MQClientInstance
啟動的 - 為負載均衡對象
RebalanceImpl
設置消費組、消費模式、分配策略,RebalanceImpl
是一個抽象類,在實例化時可以看到使用的是RebalancePushImpl
類型的 - 創建消息拉取API對象
PullAPIWrapper
,用於向Broker發送拉取消息的請求 - 根據消費模式,初始化消費進度存儲對象
offsetStore
- 集群模式:消息的消費進度保存在Broker中,使用
LocalFileOffsetStore
。 - 廣播模式:消息的消費進度保存在消費者端,使用
RemoteBrokerOffsetStore
。
- 集群模式:消息的消費進度保存在Broker中,使用
- 調用
MQClientInstance
的registerConsumer
將消費者組的資訊註冊到MQClientInstance
的consumerTable
中 - 調用
mQClientFactory
的start方法啟動MQClientInstance
- 調用
mQClientFactory
的rebalanceImmediately
方法進行負載均衡
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// MQClientInstance
private MQClientInstance mQClientFactory;
// 負載均衡對象,具體使用的是RebalancePushImpl進行實例化
private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
// 消息拉取API對象PullAPIWrapper
private PullAPIWrapper pullAPIWrapper;
// 消費進度存儲對象
private OffsetStore offsetStore;
public synchronized void start() throws MQClientException {
// 判斷狀態
switch (this.serviceState) {
case CREATE_JUST: // 如果是創建未啟動狀態
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
// 先置為失敗狀態
this.serviceState = ServiceState.START_FAILED;
// 檢查配置
this.checkConfig();
// 處理消息訂閱
this.copySubscription();
// 如果是集群模式
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 創建MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 設置消費者組
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
// 設置消費模式
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
// 設置分配策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
// 設置MQClientInstance
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
// 創建消息拉取API對象
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
// 註冊消息過濾鉤子
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
// 消費模式判斷
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 廣播模式
// 消費進度存儲在消費者本地,從本地獲取
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING: // 集群模式
// 消費進度需要從Broker獲取
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
// 設置消費進度
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 載入消費進度
this.offsetStore.load();
// 如果是順序消費
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
// 創建順序消費service:ConsumeMessageOrderlyService
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
// 非順序消費,使用ConsumeMessageConcurrentlyService
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 啟動消費服務
this.consumeMessageService.start();
// 將消費者資訊註冊到mQClientFactory中,key為消費者組名稱,value為消費者也就是當前對象
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 啟動MQClientInstance
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
// 狀態更改為運行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 進行負載均衡
this.mQClientFactory.rebalanceImmediately();
}
}
public class MQClientInstance {
// 註冊消費者
public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {
if (null == group || null == consumer) {
return false;
}
// 將消費者組資訊添加到consumerTable中
MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
if (prev != null) {
log.warn("the consumer group[" + group + "] exist already.");
return false;
}
return true;
}
}
主題訂閱處理
在copySubscription
方法中,從defaultMQPushConsumer
獲取了設置的主題訂閱資訊,在前面的例子中可以看到向defaultMQPushConsumer中添加了訂閱的主題資訊,所以這裡獲取到了之前添加的主題資訊MAP集合,其中KEY為主題,VALUE為表達式,然後遍歷訂閱資訊集合,將訂閱資訊包裝成SubscriptionData對象,並加入到負載均衡對象rebalanceImpl
中:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// DefaultMQPushConsumer
private final DefaultMQPushConsumer defaultMQPushConsumer;
private void copySubscription() throws MQClientException {
try {
// 獲取訂閱資訊,KEY為主題,VALUE為表達式
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
// 獲取主題
final String topic = entry.getKey();
// 獲取表達式
final String subString = entry.getValue();
// 構建主題資訊對象
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
// 加入到負載均衡實現類中
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
// 獲取重試主題
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
// 訂閱重試主題
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}
創建MQClientInstance
MQClientInstance
中有以下幾個主要的成員變數:
pullMessageService:對應實現類為PullMessageService
,是用來拉取消息的服務
rebalanceService:對應的實現類為RebalanceService
,是用來進行負載均衡的服務
consumerTable:消費者組資訊,key為消費者組名稱,value為註冊的消費者,上面可知在start方法中調用了registerConsumer
方法進行了消費者註冊
RebalanceService
和PullMessageService
都繼承了ServiceThread,在MQClientInstance
的start方法中,分別調用了pullMessageService和rebalanceService的start方法啟動拉取服務執行緒和負載均衡執行緒:
public class MQClientInstance {
// 拉取消息Service
private final PullMessageService pullMessageService;
// 負載均衡service
private final RebalanceService rebalanceService
// 消費者組資訊,key為消費者組名稱,value為註冊的消費者
private final ConcurrentMap<String, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
// ...
// 創建MQClientAPIImpl
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
// ...
this.mQAdminImpl = new MQAdminImpl(this);
// 創建拉取消息service
this.pullMessageService = new PullMessageService(this);
// 創建負載均衡service,並在構造函數中傳入了當前對象
this.rebalanceService = new RebalanceService(this);
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// ...
}
// 啟動
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
// ...
this.startScheduledTask();
// 啟動拉取消息服務
this.pullMessageService.start();
// 啟動負載均衡服務
this.rebalanceService.start();
// ...
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
}
消息拉取服務啟動
PullMessageService
繼承了ServiceThread
,並且使用了阻塞隊列pullRequestQueue
存儲消息拉取請求,PullMessageService
被啟動後,在run方法中等待pullRequestQueue
中拉取請求的到來,然後調用pullMessage
方法拉取消息, 在pullMessage
中又是調用DefaultMQPushConsumerImpl
的pullMessage
進行消息拉取的:
public class PullMessageService extends ServiceThread {
// 拉取請求阻塞隊列
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 拉取消息
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
// 拉取消息
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
// 轉換為DefaultMQPushConsumerImpl
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 調用pullMessage拉取消息
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
}
這裡可能會有一個疑問,既然PullMessageService
在等待拉取請求的到來,那麼什麼時候會往pullRequestQueue
中添加拉取消息的請求?
可以看到在PullMessageService
的executePullRequestImmediately
方法中,將拉取請求添加到了阻塞隊列pullRequestQueue
中:
public class PullMessageService extends ServiceThread {
// 拉取請求阻塞隊列
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
// 向隊列中添加拉取消息的請求資訊
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
}
那麼接下來只需看看哪裡調用了PullMessageService
的executePullRequestImmediately
方法就可以找到在何時向隊列中添加拉取請求的:
可以看到DefaultMQPushConsumerImpl
的executePullRequestImmediately
方法中調用了PullMessageService
的executePullRequestImmediately
方法:
public void executePullRequestImmediately(final PullRequest pullRequest) {
// 調用PullMessageService的executePullRequestImmediately方法
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
接下來再看看哪裡調用了DefaultMQPushConsumerImpl
的executePullRequestImmediately
:
發現有兩處進行了調用:
DefaultMQPushConsumerImpl
的pullMessage
方法RebalancePushImpl
的dispatchPullRequest
方法
前面可知PullMessageService
處理拉取請求的時候就是調用的DefaultMQPushConsumerImpl
的pullMessage
方法進行處理的,所以如果是首次添加拉取請求,一定不是從這個入口添加的,那麼首次大概就是從RebalancePushImpl這個地方添加的,接下來就去看看RebalancePushImpl
如何添加拉取請求的。
負載均衡服務啟動
MQClientInstance
的start方法中,啟動了負責均衡服務的執行緒,在RebalanceService
的run方法中,調用了waitForRunning
方法進行阻塞等待,如果負責均衡服務被喚醒,將會調用MQClientInstance
的doRebalance
進行負載均衡:
public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory; // 引用了MQClientInstance
// 構造函數
public RebalanceService(MQClientInstance mqClientFactory) {
// 設置MQClientInstance
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 等待運行
this.waitForRunning(waitInterval);
// 進行負載均衡
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
負載均衡服務的喚醒
前面可知DefaultMQPushConsumerImpl
在啟動的時候調用了MQClientInstance
的rebalanceImmediately
方法,在rebalanceImmediately
方法中可以看到,調用了rebalanceService
的wakeup
方法喚醒負載均衡執行緒,(關於wakeup方法的實現前面在講解消息發送時已經分析過這裡不再贅述):
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
// ...
// 喚醒負載均衡服務,也就是調用MQClientInstance的rebalanceImmediately方法
this.mQClientFactory.rebalanceImmediately();
}
}
public class MQClientInstance {
public void rebalanceImmediately() {
// 喚醒負載均衡服務
this.rebalanceService.wakeup();
}
}
負載均衡
負責均衡服務被喚醒後,會調用MQClientInstance
的doRebalance
進行負載均衡,處理邏輯如下:
- 從consumerTable中獲取註冊的消費者組資訊,前面可知consumerTable中存放了註冊的消費者資訊,Key為組名稱,value為消費者
- 對consumerTable進行遍歷,調用消費者的
doRebalance
方法對每一個消費者進行負載均衡,前面可知消費者是DefaultMQPushConsumerImpl
類型的
public class MQClientInstance {
public void doRebalance() {
// 遍歷註冊的消費者
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
// 負載均衡,前面可知消費者是DefaultMQPushConsumerImpl類型的
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
}
接下來進入到DefaultMQPushConsumerImpl
的doRebalance
,可以看到它又調用了rebalanceImpl
的doRebalance
進行負載均衡:
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
@Override
public void doRebalance() {
if (!this.pause) {
// 這裡又調用了rebalanceImpl的doRebalance進行負載均衡
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
}
RebalanceImpl
RebalanceImpl
的doRebalance
處理邏輯如下:
- 獲取訂閱的主題資訊集合,在訂閱處理章節中,可以看到將訂閱的主題資訊封裝成了SubscriptionData並加入到了RebalanceImpl中
- 對獲取到的訂閱主題資訊集合進行遍歷,調用
rebalanceByTopic
對每一個主題進行負載均衡
public abstract class RebalanceImpl {
public void doRebalance(final boolean isOrder) {
// 獲取訂閱的主題資訊
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
// 遍歷所有訂閱的主題
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 根據主題進行負載均衡
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
}
根據主題進行負載均衡
rebalanceByTopic
方法中根據消費模式進行了判斷然後對主題進行負載均衡,這裡我們關注集群模式下的負載均衡:
-
從
topicSubscribeInfoTable
中根據主題獲取對應的消息隊列集合 -
根據主題資訊和消費者組名稱,獲取所有訂閱了該主題的消費者ID集合
-
如果主題對應的消息隊列集合和消費者ID都不為空,對消息隊列集合和消費ID集合進行排序
-
獲取分配策略,根據分配策略,為當前的消費者分配對應的消費隊列,RocketMQ默認提供了以下幾種分配策略:
-
AllocateMessageQueueAveragely:平均分配策略,根據消息隊列的數量和消費者的個數計算每個消費者分配的隊列個數。
-
AllocateMessageQueueAveragelyByCircle:平均輪詢分配策略,將消息隊列逐個分發給每個消費者。
-
AllocateMessageQueueConsistentHash:根據一致性 hash進行分配。
-
llocateMessageQueueByConfig:根據配置,為每一個消費者配置固定的消息隊列 。
-
AllocateMessageQueueByMachineRoom:分配指定機房下的消息隊列給消費者。
-
AllocateMachineRoomNearby:優先分配給同機房的消費者。
-
-
根據最新分配的消息隊列,調用
updateProcessQueueTableInRebalance
更新當前消費者消費的隊列資訊
public abstract class RebalanceImpl {
// 根據主題進行負載均衡
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: { // 廣播模式
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// ...
break;
}
case CLUSTERING: { // 集群模式
// 根據主題獲取訂閱的消息隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 獲取所有訂閱了該主題的消費者id
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
// ...
if (mqSet != null && cidAll != null) { // 如果都不為空
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 對消息隊列排序
Collections.sort(mqAll);
// 對消費者排序
Collections.sort(cidAll);
// 獲取分配策略
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 根據分配策略,為當前的消費者分配消費隊列
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
// 分配給當前消費的消費隊列
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
// 將分配結果加入到結果集合中
allocateResultSet.addAll(allocateResult);
}
// 根據分配資訊更新處理隊列
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
// ...
}
break;
}
default:
break;
}
}
}
更新處理隊列
updateProcessQueueTableInRebalance
方法同樣在RebalanceImpl
中,RebalanceImpl
中使用了一個ConcurrentMap
類型的處理隊列表存儲消息隊列及對應的隊列處理資訊,updateProcessQueueTableInRebalance
方法的入參中topic表示當前要進行負載均衡的主題,mqSet中記錄了重新分配給當前消費者的消息隊列,主要處理邏輯如下:
- 獲取處理隊列表
processQueueTable
進行遍歷,處理每一個消息隊列,如果隊列表為空直接進入第2步:- 判斷消息隊列所屬的主題是否與方法中指定的主題一致,如果不一致繼續遍歷下一個消息隊列
- 如果主題一致,判斷mqSet中是否包含當前正在遍歷的隊列,如果不包含,說明此隊列已經不再分配給當前的消費者進行消費,需要將消息隊列置為dropped,表示刪除
- 創建消息拉取請求集合pullRequestList,並遍曆本次分配的消息隊列集合,如果某個消息隊列不在
processQueueTable
中,需要進行如下處理:- 計算消息拉取偏移量,如果消息拉取偏移量大於0,創建ProcessQueue,並放入處理隊列表中
processQueueTable
- 構建
PullRequest
,設置消息的拉取資訊,並加入到拉取消息請求集合pullRequestList
中
- 計算消息拉取偏移量,如果消息拉取偏移量大於0,創建ProcessQueue,並放入處理隊列表中
- 調用dispatchPullRequest處理拉取請求集合中的數據
可以看到,經過這一步,如果分配給當前消費者的消費隊列不在processQueueTable
中,就會構建拉取請求PullRequest
,然後調用dispatchPullRequest處理消息拉取請求。
public abstract class RebalanceImpl {
// 處理隊列表,KEY為消息隊列,VALUE為對應的處理資訊
protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
// 負載均衡,topic表示當前要進行負載均衡的主題,mqSet中記錄了重新分配給當前消費者的消息隊列
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
// 處理隊列表
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
// 獲取消息隊列
MessageQueue mq = next.getKey();
// 獲取處理隊列
ProcessQueue pq = next.getValue();
// 主題是否一致
if (mq.getTopic().equals(topic)) {
// 如果隊列集合中不包含當前的隊列
if (!mqSet.contains(mq)) {
// 設置為dropped
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) { // 是否過期
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true); // 設置為刪除
// ...
break;
default:
break;
}
}
}
}
// 創建拉取請求集合
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍曆本次分配的消息隊列集合
for (MessageQueue mq : mqSet) {
// 如果之前不在processQueueTable中
if (!this.processQueueTable.containsKey(mq)) {
// ...
// 創建ProcessQueue
ProcessQueue pq = new ProcessQueue();
long nextOffset = -1L;
try {
// 計算消息拉取偏移量
nextOffset = this.computePullFromWhereWithException(mq);
} catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
// 如果偏移量大於等於0
if (nextOffset >= 0) {
// 放入處理隊列表中
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
// 如果之前已經存在,不需要進行處理
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
// 如果之前不存在,構建PullRequest,之後會加入到阻塞隊列中,進行消息拉取
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);// 設置消費組
pullRequest.setNextOffset(nextOffset);// 設置拉取偏移量
pullRequest.setMessageQueue(mq);// 設置消息隊列
pullRequest.setProcessQueue(pq);// 設置處理隊列
pullRequestList.add(pullRequest);// 加入到拉取消息請求集合
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// 添加消息拉取請求
this.dispatchPullRequest(pullRequestList);
return changed;
}
}
添加拉取請求
在dispatchPullRequest方法中可以看到,對pullRequestList進行了遍歷,然後將每一個拉取請求調用defaultMQPushConsumerImpl
的executePullRequestImmediately
方法添加到了PullMessageService
的阻塞隊列中等待進行消息拉取:
public class RebalancePushImpl extends RebalanceImpl {
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
// 加入到阻塞隊列中
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
}
拉取消息
上面可知,如果阻塞隊列中添加了拉取消息的請求,接下來會調用DefaultMQPushConsumerImpl
的pullMessage
方法進行消息拉取,處理邏輯如下:
- 從拉取請求中獲取處理隊列
processQueue
,判斷是否置為Dropped刪除狀態,如果處於刪除狀態不進行處理 - 從處理隊列中獲取消息的數量和大小,判斷是否超過限制,如果超過限制延遲50毫秒後重新加入到拉取請求隊列中進行處理
- 判斷是否是順序消費,這裡先不討論順序消費,如果是非順序消費,判斷
processQueue
中隊列最大偏移量和最小偏移量的間距是否超過ConsumeConcurrentlyMaxSpan
的值,如果超過需要進行流量控制,延遲50毫秒後重新加入隊列中進行處理 - 獲取拉取主題的訂閱資訊,如果為空,延遲3000毫秒後重新進行拉取
- 創建消息拉取後的回調函數PullCallback
- 構建消息拉取系統標記
- 通過
PullAPIWrapper
的pullKernelImpl
方法向Broker發送拉取消息請求
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* 拉取延遲毫秒數
*/
private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
/**
* 出現異常後的延遲處理毫秒數
*/
private long pullTimeDelayMillsWhenException = 3000;
public void pullMessage(final PullRequest pullRequest) {
// 從請求中獲取處理隊列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 如果被置為Dropped,不進行處理
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 設置拉取時間
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// ...
// 獲取消息數量
long cachedMessageCount = processQueue.getMsgCount().get();
// 獲取消息大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// 判斷當前處理的消息條數是否超過限制
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
// 判斷當前處理的消息大小是否超過限制
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 延遲進行處理
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
if (!this.consumeOrderly) {// 非順序消費
// 隊列最大偏移量和最小偏移量的間距是否超過ConsumeConcurrentlyMaxSpan
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
// 延遲處理
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// ...
return;
}
} else {
// 順序消費
// ...
}
// 獲取主題訂閱資訊
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
// 延遲處理
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
// 創建消息拉取成功後的回調函數
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
// ...
}
// ...
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
// 獲取主題的訂閱資訊
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
// 構建消息拉取系統標記
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 發送請求拉取消息
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
}
發送拉取消息請求
PullAPIWrapper中主要是獲取了Broker的地址,然後創建拉取請求頭PullMessageRequestHeader,設置拉取的相關資訊,然後調用MQClientAPIImpl
的pullMessage
拉取消息:
public class PullAPIWrapper {
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final String expressionType,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 根據BrokerName獲取Broker資訊
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
// ...
if (findBrokerResult != null) {
// ...
// 創建拉取消息的請求頭
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup); // 設置消費組
requestHeader.setTopic(mq.getTopic()); // 設置主題
requestHeader.setQueueId(mq.getQueueId()); // 設置隊列ID
requestHeader.setQueueOffset(offset); // 設置拉取偏移量
requestHeader.setMaxMsgNums(maxNums); // 設置拉取最大消息個數
requestHeader.setSysFlag(sysFlagInner);// 設置系統標識
requestHeader.setCommitOffset(commitOffset); // 設置commit偏移量
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);// 設置訂閱主題表達式
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
// 獲取Broker地址
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 調MQClientAPIImpl的pullMessage拉取消息
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
MQClientAPIImpl
的pullMessage
中首先構建了遠程請求RemotingCommand
,可以看到請求類型設置的是拉取消息PULL_MESSAGE
:
- 非同步調用
pullMessageAsync
方法拉取消息 - 同步調用
pullMessageSync
方法拉取消息
以非同步消息拉取pullMessageAsync
為例,看一下請求的發送:
- 通過
invokeAsync
向Broker發送拉取消息的請求 - 在請求返迴響應的時候,進行判斷,如果響應不為空,調用processPullResponse處理響應內容,然後調用回調函數PullCallback的
onSuccess
方法處理消息
public class MQClientAPIImpl {
/**
* 發送請求拉取消息
*/
public PullResult pullMessage(
final String addr,
final PullMessageRequestHeader requestHeader,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
// 構建請求,這裡可以看到請求類型是拉取消息PULL_MESSAGE
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
switch (communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC: // 非同步
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC: // 同步
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
break;
}
return null;
}
// 非同步發送請求拉取消息
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
// 發送請求
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
// 獲取響應
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
// 處理響應
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
assert pullResult != null;
// 調用回調函數處理
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
// ...
}
}
});
}
}
Broker對消息拉取請求處理
Broker在啟動的時候註冊了消息拉取請求處理器PullMessageProcessor
:
public class BrokerController {
private final PullMessageProcessor pullMessageProcessor;
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// ...
// 創建PullMessageProcessor
this.pullMessageProcessor = new PullMessageProcessor(this);
// ...
}
public void registerProcessor() {
// ...
// 註冊處理器
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
// ...
}
}
拉取請求處理
PullMessageProcessor
的processRequest
方法中用於處理消費者發送的消息拉取請求,處理邏輯如下:
- 調用
MessageStore
的getMessage
方法查找消息 - 設置響應資訊,之後將消息查找結果響應給發送者
- 如果本次消息未查找到(有可能消息還未到達),並且允許將請求掛起,則將拉取請求提交到PullRequestHoldService中進行掛起,稍後重新拉取
public class PullMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
// 根據消費者組獲取訂閱配置
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
if (null == subscriptionGroupConfig) {
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));
return response;
}
// ...
// 拉取消息
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
// 設置拉取結果
response.setRemark(getMessageResult.getStatus().name());
// 設置下一次的拉取偏移量
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
// 設置最小偏移量
responseHeader.setMinOffset(getMessageResult.getMinOffset());
// 設置最大偏移量
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
// ...
switch (response.getCode()) {
case ResponseCode.SUCCESS:
// ...
break;
case ResponseCode.PULL_NOT_FOUND: // 如果消息未找到
// 如果允許掛起
if (brokerAllowSuspend && hasSuspendFlag) {
// 掛起的超時時間
long pollingTimeMills = suspendTimeoutMillisLong;
// 如果未開啟長輪詢
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
// 從配置中獲取
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 構建拉取請求
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 提交到PullRequestHoldService進行掛起
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
case ResponseCode.PULL_RETRY_IMMEDIATELY:
break;
case ResponseCode.PULL_OFFSET_MOVED:
// ...
break;
default:
assert false;
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store getMessage return null");
}
// ...
return response;
}
}
查找消息
DefaultMessageStore
的getMessage
用於根據消費者的拉取請求查找對應的消息數據,它首先根據主題名稱和隊列ID查找對應的消費隊列,並獲取消息隊列對應的CommitLog文件的最小偏移量minOffset
和最大偏移量maxOffset
,然後校驗本次拉取的偏移量是否在最小偏移量和最大偏移量之間,如果不在,會調用nextOffsetCorrection
進行糾正,所以先來看一下nextOffsetCorrection
方法:
// 糾正下一次拉取消息的偏移量
private long nextOffsetCorrection(long oldOffset, long newOffset) {
long nextOffset = oldOffset;
// 如果當前Broker不是從節點或者是設置了OffsetCheckInSlave校驗
if (this.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE || this.getMessageStoreConfig().isOffsetCheckInSlave()) {
nextOffset = newOffset; // 更新拉取進度
}
// 返回nextOffset
return nextOffset;
}
由nextOffsetCorrection
方法可知,如果當前Broker是主節點或者開啟了OffsetCheckInSlave校驗,才會糾正下次的拉取進度設置,否則依舊使用原來的拉取偏移量。
根據拉取偏移量與CommitLog最大最小偏移量的值的對比,處理結果有以下幾種情況:
-
NO_MESSAGE_IN_QUEUE:對應maxOffset最大偏移量為0的情況,說明當前消息隊列中沒有消息,調用nextOffsetCorrection設置下一次的拉取偏移量為0,從0開始拉取。
nextOffsetCorrection方法糾正拉取偏移量的條件為當前Broker是主節點或者開啟了OffsetCheckInSlave校驗,所以只有在這個條件下,才會更新為新的拉取偏移量,在當前這個情況下也就是會更新為0,下次從0開始拉取, 如果條件不成立,則不會進行更新,依舊使用原來的拉取偏移量。
-
OFFSET_TOO_SMALL:對應請待拉取偏移量offset小於CommitLog文件的最小偏移量的情況,說明拉取進度值過小,調用nextOffsetCorrection設置下一次的拉取偏移量為CommitLog文件的最小偏移量(需要滿足nextOffsetCorrection的更新條件)。
-
OFFSET_OVERFLOW_ONE:對應待拉取偏移量offset等於CommitLog文件的最大偏移量的情況,此時雖然調用了nextOffsetCorrection進行糾正,但是設置的更新偏移量依舊為offset的值,也就是不進行更新。
-
OFFSET_OVERFLOW_BADLY:對應待拉取偏移量offset大於CommitLog文件最大偏移量的情況,說明拉取偏移量越界,此時有以下兩種情況:
- 如果最小偏移量為0,將下一次拉取偏移量設置為最小偏移量的值
- 如果最小偏移量不為0,將下一次拉取偏移量的值設置為最大偏移量
-
NO_MATCHED_LOGIC_QUEUE:如果根據主題未找到消息隊列,返回沒有匹配的隊列
-
FOUND:待拉取消息偏移量介於最大最小偏移量之間,此時根據拉取偏移量和大小從CommitLog中獲取消息數據
public class DefaultMessageStore implements MessageStore {
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
// ...
long beginTime = this.getSystemClock().now();
GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
long nextBeginOffset = offset;
long minOffset = 0;
long maxOffset = 0;
GetMessageResult getResult = null;
// 獲取CommitLog文件的最大偏移量
final long maxOffsetPy = this.commitLog.getMaxOffset();
// 根據主題和隊列ID查找消費隊列
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) { // 如果消費隊列不為空
// 獲取消息隊列最小偏移量
minOffset = consumeQueue.getMinOffsetInQueue();
// 獲取消息隊列最大偏移量
maxOffset = consumeQueue.getMaxOffsetInQueue();
// 如果最大偏移量為0,說明當前消息隊列中沒有消息
if (maxOffset == 0) {
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
// 設置下一次的拉取偏移量為0
nextBeginOffset = nextOffsetCorrection(offset, 0);
} else if (offset < minOffset) { // 如果待拉取偏移量小於最小偏移量
status = GetMessageStatus.OFFSET_TOO_SMALL;
// 設置下一次的拉取偏移量為minOffset
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else if (offset == maxOffset) { // 如果待拉取偏移量等於最大偏移量
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
// 設置下一次的拉取偏移量為offset也就是不進行更新
nextBeginOffset = nextOffsetCorrection(offset, offset);
} else if (offset > maxOffset) { // 如果待拉取偏移量大於最大偏移量
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
// 如果最小偏移量為0
if (0 == minOffset) {
// 更新下次拉取偏移量為minOffset
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
} else {
// 更新下次拉取偏移量為maxOffset
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
}
} else {
// 根據偏移量獲取消息隊列對應的ConsumeQueue
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
// 如果不為空
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
long nextPhyFileStartOffset = Long.MIN_VALUE;
long maxPhyOffsetPulling = 0;
int i = 0;
// ...
// 創建獲取消息結果對象GetMessageResult
getResult = new GetMessageResult(maxMsgNums);
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 獲取偏移量
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
// 獲取大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
// ...
// 根據拉取偏移量和大小從CommitLog中獲取消息數據
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
// ...
this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
// 設置消息內容
getResult.addMessage(selectResult);
// 設置查找狀態為FOUND
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
// ...
// 計算下次拉取偏移量
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
} finally {
bufferConsumeQueue.release();
}
} else {
// 未查找到
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
// 如果未查找到消息隊列
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
// ...
// 設置查找結果
getResult.setStatus(status); // 查找結果狀態
getResult.setNextBeginOffset(nextBeginOffset); // 下一次拉取的偏移量
getResult.setMaxOffset(maxOffset);// CommitLog最大偏移量
getResult.setMinOffset(minOffset);// CommitLog最小偏移量
return getResult;
}
}
消費者對拉取消息的處理
前面知道,非同步拉取消息的時候註冊了回調函數PullCallback,當請求返迴響應結果之後,會執行回調函數,這次進入到DefaultMQPushConsumerImpl
的pullMessage
回調函數中看一下都做了什麼。
在onSuccess
方法中,首先調用了PullAPIWrapper
的processPullResult
方法處理返回的響應資訊,然後根據拉取結果進行處理,拉取結果有以下幾種情況:
FOUND:對應GetMessageResult.FOUND的情況,此時判斷是否拉取到了消息
-
如果未拉取到消息,將拉取請求放入到阻塞隊列中再進行一次拉取
-
如果拉取到了消息,將消息提交到ConsumeMessageService中進行消費(非同步處理),然後判斷拉取間隔PullInterval是否大於0,如果大於0,表示需要等待一段時間後再進行拉取,此時調用
executePullRequestLater
方法延遲下一次拉取,如果PullInterval小於0表示需要立刻進行下一次拉取,此時調用executePullRequestImmediately
將拉取請求加入隊列中進行下一次拉取。
NO_MATCHED_MSG:沒有匹配的消息,此時更新下一次的拉取偏移量,調用executePullRequestImmediately
將拉取請求加入隊列中重新進行拉取。
OFFSET_ILLEGAL:拉取偏移量不合法,此時設置下一次拉取偏移量,並將拉取請求中存放的ProcessQueue置為dropped刪除狀態,然後通過DefaultMQPushConsumerImpl
提交非同步任務,在任務中重新更新拉取偏移量,並將ProcessQueue刪除
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
// 處理拉取結果
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
// 判斷拉取結果
switch (pullResult.getPullStatus()) {
case FOUND:
// 獲取上一次拉取的偏移量
long prevRequestOffset = pullRequest.getNextOffset();
// 更新下一次拉取的偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
// 如果未拉取到消息
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
// 將拉取請求放入到阻塞隊列中再進行一次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// 將消息加入到processQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 將消息提交到ConsumeMessageService中進行消費(非同步處理)
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
// 如果PullInterval大於0,等待PullInterval毫秒後將對象放入到阻塞隊列中
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
// 根據間隔時間稍後再進行拉取
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
// 立刻進行下一次拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
// ...
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG: // 沒有匹配的消息
// 更新下一次的拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 再次進行拉取
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL: // 不合法
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
// 設置下一次拉取偏移量
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
// 設置為dropped,進行丟棄
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
// 更新拉取偏移量
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
// 持久化
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
// 移除處理隊列,等待下一次負責均衡
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
// 如果出現異常,稍後再進行拉取
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
總結
參考
丁威、周繼鋒《RocketMQ技術內幕》
RocketMQ版本:4.9.3