一、SSE是什么?
SSE技术是基于单工通信模式,只是单纯的客户端向服务端发送请求,服务端不会主动发送给客户端。服务端采取的策略是抓住这个请求不放,等数据更新的时候才返回给客户端,当客户端接收到消息后,再向服务端发送请求,周而复始。
注意:因为EventSource对象是SSE的客户端,可能会有浏览器对其不支持
二、sse 与 websoket
SSE(Server-Sent Events)
是 HTML5 遵循 W3C 标准提出的客户端和服务端之间进行实时通信的协议。
优点
- SSE 客户端可以接收来自服务器的“流”数据,而不需要进行轮询。由于没有浪费的请求,因此 SSE 对于减轻服务器的压力非常有用。
- SSE 使用纯 JavaScript 实现简单,不需要额外的插件或库来处理消息。客户端可以使用 EventSource 接口轻松地与 SSE 服务器通信。
- SSE 天生具有自适应性,由于 SSE 是基于 HTTP 响应使用 EventStream 传递消息,因此它利用了 HTTP 的开销和互联网上的结构。
- SSE 可以与任何服务器语言和平台一起使用,因为 SSE 是一种规定了消息传递方式的技术,不依赖于具体的服务器语言和平台。
缺点
- SSE 是单向通信,只能从服务器推送到客户端。如果应用程序需要双向通信,就需要使用 Websocket。
- SSE无法发送二进制数据,只能发送 UTF-8 编码的文本。如果应用程序需要发送二进制数据,就需要使用 Websocket。
- SSE 不是所有浏览器都支持。虽然 SSE 是 HTML5 的一部分,但具体的浏览器支持性可能会有差异。
Websocket
是 HTML5 的一部分,提供了一种双向通信的机制。
优点
- Websocket 支持双向通信。使用 Websocket 可以同时向客户端发送和接收数据。
- Websocket 协议可以传输二进制数据,这使得 Websocket 更加灵活和强大。
- Websocket 连接长期存在,而不需要仅仅为了接收数据而保持 HTTP 连接打开。
- Websocket 的实现支持跨域的通信,可以方便地进行跨域通信。
缺点
- Websocket 不支持所有浏览器。特别是老浏览器可能不支持 Websocket 协议。
- Websocket 是一种全双工的通信方式。由于 Websocket 长期存在,会占用服务器资源。在高并发场景下,应该考虑使用 SSE。
三、前端示例代码:
// 建立连接 createSseConnect(clientId){ if(window.EventSource){ const eventSource = new EventSource('http://127.0.0.1:8083/sse/createSseConnect?clientId='+clientId); console.log(eventSource) eventSource.onmessage = (event) =>{ console.log("onmessage:"+clientId+": "+event.data) }; eventSource.onopen = (event) =>{ console.log("onopen:"+clientId+": "+event) }; eventSource.onerror = (event) =>{ console.log("onerror :"+clientId+": "+event) }; eventSource.close = (event) =>{ console.log("close :"+clientId+": "+event) }; }else{ console.log("你的浏览器不支持SSE~") } console.log(" 测试 打印") },
复制
四、后端示例代码:
SseController
package com.joker.cloud.linserver.controller; import com.joker.cloud.linserver.conf.sse.sseUtils; import com.joker.common.message.Result; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Map; /** * SseController * * @author joker * @version 1.0 * 2023/8/9 11:18 **/ @RestController @Slf4j @CrossOrigin @RequestMapping("/sse") public class SseController { @Autowired private sseUtils sseUtils; @GetMapping(value = "/createSseConnect", produces="text/event-stream;charset=UTF-8") public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) Long clientId) { return sseUtils.connect(clientId); } @PostMapping("/sendMessage") public void sendMessage(@RequestParam("clientId") Long clientId, @RequestParam("message") String message){ sseUtils.sendMessage(clientId, "123456789", message); } @GetMapping(value = "/listSseConnect") public Result<Map<Long, SseEmitter>> listSseConnect(){ Map<Long, SseEmitter> sseEmitterMap = sseUtils.listSseConnect(); return Result.success(sseEmitterMap); } /** * 关闭SSE连接 * * @param clientId 客户端ID **/ @GetMapping("/closeSseConnect") public Result closeSseConnect(Long clientId) { sseUtils.deleteUser(clientId); return Result.success(); } }
复制
sseUtils工具类
package com.joker.cloud.linserver.conf.sse; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * sseUtils * * @author joker * @version 1.0 * 2023/8/9 11:20 **/ @Slf4j @Component public class sseUtils { private static final Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); /** * 创建连接 */ public SseEmitter connect(Long userId) { if (sseEmitterMap.containsKey(userId)) { SseEmitter sseEmitter =sseEmitterMap.get(userId); sseEmitterMap.remove(userId); sseEmitter.complete(); } try { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String temp = str.substring(0, 8) + str.substring(9, 13) + str.substring(14, 18) + str.substring(19, 23) + str.substring(24); // 设置超时时间,0表示不过期。默认30秒 SseEmitter sseEmitter = new SseEmitter(30*1000L); sseEmitter.send(SseEmitter.event().id(temp).data("")); // reconnectTime(10*1000L) // 注册回调 sseEmitter.onCompletion(completionCallBack(userId)); // sseEmitter.completeWithError(errorCallBack(userId)); sseEmitter.onTimeout(timeoutCallBack(userId)); sseEmitterMap.put(userId, sseEmitter); log.info("创建sse连接完成,当前用户:{}", userId); return sseEmitter; } catch (Exception e) { log.info("创建sse连接异常,当前用户:{}", userId); } return null; } /** * 给指定用户发送消息 * */ public boolean sendMessage(Long userId,String messageId, String message) { if (sseEmitterMap.containsKey(userId)) { SseEmitter sseEmitter = sseEmitterMap.get(userId); try { sseEmitter.send(SseEmitter.event().id(messageId).data(message)); // reconnectTime(10*1000L) log.info("用户{},消息id:{},推送成功:{}", userId,messageId, message); return true; }catch (Exception e) { sseEmitterMap.remove(userId); log.info("用户{},消息id:{},推送异常:{}", userId,messageId, e.getMessage()); sseEmitter.complete(); return false; } }else { log.info("用户{}未上线", userId); } return false; } /** * 删除连接 * @param userId */ public void deleteUser(Long userId){ removeUser(userId); } private static Runnable completionCallBack(Long userId) { return () -> { log.info("结束sse用户连接:{}", userId); removeUser(userId); }; } private static Throwable errorCallBack(Long userId) { log.info("sse用户连接异常:{}", userId); removeUser(userId); return new Throwable(); } private static Runnable timeoutCallBack(Long userId) { return () -> { log.info("连接sse用户超时:{}", userId); removeUser(userId); }; } /** * 断开 * @param userId */ public static void removeUser(Long userId){ if (sseEmitterMap.containsKey(userId)) { SseEmitter sseEmitter = sseEmitterMap.get(userId); sseEmitterMap.remove(userId); sseEmitter.complete(); }else { log.info("用户{} 连接已关闭",userId); } } public Map<Long, SseEmitter> listSseConnect(){ return sseEmitterMap; } }
复制
五、模拟测试:
模拟浏览器发送建立连接的请求:
切换到时间栏目,可以看到长连接始终保持着的:
切换到eventStream:可以看到后端通信的streams流数据
使用postMan 模拟后端服务器推送给客户端消息
浏览器建立的连接中会看到服务器推送到客户端的消息内容及ID等基础信息
控制台也可以监听到事件的变化并输出