如何使用java搭建一款高性能的Mqtt集群broker!
- 2021 年 5 月 11 日
- 筆記
- reactor-stream
SMQTT是一款開源的MQTT消息代理Broker,
SMQTT基於Netty開發,底層採用Reactor3反應堆模型,支援單機部署,支援容器化部署,具備低延遲,高吞吐量,支援百萬TCP連接,同時支援多種協議交互,是一款非常優秀的消息中間件!
smqtt目前擁有的功能如下:
- 消息品質等級實現(支援qos0,qos1,qos2)
- 會話消息
- 保留消息
- 遺囑消息
- 客戶端認證
- tls加密
- websocket協議支援
- http協議交互
- SPI介面擴展支援
- 消息管理介面(會話消息/保留消息管理)
- 通道管理介面 (管理系統的客戶端連接)
- 認證介面 (用於自定義外部認證)
- 攔截器 (用戶自定義攔截消息)
- 集群支援(gossip協議實現)
- 容器化支援
後面規劃項目
- 規則引擎
- Web管理系統
- 監控系統
- 協議橋接agent(用戶其他協議與broker之間交互)
main方式啟動
引入依賴
<dependency>
<groupId>io.github.quickmsg</groupId>
<artifactId>smqtt-core</artifactId>
<version>1.0.4</version>
</dependency>
阻塞式啟動服務:
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.reactivePasswordAuth((U,P)->true)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
.build()
.startAwait();
非阻塞式啟動服務:
Bootstrap bootstrap =
Bootstrap.builder()
.port(8555)
.websocketPort(8999)
.options(channelOptionMap -> {})
.ssl(false)
.sslContext(new SslContext("crt","key"))
.isWebsocket(true)
.wiretap(false)
.httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
.build()
.start().block();
assert bootstrap != null;
// 關閉服務
bootstrap.shutdown();
jar方式
- 下載源碼 mvn compile package -Dmaven.test.skip=true smqtt-bootstrap -P jar
在smqtt-bootstrap/target目錄下生成jar
- 準備配置文件 config.properties
# 開啟tcp埠
smqtt.tcp.port=1883
# 高水位
smqtt.tcp.lowWaterMark=4000000
# 低水位
smqtt.tcp.highWaterMark=80000000
# 開啟ssl加密
smqtt.tcp.ssl=false
# 證書crt smqtt.tcp.ssl.crt =
# 證書key smqtt.tcp.ssl.key =
# 開啟日誌
smqtt.tcp.wiretap=false
# boss執行緒
smqtt.tcp.bossThreadSize=4;
# work執行緒
smqtt.tcp.workThreadSize=8;
# websocket埠
smqtt.websocket.port=8999;
# websocket開啟
smqtt.websocket.enable=true;
# smqtt用戶
smqtt.tcp.username=smqtt;
# smqtt密碼
smqtt.tcp.password=smqtt;
# 開啟http
smqtt.http.enable=true;
# 開啟http埠
smqtt.http.port=1999;
# 開啟http日誌
smqtt.http.accesslog=true;
# 開啟ssl
smqtt.http.ssl.enable=false;
# smqtt.http.ssl.crt =;
# smqtt.http.ssl.key;
- 啟動服務
java -jar smqtt-bootstrap-1.0.1-SNAPSHOT.jar <conf.properties路徑>
docker 方式
拉取鏡像
# 拉取docker鏡像地址
docker pull 1ssqq1lxr/smqtt:latest
啟動鏡像默認配置
# 啟動服務
docker run -it -p 1883:1883 1ssqq1lxr/smqtt
啟動鏡像使用自定義配置( 準備配置文件conf.properties)
# 啟動服務
docker run -it -v <配置文件路徑目錄>:/conf -p 1883:1883 -p 1999:1999 1ssqq1lxr/smqtt
測試服務(啟動http埠)
-
啟動客戶端訂閱主題 test/+
-
使用http介面推送mqtt消息
# 推送消息
curl -H "Content-Type: application/json" -X POST -d '{"topic": "test/teus", "qos":2, "retain":true, "message":"我來測試保留消息3" }' "//localhost:1999/smqtt/publish"
wiki地址
集群類配置參考文檔: