首页 前端知识 websocket之netty-socketio高性能实时消息推送框架

websocket之netty-socketio高性能实时消息推送框架

2024-08-18 22:08:16 前端知识 前端哥 573 430 我要收藏

简介

websocket协议的出现,打破http那种只能由客户端发起请求,服务器响应的单项数据传输局面,采用websocket之后,可以由客户端推送数据到服务端,也可以由服务器主动推送数据到客户端,从而同时进行双向通信,便于数据实时更新推送

netty-socketio便是为了方便我们进行websocket开发的框架,底层采用netty,提供了可靠高效的性能

netty-socketio的官方github地址:https://github.com/mrniko/netty-socketio

入门demo

咋们先来一个入门的websocket程序,带大家先简单了解下netty-socketio的使用

demo相关代码地址:https://gitee.com/xumengqq/work-websocketw.git

项目结构图

依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.19</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>

配置

# 自定义socket服务端配置
websocket:
  server:
    host: localhost
    port: 9999

服务端代码实现

netty-socketio框架采用的数据传输格式是["key","data"],这是一个数组,前面的key是信令,可以自定义,后面是传递的数据,理解为对"key"信令发送了一条"data"数据。也可以类比成对某个接口发送了一条数据。后面的代码编写思路也是监听这个key信令上传的数据,或者主动对这个key信令下发数据

实体类和常量类
@Data
public class Message {
    private String userId;
    private String room;
    private String data;
}

public class EventConstant {

    public static final String TEST_EVENT = "test";

    public static final String MESSAGE = "message";

    public static final String USER_ID = "userId";

    public static final String JOIN_ROOM = "joinRoom";
    public static final String LEAVE_ROOM = "leaveRoom";

    /**
     * 获取该用户所有加入的房间
     */
    public static final String GET_MY_ROOMS = "getMyRooms";

    /**
     * 房间内广播消息
     * */
    public static final String ROOM_BROADCAST = "roomBroadcast";
    /**
     * 全局广播消息
     * */
    public static final String GLOBAL_BROADCAST = "globalBroadcast";

    public static final String SEND_USER_DATA = "sendUserData";
}
socket配置类编写
@Configuration
@Log4j2
public class WebsocketConfig {

    @Value("${websocket.server.host:localhost}")
    private String host;
    @Value("${websocket.server.port:9999}")
    private Integer websocketPort;

    /**
     * 设置socket service基本配置
     * @return
     */
    @Bean
    public SocketIOServer getSocketIOServer(){
        com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
        configuration.setPort(websocketPort);
        configuration.setHostname(host);
        configuration.getSocketConfig().setTcpNoDelay(true);
        configuration.getSocketConfig().setReuseAddress(true);
        return new SocketIOServer(configuration);
    }

    /**
     * 扫描socket相关的注解,例如@OnConnect、@DisConnect、@OnEvent
     * @return
     */
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(){
        return new SpringAnnotationScanner(getSocketIOServer());
    }
}

