【mq】從零開始實現 mq-01-生產者、消費者啟動
- 2022 年 4 月 21 日
- 筆記
MQ 是什麼?
MQ(Message Queue)消息隊列,是基礎數據結構中「先進先出」的一種數據結構。
指把要傳輸的數據(消息)放在隊列中,用隊列機制來實現消息傳遞——生產者產生消息並把消息放入隊列,然後由消費者去處理。
消費者可以到指定隊列拉取消息,或者訂閱相應的隊列,由MQ服務端給其推送消息。
MQ 的作用?
消息隊列中間件是分佈式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。
解耦:一個業務需要多個模塊共同實現,或者一條消息有多個系統需要對應處理,只需要主業務完成以後,發送一條MQ,其餘模塊消費MQ消息,即可實現業務,降低模塊之間的耦合。
異步:主業務執行結束後從屬業務通過MQ,異步執行,減低業務的響應時間,提高用戶體驗。
削峰:高並發情況下,業務異步處理,提供高峰期業務處理能力,避免系統癱瘓。
ps: 以上內容摘選自百科。
實現 mq 的準備工作
maven 引入
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
模塊劃分
The message queue in java. 作為 mq 的從零開始的學習項目,目前已開源。
項目的模塊如下:
模塊 | 說明 |
---|---|
mq-common | 公共代碼 |
mq-broker | 註冊中心 |
mq-producer | 消息生產者 |
mq-consumer | 消息消費者 |
消息消費者
接口定義
package com.github.houbb.mq.consumer.api;
/**
* @author binbin.hou
* @since 1.0.0
*/
public interface IMqConsumer {
/**
* 訂閱
* @param topicName topic 名稱
* @param tagRegex 標籤正則
*/
void subscribe(String topicName, String tagRegex);
/**
* 註冊監聽器
* @param listener 監聽器
*/
void registerListener(final IMqConsumerListener listener);
}
IMqConsumerListener
作為消息監聽類的接口,定義如下:
public interface IMqConsumerListener {
/**
* 消費
* @param mqMessage 消息體
* @param context 上下文
* @return 結果
*/
ConsumerStatus consumer(final MqMessage mqMessage,
final IMqConsumerListenerContext context);
}
ConsumerStatus 代表消息消費的幾種狀態。
消息體
啟動消息體 MqMessage 定義如下:
package com.github.houbb.mq.common.dto;
import java.util.Arrays;
import java.util.List;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class MqMessage {
/**
* 標題名稱
*/
private String topic;
/**
* 標籤
*/
private List<String> tags;
/**
* 內容
*/
private byte[] payload;
/**
* 業務標識
*/
private String bizKey;
/**
* 負載分片標識
*/
private String shardingKey;
// getter&setter&toString
}
push 消費者策略實現
消費者啟動的實現如下:
/**
* 推送消費策略
*
* @author binbin.hou
* @since 1.0.0
*/
public class MqConsumerPush extends Thread implements IMqConsumer {
// 省略...
@Override
public void run() {
// 啟動服務端
log.info("MQ 消費者開始啟動服務端 groupName: {}, port: {}, brokerAddress: {}",
groupName, port, brokerAddress);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new MqConsumerHandler());
}
})
// 這個參數影響的是還沒有被accept 取出的連接
.option(ChannelOption.SO_BACKLOG, 128)
// 這個參數只是過一段時間內客戶端沒有響應,服務端會發送一個 ack 包,以判斷客戶端是否還活着。
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 綁定端口,開始接收進來的鏈接
ChannelFuture channelFuture = serverBootstrap.bind(port).syncUninterruptibly();
log.info("MQ 消費者啟動完成,監聽【" + port + "】端口");
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("MQ 消費者關閉完成");
} catch (Exception e) {
log.error("MQ 消費者啟動異常", e);
throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
// 省略...
}
ps: 初期我們把 consumer 作為服務端,後續引入 broker 則只有 broker 是服務端。
MqConsumerHandler 處理類
這個類是一個空的實現。
public class MqConsumerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {
//nothing
}
}
測試代碼
MqConsumerPush mqConsumerPush = new MqConsumerPush();
mqConsumerPush.start();
啟動日誌:
[DEBUG] [2022-04-21 19:16:41.343] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:16:41.356] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消費者開始啟動服務端 groupName: C_DEFAULT_GROUP_NAME, port: 9527, brokerAddress:
[INFO] [2022-04-21 19:16:43.196] [Thread-0] [c.g.h.m.c.c.MqConsumerPush.run] - MQ 消費者啟動完成,監聽【9527】端口
消息生產者
接口定義
最基本的消息發送接口。
package com.github.houbb.mq.producer.api;
import com.github.houbb.mq.common.dto.MqMessage;
import com.github.houbb.mq.producer.dto.SendResult;
/**
* @author binbin.hou
* @since 1.0.0
*/
public interface IMqProducer {
/**
* 同步發送消息
* @param mqMessage 消息類型
* @return 結果
*/
SendResult send(final MqMessage mqMessage);
/**
* 單向發送消息
* @param mqMessage 消息類型
* @return 結果
*/
SendResult sendOneWay(final MqMessage mqMessage);
}
生產者實現
MqProducer 啟動的實現如下,基於 netty。
package com.github.houbb.mq.producer.core;
/**
* 默認 mq 生產者
* @author binbin.hou
* @since 1.0.0
*/
public class MqProducer extends Thread implements IMqProducer {
//省略...
@Override
public void run() {
// 啟動服務端
log.info("MQ 生產者開始啟動客戶端 GROUP: {}, PORT: {}, brokerAddress: {}",
groupName, port, brokerAddress);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new MqProducerHandler());
}
})
.connect("localhost", port)
.syncUninterruptibly();
log.info("MQ 生產者啟動客戶端完成,監聽端口:" + port);
channelFuture.channel().closeFuture().syncUninterruptibly();
log.info("MQ 生產者開始客戶端已關閉");
} catch (Exception e) {
log.error("MQ 生產者啟動遇到異常", e);
throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
} finally {
workerGroup.shutdownGracefully();
}
}
//省略...
}
MqProducerHandler 處理類
默認的空實現,什麼都不做。
package com.github.houbb.mq.producer.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* @author binbin.hou
* @since 1.0.0
*/
public class MqProducerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object object) throws Exception {
//do nothing now
}
}
啟動代碼
MqProducer mqProducer = new MqProducer();
mqProducer.start();
啟動日誌:
[DEBUG] [2022-04-21 19:17:11.960] [main] [c.g.h.l.i.c.LogFactory.setImplementation] - Logging initialized using 'class com.github.houbb.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2022-04-21 19:17:11.974] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生產者開始啟動客戶端 GROUP: P_DEFAULT_GROUP_NAME, PORT: 9527, brokerAddress:
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x5cb48145] REGISTERED
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler connect
信息: [id: 0x5cb48145] CONNECT: localhost/127.0.0.1:9527
四月 21, 2022 7:17:13 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x5cb48145, L:/127.0.0.1:57740 - R:localhost/127.0.0.1:9527] ACTIVE
[INFO] [2022-04-21 19:17:13.833] [Thread-0] [c.g.h.m.p.c.MqProducer.run] - MQ 生產者啟動客戶端完成,監聽端口:9527
小結
基於 netty 最基本的服務端啟動、客戶端啟動到這裡就結束了。
千里之行,始於足下。
我們下一節將和大家一起學習,如何實現客戶端與服務端之間的交互。
希望本文對你有所幫助,如果喜歡,歡迎點贊收藏轉發一波。
我是老馬,期待與你的下次重逢。
開源地址
The message queue in java.(java 簡易版本 mq 實現) : //github.com/houbb/mq
拓展閱讀
rpc-從零開始實現 rpc: //github.com/houbb/rpc