20240603每日通信--------springboot使用netty-socketio集成即时通信WebSocket

简单效果图

群聊,私聊,广播都可以支持。
在这里插入图片描述

基础概念:

POM文件:

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spring-boot-demo-websocket-socketio</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-boot-demo-websocket-socketio</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>com.xkcoding</groupId>
        <artifactId>spring-boot-demo</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <netty-socketio.version>1.7.16</netty-socketio.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>${netty-socketio.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <build>
        <finalName>spring-boot-demo-websocket-socketio</finalName>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

websocket服务器配置

/**
 * <p>
 * websocket服务器配置
 * </p>
 */
@Configuration
@EnableConfigurationProperties({WsConfig.class})
public class ServerConfig {

    @Bean
    public SocketIOServer server(WsConfig wsConfig) {
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setHostname(wsConfig.getHost());
        config.setPort(wsConfig.getPort());

        //这个listener可以用来进行身份验证
        config.setAuthorizationListener(data -> {
            // http://localhost:8081?token=xxxxxxx
            // 例如果使用上面的链接进行connect,可以使用如下代码获取用户密码信息,本文不做身份验证
            String token = data.getSingleUrlParam("token");
            // 校验token的合法性,实际业务需要校验token是否过期等等,参考 spring-boot-demo-rbac-security 里的 JwtUtil
            // 如果认证不通过会返回一个 Socket.EVENT_CONNECT_ERROR 事件
            return StrUtil.isNotBlank(token);
        });

        return new SocketIOServer(config);
    }

    /**
     * Spring 扫描自定义注解
     */
    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer server) {
        return new SpringAnnotationScanner(server);
    }
}

核心事件处理类

/**
 * <p>
 * 消息事件处理
 * </p>
 */
@Component
@Slf4j
public class MessageEventHandler {
    @Autowired
    private SocketIOServer server;

    @Autowired
    private DbTemplate dbTemplate;

    /**
     * 添加connect事件,当客户端发起连接时调用
     *
     * @param client 客户端对象
     */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        if (client != null) {
            String token = client.getHandshakeData().getSingleUrlParam("token");
            // 模拟用户id 和token一致
            String userId = client.getHandshakeData().getSingleUrlParam("token");
            UUID sessionId = client.getSessionId();

            dbTemplate.save(userId, sessionId);
            log.info("连接成功,【token】= {},【sessionId】= {}", token, sessionId);
        } else {
            log.error("客户端为空");
        }
    }

    /**
     * 添加disconnect事件,客户端断开连接时调用,刷新客户端信息
     *
     * @param client 客户端对象
     */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        if (client != null) {
            String token = client.getHandshakeData().getSingleUrlParam("token");
            // 模拟用户id 和token一致
            String userId = client.getHandshakeData().getSingleUrlParam("token");
            UUID sessionId = client.getSessionId();

            dbTemplate.deleteByUserId(userId);
            log.info("客户端断开连接,【token】= {},【sessionId】= {}", token, sessionId);
            client.disconnect();
        } else {
            log.error("客户端为空");
        }
    }

    /**
     * 加入群聊
     *
     * @param client  客户端
     * @param request 请求
     * @param data    群聊
     */
    @OnEvent(value = Event.JOIN)
    public void onJoinEvent(SocketIOClient client, AckRequest request, JoinRequest data) {
        log.info("用户:{} 已加入群聊:{}", data.getUserId(), data.getGroupId());
        client.joinRoom(data.getGroupId());

        server.getRoomOperations(data.getGroupId()).sendEvent(Event.JOIN, data);
    }


    @OnEvent(value = Event.CHAT)
    public void onChatEvent(SocketIOClient client, AckRequest request, SingleMessageRequest data) {
        Optional<UUID> toUser = dbTemplate.findByUserId(data.getToUid());
        if (toUser.isPresent()) {
            log.info("用户 {} 刚刚私信了用户 {}:{}", data.getFromUid(), data.getToUid(), data.getMessage());
            sendToSingle(toUser.get(), data);
            client.sendEvent(Event.CHAT_RECEIVED, "发送成功");
        } else {
            client.sendEvent(Event.CHAT_REFUSED, "发送失败,对方不想理你");
        }
    }

    @OnEvent(value = Event.GROUP)
    public void onGroupEvent(SocketIOClient client, AckRequest request, GroupMessageRequest data) {
        Collection<SocketIOClient> clients = server.getRoomOperations(data.getGroupId()).getClients();

        boolean inGroup = false;
        for (SocketIOClient socketIOClient : clients) {
            if (ObjectUtil.equal(socketIOClient.getSessionId(), client.getSessionId())) {
                inGroup = true;
                break;
            }
        }
        if (inGroup) {
            log.info("群号 {} 收到来自 {} 的群聊消息:{}", data.getGroupId(), data.getFromUid(), data.getMessage());
            sendToGroup(data);
        } else {
            request.sendAckData("请先加群!");
        }
    }

    /**
     * 单聊
     */
    public void sendToSingle(UUID sessionId, SingleMessageRequest message) {
        server.getClient(sessionId).sendEvent(Event.CHAT, message);
    }

    /**
     * 广播
     */
    public void sendToBroadcast(BroadcastMessageRequest message) {
        log.info("系统紧急广播一条通知:{}", message.getMessage());
        for (UUID clientId : dbTemplate.findAll()) {
            if (server.getClient(clientId) == null) {
                continue;
            }
            server.getClient(clientId).sendEvent(Event.BROADCAST, message);
        }
    }

    /**
     * 群聊
     */
    public void sendToGroup(GroupMessageRequest message) {
        server.getRoomOperations(message.getGroupId()).sendEvent(Event.GROUP, message);
    }
}

websocket 服务器启动类

/**
 * <p>
 * websocket服务器启动
 * </p>
 *
 * @author yangkai.shen
 * @date Created in 2018-12-18 17:07
 */
@Component
@Slf4j
public class ServerRunner implements CommandLineRunner {
    @Autowired
    private SocketIOServer server;

    @Override
    public void run(String... args) {
        server.start();
        log.info("websocket 服务器启动成功。。。");
    }
}

最近更新

  1. TCP协议是安全的吗?

    2024-06-08 04:44:01       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-08 04:44:01       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-08 04:44:01       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-08 04:44:01       18 阅读

热门阅读

  1. Android面试题汇总-Handler

    2024-06-08 04:44:01       11 阅读
  2. Mybatis面试系列五

    2024-06-08 04:44:01       9 阅读
  3. Vue3响应式基础——ref()和reactive()

    2024-06-08 04:44:01       7 阅读
  4. Vue封装localStorage设置过期时间

    2024-06-08 04:44:01       8 阅读
  5. 使用 Ant Design Vue 实现动态表头与数据填充

    2024-06-08 04:44:01       9 阅读
  6. learn-vue中template根节点元素Div

    2024-06-08 04:44:01       8 阅读
  7. 2024全国高考作文题解读(文心一言 4.0版本)

    2024-06-08 04:44:01       11 阅读
  8. el-select中下拉数据太多,页面卡顿

    2024-06-08 04:44:01       10 阅读
  9. SEO 中域权限和页面权限之间的区别

    2024-06-08 04:44:01       7 阅读
  10. 如何不用命令创建用户

    2024-06-08 04:44:01       8 阅读
  11. 文件大小格式化为易读的字符串

    2024-06-08 04:44:01       9 阅读
  12. Scalable Membership Inference Attacks via Quantile Regression

    2024-06-08 04:44:01       8 阅读
  13. size of the Undo tablespace metalink 提供的 crontab

    2024-06-08 04:44:01       8 阅读