netty-socketio还提供了命名空间的概念,简单来说就是一个netty-socketio应用程序,里面有n个namespace,一个namespace里面又有n个room,用户默认会加入一个空字符""名称 的namespace和一个空字符""名称的room,下面有获取默认的namespace和room的代码和运行结果。绝大多数情况下使用默认的namespace就可以了,这里不做展开。

 

 其它的配置可以参考github上的说明:https://github.com/mrniko/netty-socketio/wiki/Configuration-details

  • setHostname If not set then bind address will be 0.0.0.0 or ::0

  • setPort The port the socket.io server will listen to

  • setJsonTypeFieldName defaults to "@class"

  • setJsonSupport Allows to setup custom implementation of JSON serialization/deserialization. See JsonSupport

  • setAuthorizationListener Authorization listener invoked on every handshake. Accepts all clients by default.

  • setStoreFactory Client store and pubsub factory, used to store session data and implements distributed pubsub. Defaults to MemoryStoreFactory, but RedisStoreFactory and HazelcastStoreFactory also implemented.

  • setPreferDirectBuffer (added in 1.5 version) Buffer allocation method used during packet encoding. Defaults to true.

  • setBossThreads (added in 1.5 version) boss-threads amount for netty

  • setWorkerThreads (added in 1.5 version) worker-threads amount for netty

  • setHeartbeatInterval Heartbeat interval (in seconds), defaults to 25

  • setHeartbeatTimeout Heartbeat timeout (in seconds), defaults to 60. Use 0 to disable it

  • setCloseTimeout Channel close timeout (in seconds) due to inactivity, defaults to 60

  • setContext Namespace, defaults to "/socket.io"

  • setAllowCustomRequests Allow to service custom requests that differ from socket.io protocol, defaults to false.

    • If true, add own handler which handle custom requests in order to avoid hang connections.
  • setPollingDuration Polling interval for XHR transport (in seconds), defaults to 20

  • setKeyStorePassword SSL key store password (for secure connections)

  • setKeyStore SSL key store stream, maybe appointed to any source

  • setMaxHttpContentLength Set maximum HTTP content length limit, defaults to 64KB.

    • If the length of the aggregated content exceeds this value, a TooLongFrameException will be raised.
  • setTransports Transports supported by server, defaults to [Transport.WEBSOCKET, Transport.FLASHSOCKET, Transport.XHRPOLLING]. Cannot be empty list

socket服务启动和销毁
@SpringBootApplication
@Log4j2
public class SingleApplication implements CommandLineRunner, DisposableBean {

    @Autowired
    private SocketIOServer socketIOServer;

    public static void main(String[] args) {
        SpringApplication.run(SingleApplication.class, args);
    }

    /**
     * 启动socket服务器
     * @param args
     * @throws Exception
     */
    @Override
    public void run(String... args) throws Exception {
        log.info("socket start ...");
        socketIOServer.start();
        log.info("socket started");
    }

    /**
     * spring容器关闭时将socket服务也停止
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        socketIOServer.stop();
        log.info("socket server stop");
    }
}
客户端连接和断开
@Component
@Log4j2
public class ConnectHandel {

    public static final String USER_ID = "userId";

    /**
     * 客户端socket连接后触发
     * @param client
     */
    @OnConnect
    public void connect(SocketIOClient client){
        // 获取登录信息,也可以在这里面进行权限判断
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
    }

    /**
     * 客户端断开服务器时触发
     * @param client
     */
    @OnDisconnect
    public void disConnect(SocketIOClient client){
        // 获取登录信息
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());
    }
}
业务代码
客户端和服务器简单交互
    @Autowired
    private SocketIOServer socketIOServer;

    @OnEvent(MESSAGE)
    public void messageEvent(SocketIOClient client,String message) throws InterruptedException {
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("testEvent userId {}, sessionUuid {}, message {}",userId,client.getSessionId(),message);

        // 业务处理,这里写一个模拟demo
        // 向客户端发消息
        client.sendEvent(MESSAGE,"服务处理中...");

        Thread.sleep(1000);
        // 业务处理结束后再向客户端发消息
        client.sendEvent(MESSAGE,"服务处理完成!");
    }
加入房间
    @OnEvent(JOIN_ROOM)
    public void joinRoom(SocketIOClient client,String room){
        client.joinRoom(room);
    }
离开房间
    @OnEvent(LEAVE_ROOM)
    public void leaveRoom(SocketIOClient client,String room){
        client.leaveRoom(room);
    }
房间内广播消息
    @OnEvent(ROOM_BROADCAST)
    public void roomBroadcast(SocketIOClient client, String data){
        Message message = JSON.parseObject(data, Message.class);
        roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
    }
    public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
        if (Objects.isNull(excludeClient)){
            socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
        }else {
            socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
        }
    }
全局广播消息
    @OnEvent(GLOBAL_BROADCAST)
    public void globalBroadcast(SocketIOClient client, String data){
        globalBroadcastData(MESSAGE,data,null);
    }
    public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
        if(Objects.isNull(excludeClient)){
            socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
        }else {
            socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
        }
    }
 点对点消息发送

