【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat
- 2022 年 5 月 6 日
- 筆記
前景回顧
【mq】從零開始實現 mq-02-如何實現生產者調用消費者?
【mq】從零開始實現 mq-03-引入 broker 中間人
【mq】從零開始實現 mq-06-消費者心跳檢測 heartbeat
為什麼需要心跳?
心跳(heartbeat ),顧名思義就是心臟的跳動。
醫學上一般通過心跳是否跳動,來判斷一個人是否活著。
那麼,分散式服務中如何判斷一個服務是否還活著呢?
實現思路
比如 mq 中,broker 需要把消息實時推送給在線的消費者。
那麼如何判斷一個消費者是否活著呢?
我們可以讓消費者定時,比如每 5 秒鐘給 broker 發送一個心跳包,考慮到網路延遲等,如果連續 1min 都沒有收到心跳,我們則移除這個消費者,認為服務已經掛了。
消費者實現
上程式碼!
心跳實現
心跳可以是一個很簡單的消息體。
@Override
public void heartbeat() {
final MqHeartBeatReq req = new MqHeartBeatReq();
final String traceId = IdHelper.uuid32();
req.setTraceId(traceId);
req.setMethodType(MethodType.C_HEARTBEAT);
req.setAddress(NetUtil.getLocalHost());
req.setPort(0);
req.setTime(System.currentTimeMillis());
log.debug("[HEARTBEAT] 往服務端發送心跳包 {}", JSON.toJSON(req));
// 通知全部
for(RpcChannelFuture channelFuture : channelFutureList) {
try {
Channel channel = channelFuture.getChannelFuture().channel();
callServer(channel, req, null);
} catch (Exception exception) {
log.error("[HEARTBEAT] 往服務端處理異常", exception);
}
}
}
消費者把心跳通知所有的 broker.
心跳的定時執行
我們啟動一個定時任務,5S 鍾執行一次。
/**
* 初始化心跳
* @since 0.0.6
*/
private void initHeartbeat() {
//5S 發一次心跳
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
heartbeat();
}
}, 5, 5, TimeUnit.SECONDS);
}
心跳是在連接到 broker 之後就開始啟動:
@Override
public void initChannelFutureList(ConsumerBrokerConfig config) {
//1. 配置初始化
//...
//2. 初始化
this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
initChannelHandler(), check);
//3. 初始化心跳
this.initHeartbeat();
}
Broker 實現
消費者定時發送消息,生產者肯定是需要接受的。
接收心跳
為了簡單,我們讓心跳是 ONE-WAY 的。
// 消費者心跳
if(MethodType.C_HEARTBEAT.equals(methodType)) {
MqHeartBeatReq req = JSON.parseObject(json, MqHeartBeatReq.class);
registerConsumerService.heartbeat(req, channel);
return null;
}
hearbeat 處理
每次收到消息,我們把請求的 channelId 記錄下來,並保存最新的訪問時間
@Override
public void heartbeat(MqHeartBeatReq mqHeartBeatReq, Channel channel) {
final String channelId = ChannelUtil.getChannelId(channel);
log.info("[HEARTBEAT] 接收消費者心跳 {}, channelId: {}",
JSON.toJSON(mqHeartBeatReq), channelId);
ServiceEntry serviceEntry = new ServiceEntry();
serviceEntry.setAddress(mqHeartBeatReq.getAddress());
serviceEntry.setPort(mqHeartBeatReq.getPort());
BrokerServiceEntryChannel entryChannel = InnerChannelUtils.buildEntryChannel(serviceEntry, channel);
entryChannel.setLastAccessTime(mqHeartBeatReq.getTime());
heartbeatMap.put(channelId, entryChannel);
}
移除消費者
如果一些消費者長時間沒有心跳,我們就認為服務已經掛了。
在 LocalBrokerConsumerService
服務啟動的時候,同時啟用一個定時清理任務。
public LocalBrokerConsumerService() {
//120S 掃描一次
final long limitMills = 2 * 60 * 1000;
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for(Map.Entry<String, BrokerServiceEntryChannel> entry : heartbeatMap.entrySet()) {
String key = entry.getKey();
long lastAccessTime = entry.getValue().getLastAccessTime();
long currentTime = System.currentTimeMillis();
if(currentTime - lastAccessTime > limitMills) {
removeByChannelId(key);
}
}
}
}, 2 * 60, 2 * 60, TimeUnit.SECONDS);
}
這個任務 2min 執行一次,如果 2min 都沒有心跳,這移除對應的消費者。
小結
心跳,是網路傳輸中驗證服務可用性非常簡單,但是有效的方式。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) //github.com/houbb/mq
拓展閱讀
rpc-從零開始實現 rpc //github.com/houbb/rpc