SpringBoot WebSocket STOMP 廣播配置

  • 2020 年 3 月 15 日
  • 筆記

1. 前言

WebSocket是一種在單個TCP連接上進行全雙工通訊的協議,常用於實時通訊的場景。在沒有使用高層級線路協議的情況下,直接使用WebSocket是很難實現發布訂閱的功能。而STOMP是在WebSocket之上提供了一個基於幀的線路格式層,STOMP客戶端可以同時作為生產者和消費者兩種模式。為發布訂閱的功能提供了基礎。

2. STOMP協議

STOMP is a simple text-orientated messaging protocol. It defines an interoperable wire format so that any of the available STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among languages and platforms (the STOMP web site has a list of STOMP client and server implementations.

文檔地址:http://jmesnil.net/stomp-websocket/doc/

3. SpringBoot WebSocket集成

SpringBoot集成WebSocket非常方便,只需要簡單的三個步驟:導包、配置、提供介面

3.1 導入websocket包

compile('org.springframework.boot:spring-boot-starter-websocket')

3.2 配置WebSocket

第一步:創建WebSocketConfig類,通過@EnableWebSocketMessageBroker 啟用代理支援的消息傳遞。

第二步:重寫registerStompEndpoints和configureMessageBroker方法。

第三步:註冊對外可訪問的stomp端點、訪問方式和連接跨域設置。

第四步:配置消息代理。可設置廣播模式和點對點通訊。也可以添加訂閱通道的前綴。

package com.itdragon.server.config    import org.springframework.context.annotation.Configuration  import org.springframework.messaging.simp.config.MessageBrokerRegistry  import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker  import org.springframework.web.socket.config.annotation.StompEndpointRegistry  import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer    @Configuration  @EnableWebSocketMessageBroker  class WebSocketConfig : WebSocketMessageBrokerConfigurer {        override fun configureMessageBroker(config: MessageBrokerRegistry) {          // 設置訂閱Broker名稱,/topic為廣播模式          config.enableSimpleBroker("/topic")          // 設置應用程式全局目標前綴          config.setApplicationDestinationPrefixes("/itdragon")      }        override fun registerStompEndpoints(registry: StompEndpointRegistry) {          // 允許使用socketJs方式訪問,訪問端點為socket,並允許跨域          registry.addEndpoint("/socket").setAllowedOrigins("*").withSockJS()      }    }

注意:

若使用了setApplicationDestinationPrefixes方法,則作用主要體現在@SubscribeMapping和@MessageMapping上。如控制層配置@MessageMapping("/sendToServer"),則客戶端發送的地址是 /itdragon/sendToServer

3.3 對外暴露介面

第一步:創建WebSocket的控制層類,並注入用於發送消息的SimpMessagingTemplate。

第二步:配置通過@MessageMapping註解修飾的方法來接收客戶端SEND的操作。

第三步:配置通過@SubscribeMapping註解修飾的方法來接收客戶端SUBSCRIBE的操作。

第四步:配置通過@SendTo註解的方法來直接將消息推送的指定地址上。

package com.itdragon.server.api.rest    import org.springframework.beans.factory.annotation.Autowired  import org.springframework.messaging.handler.annotation.MessageMapping  import org.springframework.messaging.handler.annotation.SendTo  import org.springframework.messaging.simp.SimpMessagingTemplate  import org.springframework.messaging.simp.annotation.SubscribeMapping  import org.springframework.stereotype.Controller  import org.springframework.web.bind.annotation.RequestMapping  import java.time.Instant    @Controller  class WebSocketController {        @Autowired      lateinit var simpMessagingTemplate: SimpMessagingTemplate        /**       * 訂閱廣播,伺服器主動推給連接的客戶端       * 通過Http請求的方式觸發訂閱操作       */      @RequestMapping("/subscribeTopic")      fun subscribeTopicByHttp() {          while (true) {              // 可以靈活設置成通道地址,實現發布訂閱的功能              val channel = "/topic/subscribeTopic"              simpMessagingTemplate.convertAndSend(channel, Instant.now())              Thread.sleep(10*1000)          }      }        /**       * 訂閱廣播,伺服器主動推給連接的客戶端       * 通過Websocket的subscribe操作觸發訂閱操作       */      @SubscribeMapping("/subscribeTopic")      fun subscribeTopicByWebSocket(): Long {          return Instant.now().toEpochMilli()      }        /**       * 服務端接收客戶端發送的消息,類似OnMessage方法       */      @MessageMapping("/sendToServer")      fun handleMessage(message: String) {          println("message:{$message}")      }        /**       * 將客戶端發送的消息廣播出去       */      @MessageMapping("/sendToTopic")      @SendTo("/topic/subscribeTopic")      fun sendToTopic(message: String): String {          return message      }    }

WebSocket的訂閱功能,可以用@SubscribeMapping註解,也可以用HTTP的方式觸發。ITDragon龍 比較傾向HTTP的方式,因為在實現身份驗證的功能上會比較方便。在客戶端發送訂閱操作之前,先發送HTTP請求做身份驗證,驗證成功後再返回指定的訂閱通道地址。

4. 前端對接測試

在做消息通道對接的測試中,最常見的對話就是:連上了嗎?沒連上;收到了嗎?沒收到;收到了嗎?收到了,後端報錯……. 作為技術人員,我們有必要對各個領域的知識都有一定的了解。只有清楚明白了前端和移動端的開發思維,我們才能提供更合適的介面。

4.1 前端程式碼

<!DOCTYPE html>  <html>  <head>  <meta charset="UTF-8" />  <title>WebSocket 發布訂閱</title>  <link rel="stylesheet" type="text/css" href="https://cdn.staticfile.org/antd/3.23.6/antd.min.css">  <script src="https://cdn.bootcss.com/sockjs-client/1.1.4/sockjs.min.js"></script>  <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.min.js"></script>  <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>  </head>  <body>      <h2 id="connStatus"></h2>      <div style="padding-left:20px;">          <p>/topic/subscribeTopic 訂閱廣播通道;/sendToServer 向服務端推送消息;/sendToTopic 將消息廣播出去</p>          <div>              <label>訂閱通道: </label> <input type="text" id="subscribeChannel" value=""/><br>              <label>推送通道: </label> <input type="text" id="sendChannel" value=""/><br>              <button id="subscribe" onclick="subscribe()">訂閱</button>              <button id="connect" onclick="connect()">連接</button>              <button id="disconnect"  onclick="disconnect();">斷開連接</button>          </div>          <br>          <div>              <label>用戶名稱: </label> <input type="text" id="name"/><br>              <label>輸入消息: </label> <input type="text" id="message"/><br>              <button id="send" onclick="sendMsg();">發送</button>              <p id="response"></p>          </div>      </div>        <script type="text/javascript">          var stompClient = null;          var host="http://localhost:8809";          function subscribe() {              $.get(host+'/subscribeTopic');          }            function connect() {              var socket = new SockJS(host+'/socket');              stompClient = Stomp.over(socket);              stompClient.connect({}, function(frame) {                  $('#connStatus').html('Connected:' + frame)                  stompClient.subscribe($('#subscribeChannel').val(), function(response) {                      $('#response').html(response.body);                  });              },function(err){                  console.log(err)              });          }            function disconnect() {              if (stompClient != null) {                  stompClient.disconnect();              }              $('#connStatus').html('Disconnected')          }            function sendMsg() {              var message = $('#message').val();              stompClient.send($('#sendChannel').val(), {}, message);          }      </script>  </body>  </html>

4.2 測試效果

簡單測試了發布和訂閱功能

GIF

5. 原生WebSocket配置

有的特殊場景需要檢測WebSocket的生命周期,還是會用到原生的WebSocket配置,這裡記錄一下對應的坑。

5.1 配置類註冊Bean

在任意一個配置類中添加ServerEndpointExporter的Bean配置

@Bean  fun serverEndpointExporter(): ServerEndpointExporter {      return ServerEndpointExporter()  }

問題:

  • 1)添加後單元測試啟動失敗,服務可以正常啟動。網上說可以移除程式碼,由SpringBoot管理。可是移除後websocket鏈接會出現問題。解決方法目前未找到。

5.2 創建WebSocketServer

第一步:通過@ServerEndpoint註解修飾類,表示該類是WebSocket的Server,並對外暴露連接地址。

第二步:通過@OnOpen、@OnClose、@OnMessage、@OnError註解修飾方法,監控WebSocket的生命周期。

第三步:通過靜態、私有、ConcurrentHashMap 修飾的變數管理客戶端。

第四步:為程式其他類提供發送消息的方法。

package com.itdragon.server.config    import org.slf4j.LoggerFactory  import org.springframework.stereotype.Component  import java.io.IOException  import java.util.concurrent.ConcurrentHashMap  import javax.websocket.*  import javax.websocket.server.PathParam  import javax.websocket.server.ServerEndpoint    @ServerEndpoint("/nativeSocket/{clientKey}")  @Service  class WebSocketServer {        private var logger = LoggerFactory.getLogger(WebSocketServer::class.java)      private var session: Session? = null      private var clientKey = ""        @OnOpen      fun onOpen(session: Session, @PathParam("clientKey") clientKey: String) {          this.session = session          this.clientKey = clientKey          if (webSocketMap.containsKey(clientKey)) {              webSocketMap.remove(clientKey)              webSocketMap[clientKey] = this          } else {              webSocketMap[clientKey] = this          }          logger.info("客戶端:$clientKey 連接成功")        }        @OnClose      fun onClose() {          if (webSocketMap.containsKey(clientKey)) {              webSocketMap.remove(clientKey)          }          logger.warn("客戶端:$clientKey 連接關閉")        }        @OnMessage      fun onMessage(message: String, session: Session) {          logger.info("客戶端:$clientKey 收到消息:$message")      }        @OnError      fun onError(session: Session, error: Throwable) {          logger.error("WebSocket客戶端(${this.clientKey})錯誤: ${error.message}")      }        @Throws(IOException::class)      fun sendMessage(message: String) {          this.session!!.basicRemote.sendText(message)      }        companion object {          private val webSocketMap = ConcurrentHashMap<String, WebSocketServer>()            @Throws(IOException::class)          fun sendMessage(clientKey: String, message: String) {              webSocketMap[clientKey]?.sendMessage(message)          }            fun getStatus(clientKey: String): Boolean? {              return webSocketMap[clientKey]?.session?.isOpen          }        }  }

問題:

5.3 前端測試

<!DOCTYPE html>  <html>  <head>      <meta charset="UTF-8" />      <title>WebSocket 簡單通訊</title>      <link rel="stylesheet" type="text/css" href="https://cdn.staticfile.org/antd/3.23.6/antd.min.css">      <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.js"></script>  </head>  <body>  <div>      <label>訂閱通道: </label> <input type="text" id="clientId" value=""/><br>      <label>推送消息: </label> <input type="text" id="message" value=""/><br>      <button id="subscribe" onclick="openSocket()">開啟socket</button>      <button id="connect" onclick="sendMessage()">發送消息</button>      <button id="disconnect"  onclick="closeSocket();">關閉socket</button>  </div>  <script>      var socket;      function openSocket() {          if(typeof(WebSocket) == "undefined") {              console.log("您的瀏覽器不支援WebSocket");          }else{              console.log("您的瀏覽器支援WebSocket");              var socketUrl="ws://localhost:8809/nativeSocket/"+$("#clientId").val();              if(socket!=null){                  socket.close();                  socket=null;              }              socket = new WebSocket(socketUrl);              socket.onopen = function() {                  console.log("websocket已打開");              };              socket.onmessage = function(msg) {                  console.log(msg.data);              };              socket.onclose = function() {                  console.log("websocket已關閉");              };              socket.onerror = function() {                  console.log("websocket發生了錯誤");              }          }      }      function sendMessage() {          if(socket!=null){              socket.send($("#message").val());          }      }      function closeSocket() {          socket.close();      }  </script>  </body>  </html>

通過stomp客戶端發起的認證操作可以看一下這篇文章:https://www.cnblogs.com/jmcui/p/8999998.html