这个需要存储userId和client的对应关系,知道需要发送的目标userId,就能获取到client,然后直接发消息既可

我们在@OnConnect和@DisConnect中来绑定和解除userId和client的关系

客户端

客户端代码我是在网上找的,大家可按自己的方式实现既可

js方式
<!DOCTYPE html>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    <title>SocketIO客户端测试环境</title>
    <base>
    <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
    <script src="https://cdn.bootcss.com/socket.io/2.1.1/socket.io.js"></script>
    <style>
        body {
            padding: 20px;
        }
        #console {
            height: 450px;
            overflow: auto;
        }
        .connect-msg {
            color: green;
        }
        .disconnect-msg {
            color: red;
        }
    </style>
</head>

<body>
<h1>客户端测试环境</h1>
<hr style="height:1px;border:none;border-top:1px solid black;" />

<div style="width: 700px; float: left">
<h3>SocketClient建立连接</h3>
<div style="border: 1px;">

    <label>socketio服务端地址:</label>
    <!--

      http://localhost 服务端ip
                  9999 服务端socket端口(服务端提供)
                test或socketIO 命名空间(可自定义)如果不定义命名空间,默认是/     比如:http://localhost:9999?userId=12345
                userId 用户id参数(可自定义)

       ps:因为我定义了命名空间/test和/socketIO,所以我这里也可以用
          http://localhost:9999/test?userId=12345
          http://localhost:9999/socketIO?userId=12345
         这里我用http://localhost:9999?userId=12345建立连接,因为这里还不涉及到请求不同命名空间的方法
      -->
    <input type="text" id="url" value="ws://localhost:9999?userId=12345" style="width: 500px;">
    <br>
    <br>
    <button id="connect" style="width: 100px;">建立连接</button>
    <button id="disconnect" style="width: 100px;">断开连接</button>


</div>

<hr style="height:1px;border:none;border-top:1px solid black;" />

<h3>SocketClient发送消息</h3>
<div style="border: 1px;">
    <label>socketEvent名称:</label><input type="text" id="socketEvent" value="getUserRooms">
    <br><br>
    <textarea  id="content" maxlength="1000" cols="40" rows="5" placeholder="请输入内容"></textarea>
    <button id="send" style="width: 100px;">发送消息</button>
</div>

<hr style="height:1px;border:none;border-top:1px solid black;" />

</div>
<div style="float: left;margin-left: 50px;">
    <h3>SocketIO互动消息</h3>
    <button id="clean" style="width: 100px;">清理输出</button>
    <div id="console" class="well"></div>
</div>


</body>

