引入Maven依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>跟随spingboot版本</version>
</dependency>
后端代码
/**
* 开启WebSocket支持
*/
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
@Component
@Slf4j
@ServerEndpoint("/demand/task/webSocket/{taskId}") // 前端请求URL
public class TaskWebSocketServer {
/**
* 保存每个需求任务对应的服务对象
*/
private static CopyOnWriteArraySet<TaskWebSocketServer> TASK_CACHE = new CopyOnWriteArraySet<>();
private Session session;
private Long taskId;
private static DemandTestTaskService demandTestTaskService;
/**
* 注入依赖业务处理服务
*/
@Autowired
public void setSunPurchasePayService(DemandTestTaskService demandTestTaskService) {
this.demandTestTaskService = demandTestTaskService;
}
public List<TaskWebSocketServer> getTaskSocketServerList(){
List<TaskWebSocketServer> serverList = new ArrayList<>(TASK_CACHE.size());
TASK_CACHE.forEach(server -> serverList.add(server));
return serverList;
}
public Long getTaskId(){
return taskId;
}
public boolean userExist(String userId, String deviceId){
if (CollectionUtils.isEmpty(pulsarList)){
log.info("任务列表为空,请先创建任务 userId:{} deviceId:{}", userId, deviceId);
return false;
}
for (DemandTaskPulsarBO pulsarBO: pulsarList){
if (Long.valueOf(userId).equals(pulsarBO.getUserId())){
return true;
}
if (deviceId.equals(pulsarBO.getDeviceId())){
return true;
}
}
return false;
}
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "taskId") Long taskId) {
if (TASK_CACHE.size() > 10){
throw new BusinessException(CodeEnum.FAIL.getCode(), "测试任务已达到最大上限10个,请稍后重试");
}
if (this.taskId != null && this.taskId.equals(taskId)){
log.info("web socket reconnection taskId:{}", taskId);
}
this.taskId = taskId;
// TODO 补偿你的业务逻辑
// 设置会话超时时间 30 * 60 * 1000
session.setMaxIdleTimeout(1800000L);
this.session = session;
TASK_CACHE.add(this);
try {
session.getBasicRemote().sendText("connect success. taskId=" + taskId);
log.info("web socket connect success taskId:{} pulsarList:{}", taskId, JacksonUtil.toJSONString(pulsarBOS));
} catch (IOException e) {
log.error("websocket IO Exception");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
try{
TASK_CACHE.remove(this);
// TODO 补充关闭连接的逻辑
}catch (Exception e){
log.error("web socket closed error taskId:{}", taskId, e);
}
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
// 高并发情况下,使用websocket出现报错的问题
synchronized (this.session){
this.session.getBasicRemote().sendText(message);
}
}
}
前端代码
<script setup>
import { reactive, toRefs, onBeforeUnmount, onMounted, getCurrentInstance } from 'vue'
import { ElMessage, ElMessageBox } from 'element-plus'
const Env = import.meta.env.VITE_API_ENV
const { proxy } = getCurrentInstance()
const { $axios, $store } = proxy
let ws = {}
let heartTime = null // 心跳定时器实例
let socketHeart = 0 // 心跳次数
let HeartTimeOut = 5000 // 心跳超时时间
let socketError = 0 // 错误次数
const _data = reactive({
tableHeight: '488px',
demandId: '',
pointData: {},
deviceId: '',
userId: '',
isContent: false,
list: [],
taskId: '',
testData: {},
isHandStop: false,
testList: [],
testAppKey: ''
})
onMounted(() => {})
const initWebSocket = taskId => {
_data.isHandStop = false
let url = MakeWss(taskId)
ws = new WebSocket(url)
ws.onopen = function (e) {
_data.isContent = true
console.log(e)
}
ws.onmessage = function (e) {
console.log(e, e.data)
if (e.data.indexOf('connect') == -1) {
let dataList = JSON.parse(e.data)
_data.list.push(dataList)
changePointStatus(dataList)
} else {
resetHeart()
}
}
ws.onerror = function (e) {
console.log(e)
reconnect()
}
ws.onclose = function (e) {
console.log(e)
_data.isContent = false
if (_data.isHandStop == false) {
reconnect()
}
}
}
// socket 重置心跳
const resetHeart = () => {
socketHeart = 0
socketError = 0
clearInterval(heartTime)
sendSocketHeart()
}
// socket心跳发送
const sendSocketHeart = () => {
heartTime = setInterval(() => {
console.log('心跳发送:', socketHeart)
ws.send(
JSON.stringify({
content: '',
requestId: 'aa9872be-d5b9-478e-aba4-50527cd3ef32',
type: 'heartbeat'
})
)
socketHeart = socketHeart + 1
}, HeartTimeOut)
}
// socket重连
const reconnect = () => {
if (socketError <= 2) {
clearInterval(heartTime)
initWebSocket(_data.taskId)
socketError = socketError + 1
console.log('socket重连', socketError)
} else {
console.log('重试次数已用完的逻辑', socketError)
clearInterval(heartTime)
}
}
function stopTest() {
_data.isHandStop = true
clearInterval(heartTime)
ws.close()
_data.isContent = false
$axios.get(`/user/detail`, { demandId: _data.demandId, taskId: _data.taskId }).then(res => {
if (res.success && res.data) {
_data.pointData = res.data
_data.testList = res.data.pointList
}
})
}
function clearTestList() {
_data.list = []
}
function handleResize() {
_data.tableHeight = document.documentElement.clientHeight - 230 + 'px'
}
window.addEventListener('resize', handleResize)
handleResize()
onBeforeUnmount(() => {
window.removeEventListener('resize', handleResize)
})
</script>