【mq】從零開始實現 mq-10-消費者拉取消息回執 pull message ack
- 2022 年 5 月 12 日
- 筆記
前景回顧
【mq】從零開始實現 mq-02-如何實現生產者調用消費者?
【mq】從零開始實現 mq-03-引入 broker 中間人
【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat
【mq】從零開始實現 mq-07-負載均衡 load balance
【mq】從零開始實現 mq-09-消費者拉取消息 pull message
【mq】從零開始實現 mq-10-消費者拉取消息回執 pull message ack
狀態回執
大家好,我是老馬。
上一節我們只實現了拉取消息的實現,但是缺少了消費狀態回執。
這一節我們一起來學習下如何實現狀態回執。
代碼實現
回執狀態的設計
我們規定如下幾種回執狀態:
package com.github.houbb.mq.common.constant;
/**
* @author binbin.hou
* @since 0.0.3
*/
public final class MessageStatusConst {
private MessageStatusConst(){}
/**
* 待消費
* ps: 生產者推送到 broker 的初始化狀態
*/
public static final String WAIT_CONSUMER = "W";
/**
* 推送給消費端處理中
* ps: broker 準備推送時,首先將狀態更新為 P,等待推送結果
* @since 0.1.0
*/
public static final String TO_CONSUMER_PROCESS = "TCP";
/**
* 推送給消費端成功
* @since 0.1.0
*/
public static final String TO_CONSUMER_SUCCESS = "TCS";
/**
* 推送給消費端失敗
* @since 0.1.0
*/
public static final String TO_CONSUMER_FAILED = "TCF";
/**
* 消費完成
*/
public static final String CONSUMER_SUCCESS = "CS";
/**
* 消費失敗
*/
public static final String CONSUMER_FAILED = "CF";
/**
* 稍後消費
* @since 0.1.0
*/
public static final String CONSUMER_LATER = "CL";
}
消費者狀態回執
我們在消費之後,添加狀態回執:
for(MqMessage mqMessage : mqMessageList) {
IMqConsumerListenerContext context = new MqConsumerListenerContext();
final String messageId = mqMessage.getTraceId();
ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
log.info("消息:{} 消費結果 {}", messageId, consumerStatus);
// 狀態同步更新
MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
log.info("消息:{} 狀態回執結果 {}", messageId, JSON.toJSON(ackResp));
}
回執實現,根據 messageId 更新對應的消息消費狀態。
public MqCommonResp consumerStatusAck(String messageId, ConsumerStatus consumerStatus) {
final MqConsumerUpdateStatusReq req = new MqConsumerUpdateStatusReq();
req.setMessageId(messageId);
req.setMessageStatus(consumerStatus.getCode());
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_CONSUMER_STATUS);
// 重試
return Retryer.<MqCommonResp>newInstance()
.maxAttempt(consumerStatusMaxAttempt)
.callable(new Callable<MqCommonResp>() {
@Override
public MqCommonResp call() throws Exception {
Channel channel = getChannel(null);
MqCommonResp resp = callServer(channel, req, MqCommonResp.class);
if(!MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
throw new MqException(ConsumerRespCode.CONSUMER_STATUS_ACK_FAILED);
}
return resp;
}
}).retryCall();
}
Broker 回執處理
消息分發
// 消費者消費狀態 ACK
if(MethodType.C_CONSUMER_STATUS.equals(methodType)) {
MqConsumerUpdateStatusReq req = JSON.parseObject(json, MqConsumerUpdateStatusReq.class);
final String messageId = req.getMessageId();
final String messageStatus = req.getMessageStatus();
return mqBrokerPersist.updateStatus(messageId, messageStatus);
}
簡單實現
這裡是基於本地 map 更新狀態的,性能比較差。
後續會以 mysql 實現。
public MqCommonResp updateStatus(String messageId, String status) {
// 這裡性能比較差,所以不可以用於生產。僅作為測試驗證
for(List<MqMessagePersistPut> list : map.values()) {
for(MqMessagePersistPut put : list) {
MqMessage mqMessage = put.getMqMessage();
if(mqMessage.getTraceId().equals(messageId)) {
put.setMessageStatus(status);
break;
}
}
}
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}
小結
對於消息狀態的細化,更加便於我們後續的管理,和問題的定位。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) //github.com/houbb/mq
拓展閱讀
rpc-從零開始實現 rpc //github.com/houbb/rpc