<script type="text/javascript">
    var socket ;
    var errorCount = 0;
    var isConnected = false;
    var maxError = 5;

    //连接
    function connect(url) {

        //var opts = {
        //    query: 'userId='+userId
        //};
        //socket = io.connect(url, opts);

        socket = io.connect(url);

        //socket.nsp = "/socketIO";//定义命名空间

        console.log(socket)

        //监听本次连接回调函数
        socket.on('connect', function () {
            isConnected =true;
            console.log("连接成功");
            serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>连接成功</span>');
            errorCount=0;
        });
        //监听消息
        socket.on('message', function (data) {
            output('<span class="connect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + data + ' </span>');
            console.log(data);
        });

        //监听断开
        socket.on('disconnect', function () {
            isConnected =false;
            console.log("连接断开");
            serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '已下线! </span>');
        });
        //监听断开错误
        socket.on('connect_error', function(data){
            serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>;' + '连接错误-'+data+' </span>');
            errorCount++;
            if(errorCount>=maxError){
                socket.disconnect();
            }
        });
        //监听连接超时
        socket.on('connect_timeout', function(data){
            serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '连接超时-'+data+' </span>');
            errorCount++;
            if(errorCount>=maxError){
                socket.disconnect();
            }
        });
        //监听错误
        socket.on('error', function(data){
            serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '系统错误-'+data+' </span>');
            errorCount++;
            if(errorCount>=maxError){
                socket.disconnect();
            }
        });

        /*socket.on('ack', function(data){

            console.log("ack:"+data)
            var str = '消息发送失败';
            if(data==1){
                str = '消息发送成功';
            }
            serverOutput('<span class="connect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + str+' </span>');

        });*/
    }

    function output(message) {
        var element = $("<div>" + " " + message + "</div>");
        $('#console').prepend(element);
    }

    function serverOutput(message) {
        var element = $("<div>" + message + "</div>");
        $('#console').prepend(element);
    }

    //连接
    $("#connect").click(function(){
        if(!isConnected){
            var url =  $("#url").val();
            connect(url);
        }else {
            serverOutput('<span class="disconnect-msg"><font color="blue">'+getNowTime()+'&nbsp;</font>' + '已经成功建立连接,不要重复建立!!! </span>');
        }
    })


    //断开连接
    $("#disconnect").click(function(){
        if(isConnected){
            socket.disconnect();
        }
    })

    //发送消息
    $("#send").click(function(){
        var socketEvent =  $("#socketEvent").val();//自定义的事件名称
        var content  = $("#content").val();//发送的内容
        socket.emit(socketEvent,content,function(data1,data2){
            console.log("ack1:"+data1);
            console.log("ack2:"+data2);
        });
    })

    //清理消息
    $("#clean").click(function(){
        $('#console').html("");
    })

    function getNowTime(){
        var date=new Date();
        var year=date.getFullYear(); //获取当前年份
        var mon=date.getMonth()+1; //获取当前月份
        var da=date.getDate(); //获取当前日
        var h=date.getHours(); //获取小时
        var m=date.getMinutes(); //获取分钟
        var s=date.getSeconds(); //获取秒
        var ms=date.getMilliseconds();
        var d=document.getElementById('Date');
        var date =year+'/'+mon+'/'+da+' '+h+':'+m+':'+s+':'+ms;
        return date;
    }
</script>
</html>

java 实现客户端
package com.corundumstudio.socketio.demo.client;

import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;

import java.net.URISyntaxException;


public class DemoSocketClient {

    public static void main(String[] args) throws URISyntaxException, InterruptedException {
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.reconnectionDelay = 1000;//失败重连的时间间隔
        options.timeout = 500;//连接超时时间(ms)

        final Socket socket = IO.socket("http://localhost:9999/?userId=12345", options);

        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                socket.send("hello");
            }
        });

        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                System.out.println("连接关闭");
            }
        });

        socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                System.out.println("sessionId:" + socket.id());
                for (Object obj : args) {
                    System.out.println(obj);
                }
                System.out.println("收到服务器应答,将要断开连接...");
                socket.disconnect();
            }
        });
        socket.connect();
    }
}

 

生产集群方案

上面的demo是单机版本,但是,生产上服务都是集群存在的,至少也要两台,双节点防止单点故障;可是websocket服务搭建集群会有个问题,就是服务间通信的问题;比如,Aclient连接上Aserver,Bclient连接上了Bserver,这时候,Aclient就没法对Bclient进行发送消息和接受 消息。

这里提供几种解决方案

Redis存储方案

redis是官方推荐的解决方案,这个在Github上可以看到。配置之后,利用redis这个中介,可以通过广播的方式对其他客户端进行通信,底层是用了redis的发布订阅功能。有两种通信方式,可以通过全局广播通知到其他客户端,也可以对加入同一个房间内的客户端进行广播。但是这个有局限性,netty-socketio只提供了广播的redis发布订阅方案,点对点方式并没有提供。

pom依赖
<!--        添加redisson依赖-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.15.4</version>
        </dependency> 
yaml配置
# redis
spring:
  redis:
    host: 121.196.111.111  #地址
    port: 6379      #端口号
    database: 0     #0号库
    password: password     #密码
