WebSocket原理
- HTML5推出WebSocket标准,使得浏览器和服务器之间任一方都可主动发送数据给对方,用来代替http轮询。
- WebSocket跟HTTP协议一样,也是应用层协议。为兼容HTTP协议,它通过HTTP协议进行一次握手,握手后数据就直接从TCP层的Socket传输,与HTTP协议再无关。
- 在首次的http请求中,浏览器发给服务端的请求会带上跟WebSocket有关的请求头,比如Connection: Upgrade和Upgrade: websocket,服务端回复也带有websocket的响应头。
优点
- 建立在 TCP 协议之上,服务器端的实现比较容易,不用频繁创建TCP请求及销毁请求,减少网络带宽资源的占用,同时也节省服务器资源;
- 与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
- 服务器与客户端之间交换的标头信息很小,大概只有2字节,数据格式比较轻量,性能开销小,通信高效。
- 可以发送文本,也可以发送二进制数据。
- 没有同源限制,客户端可以与任意服务器通信。
- 协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
代码实现
前端代码
| |
| var websocket = null; |
| |
| |
| |
| |
| |
| |
| |
| function initWebSocket(param) { |
| |
| if ('WebSocket' in window) { |
| websocket = new WebSocket(param.url); |
| } else { |
| alert('当前浏览器 Not support websocket'); |
| return; |
| } |
| |
| websocket.onerror = function (e) { |
| logger.info("WebSocket连接发生错误:" + e); |
| }; |
| |
| |
| websocket.onopen = function (e) { |
| logger.info("WebSocket连接成功:" + JSON.stringify(e)); |
| }; |
| |
| |
| websocket.onmessage = function (event) { |
| logger.info("WebSocket接收到消息["+JSON.stringify(event)+"]"); |
| if(param.onmessage){ |
| param.onmessage(event.data); |
| } |
| }; |
| |
| |
| websocket.onclose = function (event) { |
| var code = event.code; |
| var reason = event.reason; |
| var wasClean = event.wasClean; |
| logger.info("WebSocket连接关闭:code[" + code + "],reason[" + reason + "],wasClean[" + wasClean + "]"); |
| }; |
| |
| |
| window.onbeforeunload = function () { |
| |
| websocket.close(); |
| }; |
| } |
| |
| |
| |
| |
| function sendWebSocketMsg(message) { |
| |
| |
| |
| |
| |
| |
| |
| if(websocket && WebSocket.OPEN == websocket.readyState){ |
| websocket.send(message); |
| }else{ |
| throw "WebSocket对象为空或者关闭"; |
| } |
| } |
| |
| |
| var webSocketParam = { |
| url: "ws://localhost:8080/queue/websocket/" + orgNo + "_" + deviceNo, |
| onmessage: function (data) { |
| console.log(data); |
| } |
| }; |
| initWebSocket(webSocketParam); |
| |
| var heartbeat = setInterval(function(){ |
| try { |
| logger.info("发送心跳连接..."); |
| sendWebSocketMsg("."); |
| } catch (e) { |
| logger.info("心跳连接发生异常,清除定时任务..." + JSON.stringify(e)); |
| clearInterval(heartbeat); |
| } |
| }, 180000); |
| |
复制
maven依赖
| <dependency> |
| <groupId>org.apache.tomcat</groupId> |
| <artifactId>tomcat-websocket</artifactId> |
| <version>9.0.8</version> |
| </dependency> |
| <dependency> |
| <groupId>javax.websocket</groupId> |
| <artifactId>javax.websocket-api</artifactId> |
| <version>1.1</version> |
| <scope>provided</scope> |
| </dependency> |
复制
后端代码
| @ServerEndpoint(value = "/websocket/{paramStr}") |
| public class MyWebSocket { |
| private static Logger logger = Logger.getLogger(MyWebSocket.class); |
| |
| private static final int MAX_SOCKET_MAP = 500; |
| |
| |
| |
| |
| public static final String MES_TYPE_CLASS = "1"; |
| |
| |
| public static ConcurrentMap<String, MyWebSocket> webSocketMap = new ConcurrentHashMap<>(); |
| |
| |
| private static int onlineCount = 0; |
| |
| |
| private Session session; |
| |
| |
| private String id = ""; |
| |
| |
| |
| |
| |
| |
| @OnOpen |
| public void onOpen(@PathParam(value = "paramStr") String paramStr, Session session) throws Exception { |
| this.id = paramStr; |
| this.session = session; |
| |
| webSocketMapPut(id, this); |
| addOnlineCount(); |
| |
| logger.info("客户端:" + id + "加入!当前在线连接数为:" + getOnlineCount()); |
| |
| String classificationNum = "0"; |
| |
| MyWebSocket.sendtoUser(MyWebSocket.MES_TYPE_CLASS + "_" + classificationNum, id, "service"); |
| } |
| |
| |
| |
| |
| |
| |
| @OnClose |
| public void onClose() throws Exception { |
| webSocketMap.remove(this.id); |
| subOnlineCount(); |
| logger.info("有一连接关闭,关闭的连接为:"+ this.id +",当前在线人数为 " + getOnlineCount()); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| @OnMessage |
| public void onMessage(String message, Session session) throws Exception { |
| logger.info("连接:"+ this.id + ",接收到来自客户端的消息:" + message); |
| if(".".equals(message)){ |
| return; |
| } |
| |
| String sendMessage = message.split("[|]")[0]; |
| String sendUserId = message.split("[|]")[1]; |
| if ("0".equals(sendUserId)){ |
| sendtoAll(sendMessage); |
| }else{ |
| sendtoUser(sendMessage, sendUserId, this.id); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| @OnError |
| public void onError(Session session, Throwable error) { |
| logger.info("连接:"+ this.id + ",发生异常", error); |
| } |
| |
| |
| |
| |
| |
| |
| |
| public void sendMessage(String message) throws IOException { |
| this.session.getBasicRemote().sendText(message); |
| } |
| |
| |
| |
| |
| |
| |
| |
| |
| public static void sendtoUser(String message, String sendUserId, String fromId) throws IOException { |
| MyWebSocket myWebSocket = webSocketMap.get(sendUserId); |
| if (myWebSocket != null) { |
| logger.info("用户[" + fromId + "]发送信息给指定用户["+ sendUserId + "],message["+message+"]"); |
| myWebSocket.sendMessage(message); |
| } else { |
| logger.info("用户[" + fromId + "]发送信息给指定用户["+ sendUserId + "],当前用户不在线"); |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| public static void sendToOrgAllDev(String orgNo, String message) throws Exception { |
| for (String key : webSocketMap.keySet()) { |
| if(key.indexOf(orgNo + "_") != -1){ |
| MyWebSocket myWebSocket = webSocketMap.get(key); |
| myWebSocket.sendMessage(message); |
| } |
| } |
| } |
| |
| |
| |
| |
| |
| |
| |
| public static void sendtoAll(String message) throws Exception { |
| for (MyWebSocket myWebSocket : webSocketMap.values()) { |
| myWebSocket.sendMessage(message); |
| } |
| } |
| |
| |
| public static synchronized int getOnlineCount() { |
| return onlineCount; |
| } |
| |
| public static synchronized void addOnlineCount() { |
| MyWebSocket.onlineCount++; |
| } |
| |
| public static synchronized void subOnlineCount() { |
| MyWebSocket.onlineCount--; |
| } |
| |
| public static void webSocketMapPut(String key, MyWebSocket value) throws Exception { |
| if(webSocketMap.size() > MAX_SOCKET_MAP){ |
| logger.info("当前在线连接数超过最大值"); |
| throw new Exception("当前在线连接数超过最大值"); |
| } |
| webSocketMap.put(key, value); |
| } |
| } |
复制
后端代码2-node.js
| var ws = require("ws"); |
| |
| |
| var wsServer = new ws.Server({ |
| host: "127.0.0.1", |
| port: 8183, |
| }); |
| console.log('WebSocket sever is listening at port localhost:8183'); |
| |
| |
| function on_server_client_comming (wsObj) { |
| console.log("request comming"); |
| websocket_add_listener(wsObj); |
| } |
| wsServer.on("connection", on_server_client_comming); |
| |
| |
| function websocket_add_listener(wsObj) { |
| wsObj.on("message", function(data) { |
| console.log("request data:"+data); |
| setTimeout(()=>{ |
| wsObj.send("1秒延时,收到了,正在处理"); |
| },1000); |
| |
| |
| |
| setTimeout(()=>{ |
| wsObj.send("3秒延时,返回数据,关闭连接"); |
| wsObj.close() |
| },3000); |
| }); |
| |
| wsObj.on("close", function() { |
| console.log("request close"); |
| }); |
| |
| wsObj.on("error", function(err) { |
| console.log("request error", err); |
| }); |
| } |
复制
所遇问题
场景测试
- 两个用户一对一聊天
- 用户给多个用户群发
- 服务器推送消息给指定用户
- 服务器给多个用户群发
其他
- spring中实现服务器端对websocket的支持时,单例导致的问题
- Spring默认实例化的Bean是单例模式,这就意味着在Spring容器加载时,就注入了MapMapper的实例,不管再调用多少次接口,加载的都是这个Bean同一个实例。
- 而WebSocket是多例模式,在项目启动时第一次初始化实例时,MapMapper的实例的确可以加载成功,但可惜这时WebSocket是无用户连接的。当有第一个用户连接时,WebSocket类会创建第二个实例,但由于Spring的Dao层是单例模式,所以这时MapMapper对应的实例为空。后续每连接一个新的用户,都会再创建新的WebSocket实例,当然MapMapper的实例都为空。
- 突然关闭前端浏览器会触发服务器后端的关闭或者报错方法
- Nginx开启websocket