WebSocket
作为一种全双工通信协议,允许服务器和客户端之间建立持久的连接,提供了比传统HTTP请求更为高效的数据交换方式。本文将探讨如何使用Pinia状态管理库在Vue应用中优雅地管理和优化WebSocket连接,以实现稳定、高效的实时数据传输。
环境与依赖
- 环境:Vue.js项目
- 依赖:Pinia (pnpm install pinia) 和 vant (pnpm install vant)
项目结构与初始化
在项目中,我们创建了一个名为useWebsocketStore的Pinia store,专门用于管理WebSocket相关的状态和行为。该store包含了一系列关键的状态变量和动作,旨在简化WebSocket的连接、消息处理、心跳机制以及重连策略。
WebSocket Store设计
核心状态
- socket: WebSocket实例。
- ConnectSuccess: 连接成功标志。
- messageHandlers: 存储注册的消息处理器数组。
- deviceIP: 设备的IP地址。
- reconnectNum: 重连次数。
- heartbeatMessage: 心跳消息内容。
- strWs: 发送消息的字符串形式。
- sendNum: 发送消息计数。
- messageNum: 接收消息计数。
- message: 最近接收的消息内容。
动作详解 connectWs
: 初始化WebSocket连接,包括建立连接、监听事件、启动心跳,并提供Promise接口确保异步操作的完成。registerMessageHandler
&unregisterMessageHandler
:允许外部组件注册或注销消息处理器,实现了插拔式的事件处理机制。handleMessageFromWebSocket
: 处理从WebSocket接收到的消息,更新内部状态并调用所有注册的消息处理器。handleClose
: WebSocket关闭时的处理逻辑,包括重连尝试、状态重置及错误通知。- **`sendMessage: 发送消息至WebSocket,包含基本的发送逻辑。
sendMessageWithRetry
: 异步发送消息,自动处理连接重试,增强了消息发送的健壮性。- **sendHeartbeat: 发送心跳消息,维护连接活性。
- **
startHeartbeat
&stopHeartbeat
: 控制心跳定时器的启动与停止,确保资源的有效利用。
为什么需要心跳机制
心跳机制(Heartbeat Mechanism)主要用于保持网络连接的活跃状态,尤其是在TCP/IP连接中,如HTTP长轮询、WebSocket、TCP长连接等场景中。它通过周期性地发送小量数据(通常称为心跳包)来确认连接的双方仍然存活,进而避免因网络层的超时或中间代理(如负载均衡器、防火墙)的会话超时而导致的连接意外断开。
useWebsocketStore.js
import { defineStore } from 'pinia'
import { showToast } from 'vant'
export const useWebsocketStore = defineStore('websocket', {
state: () => ({
socket: null,
ConnectSuccess: false,
// 新增一个数组用于存储注册的处理器
messageHandlers: [],
deviceIP: null,
reconnectNum: 0,
heartbeatMessage: JSON.stringify({ cmd: '1111' }), // 心跳消息内容
strWs: null,
sendNum: 0,
messageNum: 0,
message: 0
}),
getters: {},
actions: {
async connectWs(deviceIP) {
if (!this.socket && !this.ConnectSuccess) {
console.log('connectWs', deviceIP)
this.deviceIP = deviceIP
this.socket = new WebSocket(deviceIP)
// this.socket.onopen = this.handleOpen
this.socket.onmessage = this.handleMessageFromWebSocket
this.socket.onclose = this.handleClose
// this.socket.onerror = this.handleError
return new Promise((resolve, reject) => {
this.socket.onopen = (event) => {
console.log('WebSocket 连接已建立', event)
this.ConnectSuccess = true
this.startHeartbeat()
resolve()
}
this.socket.onerror = (event) => {
console.error('WebSocket 连接错误', event)
this.ConnectSuccess = false
reject(event)
}
})
}
},
// 新增方法用于注册消息处理器
registerMessageHandler(handler) {
this.messageHandlers.push(handler)
},
// 新增方法用于移除消息处理器
unregisterMessageHandler(handler) {
this.messageHandlers = this.messageHandlers.filter((h) => h !== handler)
},
//
handleMessageFromWebSocket(event) {
const message = JSON.parse(event.data)
console.log('消息', message)
this.message = message.cmd
this.messageNum += 1
// 调用所有注册的处理器
// console.log(message)
this.messageHandlers.forEach((handler) => handler(message))
},
handleOpen(event) {
console.log('WebSocket 连接已建立', event)
if (event.target.readyState === 1) {
this.ConnectSuccess = true
}
},
handleClose(event) {
console.error('WebSocket 关闭:', event)
this.socket = null
this.ConnectSuccess = false
this.sendNum = 0
this.messageNum = 0
this.stopHeartbeat()
// 添加自动重连逻辑
this.reconnectWs()
.then(() => {
this.ConnectSuccess = true
showToast('WebSocket 重连成功')
})
.catch((error) => {
this.ConnectSuccess = false
showToast('最终重连失败', error)
})
},
handleError(error) {
console.error('WebSocket 错误:', error)
this.ConnectSuccess = false
},
async reconnectWs() {
return new Promise(async (resolve, reject) => {
this.reconnectNum++
try {
console.log(`尝试重新连接... (${this.reconnectNum})`)
await this.connectWs(this.deviceIP)
if (this.socket && this.ConnectSuccess) {
resolve() // 连接成功,停止重试
} else {
throw new Error('连接失败')
}
} catch (error) {
this.ConnectSuccess = false
console.log(`重连WebSocket失败,稍后重试... (${this.reconnectNum})`)
reject()
}
})
},
sendMessage(message) {
// 发送消息逻辑...
if (this.socket.readyState === WebSocket.OPEN) {
// showToast('确定发送', message)
this.strWs = JSON.parse(message).cmd
this.sendNum += 1
this.socket.send(message)
} else {
// 如果在此之后仍然无法发送,可以选择抛出错误或继续等待(根据需求)
showToast('WebSocket 尚未连接')
console.error('即使重试后,WebSocket仍无法发送消息')
}
},
// 在actions中添加一个新的异步发送方法
async sendMessageWithRetry(message) {
if (!this.ConnectSuccess) {
// 如果连接未建立,尝试重新连接
try {
await this.reconnectWs()
} catch (error) {
console.error('尝试重新连接WebSocket失败', error)
throw error
}
}
this.sendMessage(message)
},
// 发送心跳消息的方法
sendHeartbeat() {
this.sendMessageWithRetry(this.heartbeatMessage)
},
// 启动心跳定时器
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
this.sendHeartbeat()
}, 5000) // 每隔3秒发送一次心跳
},
// 如果需要在适当的时候清理心跳定时器,比如在断开连接时
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval)
this.heartbeatInterval = null
}
}
}
})
.vue 文件中使用
useWebsocket.connectWs(deviceIP) 连接socket 只需要连接一次,可以放在main.js 里面初始化的时候连接
<script setup>
import { onMounted, onUnmounted, } from 'vue'
import { useWebsocketStore } from '@/stores/useWebsocketStore'
const useWebsocket = useWebsocketStore()
const deviceIP='ws:192.168.89'
const handleMessage= (message)=>{
// 外部处理消息逻辑
}
const init = () => {
useWebsocket.connectWs(deviceIP) //
useWebsocket.registerMessageHandler(handleMessage)
}
init()
onUnmounted(() => {
// 组件卸载时移除消息处理器
useWebsocket.unregisterMessageHandler(handleMessage)
})