配置类
// redisson的配置类
@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.port}")
    private Integer port;

    @Value("${spring.redis.database:0}")
    private Integer database;

    @Value("${spring.redis.timeout:3000}")
    private Integer timeout;

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
//        config.useClusterServers().addNodeAddress("redis://").setPassword("");
        config.useSingleServer().setAddress("redis://"+ host +":" + port).setPassword(password).setDatabase(database).setTimeout(timeout);
        RedissonClient redissonClient = Redisson.create(config);
        return redissonClient;
    }
}


// 在SocketIOServer配置上redisson既可使用
    @Autowired
    private RedissonClient redissonClient;
    @Bean
    public SocketIOServer getSocketIOServer(){
        com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
        configuration.setPort(websocketPort);
        configuration.setHostname(host);
        configuration.getSocketConfig().setTcpNoDelay(true);
        configuration.getSocketConfig().setReuseAddress(true);
        // 设置redis存储方式
        configuration.setStoreFactory(new RedissonStoreFactory(redissonClient));
        return new SocketIOServer(configuration);
    }
全局广播和房间内广播
    @OnEvent(ROOM_BROADCAST)
    public void roomBroadcast(SocketIOClient client, String data){
        Message message = JSON.parseObject(data, Message.class);
        roomBroadcastData(MESSAGE,message.getRoom(),message.getData(),null);
    }

    @OnEvent(GLOBAL_BROADCAST)
    public void globalBroadcast(SocketIOClient client, String data){
        globalBroadcastData(MESSAGE,data,null);
    }

    public void globalBroadcastData(String eventKey,Object data,SocketIOClient excludeClient){
        if(Objects.isNull(excludeClient)){
            socketIOServer.getBroadcastOperations().sendEvent(eventKey,data);
        }else {
            socketIOServer.getBroadcastOperations().sendEvent(eventKey,excludeClient,data);
        }
    }

    public void roomBroadcastData(String eventKey,String room,Object data,SocketIOClient excludeClient){
        if (Objects.isNull(excludeClient)){
            socketIOServer.getRoomOperations(room).sendEvent(eventKey,data);
        }else {
            socketIOServer.getRoomOperations(room).sendEvent(eventKey,excludeClient,data);
        }
    }
生产灰度发布

nginx重新加载配置会影响原有的websocket连接吗?可不可以通过更新路由配置实现websocket服务的灰度发布?

经过测试,在更改nginx配置信息后,比如将其中一个服务设置成down,输入nginx  -s reload命令,原有的websocket连接不会受到影响,但是新的连接不会再进入设置为down的服务,随时间推移,连接数就会越来越少,直至为0。因此,我们可以将需要发布的服务设置为down,然后再编写一个获取此服务连接client的数量的接口,当数量为0的时候,代表没有websocket连接了,此时,既可对这台服务器进行发布了。

获取某台服务器连接客户端数量

    @Autowired
    private SocketIOServer socketIOServer;

    @GetMapping("/getClientCount")
    public Object getClientCount(){
        Collection<SocketIOClient> allClients = socketIOServer.getAllClients();
        int size = allClients.size();
        return size;
    }

nginx配置websocket服务路由

这里需要采用hash的路由方式;不知道是框架还是其他的什么原因,客户端会发起三次的普通http请求和一次http升级成websocket的请求,采用其他的路由方式会导致这些请求不到同一个服务器从而导致websocket连接报错。这需要注意一下

worker_processes  1;

events {
    worker_connections  1024;
}

http {
	map $http_upgrade $connection_upgrade {
		default upgrade;
		'' close;
	}
 
    upstream websocket {
    	hash $remote_addr consistent;
        server 127.0.0.1:9999 down;
        server 127.0.0.1:9998;
    }
 
    server {
        listen 80;
        location / {
            proxy_pass http://websocket;
			proxy_http_version 1.1;
			proxy_set_header Upgrade $http_upgrade;
			proxy_set_header Connection "$connection_upgrade";
            #proxy_set_header Host $host;
        }
    }
}

Redis存储-点对点完善

之前的redis方案是netty-websocketio官方提供的集群解决方案,但此方案平没有解决点对点信息传输的问题,我们现在在此基础上还是通过redis的发布订阅来完善点对点信息推送。

