【mq】從零開始實現 mq-12-消息的批量發送與回執
- 2022 年 5 月 18 日
- 筆記
前景回顧
【mq】從零開始實現 mq-02-如何實現生產者調用消費者?
【mq】從零開始實現 mq-03-引入 broker 中間人
【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat
【mq】從零開始實現 mq-07-負載均衡 load balance
【mq】從零開始實現 mq-09-消費者拉取消息 pull message
【mq】從零開始實現 mq-10-消費者拉取消息回執 pull message ack
【mq】從零開始實現 mq-11-消費者消息回執添加分組信息 pull message ack groupName
批量消息
對於消息的發送,有時候可能需要一次發送多個,比如日誌消息等。
批量操作可以提升性能。
本節老馬就和大家一起添加一點批量特性。
消息的批量發送
生產者實現
接口定義
/**
* 同步發送消息-批量
* @param mqMessageList 消息類型
* @return 結果
* @since 0.1.3
*/
SendBatchResult sendBatch(final List<MqMessage> mqMessageList);
/**
* 單向發送消息-批量
* @param mqMessageList 消息類型
* @return 結果
* @since 0.1.3
*/
SendBatchResult sendOneWayBatch(final List<MqMessage> mqMessageList);
一次支持發送多個消息。
接口實現
生產者實現如下。
@Override
public SendBatchResult sendBatch(List<MqMessage> mqMessageList) {
final List<String> messageIdList = this.fillMessageList(mqMessageList);
final MqMessageBatchReq batchReq = new MqMessageBatchReq();
batchReq.setMqMessageList(mqMessageList);
String traceId = IdHelper.uuid32();
batchReq.setTraceId(traceId);
batchReq.setMethodType(MethodType.P_SEND_MSG_BATCH);
return Retryer.<SendBatchResult>newInstance()
.maxAttempt(maxAttempt)
.callable(new Callable<SendBatchResult>() {
@Override
public SendBatchResult call() throws Exception {
return doSendBatch(messageIdList, batchReq, false);
}
}).retryCall();
}
@Override
public SendBatchResult sendOneWayBatch(List<MqMessage> mqMessageList) {
List<String> messageIdList = this.fillMessageList(mqMessageList);
MqMessageBatchReq batchReq = new MqMessageBatchReq();
batchReq.setMqMessageList(mqMessageList);
String traceId = IdHelper.uuid32();
batchReq.setTraceId(traceId);
batchReq.setMethodType(MethodType.P_SEND_MSG_ONE_WAY_BATCH);
return doSendBatch(messageIdList, batchReq, true);
}
private SendBatchResult doSendBatch(List<String> messageIdList,
MqMessageBatchReq batchReq,
boolean oneWay) {
log.info("[Producer] 批量發送消息 messageIdList: {}, batchReq: {}, oneWay: {}",
messageIdList, JSON.toJSON(batchReq), oneWay);
// 以第一個 sharding-key 為準。
// 後續的會被忽略
MqMessage mqMessage = batchReq.getMqMessageList().get(0);
Channel channel = getChannel(mqMessage.getShardingKey());
//one-way
if(oneWay) {
log.warn("[Producer] ONE-WAY send, ignore result");
return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);
}
MqCommonResp resp = callServer(channel, batchReq, MqCommonResp.class);
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
return SendBatchResult.of(messageIdList, SendStatus.SUCCESS);
}
throw new MqException(ProducerRespCode.MSG_SEND_FAILED);
}
ps: 這裡和單個發送有一個區別,那就是對於 channel 的選擇。因為只能選擇一個,所以不能兼顧每一個消息的 sharding-key。
Broker 的處理
消息分發
// 生產者消息發送-批量
if(MethodType.P_SEND_MSG_BATCH.equals(methodType)) {
return handleProducerSendMsgBatch(channelId, json);
}
// 生產者消息發送-ONE WAY-批量
if(MethodType.P_SEND_MSG_ONE_WAY_BATCH.equals(methodType)) {
handleProducerSendMsgBatch(channelId, json);
return null;
}
具體實現
/**
* 處理生產者發送的消息
*
* @param channelId 通道標識
* @param json 消息體
* @since 0.1.3
*/
private MqCommonResp handleProducerSendMsgBatch(String channelId, String json) {
MqMessageBatchReq batchReq = JSON.parseObject(json, MqMessageBatchReq.class);
final ServiceEntry serviceEntry = registerProducerService.getServiceEntry(channelId);
List<MqMessagePersistPut> putList = buildPersistPutList(batchReq, serviceEntry);
MqCommonResp commonResp = mqBrokerPersist.putBatch(putList);
// 遍歷異步推送
for(MqMessagePersistPut persistPut : putList) {
this.asyncHandleMessage(persistPut);
}
return commonResp;
}
這裡對消息列表進行持久化保存。
演示的持久化策略如下:
@Override
public MqCommonResp putBatch(List<MqMessagePersistPut> putList) {
// 構建列表
for(MqMessagePersistPut put : putList) {
this.doPut(put);
}
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}
消息的批量ACK
說明
以前的實現方式是每一個消息消費完成之後,進行一次 ACK。
對於 pull 策略的消息消費,我們可以等當前批次結束,統一進行 ACK 回執。
消費實現
實現調整如下:
for(MqTopicTagDto tagDto : subscribeList) {
final String topicName = tagDto.getTopicName();
final String tagRegex = tagDto.getTagRegex();
MqConsumerPullResp resp = consumerBrokerService.pull(topicName, tagRegex, size);
if(MqCommonRespCode.SUCCESS.getCode().equals(resp.getRespCode())) {
List<MqMessage> mqMessageList = resp.getList();
if(CollectionUtil.isNotEmpty(mqMessageList)) {
List<MqConsumerUpdateStatusDto> statusDtoList = new ArrayList<>(mqMessageList.size());
for(MqMessage mqMessage : mqMessageList) {
IMqConsumerListenerContext context = new MqConsumerListenerContext();
final String messageId = mqMessage.getTraceId();
ConsumerStatus consumerStatus = mqListenerService.consumer(mqMessage, context);
log.info("消息:{} 消費結果 {}", messageId, consumerStatus);
// 狀態同步更新
if(!ackBatchFlag) {
MqCommonResp ackResp = consumerBrokerService.consumerStatusAck(messageId, consumerStatus);
log.info("消息:{} 狀態回執結果 {}", messageId, JSON.toJSON(ackResp));
} else {
// 批量
MqConsumerUpdateStatusDto statusDto = new MqConsumerUpdateStatusDto();
statusDto.setMessageId(messageId);
statusDto.setMessageStatus(consumerStatus.getCode());
statusDto.setConsumerGroupName(groupName);
statusDtoList.add(statusDto);
}
}
// 批量執行
if(ackBatchFlag) {
MqCommonResp ackResp = consumerBrokerService.consumerStatusAckBatch(statusDtoList);
log.info("消息:{} 狀態批量回執結果 {}", statusDtoList, JSON.toJSON(ackResp));
statusDtoList = null;
}
}
} else {
log.error("拉取消息失敗: {}", JSON.toJSON(resp));
}
}
如果 ackBatchFlag = false,則處理邏輯和以前一樣。
如果 ackBatchFlag = true,則首先把消息放到 list 中,結束後統一執行。
broker 實現
消息分發
//消費者消費狀態 ACK-批量
if(MethodType.C_CONSUMER_STATUS_BATCH.equals(methodType)) {
MqConsumerUpdateStatusBatchReq req = JSON.parseObject(json, MqConsumerUpdateStatusBatchReq.class);
final List<MqConsumerUpdateStatusDto> statusDtoList = req.getStatusList();
return mqBrokerPersist.updateStatusBatch(statusDtoList);
}
實現
默認的持久化實現,更新如下:
@Override
public MqCommonResp updateStatusBatch(List<MqConsumerUpdateStatusDto> statusDtoList) {
for(MqConsumerUpdateStatusDto statusDto : statusDtoList) {
this.doUpdateStatus(statusDto.getMessageId(), statusDto.getConsumerGroupName(),
statusDto.getMessageStatus());
}
MqCommonResp commonResp = new MqCommonResp();
commonResp.setRespCode(MqCommonRespCode.SUCCESS.getCode());
commonResp.setRespMessage(MqCommonRespCode.SUCCESS.getMsg());
return commonResp;
}
遍歷每一個元素,進行狀態的更新。
小結
異步和批量,是提升性能最常用的 2 種方式。
批量的實現相關來說是最簡單,也是效果最顯著的。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) //github.com/houbb/mq
拓展閱讀
rpc-從零開始實現 rpc //github.com/houbb/rpc