socket系列(三)——Spring-socket实时通信、推送
- 2019 年 10 月 30 日
- 笔记
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/luo4105/article/details/72695378
Spring-socket实现实时通信
实现
Spring4.x发布spring-socket模块包,用于支持websocket,同时兼容支持socketJS。
需要把spring所有的包更新到4.x以上版本,并下载spring-websocket包。
所需jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-websocket</artifactId> <version>${org.springframework-version}</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <!-- For SockJS --> <!--http://jira.codehaus.org/browse/JACKSON-884 --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.6.4</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> <scope>runtime</scope> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency>
官方的demo地址:https://github.com/rstoyanchev/spring-websocket-test
项目结构,马赛克的文件不用管
SpringMVC配置文件
<?xml version="1.0"encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.2.xsd http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd"> <context:component-scan base-package="com.lc.controller"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <mvc:annotation-driven /> <!-- websocket配置 --> <bean id="HelloHandler" class="com.lc.websocket.MySorketHandle" /> <websocket:handlers> <!-- 配置消息处理bean和路径的映射关系 --> <websocket:mapping path="/hello" handler="HelloHandler"/> <!-- 配置握手拦截器 --> <websocket:handshake-interceptors> <bean class="com.lc.websocket.HandshakeInterceptor"/> </websocket:handshake-interceptors> <!-- 开启sockjs,去掉则关闭sockjs --> <!-- <websocket:sockjs /> --> </websocket:handlers> <!-- 配置websocket消息的最大缓冲区长度 --> <!-- <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean"> <propertyname="maxTextMessageBufferSize" value="8192" /> <propertyname="maxBinaryMessageBufferSize" value="8192" /> </bean>--> <!-- 视图层配置 --> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="prefix" value="/WEB-INF/jsps/"/> <property name="suffix" value=".jsp"/> </bean> </beans>
Websorket实现类
import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; public classMySorketHandle extends TextWebSocketHandler { // 线上人数 private static int count; private staticCopyOnWriteArraySet<WebSocketSession> set = new CopyOnWriteArraySet<>(); private WebSocketSession session; @Override public voidafterConnectionEstablished(WebSocketSession session) { this.session = session; try{ set.add(this.session); }catch(Exception e) { e.printStackTrace(); } MySorketHandle.addOnlineCount(); System.out.println("目前连接人数:" + getOnlineCount()); } public voidafterConnectionClosed(WebSocketSession session,CloseStatus closeStatus) { this.session = session; set.remove(this.session); subOnlineCount(); System.out.println("目前连接人数:" + getOnlineCount()); } public void handleMessage(WebSocketSessionsession,WebSocketMessage<?>message){ System.out.println("text message: "+ session.getId()+ "-"+ message.getPayload()); for(WebSocketSession ssion : set) { try { ssion.sendMessage(message); }catch(IOException e) { e.printStackTrace(); } } } public static int getOnlineCount() { return count; } public static void addOnlineCount() { count++; } public static void subOnlineCount() { count--; } }
HandshakeInterceptor类
import java.util.Map; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; importorg.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor; public classHandshakeInterceptor extends HttpSessionHandshakeInterceptor { /* * 握手前处理动作 */ @Override public booleanbeforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler, Map<String,Object> map)throwsException { System.out.println("握手前"); return super.beforeHandshake(request, response, handler, map); } @Override public void afterHandshake(ServerHttpRequestrequest,ServerHttpResponse response,WebSocketHandler wsHandler,Exception ex) { super.afterHandshake(request, response, wsHandler, ex); } }
Controller页面跳转
@Controller @RequestMapping("/im") public classIMController { @RequestMapping("/page") public Stringpage(HttpServletRequest request, HttpServletResponse response) { return "IMpage"; } }
页面
<%@ page language="java"contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>socket</title> <script type="text/javascript" src="http://cdn.static.runoob.com/libs/jquery/2.1.1/jquery.min.js"></script> </head> <body> welcome<br /> <input id="text" type="text"/> <button οnclick="sendMsg()">sendMsg</button> <hr/> <button οnclick="closeWebSocket()">close WebSocketconnection</button> <hr/> <div id="message"></div> </body> <script type="text/javascript"> var websocket = null; //判断浏览器是否支持websocket if('WebSocket' in window) { websocket = new WebSocket("ws://localhost:8080/sdz-web//hello"); }else{ $("#message").html("该浏览器不支持实时通信功能"); } websocket.onopen= function() { console.log("websocket连接成功"); } websocket.onclose= function() { console.log("websocket连接关闭"); } websocket.onmessage= function(event) { console.log("接收消息"); console.log(event); printMsg(event.data); } //打印消息 function printMsg(msg) { $("#message").append(msg+ "<br/>"); } function sendMsg() { var msg = $("#text").val(); websocket.send(msg); } function closeWebSocket(){ websocket.close(); } </script> </html>
运行
总结
MySorketHandle类是websocket的实现类,这里我继承TextWebSocketHandler类,也可以实现WebSocketHandle接口,详细可以看看官方文档,这里大致介绍一下几个主要方法。
afterConnectionEstablished(WebSocketSessionsession):当有新的连接时执行方法,这里执行的是给session赋值,添加该连接对象,增加连接数量,打印连接信息
afterConnectionClosed(WebSocketSessionsession,CloseStatus closeStatus):当有连接关闭时执行方法,这里执行的是移除连接对象和打印信息操作。
handleMessage(WebSocketSessionsession,WebSocketMessage<?> message):当有新消息传后台时执行方法,这里执行的是给所有连接对象发送该请求。
handleTransportError(WebSocketSessionsession,Throwable exception):当有连接错误/异常时执行方法。
在spring-socket中,每次连接,都不会创建新的MySorketHandle,但会创建新的sesion对象,所以这里用CopyOnWriteArraySet<WebSocketSession> set来存放所有连接对象。
配置方面主要是这些
<!-- websocket配置 --> <bean id="HelloHandler"class="com.lc.websocket.MySorketHandle" /> <websocket:handlers> <!--配置消息处理bean和路径的映射关系 --> <websocket:mappingpath="/hello" handler="HelloHandler" /> <!--配置握手拦截器 --> <websocket:handshake-interceptors> <beanclass="com.lc.websocket.HandshakeInterceptor" /> </websocket:handshake-interceptors> <!--开启sockjs,去掉则关闭sockjs --> <!--<websocket:sockjs /> --> </websocket:handlers>
<!-- 配置websocket消息的最大缓冲区长度--> <!-- <bean class="org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean"> <propertyname="maxTextMessageBufferSize" value="8192" /> <propertyname="maxBinaryMessageBufferSize" value="8192" /> </bean> -->
当然,别忘了加上
xmlns:websocket="http://www.springframework.org/schema/websocket" xsi:schemaLocation="http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.1.xsd"
实现触发推送
对于上个例子,实现服务器触发推送功能,需求如下:
访问” http://127.0.0.1:8080/sdz-web/im/push”接口时,服务器会向指定的连接客户端推送信息,参数sessionId是客户端连接的sessionId,如果不带参数,则向所有的连接的客户端推送信息。
实现
项目结构,马赛克的文件不用管
编写测试方法,分别测试全部推送和指定推送
public classIMControllerPushTest { @Test public void pushAll() throws IOException { URLurl= newURL("http://127.0.0.1:8080/sdz-web/im/push"); HttpURLConnectionconn= (HttpURLConnection) url.openConnection(); conn.setRequestProperty("User-Agent","Mozilla/5.0 (Windows NT 6.1; Win64;x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110Safari/537.36"); conn.setDoOutput(true); conn.setDoInput(true); OutputStreamos = conn.getOutputStream(); Stringparams= ""; os.write(params.getBytes("UTF-8")); InputStreamis = conn.getInputStream(); byte[] res = new byte[1024]; StringBuffersb = new StringBuffer(); int l = 0; while((l = is.read(res)) != -1) { sb.append(new String(res, 0, l, "UTF-8")); } System.out.println(sb); } @Test public void push() throws IOException { URLurl= newURL("http://127.0.0.1:8080/sdz-web/im/push"); HttpURLConnectionconn= (HttpURLConnection) url.openConnection(); conn.setRequestProperty("User-Agent","Mozilla/5.0 (Windows NT 6.1; Win64;x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110Safari/537.36"); conn.setDoOutput(true); conn.setDoInput(true); OutputStreamos = conn.getOutputStream(); Stringparams= "sessionId=0"; os.write(params.getBytes("UTF-8")); InputStreamis = conn.getInputStream(); byte[] res = new byte[1024]; StringBuffersb = new StringBuffer(); int l = 0; while((l = is.read(res)) != -1) { sb.append(new String(res, 0, l, "UTF-8")); } System.out.println(sb); } }
IMController
@Controller @RequestMapping("/im") public classIMController { @Bean public MySorketHandlemySorketHandle() { return new MySorketHandle(); } @RequestMapping("/page") public Stringpage(HttpServletRequest request, HttpServletResponse response) { return "IMpage"; } @ResponseBody @RequestMapping("/push") public String push(@RequestParam(required = false) String sessionId, HttpServletResponseresponse){ Stringmsg= ""; if (StringUtils.isEmpty(sessionId)) { msg =mySorketHandle().pushMsg("服务器推送信息了"); System.out.println(msg); }else{ msg =mySorketHandle().pushMsg(sessionId, "服务器推送信息了"); System.out.println(msg); } return msg; } }
MySorkerHandler
/** * 给指定连接推消息 * @param session * @param message */ public String pushMsg(String sessionid, String message){ for(WebSocketSession ssion : set) { try { if(sessionid.equals(ssion.getId())){ ssion.sendMessage(new TextMessage(message)); return "机器:" + sessionid+ "推送成功"; } }catch(IOException e) { e.printStackTrace(); } } return "推送失败"; } /** * 给全部连接 * @param message * @return */ public String pushMsg(String message) { int i = 0; for(WebSocketSession ssion : set) { try { ssion.sendMessage(new TextMessage(message)); i++; }catch(IOException e) { e.printStackTrace(); } } return "共有" + i + "得到推送"; }
运行
总结
关键代码就是socket的功能实现和controller的调用
controller的调用
@Bean public MySorketHandle mySorketHandle() { returnnew MySorketHandle(); }
@ResponseBody @RequestMapping("/push") public String push(@RequestParam(required =false) String sessionId, HttpServletResponseresponse) { Stringmsg = ""; if(StringUtils.isEmpty(sessionId)) { msg= mySorketHandle().pushMsg("服务器推送信息了"); System.out.println(msg); }else { msg= mySorketHandle().pushMsg(sessionId, "服务器推送信息了"); System.out.println(msg); } returnmsg; }
socket的功能
/** * 给指定连接推消息 * @param session * @param message */ public String pushMsg(String sessionid, String message){ for(WebSocketSession ssion : set) { try { if(sessionid.equals(ssion.getId())){ ssion.sendMessage(new TextMessage(message)); return "机器:" + sessionid+ "推送成功"; } }catch(IOException e) { e.printStackTrace(); } } return "推送失败"; }
/** * 给全部连接 * @param message * @return */ public String pushMsg(String message) { int i = 0; for(WebSocketSession ssion : set) { try { ssion.sendMessage(new TextMessage(message)); i++; }catch(IOException e) { e.printStackTrace(); } } return "共有" + i + "得到推送"; }
Push是一个触发接口,调用MySorketHandle的pushMsg()方法。
如果”@ResponBody”返回数据是乱码,在配置文件中加上
<mvc:annotation-driven > <mvc:message-converters register-defaults="true"> <bean class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes" value="text/html;charset=UTF-8"/> </bean> </mvc:message-converters> </mvc:annotation-driven>
原因是springMVC对string的处理类StringHttpMessageConverter 中,默认采用的字符集是ISO-8859-1,而且是final的。
public static final Charset DEFAULT_CHARSET= Charset.forName("ISO-8859-1");