当然,你也可以通过mq的方式来完成,个人觉得用redis的话,中小型项目就足够了,成本也低

思路:用户连上服务器,将用户和用户所连接的机器信息(这里指ip和端口号)保存在redis,断开连接则将redis上的记录清除掉;每台服务器订阅当前ip和端口号的topic,知道用户连接到那台服务器之后,发布消息则是发布到这个用户对应服务器ip和端口号的topic,这样,经过redis的订阅和发布,既可完成点对点的通信。(多台服务器分配多个topic方案是为了优化。如果采用一个topic,所有的机器都订阅同一个topic,然后每台机器再去赛选要发送的目标客户端是否连接在此机器上,这种方式会让其他非目标机器都进行一次赛选处理,略微有些影响性能,照成服务器不必要的开销)

ClientHub类

在之前代码的基础上,编写一个clientHub类,用于存储用户连接之前的信息

@Component
@Log4j2
public class ClientHub implements CommandLineRunner {

    @Autowired
    private SocketIOServer socketIOServer;

    @Autowired
    private RedissonClient redissonClient;

    public static final String REDIS_TOPIC = "redis_topic";

    public static final String ONLINE_USER = "online_user_";

    /**
     * 本机存储userId与客户端之间的连接关系
     */
    private static Map<String, SocketIOClient> userIdToClient = new HashMap<>();

    /**
     * 集群模式下,点对点发送,一律采取这个方法,不能直接通过SocketClient发送
     * @param fromUserId
     * @param targetUserId
     * @param key
     * @param data
     */
    public void sendMessageByUserId(String fromUserId,String targetUserId,String key,String data){
        Message message = new Message();
        message.setFromUserId(fromUserId);
        message.setTargetUserId(targetUserId);
        message.setKey(key);
        message.setData(data);

        SocketIOClient socketIOClient = userIdToClient.get(targetUserId);
        // 如果是本机直连的,直接发送,不是则通过redis发布对应的机器订阅
        if(!Objects.isNull(socketIOClient)){
            socketIOClient.sendEvent(message.getKey(),message);
        }else {
            RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + targetUserId);
            if(bucket.isExists()){
                String topic = bucket.get();
                this.pubMessage(topic,message);
            }
        }
    }

    /**
     * 用户连接时调用,存储用户连接之间的信息
     * @param userId
     * @param client
     */
    public void addUser(String userId,SocketIOClient client){
        userIdToClient.put(userId,client);
        // 存储在redis上
        RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
        bucket.set(getRedisTopic());
    }

    /**
     * 用户断开连接时调用,清楚用户和客户端之前的关系
     * @param userId
     */
    public void deleteUser(String userId){
        userIdToClient.remove(userId);
        // 从redis上删除掉
        RBucket<String> bucket = redissonClient.getBucket(ONLINE_USER + userId);
        bucket.delete();
    }

    /**
     * 获取本socket服务的ip和端口号
     * @return
     */
    private String getHostAndPort(){
        String hostAddress = "";
        try {
            hostAddress = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        int port = socketIOServer.getConfiguration().getPort();
        return hostAddress+"_"+port;
    }

    /**
     * 获取这台机器redis订阅的topic
     * @return
     */
    private String getRedisTopic(){
        String hostAndPort = getHostAndPort();
        return REDIS_TOPIC + "_" + hostAndPort;
    }

    /**
     * redis发布消息
     * @param topic
     * @param message
     */
    private void pubMessage(String topic, Message message){
        redissonClient.getTopic(topic).publish(message);
        log.info("pubMessage topic {} data {}",topic,JSON.toJSONString(message));
    }

    /**
     * redis订阅消息
     * @param topic
     */
    public void subMessage(String topic){
        redissonClient.getTopic(topic).addListener(Message.class,(channel,message)->{
            log.info("topic {} submessage {}",topic, JSON.toJSONString(message));
            String userId = message.getTargetUserId();
            SocketIOClient socketIOClient = userIdToClient.get(userId);
            if(Objects.nonNull(socketIOClient)){
                socketIOClient.sendEvent(message.getKey(),JSON.toJSONString(message));
            }
        });
    }

    /**
     * 服务器 对应的组件加载完成后,需要开启redis订阅功能
     * @param args
     * @throws Exception
     */
    @Override
    public void run(String... args) throws Exception {
        String redisTopic = getRedisTopic();
        subMessage(redisTopic);
        log.info("topic {} 已订阅",redisTopic);
    }
}
连接记录

