基于Pinia的WebSocket管理与优化实践(实现心跳重连机制,异步发送)

WebSocket作为一种全双工通信协议,允许服务器和客户端之间建立持久的连接,提供了比传统HTTP请求更为高效的数据交换方式。本文将探讨如何使用Pinia状态管理库在Vue应用中优雅地管理和优化WebSocket连接,以实现稳定、高效的实时数据传输。

环境与依赖

  • 环境:Vue.js项目
  • 依赖:Pinia (pnpm install pinia) 和 vant (pnpm install vant)

项目结构与初始化

在项目中,我们创建了一个名为useWebsocketStore的Pinia store,专门用于管理WebSocket相关的状态和行为。该store包含了一系列关键的状态变量和动作,旨在简化WebSocket的连接、消息处理、心跳机制以及重连策略。

WebSocket Store设计

核心状态

  • socket: WebSocket实例。
  • ConnectSuccess: 连接成功标志。
  • messageHandlers: 存储注册的消息处理器数组。
  • deviceIP: 设备的IP地址。
  • reconnectNum: 重连次数。
  • heartbeatMessage: 心跳消息内容。
  • strWs: 发送消息的字符串形式。
  • sendNum: 发送消息计数。
  • messageNum: 接收消息计数。
  • message: 最近接收的消息内容。
    动作详解
  • connectWs: 初始化WebSocket连接,包括建立连接、监听事件、启动心跳,并提供Promise接口确保异步操作的完成。
  • registerMessageHandler & unregisterMessageHandler:允许外部组件注册或注销消息处理器,实现了插拔式的事件处理机制。
  • handleMessageFromWebSocket: 处理从WebSocket接收到的消息,更新内部状态并调用所有注册的消息处理器。
  • handleClose: WebSocket关闭时的处理逻辑,包括重连尝试、状态重置及错误通知。
  • **`sendMessage: 发送消息至WebSocket,包含基本的发送逻辑。
  • sendMessageWithRetry: 异步发送消息,自动处理连接重试,增强了消息发送的健壮性。
  • **sendHeartbeat: 发送心跳消息,维护连接活性。
  • **startHeartbeat&stopHeartbeat: 控制心跳定时器的启动与停止,确保资源的有效利用。

为什么需要心跳机制

心跳机制(Heartbeat Mechanism)主要用于保持网络连接的活跃状态,尤其是在TCP/IP连接中,如HTTP长轮询、WebSocket、TCP长连接等场景中。它通过周期性地发送小量数据(通常称为心跳包)来确认连接的双方仍然存活,进而避免因网络层的超时或中间代理(如负载均衡器、防火墙)的会话超时而导致的连接意外断开。

useWebsocketStore.js

import { defineStore } from 'pinia'

import { showToast } from 'vant'

export const useWebsocketStore = defineStore('websocket', {
  state: () => ({
    socket: null,
    ConnectSuccess: false,
    // 新增一个数组用于存储注册的处理器
    messageHandlers: [],
    deviceIP: null,
    reconnectNum: 0,
    heartbeatMessage: JSON.stringify({ cmd: '1111' }), // 心跳消息内容
    strWs: null,
    sendNum: 0,
    messageNum: 0,
    message: 0
  }),
  getters: {},
  actions: {
    async connectWs(deviceIP) {
      if (!this.socket && !this.ConnectSuccess) {
        console.log('connectWs', deviceIP)
        this.deviceIP = deviceIP
        this.socket = new WebSocket(deviceIP)
        // this.socket.onopen = this.handleOpen
        this.socket.onmessage = this.handleMessageFromWebSocket
        this.socket.onclose = this.handleClose
        // this.socket.onerror = this.handleError

        return new Promise((resolve, reject) => {
          this.socket.onopen = (event) => {
            console.log('WebSocket 连接已建立', event)
            this.ConnectSuccess = true
            this.startHeartbeat()
            resolve()
          }
          this.socket.onerror = (event) => {
            console.error('WebSocket 连接错误', event)
            this.ConnectSuccess = false
            reject(event)
          }
        })
      }
    },
    // 新增方法用于注册消息处理器
    registerMessageHandler(handler) {
      this.messageHandlers.push(handler)
    },
    // 新增方法用于移除消息处理器
    unregisterMessageHandler(handler) {
      this.messageHandlers = this.messageHandlers.filter((h) => h !== handler)
    },
    //
    handleMessageFromWebSocket(event) {
      const message = JSON.parse(event.data)
      console.log('消息', message)
      this.message = message.cmd
      this.messageNum += 1
      // 调用所有注册的处理器
      // console.log(message)
      this.messageHandlers.forEach((handler) => handler(message))
    },
    handleOpen(event) {
      console.log('WebSocket 连接已建立', event)
      if (event.target.readyState === 1) {
        this.ConnectSuccess = true
      }
    },
    handleClose(event) {
      console.error('WebSocket 关闭:', event)
      this.socket = null
      this.ConnectSuccess = false
      this.sendNum = 0
      this.messageNum = 0
      this.stopHeartbeat()
      // 添加自动重连逻辑
      this.reconnectWs()
        .then(() => {
          this.ConnectSuccess = true
          showToast('WebSocket 重连成功')
        })
        .catch((error) => {
          this.ConnectSuccess = false
          showToast('最终重连失败', error)
        })
    },
    handleError(error) {
      console.error('WebSocket 错误:', error)

      this.ConnectSuccess = false
    },
    async reconnectWs() {
      return new Promise(async (resolve, reject) => {
        this.reconnectNum++
        try {
          console.log(`尝试重新连接... (${this.reconnectNum})`)
          await this.connectWs(this.deviceIP)
          if (this.socket && this.ConnectSuccess) {
            resolve() // 连接成功,停止重试
          } else {
            throw new Error('连接失败')
          }
        } catch (error) {
          this.ConnectSuccess = false
          console.log(`重连WebSocket失败,稍后重试... (${this.reconnectNum})`)
          reject()
        }
      })
    },

    sendMessage(message) {
      // 发送消息逻辑...
      if (this.socket.readyState === WebSocket.OPEN) {
        // showToast('确定发送', message)
        this.strWs = JSON.parse(message).cmd
        this.sendNum += 1
        this.socket.send(message)
      } else {
        // 如果在此之后仍然无法发送,可以选择抛出错误或继续等待(根据需求)
        showToast('WebSocket 尚未连接')
        console.error('即使重试后,WebSocket仍无法发送消息')
      }
    },
    // 在actions中添加一个新的异步发送方法
    async sendMessageWithRetry(message) {
      if (!this.ConnectSuccess) {
        // 如果连接未建立,尝试重新连接
        try {
          await this.reconnectWs()
        } catch (error) {
          console.error('尝试重新连接WebSocket失败', error)
          throw error
        }
      }
      this.sendMessage(message)
    },

    // 发送心跳消息的方法
    sendHeartbeat() {
      this.sendMessageWithRetry(this.heartbeatMessage)
    },
    // 启动心跳定时器
    startHeartbeat() {
      this.heartbeatInterval = setInterval(() => {
        this.sendHeartbeat()
      }, 5000) // 每隔3秒发送一次心跳
    },

    // 如果需要在适当的时候清理心跳定时器,比如在断开连接时
    stopHeartbeat() {
      if (this.heartbeatInterval) {
        clearInterval(this.heartbeatInterval)
        this.heartbeatInterval = null
      }
    }
  }
})

.vue 文件中使用

useWebsocket.connectWs(deviceIP) 连接socket 只需要连接一次,可以放在main.js 里面初始化的时候连接

<script setup>
import {  onMounted, onUnmounted,  } from 'vue'

import { useWebsocketStore } from '@/stores/useWebsocketStore'
const useWebsocket = useWebsocketStore()

const deviceIP='ws:192.168.89'
const handleMessage= (message)=>{
  // 外部处理消息逻辑
}
const init = () => {
  useWebsocket.connectWs(deviceIP) // 
  useWebsocket.registerMessageHandler(handleMessage)
}
init()

onUnmounted(() => {
  // 组件卸载时移除消息处理器
  useWebsocket.unregisterMessageHandler(handleMessage)
 
})

相关推荐

  1. uniapp封装websocket以及心跳检测、

    2024-07-12 04:12:01       32 阅读
  2. vue封装websocket以及心跳检测、

    2024-07-12 04:12:01       31 阅读
  3. WebSocket 断网心跳检测功能封装

    2024-07-12 04:12:01       27 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-12 04:12:01       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-12 04:12:01       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-12 04:12:01       58 阅读
  4. Python语言-面向对象

    2024-07-12 04:12:01       69 阅读

热门阅读

  1. WVP后端项目文件结构

    2024-07-12 04:12:01       31 阅读
  2. 贪心算法-以学籍管理系统为例

    2024-07-12 04:12:01       26 阅读
  3. RISC-V主要指令集介绍及规则

    2024-07-12 04:12:01       28 阅读
  4. 【ChatGPT】全面解析 ChatGPT:从起源到未来

    2024-07-12 04:12:01       22 阅读
  5. 代码随想录算法训练营第9天

    2024-07-12 04:12:01       25 阅读
  6. 担心插座预留的不够用,家里装修留多少开关插座

    2024-07-12 04:12:01       19 阅读
  7. Vue路由传参和接参如何实现

    2024-07-12 04:12:01       26 阅读
  8. android轮播图入门2——触摸停止与指示器

    2024-07-12 04:12:01       24 阅读
  9. Symfony 是一个用于构建PHP的框架

    2024-07-12 04:12:01       26 阅读