正在加载文档...
文档内容较大,正在处理中,请稍候
正在加载文档...
文档内容较大,正在处理中,请稍候
本文档详细介绍了基于 SSE(Server-Sent Events)技术的前后端实时通信实现方案,包括服务端连接管理、客户端连接处理、消息推送机制以及完整的使用示例。本方案提供了高效、可靠的单向实时通信能力,适用于实时通知、消息推送、状态同步等场景。
┌─────────────────────────────────────────────────────────────┐
│ 客户端 (React) │
├─────────────────────────────────────────────────────────────┤
│ SSEProvider (Context) │
│ ├── useSSE (Hook) │
│ │ ├── EventSourcePolyfill │
│ │ ├── 自动重连机制 │
│ │ └── 心跳检测 │
│ └── useSSEEvents (Hook) │
│ ├── 事件处理 (loginout, error, heartbeat) │
│ └── 通知显示 (notice, announcement) │
└─────────────────────────────────────────────────────────────┘
│
│ HTTP/SSE
│
┌─────────────────────────────────────────────────────────────┐
│ 服务端 (Node.js) │
├─────────────────────────────────────────────────────────────┤
│ Express Router │
│ ├── /sse/connect/auth (认证连接) │
│ ├── /sse/send (发送消息) │
│ ├── /sse/broadcast (广播消息) │
│ └── /sse/stats (统计信息) │
│ │
│ SSEController │
│ ├── connectAuthenticated │
│ ├── sendToUser │
│ └── broadcast │
│ │
│ SSEManager (单例) │
│ ├── connections (Map<connectionId, connection>) │
│ ├── userConnections (Map<userId, Set<connectionId>>) │
│ ├── createConnection │
│ ├── sendMessage │
│ ├── sendToUser │
│ ├── broadcast │
│ └── checkHeartbeats │
└─────────────────────────────────────────────────────────────┘客户端请求连接
│
▼
服务端认证 (JWT)
│
▼
创建 SSE 连接
│
├── 发送欢迎消息 (welcome)
├── 启动心跳机制 (heartbeat)
└── 维护连接状态
│
▼
服务端推送消息
│
├── 向特定用户推送 (sendToUser)
├── 广播消息 (broadcast)
└── 系统事件 (loginout, notice, announcement)
│
▼
客户端接收并处理
│
├── 事件订阅 (subscribe)
├── 自动重连 (reconnect)
└── UI 更新SSEManager 是整个系统的核心,负责管理所有 SSE 连接。
class SSEManager {
connections = new Map(); // 连接ID -> 连接对象
userConnections = new Map(); // 用户ID -> 连接ID集合
connectionUsers = new Map(); // 连接ID -> 用户ID
options = {
heartbeatInterval: 30000, // 心跳间隔(30秒)
defaultEventType: "message", // 默认事件类型
connectionTimeout: 120000, // 连接超时(2分钟)
};
}创建连接
createConnection(req, res, (userId = null), (options = {}));Content-Type: text/event-stream)open 事件)发送消息
sendMessage(connectionId, message);
sendToUser(userId, message);
broadcast(message, (filterFn = null));sendMessage:向指定连接发送消息sendToUser:向指定用户的所有连接发送消息broadcast:向所有连接广播消息(支持过滤器)连接管理
closeConnection(connectionId);
closeUserConnections(userId);
checkHeartbeats();
cleanupInactiveConnections();closeConnection:关闭指定连接closeUserConnections:关闭用户的所有连接checkHeartbeats:检查所有连接的心跳状态cleanupInactiveConnections:清理超时连接SSE 消息采用标准格式:
event: <eventType>
data: <JSON数据>
示例:
event: welcome
data: {"message":"欢迎回来,用户 123!","timestamp":"2025-12-10T12:00:00.000Z","connectionId":"conn_123"}
控制器处理 HTTP 请求,调用 SSEManager 的方法。
static connectAuthenticated(req, res) {
const userId = req.user?.id || req.user?.user_id;
const connectionId = sseManager.createConnection(req, res, userId);
// 发送欢迎消息
sseManager.sendMessage(connectionId, {
type: "welcome",
data: {
message: `欢迎回来,用户 ${userId}!`,
timestamp: new Date().toISOString(),
connectionId,
clientId: req.query.clientId || req.headers["x-client-id"],
},
});
}static sendToUser(req, res) {
const { userId, eventType, data } = req.body;
const sentCount = sseManager.sendToUser(userId, {
type: eventType || "notification",
data,
});
// 返回发送结果
}static broadcast(req, res) {
const { eventType, data, filter } = req.body;
let filterFn = null;
if (filter?.userIds?.length > 0) {
const targetUserIds = new Set(filter.userIds);
filterFn = (connection) =>
connection.userId && targetUserIds.has(connection.userId);
}
const sentCount = sseManager.broadcast(
{ type: eventType || "broadcast", data },
filterFn
);
// 返回广播结果
}// src/routes/sseRoutes.js
router.get("/connect/auth", authenticateToken, SSEController.connectAuthenticated);
router.post("/send", authenticateToken, SSEController.sendToUser);
router.post("/broadcast", authenticateToken, SSEController.broadcast);
router.get("/stats", authenticateToken, SSEController.getStats);
router.get("/users/:userId/connections", authenticateToken, SSEController.getUserConnections);
router.post("/users/:userId/disconnect", authenticateToken, SSEController.disconnectUser);| 端点 | 方法 | 认证 | 说明 |
|---|---|---|---|
/sse/connect/auth |
GET | 是 | 建立认证 SSE 连接 |
/sse/send |
POST | 是 | 发送消息给特定用户 |
/sse/broadcast |
POST | 是 | 广播消息给所有连接 |
/sse/stats |
GET | 是 | 获取连接统计信息 |
/sse/users/:userId/connections |
GET | 是 | 获取用户连接信息 |
/sse/users/:userId/disconnect |
POST | 是 | 断开用户连接 |
useSSE 是核心的 SSE 连接 Hook,提供连接管理、事件订阅、自动重连等功能。
import useSSE from "@/hooks/useSSE";
const {
subscribe,
unsubscribe,
once,
connectionState,
isConnected,
connectionId,
reconnect,
disconnect,
} = useSSE({
url: "http://127.0.0.1:8888/sse/connect/auth",
autoconnect: true,
reconnectDelay: 3000,
maxReconnectAttempts: Infinity,
useAuthHook: true,
enableHeartbeat: true,
clientId: "cid_123",
});| 选项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
url |
string | null | SSE 连接 URL |
autoconnect |
boolean | true | 是否自动连接 |
autoDisconnect |
boolean | true | 是否自动断开 |
reconnectDelay |
number | 3000 | 重连延迟(毫秒) |
maxReconnectAttempts |
number | Infinity | 最大重连次数 |
logEvents |
boolean | true | 是否记录事件日志 |
useAuthHook |
boolean | true | 是否使用认证 Hook |
enableHeartbeat |
boolean | true | 是否启用心跳检测 |
clientId |
string | null | 客户端 ID |
| 属性/方法 | 类型 | 说明 |
|---|---|---|
subscribe |
function | 订阅事件 |
unsubscribe |
function | 取消订阅 |
once |
function | 一次性订阅 |
connectionState |
string | 连接状态(connecting/open/closed/error) |
isConnected |
boolean | 是否已连接 |
connectionId |
string | 连接 ID |
lastEvent |
object | 最后收到的事件 |
reconnect |
function | 手动重连 |
disconnect |
function | 断开连接 |
// 订阅事件
const unsubscribe = subscribe("notice", (event) => {
const data = typeof event.data === "string" ? JSON.parse(event.data) : event.data;
console.log("收到通知:", data);
});
// 组件卸载时取消订阅
useEffect(() => {
return () => unsubscribe();
}, []);useSSEEvents 负责处理 SSE 连接的各种系统事件。
import { useSSEEvents } from "@/hooks/useSSEEvents";
const sseInstance = useSSE({
/* 配置 */
});
useSSEEvents(sseInstance);loginout 事件
sseSubscribe("loginout", (event) => {
const data = JSON.parse(event.data);
// 只处理 Web 端的 loginout 消息
if (data.clientType !== "web") return;
// 显示警告弹窗
modalWarning("登录异常", "您已在其他地方登录,请重新登录!", {
onOk: () => handleLogout(false),
});
});notice 事件
sseSubscribe("notice", (event) => {
const data = JSON.parse(event.data);
// 根据通知级别显示不同类型的通知
if (data.level === "high") {
notificationError("新通知", data.title);
} else if (data.level === "medium") {
notificationWarning("新通知", data.title);
} else {
notificationInfo("新通知", data.title);
}
});SSEProvider 提供全局的 SSE 连接,方便在应用的任何组件中使用。
import { SSEProvider } from "@/contexts/SSEContext";
function App() {
return (
<SSEProvider>
<YourApp />
</SSEProvider>
);
}import { useSSEContext } from "@/contexts/SSEContext";
function MyComponent() {
const { subscribe, isConnected, connectionId } = useSSEContext();
useEffect(() => {
const unsubscribe = subscribe("custom:event", (event) => {
console.log("收到事件:", event.data);
});
return () => unsubscribe();
}, [subscribe]);
return <div>连接状态: {isConnected ? "已连接" : "未连接"}</div>;
}const sseUrl = useMemo(() => {
let base;
if (process.env.NODE_ENV === "production") {
// 生产环境:使用相对路径,由反向代理处理
base = "/api/sse/connect/auth";
} else {
// 开发环境:直接使用后端完整 URL
// 注意:webpack dev server 代理不支持 SSE 长连接
base = "http://127.0.0.1:8888/sse/connect/auth";
}
const sep = base.includes("?") ? "&" : "?";
return `${base}${sep}clientId=${encodeURIComponent(clientId)}`;
}, [clientId]);1. 客户端初始化
├── 生成或获取 clientId
├── 构建 SSE URL
└── 调用 useSSE Hook
2. 建立连接
├── 发送 HTTP GET 请求到 /sse/connect/auth
├── 携带 JWT Token(Authorization Header)
├── 携带 clientId(Query 参数或 Header)
└── 服务端认证用户身份
3. 服务端处理
├── 验证 JWT Token
├── 创建 SSE 连接
├── 关联用户 ID
├── 发送欢迎消息(welcome 事件)
└── 启动心跳机制
4. 客户端接收
├── 监听 open 事件
├── 保存 connectionId
├── 更新连接状态为 "open"
└── 显示连接成功消息1. 服务端推送消息
├── 业务逻辑调用 sseManager.sendToUser(userId, message)
├── SSEManager 查找用户的所有连接
├── 遍历连接,发送消息
└── 记录发送结果
2. 消息格式
├── event: <eventType>
├── data: <JSON数据>
└── \n\n(结束标记)
3. 客户端接收
├── EventSource 触发对应事件
├── 调用订阅的回调函数
├── 解析 JSON 数据
└── 更新 UI 或执行业务逻辑1. 服务端心跳
├── 定时器(每 30 秒)
├── 遍历所有连接
├── 发送 heartbeat 事件
├── 检查连接超时
└── 清理不活跃连接
2. 客户端心跳
├── 监听 heartbeat 事件
├── 更新最后心跳时间戳
└── 用于监控连接健康状态1. 连接断开检测
├── EventSource.onerror 触发
├── readyState 变为 CLOSED
└── 更新连接状态为 "error"
2. 重连逻辑
├── 检查是否超过最大重连次数
├── 检查页面是否可见
├── 使用指数退避策略计算延迟
├── 延迟后调用 connect()
└── 重连成功后处理消息队列
3. 指数退避
├── delay = min(1000 * 2^attempts, 30000)
├── 最多等待 30 秒
└── 避免频繁重连const SSEManager = require("./src/infra/sseManager");
const sseManager = SSEManager.getInstance();
// 发送通知给特定用户
sseManager.sendToUser(userId, {
type: "notice",
data: {
noticeId: "notice_123",
title: "您有一条新消息",
level: "high",
category: "system",
},
});// 广播给所有用户
sseManager.broadcast({
type: "announcement",
data: {
title: "系统维护通知",
content: "系统将于今晚 22:00 进行维护",
level: "medium",
},
});
// 广播给特定用户
sseManager.broadcast(
{
type: "announcement",
data: {
/* ... */
},
},
(connection) => {
// 只发送给 VIP 用户
return connection.userId && isVIPUser(connection.userId);
}
);// 当用户在新设备登录时,通知其他设备
sseManager.sendToUser(userId, {
type: "loginout",
data: {
message: "您已在其他设备登录",
clientType: "web", // 只通知 Web 端
},
});import { useSSEContext } from "@/contexts/SSEContext";
function NotificationComponent() {
const { subscribe, isConnected } = useSSEContext();
const [notifications, setNotifications] = useState([]);
useEffect(() => {
const unsubscribe = subscribe("notice", (event) => {
const data = JSON.parse(event.data);
setNotifications((prev) => [...prev, data]);
});
return () => unsubscribe();
}, [subscribe]);
return (
<div>
<div>连接状态: {isConnected ? "✅" : "❌"}</div>
<div>
{notifications.map((notif) => (
<div key={notif.noticeId}>{notif.title}</div>
))}
</div>
</div>
);
}import { useSSEContext } from "@/contexts/SSEContext";
function CustomEventHandler() {
const { subscribe, once } = useSSEContext();
useEffect(() => {
// 订阅自定义事件
const unsubscribe = subscribe("custom:event", (event) => {
const data = JSON.parse(event.data);
console.log("收到自定义事件:", data);
});
// 一次性订阅(只触发一次)
const unsubscribeOnce = once("one-time:event", (event) => {
console.log("一次性事件:", event.data);
});
return () => {
unsubscribe();
unsubscribeOnce();
};
}, [subscribe, once]);
return <div>自定义事件处理器</div>;
}import { useSSEContext } from "@/contexts/SSEContext";
function ConnectionControl() {
const { reconnect, disconnect, connectionState } = useSSEContext();
return (
<div>
<div>连接状态: {connectionState}</div>
<button onClick={reconnect}>重新连接</button>
<button onClick={disconnect}>断开连接</button>
</div>
);
}| 事件类型 | 说明 | 数据格式 |
|---|---|---|
open |
连接建立 | { connectionId, clientId } |
welcome |
欢迎消息 | { message, timestamp, connectionId, clientId } |
heartbeat |
心跳消息 | { timestamp } |
close |
连接关闭 | { reason } |
error |
错误消息 | { message, status } |
| 事件类型 | 说明 | 数据格式 |
|---|---|---|
loginout |
登录/注销 | { message, clientType } |
notice |
通知消息 | { noticeId, title, level, category } |
announcement |
公告消息 | { noticeId, title, level, category } |
message |
普通消息 | 任意 JSON 数据 |
可以定义任意自定义事件类型,只需在服务端和客户端使用相同的 eventType 即可。
连接管理
消息发送
broadcast 而不是循环 sendToUser错误处理
性能优化
连接管理
事件处理
useSSEEvents 处理系统事件错误处理
error 事件性能优化
once 处理一次性事件认证授权
输入验证
速率限制
问题:客户端无法建立 SSE 连接
可能原因:
解决方法:
问题:服务端发送了消息,但客户端没有收到
可能原因:
解决方法:
问题:客户端频繁重连
可能原因:
解决方法:
问题:长时间运行后内存占用增加
可能原因:
解决方法:
// 获取连接统计
const stats = sseManager.getStats();
console.log("总连接数:", stats.totalConnections);
console.log("总用户数:", stats.totalUsers);
console.log("用户连接数:", stats.connectionsByUser);// 启用事件日志
const sseInstance = useSSE({
logEvents: true, // 开发环境启用
// ...
});
// 监听连接状态变化
useEffect(() => {
console.log("连接状态:", connectionState);
console.log("是否连接:", isConnected);
console.log("连接ID:", connectionId);
}, [connectionState, isConnected, connectionId]);| 版本 | 日期 | 变更内容 |
|---|---|---|
| 1.0.0 | 2025-12-10 | 初始版本发布 |
| - 实现基础 SSE 连接管理 | ||
| - 支持用户关联和消息推送 | ||
| - 实现客户端自动重连 | ||
| - 提供 React Hooks 和 Context |
SSE暂时足够了