建立连接和断开连接分别进行记录和清除工作

    @Autowired
    private ClientHub clientHub;

    /**
     * 客户端socket连接后触发
     * @param client
     */
    @OnConnect
    public void connect(SocketIOClient client){
        // 获取登录信息,也可以在这里面进行权限判断
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
        String url = client.getHandshakeData().getUrl();
        log.info("url :{}",url);

        // 保存userId和client的对应关系
        clientHub.addUser(userId,client);
    }

    /**
     * 客户端断开服务器时触发
     * @param client
     */
    @OnDisconnect
    public void disConnect(SocketIOClient client){
        // 获取登录信息
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("disConnect userId {}, sessionUuid {}",userId,client.getSessionId());

        // 解除userId和client的对应关系
        clientHub.deleteUser(userId);
    }
点对点发送消息业务使用
     /**
     * 点对点发送消息
     * @param client
     * @param data
     * 注意:如果接受的参数是一个对象,那么前端页面发送数据的时候也必须是一个对象,不能用json字符串代替
     * example:js代码 socket.emit('sendUserData', {data: "你好啊", targetUserId: "2233"});
     */
    @OnEvent(SEND_USER_DATA)
    public void sendUserData(SocketIOClient client, AckRequest ackRequest,Message data){
        // 获取登录信息
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("sendUserData {}",JSON.toJSONString(data));
        clientHub.sendMessageByUserId(userId,data.getTargetUserId(),MESSAGE,data.getData());
    }

 这里只是简单的编写集群的思想,实际情况还有一些问题需要注意和解决,比如当某台机器挂了,这台机器连接的用户数据还是在redis上,其它机器就以为这些用户还是在线,但其实状态不对。这个就需要进行额外的处理了,比如存redis的时候设置过期时间,然后通过定时任务不断在redis上刷新在线用户信息等。

特殊场景集群方案

之前的redis方式,是通用的集群解决方案,基本上大部分的场景都适用,但有些时候业务场景比较特殊,也可以采取特殊的方式解决

特殊场景

用户之前的通信只在同一个房间内,比如视频会议,上课直播,游戏房间之类的,用户之间只和在同一个房间内的人进行通信,不需要对跨房间或者对房间外的用户进行实时消息推送。

思路

将房间看做一个单独的整体,只能分配在某一台服务器上,只要是这个房间的人需要连接服务器,就先获取该房间的服务器地址,客户端直接连上对应服务器,再进入房间,那么,该房间内的所有人都是在同一台服务器上,从而避免了跨服务器之间的通信场景,就相当于单机版。不过这个得手动控制房间的负载均衡,每台websocket服务器的访问路径也要自己控制好,生产的灰度发布也需要自行写代码控制。

spring事件模型

简述事件模型

spring提供的事件模型,其实也相当于一个发布订阅的工具,只不过是在单个程序内进行的,这里可以借助spring的事件,配合上多线程异步,可以更方便地进行websocket功能编码开发。从而达到,主要业务逻辑消息迅速实时推送,附加业务逻辑并发执行。

spring事件是spring自带的功能,不需要再额外引入依赖

定义事件

直接继承抽象类ApplicationEvent,然后自定义属性,自定义的属性用于在事件传播过程中记录自己需要的数据

public class ConnectEvent extends ApplicationEvent {

    private SocketIOClient socketIOClient;

    public ConnectEvent(Object source,SocketIOClient socketIOClient) {
        super(source);
        this.socketIOClient = socketIOClient;
    }

