简介
websocket协议的出现,打破http那种只能由客户端发起请求,服务器响应的单项数据传输局面,采用websocket之后,可以由客户端推送数据到服务端,也可以由服务器主动推送数据到客户端,从而同时进行双向通信,便于数据实时更新推送
netty-socketio便是为了方便我们进行websocket开发的框架,底层采用netty,提供了可靠高效的性能
netty-socketio的官方github地址:https://github.com/mrniko/netty-socketio
入门demo
咋们先来一个入门的websocket程序,带大家先简单了解下netty-socketio的使用
demo相关代码地址:https://gitee.com/xumengqq/work-websocketw.git
项目结构图
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.19</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
配置
# 自定义socket服务端配置
websocket:
server:
host: localhost
port: 9999
服务端代码实现
netty-socketio框架采用的数据传输格式是["key","data"],这是一个数组,前面的key是信令,可以自定义,后面是传递的数据,理解为对"key"信令发送了一条"data"数据。也可以类比成对某个接口发送了一条数据。后面的代码编写思路也是监听这个key信令上传的数据,或者主动对这个key信令下发数据
实体类和常量类
@Data
public class Message {
private String userId;
private String room;
private String data;
}
public class EventConstant {
public static final String TEST_EVENT = "test";
public static final String MESSAGE = "message";
public static final String USER_ID = "userId";
public static final String JOIN_ROOM = "joinRoom";
public static final String LEAVE_ROOM = "leaveRoom";
/**
* 获取该用户所有加入的房间
*/
public static final String GET_MY_ROOMS = "getMyRooms";
/**
* 房间内广播消息
* */
public static final String ROOM_BROADCAST = "roomBroadcast";
/**
* 全局广播消息
* */
public static final String GLOBAL_BROADCAST = "globalBroadcast";
public static final String SEND_USER_DATA = "sendUserData";
}
socket配置类编写
@Configuration
@Log4j2
public class WebsocketConfig {
@Value("${websocket.server.host:localhost}")
private String host;
@Value("${websocket.server.port:9999}")
private Integer websocketPort;
/**
* 设置socket service基本配置
* @return
*/
@Bean
public SocketIOServer getSocketIOServer(){
com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
configuration.setPort(websocketPort);
configuration.setHostname(host);
configuration.getSocketConfig().setTcpNoDelay(true);
configuration.getSocketConfig().setReuseAddress(true);
return new SocketIOServer(configuration);
}
/**
* 扫描socket相关的注解,例如@OnConnect、@DisConnect、@OnEvent
* @return
*/
@Bean
public SpringAnnotationScanner springAnnotationScanner(){
return new SpringAnnotationScanner(getSocketIOServer());
}
}
netty-socketio还提供了命名空间的概念,简单来说就是一个netty-socketio应用程序,里面有n个namespace,一个namespace里面又有n个room,用户默认会加入一个空字符""名称 的namespace和一个空字符""名称的room,下面有获取默认的namespace和room的代码和运行结果。绝大多数情况下使用默认的namespace就可以了,这里不做展开。
其它的配置可以参考github上的说明:https://github.com/mrniko/netty-socketio/wiki/Configuration-details
-
setHostname If not set then bind address will be 0.0.0.0 or ::0
-
setPort The port the socket.io server will listen to
-
setJsonTypeFieldName defaults to "@class"
-
setJsonSupport Allows to setup custom implementation of JSON serialization/deserialization. See JsonSupport
-
setAuthorizationListener Authorization listener invoked on every handshake. Accepts all clients by default.
-
setStoreFactory Client store and pubsub factory, used to store session data and implements distributed pubsub. Defaults to MemoryStoreFactory, but RedisStoreFactory and HazelcastStoreFactory also implemented.
-
setPreferDirectBuffer (added in 1.5 version) Buffer allocation method used during packet encoding. Defaults to true.
-
setBossThreads (added in 1.5 version) boss-threads amount for netty
-
setWorkerThreads (added in 1.5 version) worker-threads amount for netty
-
setHeartbeatInterval Heartbeat interval (in seconds), defaults to 25
-
setHeartbeatTimeout Heartbeat timeout (in seconds), defaults to 60. Use 0 to disable it
-
setCloseTimeout Channel close timeout (in seconds) due to inactivity, defaults to 60
-
setContext Namespace, defaults to "/socket.io"
-
setAllowCustomRequests Allow to service custom requests that differ from socket.io protocol, defaults to false.
- If true, add own handler which handle custom requests in order to avoid hang connections.
-
setPollingDuration Polling interval for XHR transport (in seconds), defaults to 20
-
setKeyStorePassword SSL key store password (for secure connections)
-
setKeyStore SSL key store stream, maybe appointed to any source
-
setMaxHttpContentLength Set maximum HTTP content length limit, defaults to 64KB.
- If the length of the aggregated content exceeds this value, a TooLongFrameException will be raised.
-
setTransports Transports supported by server, defaults to [Transport.WEBSOCKET, Transport.FLASHSOCKET, Transport.XHRPOLLING]. Cannot be empty list
socket服务启动和销毁
@SpringBootApplication
@Log4j2
public class SingleApplication implements CommandLineRunner, DisposableBean {
@Autowired
private SocketIOServer socketIOServer;
public static void main(String[] args) {
SpringApplication.run(SingleApplication.class, args);
}
/**
* 启动socket服务器
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
log.info("socket start ...");
socketIOServer.start();
log.info("socket started");
}
/**
* spring容器关闭时将socket服务也停止
* @throws Exception
*/
@Override
public void destroy() throws Exception {
socketIOServer.stop();
log.info("socket server stop");
}
}
客户端连接和断开
@Component
@Log4j2
public class ConnectHandel {
public static final String USER_ID = "userId";
/**
* 客户端socket连接后触发
* @param client
*/
@OnConnect
public void connect(SocketIOClient client){
// 获取登录信息,也可以在这里面进行权限判断
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
}
/**
* 客户端断开服务器时触发
* @param client
*/
@OnDisconnect
public void disConnect(SocketIOClient client){
// 获取登录信息
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
}
}
业务代码
客户端和服务器简单交互
@Autowired
private SocketIOServer socketIOServer;
@OnEvent(MESSAGE)
public void messageEvent(SocketIOClient client,String message) throws InterruptedException {
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("testEvent userId {}, sessionUuid {}, message {}",userId,client.getSessionId(),message);
// 业务处理,这里写一个模拟demo
// 向客户端发消息
client.sendEvent(MESSAGE,"服务处理中...");
Thread.sleep(1000);
// 业务处理结束后再向客户端发消息
client.sendEvent(MESSAGE,"服务处理完成!");
}
加入房间
@OnEvent(JOIN_ROOM)
public void joinRoom(SocketIOClient client,String room){
client.joinRoom(room);
}
离开房间
@OnEvent(LEAVE_ROOM)
public void leaveRoom(SocketIOClient client,String room){
client.leaveRoom(room);
}
房间内广播消息
@OnEvent(ROOM_BROADCAST)
public void roomBroadcast(SocketIOClient client, String data){
Message message = JSON.parseObject(data, Message.class);
roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
}
public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
if (Objects.isNull(excludeClient)){
socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
}else {
socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
}
}
全局广播消息
@OnEvent(GLOBAL_BROADCAST)
public void globalBroadcast(SocketIOClient client, String data){
globalBroadcastData(MESSAGE,data,null);
}
public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
if(Objects.isNull(excludeClient)){
socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
}else {
socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
}
}
点对点消息发送
这个需要存储userId和client的对应关系,知道需要发送的目标userId,就能获取到client,然后直接发消息既可
我们在@OnConnect和@DisConnect中来绑定和解除userId和client的关系
客户端
客户端代码我是在网上找的,大家可按自己的方式实现既可
js方式
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>SocketIO客户端测试环境</title>
<base>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script>
<style>
body {
padding: 20px;
}
#console {
height: 450px;
overflow: auto;
}
.connect-msg {
color: green;
}
.disconnect-msg {
color: red;
}
</style>
</head>
<body>
<h1>客户端测试环境</h1>
<hr style="height:1px;border:none;border-top:1px solid black;" />
<div style="width: 700px; float: left">
<h3>SocketClient建立连接</h3>
<div style="border: 1px;">
<label>socketio服务端地址:</label>
<!--
http://localhost 服务端ip
9999 服务端socket端口(服务端提供)
test或socketIO 命名空间(可自定义)如果不定义命名空间,默认是/ 比如:http://localhost:9999?userId=12345
userId 用户id参数(可自定义)
ps:因为我定义了命名空间/test和/socketIO,所以我这里也可以用
http://localhost:9999/test?userId=12345
http://localhost:9999/socketIO?userId=12345
这里我用http://localhost:9999?userId=12345建立连接,因为这里还不涉及到请求不同命名空间的方法
-->
<input type="text" id="url" value="ws://localhost:9999?userId=12345" style="width: 500px;">
<br>
<br>
<button id="connect" style="width: 100px;">建立连接</button>
<button id="disconnect" style="width: 100px;">断开连接</button>
</div>
<hr style="height:1px;border:none;border-top:1px solid black;" />
<h3>SocketClient发送消息</h3>
<div style="border: 1px;">
<label>socketEvent名称:</label><input type="text" id="socketEvent" value="getUserRooms">
<br><br>
<textarea id="content" maxlength="1000" cols="40" rows="5" placeholder="请输入内容"></textarea>
<button id="send" style="width: 100px;">发送消息</button>
</div>
<hr style="height:1px;border:none;border-top:1px solid black;" />
</div>
<div style="float: left;margin-left: 50px;">
<h3>SocketIO互动消息</h3>
<button id="clean" style="width: 100px;">清理输出</button>
<div id="console" class="well"></div>
</div>
</body>
<script type="text/javascript">
var socket ;
var errorCount = 0;
var isConnected = false;
var maxError = 5;
//连接
function connect(url) {
//var opts = {
// query: 'userId='+userId
//};
//socket = io.connect(url, opts);
socket = io.connect(url);
//socket.nsp = "/socketIO";//定义命名空间
console.log(socket)
//监听本次连接回调函数
socket.on('connect', function () {
isConnected =true;
console.log("连接成功");
serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>连接成功</span>');
errorCount=0;
});
//监听消息
socket.on('message', function (data) {
output('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>' + data + ' </span>');
console.log(data);
});
//监听断开
socket.on('disconnect', function () {
isConnected =false;
console.log("连接断开");
serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已下线! </span>');
});
//监听断开错误
socket.on('connect_error', function(data){
serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>;' + '连接错误-'+data+' </span>');
errorCount++;
if(errorCount>=maxError){
socket.disconnect();
}
});
//监听连接超时
socket.on('connect_timeout', function(data){
serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '连接超时-'+data+' </span>');
errorCount++;
if(errorCount>=maxError){
socket.disconnect();
}
});
//监听错误
socket.on('error', function(data){
serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '系统错误-'+data+' </span>');
errorCount++;
if(errorCount>=maxError){
socket.disconnect();
}
});
/*socket.on('ack', function(data){
console.log("ack:"+data)
var str = '消息发送失败';
if(data==1){
str = '消息发送成功';
}
serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+' </font>' + str+' </span>');
});*/
}
function output(message) {
var element = $("<div>" + " " + message + "</div>");
$('#console').prepend(element);
}
function serverOutput(message) {
var element = $("<div>" + message + "</div>");
$('#console').prepend(element);
}
//连接
$("#connect").click(function(){
if(!isConnected){
var url = $("#url").val();
connect(url);
}else {
serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+' </font>' + '已经成功建立连接,不要重复建立!!! </span>');
}
})
//断开连接
$("#disconnect").click(function(){
if(isConnected){
socket.disconnect();
}
})
//发送消息
$("#send").click(function(){
var socketEvent = $("#socketEvent").val();//自定义的事件名称
var content = $("#content").val();//发送的内容
socket.emit(socketEvent,content,function(data1,data2){
console.log("ack1:"+data1);
console.log("ack2:"+data2);
});
})
//清理消息
$("#clean").click(function(){
$('#console').html("");
})
function getNowTime(){
var date=new Date();
var year=date.getFullYear(); //获取当前年份
var mon=date.getMonth()+1; //获取当前月份
var da=date.getDate(); //获取当前日
var h=date.getHours(); //获取小时
var m=date.getMinutes(); //获取分钟
var s=date.getSeconds(); //获取秒
var ms=date.getMilliseconds();
var d=document.getElementById('Date');
var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms;
return date;
}
</script>
</html>
java 实现客户端
package com.corundumstudio.socketio.demo.client;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import java.net.URISyntaxException;
public class DemoSocketClient {
public static void main(String[] args) throws URISyntaxException, InterruptedException {
IO.Options options = new IO.Options();
options.transports = new String[]{"websocket"};
options.reconnectionAttempts = 2;
options.reconnectionDelay = 1000;//失败重连的时间间隔
options.timeout = 500;//连接超时时间(ms)
final Socket socket = IO.socket("http://localhost:9999/?userId=12345", options);
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send("hello");
}
});
socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("连接关闭");
}
});
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("sessionId:" + socket.id());
for (Object obj : args) {
System.out.println(obj);
}
System.out.println("收到服务器应答,将要断开连接...");
socket.disconnect();
}
});
socket.connect();
}
}
生产集群方案
上面的demo是单机版本,但是,生产上服务都是集群存在的,至少也要两台,双节点防止单点故障;可是websocket服务搭建集群会有个问题,就是服务间通信的问题;比如,Aclient连接上Aserver,Bclient连接上了Bserver,这时候,Aclient就没法对Bclient进行发送消息和接受 消息。
这里提供几种解决方案
Redis存储方案
redis是官方推荐的解决方案,这个在Github上可以看到。配置之后,利用redis这个中介,可以通过广播的方式对其他客户端进行通信,底层是用了redis的发布订阅功能。有两种通信方式,可以通过全局广播通知到其他客户端,也可以对加入同一个房间内的客户端进行广播。但是这个有局限性,netty-socketio只提供了广播的redis发布订阅方案,点对点方式并没有提供。
pom依赖
<!-- 添加redisson依赖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.15.4</version>
</dependency>
yaml配置
# redis
spring:
redis:
host: 121.196.111.111 #地址
port: 6379 #端口号
database: 0 #0号库
password: password #密码
配置类
// redisson的配置类
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.port}")
private Integer port;
@Value("${spring.redis.database:0}")
private Integer database;
@Value("${spring.redis.timeout:3000}")
private Integer timeout;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
// config.useClusterServers().addNodeAddress("redis://").setPassword("");
config.useSingleServer().setAddress("redis://"+ host +":" + port).setPassword(password).setDatabase(database).setTimeout(timeout);
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}
// 在SocketIOServer配置上redisson既可使用
@Autowired
private RedissonClient redissonClient;
@Bean
public SocketIOServer getSocketIOServer(){
com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
configuration.setPort(websocketPort);
configuration.setHostname(host);
configuration.getSocketConfig().setTcpNoDelay(true);
configuration.getSocketConfig().setReuseAddress(true);
// 设置redis存储方式
configuration.setStoreFactory(new RedissonStoreFactory(redissonClient));
return new SocketIOServer(configuration);
}
全局广播和房间内广播
@OnEvent(ROOM_BROADCAST)
public void roomBroadcast(SocketIOClient client, String data){
Message message = JSON.parseObject(data, Message.class);
roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
}
@OnEvent(GLOBAL_BROADCAST)
public void globalBroadcast(SocketIOClient client, String data){
globalBroadcastData(MESSAGE,data,null);
}
public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
if(Objects.isNull(excludeClient)){
socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
}else {
socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
}
}
public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
if (Objects.isNull(excludeClient)){
socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
}else {
socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
}
}
生产灰度发布
nginx重新加载配置会影响原有的websocket连接吗?可不可以通过更新路由配置实现websocket服务的灰度发布?
经过测试,在更改nginx配置信息后,比如将其中一个服务设置成down,输入nginx -s reload命令,原有的websocket连接不会受到影响,但是新的连接不会再进入设置为down的服务,随时间推移,连接数就会越来越少,直至为0。因此,我们可以将需要发布的服务设置为down,然后再编写一个获取此服务连接client的数量的接口,当数量为0的时候,代表没有websocket连接了,此时,既可对这台服务器进行发布了。
获取某台服务器连接客户端数量
@Autowired
private SocketIOServer socketIOServer;
@GetMapping("/getClientCount")
public Object getClientCount(){
Collection<SocketIOClient> allClients = socketIOServer.getAllClients();
int size = allClients.size();
return size;
}
nginx配置websocket服务路由
这里需要采用hash的路由方式;不知道是框架还是其他的什么原因,客户端会发起三次的普通http请求和一次http升级成websocket的请求,采用其他的路由方式会导致这些请求不到同一个服务器从而导致websocket连接报错。这需要注意一下
worker_processes 1;
events {
worker_connections 1024;
}
http {
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
hash $remote_addr consistent;
server 127.0.0.1:9999 down;
server 127.0.0.1:9998;
}
server {
listen 80;
location / {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "$connection_upgrade";
#proxy_set_header Host $host;
}
}
}
Redis存储-点对点完善
之前的redis方案是netty-websocketio官方提供的集群解决方案,但此方案平没有解决点对点信息传输的问题,我们现在在此基础上还是通过redis的发布订阅来完善点对点信息推送。
当然,你也可以通过mq的方式来完成,个人觉得用redis的话,中小型项目就足够了,成本也低
思路:用户连上服务器,将用户和用户所连接的机器信息(这里指ip和端口号)保存在redis,断开连接则将redis上的记录清除掉;每台服务器订阅当前ip和端口号的topic,知道用户连接到那台服务器之后,发布消息则是发布到这个用户对应服务器ip和端口号的topic,这样,经过redis的订阅和发布,既可完成点对点的通信。(多台服务器分配多个topic方案是为了优化。如果采用一个topic,所有的机器都订阅同一个topic,然后每台机器再去赛选要发送的目标客户端是否连接在此机器上,这种方式会让其他非目标机器都进行一次赛选处理,略微有些影响性能,照成服务器不必要的开销)
ClientHub类
在之前代码的基础上,编写一个clientHub类,用于存储用户连接之前的信息
@Component
@Log4j2
public class ClientHub implements CommandLineRunner {
@Autowired
private SocketIOServer socketIOServer;
@Autowired
private RedissonClient redissonClient;
public static final String REDIS_TOPIC = "redis_topic";
public static final String ONLINE_USER = "online_user_";
/**
* 本机存储userId与客户端之间的连接关系
*/
private static Map<String, SocketIOClient> userIdToClient = new HashMap<>();
/**
* 集群模式下,点对点发送,一律采取这个方法,不能直接通过SocketClient发送
* @param fromUserId
* @param targetUserId
* @param key
* @param data
*/
public void sendMessageByUserId(String fromUserId,String targetUserId,String key,String data){
Message message = new Message();
message.setFromUserId(fromUserId);
message.setTargetUserId(targetUserId);
message.setKey(key);
message.setData(data);
SocketIOClient socketIOClient = userIdToClient.get(targetUserId);
// 如果是本机直连的,直接发送,不是则通过redis发布对应的机器订阅
if(!Objects.isNull(socketIOClient)){
socketIOClient.sendEvent(message.getKey(),message);
}else {
RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + targetUserId);
if(bucket.isExists()){
String topic = bucket.get();
this.pubMessage(topic,message);
}
}
}
/**
* 用户连接时调用,存储用户连接之间的信息
* @param userId
* @param client
*/
public void addUser(String userId,SocketIOClient client){
userIdToClient.put(userId,client);
// 存储在redis上
RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
bucket.set(getRedisTopic());
}
/**
* 用户断开连接时调用,清楚用户和客户端之前的关系
* @param userId
*/
public void deleteUser(String userId){
userIdToClient.remove(userId);
// 从redis上删除掉
RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
bucket.delete();
}
/**
* 获取本socket服务的ip和端口号
* @return
*/
private String getHostAndPort(){
String hostAddress = "";
try {
hostAddress = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
int port = socketIOServer.getConfiguration().getPort();
return hostAddress+"_"+port;
}
/**
* 获取这台机器redis订阅的topic
* @return
*/
private String getRedisTopic(){
String hostAndPort = getHostAndPort();
return REDIS_TOPIC + "_" + hostAndPort;
}
/**
* redis发布消息
* @param topic
* @param message
*/
private void pubMessage(String topic, Message message){
redissonClient.getTopic(topic).publish(message);
log.info("pubMessage topic {} data {}",topic,JSON.toJSONString(message));
}
/**
* redis订阅消息
* @param topic
*/
public void subMessage(String topic){
redissonClient.getTopic(topic).addListener(Message.class,(channel,message)->{
log.info("topic {} submessage {}",topic, JSON.toJSONString(message));
String userId = message.getTargetUserId();
SocketIOClient socketIOClient = userIdToClient.get(userId);
if(Objects.nonNull(socketIOClient)){
socketIOClient.sendEvent(message.getKey(),JSON.toJSONString(message));
}
});
}
/**
* 服务器 对应的组件加载完成后,需要开启redis订阅功能
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
String redisTopic = getRedisTopic();
subMessage(redisTopic);
log.info("topic {} 已订阅",redisTopic);
}
}
连接记录
建立连接和断开连接分别进行记录和清除工作
@Autowired
private ClientHub clientHub;
/**
* 客户端socket连接后触发
* @param client
*/
@OnConnect
public void connect(SocketIOClient client){
// 获取登录信息,也可以在这里面进行权限判断
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
String url = client.getHandshakeData().getUrl();
log.info("url :{}",url);
// 保存userId和client的对应关系
clientHub.addUser(userId,client);
}
/**
* 客户端断开服务器时触发
* @param client
*/
@OnDisconnect
public void disConnect(SocketIOClient client){
// 获取登录信息
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
// 解除userId和client的对应关系
clientHub.deleteUser(userId);
}
点对点发送消息业务使用
/**
* 点对点发送消息
* @param client
* @param data
* 注意:如果接受的参数是一个对象,那么前端页面发送数据的时候也必须是一个对象,不能用json字符串代替
* example:js代码 socket.emit('sendUserData', {data: "你好啊", targetUserId: "2233"});
*/
@OnEvent(SEND_USER_DATA)
public void sendUserData(SocketIOClient client, AckRequest ackRequest,Message data){
// 获取登录信息
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("sendUserData {}",JSON.toJSONString(data));
clientHub.sendMessageByUserId(userId,data.getTargetUserId(),MESSAGE,data.getData());
}
这里只是简单的编写集群的思想,实际情况还有一些问题需要注意和解决,比如当某台机器挂了,这台机器连接的用户数据还是在redis上,其它机器就以为这些用户还是在线,但其实状态不对。这个就需要进行额外的处理了,比如存redis的时候设置过期时间,然后通过定时任务不断在redis上刷新在线用户信息等。
特殊场景集群方案
之前的redis方式,是通用的集群解决方案,基本上大部分的场景都适用,但有些时候业务场景比较特殊,也可以采取特殊的方式解决
特殊场景
用户之前的通信只在同一个房间内,比如视频会议,上课直播,游戏房间之类的,用户之间只和在同一个房间内的人进行通信,不需要对跨房间或者对房间外的用户进行实时消息推送。
思路
将房间看做一个单独的整体,只能分配在某一台服务器上,只要是这个房间的人需要连接服务器,就先获取该房间的服务器地址,客户端直接连上对应服务器,再进入房间,那么,该房间内的所有人都是在同一台服务器上,从而避免了跨服务器之间的通信场景,就相当于单机版。不过这个得手动控制房间的负载均衡,每台websocket服务器的访问路径也要自己控制好,生产的灰度发布也需要自行写代码控制。
spring事件模型
简述事件模型
spring提供的事件模型,其实也相当于一个发布订阅的工具,只不过是在单个程序内进行的,这里可以借助spring的事件,配合上多线程异步,可以更方便地进行websocket功能编码开发。从而达到,主要业务逻辑消息迅速实时推送,附加业务逻辑并发执行。
spring事件是spring自带的功能,不需要再额外引入依赖
定义事件
直接继承抽象类ApplicationEvent,然后自定义属性,自定义的属性用于在事件传播过程中记录自己需要的数据
public class ConnectEvent extends ApplicationEvent {
private SocketIOClient socketIOClient;
public ConnectEvent(Object source,SocketIOClient socketIOClient) {
super(source);
this.socketIOClient = socketIOClient;
}
public SocketIOClient getSocketIOClient() {
return socketIOClient;
}
}
事件发布
通过ApplicationEventPublisher的实例既可发布自定义的事件
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@OnConnect
public void connect(SocketIOClient client){
// 获取登录信息,也可以在这里面进行权限判断
String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
String url = client.getHandshakeData().getUrl();
log.info("url :{}",url);
// 也可以在这一步进行权限鉴定,鉴定不通过就发给客户端一个权限鉴定失败的消息,然后再断开连接
// client.sendEvent(MESSAGE,"权限不足");
// client.disconnect();
// 保存userId和client的对应关系
clientHub.addUser(userId,client);
// 发布一个用户连接事件
applicationEventPublisher.publishEvent(new ConnectEvent(this,client));
}
事件监听(订阅)
事件监听有两种书写方式,一种是直接实现ApplicationListener接口,泛型里面放入自定义事件
@Component
@Log4j2
public class UserLoginListenerDemo implements ApplicationListener<ConnectEvent> {
@Override
public void onApplicationEvent(ConnectEvent event) {
SocketIOClient socketIOClient = event.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
// 业务代码。。。
log.info("UserLoginListenerDemo userId {}",userId);
}
}
}
第二中有点类似于controller的方式,在方法上用@EventListener注解来标识事件监听,在方法的参数中传入自定义的事件,并且如果有顺序需求的话,可以通过@Order来注明那个方法先调用(值小的先调用)
@Component
@Log4j2
public class UserLoginListener {
@EventListener
@Order(3)
public void connectListenerLog(ConnectEvent connectEvent){
SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
log.info("user {} 登录的日志操作",userId);
}
}
@EventListener
@Order(2)
public void connectListenerStore(ConnectEvent connectEvent){
SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
log.info("user {} 登录的行为存表",userId);
}
}
@EventListener
@Order(1)
public void connectListenerState(ConnectEvent connectEvent){
SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
log.info("user {} 登录的状态更改",userId);
}
}
引入多线程异步处理
上面的事件监听的方式是单线程的,如果仅仅是这样,感觉还不如直接定义方法进行调用,事件模型到显得有点多余。我们可以引入线程池和@Async,通过多线程并行处理事件,实现真正的发布订阅
创建一个spring提供的线程池
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor autoLoginExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(20);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setQueueCapacity(200);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
threadPoolTaskExecutor.setThreadNamePrefix("event-task-");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
启动类上加上@EnableAsync注解
@SpringBootApplication
@Log4j2
@EnableAsync
public class ClusterEventApplication implements CommandLineRunner, DisposableBean {
event监听方法上加上@Async表示异步执行
@Component
@Log4j2
public class UserLoginListener {
@EventListener
// @Order(3)
@Async
public void connectListenerLog(ConnectEvent connectEvent){
SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
log.info("user {} 登录的日志操作",userId);
}
}
@EventListener
// @Order(2)
@Async
public void connectListenerStore(ConnectEvent connectEvent){
SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
log.info("user {} 登录的行为存表",userId);
}
}
@EventListener
// @Order(1)
@Async
public void connectListenerState(ConnectEvent connectEvent){
SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
{
log.info("user {} 登录的状态更改",userId);
}
}
}
效果截图: