如何使用java搭建一款高性能的Mqtt集群broker!

SMQTT是一款開源的MQTT消息代理Broker,

SMQTT基於Netty開發,底層採用Reactor3反應堆模型,支援單機部署,支援容器化部署,具備低延遲,高吞吐量,支援百萬TCP連接,同時支援多種協議交互,是一款非常優秀的消息中間件!

smqtt目前擁有的功能如下:

  1. 消息品質等級實現(支援qos0,qos1,qos2)
  2. 會話消息
  3. 保留消息
  4. 遺囑消息
  5. 客戶端認證
  6. tls加密
  7. websocket協議支援
  8. http協議交互
  9. SPI介面擴展支援
    • 消息管理介面(會話消息/保留消息管理)
    • 通道管理介面 (管理系統的客戶端連接)
    • 認證介面 (用於自定義外部認證)
    • 攔截器 (用戶自定義攔截消息)
  10. 集群支援(gossip協議實現)
  11. 容器化支援

後面規劃項目

  1. 規則引擎
  2. Web管理系統
  3. 監控系統
  4. 協議橋接agent(用戶其他協議與broker之間交互)

main方式啟動

引入依賴

<dependency>
  <groupId>io.github.quickmsg</groupId>
  <artifactId>smqtt-core</artifactId>
  <version>1.0.4</version>
</dependency>

阻塞式啟動服務:


 Bootstrap.builder()
       .port(8555)
       .websocketPort(8999)
       .options(channelOptionMap -> {})
       .ssl(false)
       .reactivePasswordAuth((U,P)->true)
       .sslContext(new SslContext("crt","key"))
       .isWebsocket(true)
       .wiretap(false)
       .httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
       .build()
       .startAwait();

非阻塞式啟動服務:


 
 Bootstrap bootstrap = 
        Bootstrap.builder()
       .port(8555)
       .websocketPort(8999)
       .options(channelOptionMap -> {})
       .ssl(false)
       .sslContext(new SslContext("crt","key"))
       .isWebsocket(true)
       .wiretap(false)
       .httpOptions(Bootstrap.HttpOptions.builder().ssl(false).httpPort(62212).accessLog(true).build())
       .build()
       .start().block();

assert bootstrap != null;
 // 關閉服務
 bootstrap.shutdown();

jar方式

  1. 下載源碼 mvn compile package -Dmaven.test.skip=true smqtt-bootstrap -P jar
  在smqtt-bootstrap/target目錄下生成jar
  1. 準備配置文件 config.properties
    
    # 開啟tcp埠
    smqtt.tcp.port=1883
    # 高水位
    smqtt.tcp.lowWaterMark=4000000
    # 低水位
    smqtt.tcp.highWaterMark=80000000
    # 開啟ssl加密
    smqtt.tcp.ssl=false
    # 證書crt smqtt.tcp.ssl.crt =
    # 證書key smqtt.tcp.ssl.key =
    # 開啟日誌
    smqtt.tcp.wiretap=false
    # boss執行緒
    smqtt.tcp.bossThreadSize=4;
    # work執行緒
    smqtt.tcp.workThreadSize=8;
    # websocket埠
    smqtt.websocket.port=8999;
    # websocket開啟
    smqtt.websocket.enable=true;
    # smqtt用戶
    smqtt.tcp.username=smqtt;
    # smqtt密碼
    smqtt.tcp.password=smqtt;
    # 開啟http
    smqtt.http.enable=true;
    # 開啟http埠
    smqtt.http.port=1999;
    # 開啟http日誌
    smqtt.http.accesslog=true;
    # 開啟ssl
    smqtt.http.ssl.enable=false;
    # smqtt.http.ssl.crt =;
    # smqtt.http.ssl.key;
  1. 啟動服務
  java -jar smqtt-bootstrap-1.0.1-SNAPSHOT.jar <conf.properties路徑>

docker 方式

拉取鏡像

# 拉取docker鏡像地址
docker pull 1ssqq1lxr/smqtt:latest

啟動鏡像默認配置

# 啟動服務
docker run -it  -p 1883:1883 1ssqq1lxr/smqtt

啟動鏡像使用自定義配置( 準備配置文件conf.properties)

# 啟動服務
docker run -it  -v <配置文件路徑目錄>:/conf -p 1883:1883  -p 1999:1999 1ssqq1lxr/smqtt

測試服務(啟動http埠)

  • 啟動客戶端訂閱主題 test/+

  • 使用http介面推送mqtt消息

# 推送消息
curl -H "Content-Type: application/json" -X POST -d '{"topic": "test/teus", "qos":2, "retain":true, "message":"我來測試保留消息3" }' "//localhost:1999/smqtt/publish"

wiki地址

集群類配置參考文檔:

smqtt文檔