    public SocketIOClient getSocketIOClient() {
        return socketIOClient;
    }
}
事件发布

通过ApplicationEventPublisher的实例既可发布自定义的事件

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @OnConnect
    public void connect(SocketIOClient client){
        // 获取登录信息,也可以在这里面进行权限判断
        String userId = client.getHandshakeData().getSingleUrlParam(USER_ID);
        log.info("connect userId {}, sessionUuid {}",userId,client.getSessionId());
        String url = client.getHandshakeData().getUrl();
        log.info("url :{}",url);
        // 也可以在这一步进行权限鉴定,鉴定不通过就发给客户端一个权限鉴定失败的消息,然后再断开连接
//        client.sendEvent(MESSAGE,"权限不足");
//        client.disconnect();

        // 保存userId和client的对应关系
        clientHub.addUser(userId,client);

        // 发布一个用户连接事件
        applicationEventPublisher.publishEvent(new ConnectEvent(this,client));
    }
事件监听(订阅)

事件监听有两种书写方式,一种是直接实现ApplicationListener接口,泛型里面放入自定义事件

@Component
@Log4j2
public class UserLoginListenerDemo implements ApplicationListener<ConnectEvent> {
    @Override
    public void onApplicationEvent(ConnectEvent event) {
        SocketIOClient socketIOClient = event.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            // 业务代码。。。
            log.info("UserLoginListenerDemo userId {}",userId);
        }
    }
}

 第二中有点类似于controller的方式,在方法上用@EventListener注解来标识事件监听,在方法的参数中传入自定义的事件,并且如果有顺序需求的话,可以通过@Order来注明那个方法先调用(值小的先调用)

@Component
@Log4j2
public class UserLoginListener {

    @EventListener
    @Order(3)
    public void connectListenerLog(ConnectEvent connectEvent){
        SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            log.info("user {} 登录的日志操作",userId);
        }
    }

    @EventListener
    @Order(2)
    public void connectListenerStore(ConnectEvent connectEvent){
        SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            log.info("user {} 登录的行为存表",userId);
        }
    }

    @EventListener
    @Order(1)
    public void connectListenerState(ConnectEvent connectEvent){
        SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            log.info("user {} 登录的状态更改",userId);
        }
    }
引入多线程异步处理

 上面的事件监听的方式是单线程的,如果仅仅是这样,感觉还不如直接定义方法进行调用,事件模型到显得有点多余。我们可以引入线程池和@Async,通过多线程并行处理事件,实现真正的发布订阅

创建一个spring提供的线程池

@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor autoLoginExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(20);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setQueueCapacity(200);
        threadPoolTaskExecutor.setKeepAliveSeconds(60);
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        threadPoolTaskExecutor.setThreadNamePrefix("event-task-");
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

启动类上加上@EnableAsync注解

@SpringBootApplication
@Log4j2
@EnableAsync
public class ClusterEventApplication implements CommandLineRunner, DisposableBean {

 event监听方法上加上@Async表示异步执行

@Component
@Log4j2
public class UserLoginListener {

    @EventListener
//    @Order(3)
    @Async
    public void connectListenerLog(ConnectEvent connectEvent){
        SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            log.info("user {} 登录的日志操作",userId);
        }
    }

    @EventListener
//    @Order(2)
    @Async
    public void connectListenerStore(ConnectEvent connectEvent){
        SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            log.info("user {} 登录的行为存表",userId);
        }
    }

    @EventListener
//    @Order(1)
    @Async
    public void connectListenerState(ConnectEvent connectEvent){
        SocketIOClient socketIOClient = connectEvent.getSocketIOClient();
        String userId = socketIOClient.getHandshakeData().getSingleUrlParam(USER_ID);
        {
            log.info("user {} 登录的状态更改",userId);
        }
    }
}

效果截图:

 

转载请注明出处或者链接地址:https://www.qianduange.cn//article/15996.html
评论
发布的文章

安装Nodejs后,npm无法使用

2024-11-30 11:11:38

大家推荐的文章
会员中心 联系我 留言建议 回顶部
复制成功!