基于SpringBoot实现WebSocket实时通讯的服务端和客户端

实现功能

服务端注册的客户端的列表;服务端向客户端发送广播消息;服务端向指定客户端发送消息;服务端向多个客户端发送消息;客户端给服务端发送消息;
效果:
请添加图片描述在这里插入图片描述

环境

jdk:1.8
SpringBoot:2.4.17

服务端

1.引入依赖:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2.在启动类上加上开启WebSocket的注解

@EnableWebSocket

3.配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * date created : Created in 2024/3/18 16:57
 * description  : WebSocketConfig 主要解决使用了@ServerEndpoint注解的websocket endpoint不被springboot扫描到的问题
 * class name   : WebSocketConfig
 */
@Configuration
public class WebSocketConfig {

    /**
     * 注入ServerEndpointExporter,
     * 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }


}

4.服务端实现

/**
 * date created : Created in 2024/3/18 16:31
 * description  : 服务端实现,方法的封装
 * class name   : WebSocketServer
 */
@Component
@Slf4j
@ServerEndpoint("/websocket/{applicationName}")
public class WebSocketServer {
    //与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    // 应用名称
    private String applicationName;

    //虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean,所以可以用一个静态set保存起来。
    private static final CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
    // 用来存在线连接用户信息
    private static final ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();


    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "applicationName") String applicationName) {
        try {
            this.session = session;
            this.applicationName = applicationName;
            webSockets.add(this);
            sessionPool.put(applicationName, session);
            log.info("【websocket消息】有新的连接,总数为:" + webSockets.size());
            log.info("【当前客户端列表】:"+ sessionPool.keySet());
        } catch (Exception e) {
        }
    }

    /**
     * description : 有连接断开之后的处理方法
     * method name : onClose
     * param       : []
     * return      : void
     */
    @OnClose
    public void onClose() {
        try {
            webSockets.remove(this);
            sessionPool.remove(this.applicationName);
            log.info("【websocket消息】连接断开,总数为:" + webSockets.size());
            log.info("【当前客户端列表】:"+ sessionPool.keySet());
        } catch (Exception e) {
        }
    }

    /**
     * description : 收到客户端消息的处理方法
     * method name : onMessage
     * param       : [message]
     * return      : void
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【websocket消息】收到客户端消息:" + message);
    }

    /**
     * description : 错误处理
     * method name : onError
     * param       : [session, error]
     * return      : void
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误,原因:" + error.getMessage());
        error.printStackTrace();
    }


    /**
     * description : 广播消息 给所有注册的客户端发送消息
     * method name : sendBroadcastMessage
     * param       : [message]
     * return      : void
     */
    public void sendBroadcastMessage(String message) {
        log.info("【websocket消息】广播消息:" + message);
        for (WebSocketServer webSocket : webSockets) {
            try {
                if (webSocket.session.isOpen()) {
                    webSocket.session.getAsyncRemote().sendText(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * description : 给指定的客户端发送消息
     * method name : sendApplicationMessage
     * param       : [applicationName 客户端的应用名称, message 要发送的消息]
     * return      : void
     */
    public void sendApplicationMessage(String applicationName, String message) {
        Session session = sessionPool.get(applicationName);
        if (session != null && session.isOpen()) {
            try {
                log.info("【websocket消息】 单点消息:" + message);
                session.getAsyncRemote().sendText(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * description : 给多个客户端发送消息
     * method name : sendMassApplicationMessage
     * param       : [applicationNames 注册的客户端的应用名称, message 要发送的消息]
     * return      : void
     */
    public void sendMassApplicationMessage(String[] applicationNames, String message) {
        for (String userId : applicationNames) {
            Session session = sessionPool.get(userId);
            if (session != null && session.isOpen()) {
                try {
                    log.info("【websocket消息】 单点消息:" + message);
                    session.getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

    }

}

客户端

1.客户端配置

yaml文件的末尾添加

# websocket的配置
websocket:
  host: localhost
  port: 19022
  prefix: websocket

2.客户端配置类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * date created : Created in 2024/3/19 14:36
 * description  : 注入配置文件中的参数 并生成服务端的对应的url
 * class name   : WebSocketProperties
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Component
@ConfigurationProperties(prefix = "websocket")
@Configuration
public class WebSocketProperties {
    @Value("${spring.application.name}")
    String appName;
    String host;
    String port;
    String prefix;
    public String getUrl() {
        return String.format("ws://%s:%s/%s/%s", host, port, prefix,appName);
    }
}

3.客户端实现

import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.stereotype.Component;

import javax.websocket.ClientEndpoint;
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * date created : Created in 2024/3/18 16:36
 * description  : 客户端接收服务端的实时消息、发送消息等方法的封装
 * class name   : WebSocketClient
 */


@ClientEndpoint
@AutoConfigureBefore(WebSocketProperties.class)
@Component
@Import(WebSocketProperties.class)
@Configuration
public class WebSocketClient {


    private Session session;

    public WebSocketClient() {
        try {
            WebSocketProperties webSocketProperties = SpringUtils.getBean(WebSocketProperties.class);
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            container.connectToServer(this, new URI(webSocketProperties.getUrl()));
        } catch (DeploymentException | URISyntaxException | IOException e) {
            e.printStackTrace();
        }
    }

    @OnOpen
    public void onOpen(Session session) {
        this.session = session;
        System.out.println("Connected to server");
    }

    @OnMessage
    public String onMessage(String message) {
        System.out.println("来自WebSocket的消息: " + message);
        return message;
    }

    @OnClose
    public void onClose() {
        System.out.println("Disconnected from server");
    }

    public void register() {
        try {
            session.getBasicRemote().sendText("register");
            System.out.println("Registered with server");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void unregister() {
        try {
            session.getBasicRemote().sendText("unregister");
            System.out.println("Unregistered from server");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }



}

使用@Autowired注入配置类无法注入,使用工具类获取,工具类:

* Copyright (c) 2020 pig4cloud Authors. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringUtils implements ApplicationContextAware {
	private static ApplicationContext context;

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		context = applicationContext;
	}

	public static Object getBean(String name) {
		return context.getBean(name);
	}

	public static <T> T getBean(Class<T> clazz) {
		return context.getBean(clazz);
	}

	public static <T> T getBean(String name, Class<T> clazz) {
		return context.getBean(name, clazz);
	}
}

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-03-21 07:10:02       14 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-21 07:10:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-21 07:10:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-21 07:10:02       18 阅读

热门阅读

  1. Hive自定义UpperGenericUDF函数

    2024-03-21 07:10:02       18 阅读
  2. 3.19号arm

    2024-03-21 07:10:02       17 阅读
  3. 將SqlServer表創建到hive腳本

    2024-03-21 07:10:02       20 阅读
  4. html 转pdf

    2024-03-21 07:10:02       15 阅读
  5. 【讲解Node.js常用的命令】进阶版

    2024-03-21 07:10:02       17 阅读
  6. 富格林:正视安全平台阻挠亏损

    2024-03-21 07:10:02       19 阅读
  7. Wpf-自定义状态控件

    2024-03-21 07:10:02       17 阅读