fix: 跨端消息同步 — 用户消息广播 + 会话隔离 + 去重

后端: handleChatMessage 将用户消息通过 broadcastToUser 广播给同用户所有设备
协议: ClientMessage 新增 ClientMsgID 字段用于跨端去重
前端: 发送消息时携带 client_msg_id,收到回显时跳过本地已添加的消息
前端: handleServerMessage 新增 session_id 过滤,防止不同会话消息串扰

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-26 13:06:44 +08:00
parent 2574f60823
commit 08687bb13d
5 changed files with 62 additions and 2 deletions
@@ -168,8 +168,12 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
} }
// 缓存用户消息(在 goroutine 前完成,避免竞态) // 缓存用户消息(在 goroutine 前完成,避免竞态)
userMsgID := msg.ClientMsgID
if userMsgID == "" {
userMsgID = "msg_" + generateID()
}
userMsg := ws.Message{ userMsg := ws.Message{
ID: "msg_" + generateID(), ID: userMsgID,
Role: "user", Role: "user",
Content: msg.Content, Content: msg.Content,
Timestamp: time.Now().UnixMilli(), Timestamp: time.Now().UnixMilli(),
@@ -183,6 +187,21 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
} }
h.hub.CacheMessage(client.UserID, client.SessionID, userMsg) h.hub.CacheMessage(client.UserID, client.SessionID, userMsg)
// 广播用户消息给同用户所有设备(跨端同步)
h.broadcastToUser(client.UserID, ws.ServerMessage{
Type: "response",
MessageID: userMsgID,
Content: msg.Content,
Role: "user",
MsgType: "chat",
SessionID: client.SessionID,
Timestamp: time.Now().UnixMilli(),
ClientInfo: &ws.ClientInfo{
ClientID: client.ClientID,
DeviceName: client.DeviceName,
},
})
// 在 goroutine 中进行 AI-Core 调用和流式发送,避免阻塞 ReadPump // 在 goroutine 中进行 AI-Core 调用和流式发送,避免阻塞 ReadPump
go h.streamResponse(client, mode, reqBody, msg.Content) go h.streamResponse(client, mode, reqBody, msg.Content)
} }
+1
View File
@@ -24,6 +24,7 @@ type ClientMessage struct {
ClientID string `json:"client_id,omitempty"` // 客户端唯一标识 (多端区分) ClientID string `json:"client_id,omitempty"` // 客户端唯一标识 (多端区分)
DeviceName string `json:"device_name,omitempty"` // 设备备注名称 DeviceName string `json:"device_name,omitempty"` // 设备备注名称
UserAgent string `json:"user_agent,omitempty"` // 浏览器 UA UserAgent string `json:"user_agent,omitempty"` // 浏览器 UA
ClientMsgID string `json:"client_msg_id,omitempty"` // 客户端消息ID (跨端去重)
} }
// ReviewMessage 审查后的结构化消息(动作/聊天分离) // ReviewMessage 审查后的结构化消息(动作/聊天分离)
+1
View File
@@ -37,6 +37,7 @@ export function useChat() {
mode, mode,
attachments, attachments,
timestamp: Date.now(), timestamp: Date.now(),
client_msg_id: userMsgId,
}); });
}, },
[addMessage, setTyping, sendMessage] [addMessage, setTyping, sendMessage]
+39 -1
View File
@@ -32,6 +32,28 @@ function getBackoffDelay(attempt: number): number {
let wsInstanceCounter = 0; let wsInstanceCounter = 0;
// ========== 跨端消息去重 ==========
const sentMsgIds = new Set<string>();
const MAX_SENT_IDS = 40;
function trackSentMsgId(id: string) {
sentMsgIds.add(id);
if (sentMsgIds.size > MAX_SENT_IDS) {
const entries = [...sentMsgIds];
sentMsgIds.clear();
for (const entry of entries.slice(-MAX_SENT_IDS / 2)) {
sentMsgIds.add(entry);
}
}
}
// 会话作用域的消息类型:仅处理匹配当前会话的消息
const SESSION_SCOPED_TYPES = new Set([
'response', 'stream_start', 'stream_chunk', 'stream_end',
'stream_segments', 'review', 'multi_message', 'thinking',
'tool_progress', 'system_info',
]);
export function useWebSocket() { export function useWebSocket() {
const [isConnected, setIsConnected] = useState(false); const [isConnected, setIsConnected] = useState(false);
const [reconnectAttempts, setReconnectAttempts] = useState(0); const [reconnectAttempts, setReconnectAttempts] = useState(0);
@@ -162,6 +184,10 @@ export function useWebSocket() {
const sendMessage = useCallback((msg: WSClientMessage) => { const sendMessage = useCallback((msg: WSClientMessage) => {
const instanceId = instanceIdRef.current; const instanceId = instanceIdRef.current;
// 注册 client_msg_id 用于跨端回显去重
if (msg.client_msg_id) {
trackSentMsgId(msg.client_msg_id);
}
if (wsRef.current?.readyState === WebSocket.OPEN) { if (wsRef.current?.readyState === WebSocket.OPEN) {
const sessionID = useSessionStore.getState().currentSessionId; const sessionID = useSessionStore.getState().currentSessionId;
wsRef.current.send( wsRef.current.send(
@@ -200,20 +226,32 @@ function handleServerMessage(msg: WSServerMessage) {
const { setMessages } = useSessionStore.getState(); const { setMessages } = useSessionStore.getState();
const chatState = useChatStore.getState(); const chatState = useChatStore.getState();
// 会话作用域消息:仅处理匹配当前会话的消息,避免跨会话串消息
if (SESSION_SCOPED_TYPES.has(msg.type) && msg.session_id) {
const currentSid = useSessionStore.getState().currentSessionId;
if (currentSid && msg.session_id !== currentSid) {
return;
}
}
switch (msg.type) { switch (msg.type) {
case 'stream_start': case 'stream_start':
setTyping(true); setTyping(true);
break; break;
case 'response': case 'response':
// 支持两种格式: 旧版 (text 字段) 和 审查消息版 (content + role + msg_type 字段)
if (msg.text || msg.content) { if (msg.text || msg.content) {
// 跨端用户消息去重:本端已本地添加,跳过服务端回显
if (msg.role === 'user' && msg.message_id && sentMsgIds.has(msg.message_id)) {
break;
}
addMessage({ addMessage({
id: msg.message_id || '', id: msg.message_id || '',
role: (msg.role as Message['role']) || 'assistant', role: (msg.role as Message['role']) || 'assistant',
content: (msg.text || msg.content) as string, content: (msg.text || msg.content) as string,
timestamp: msg.timestamp, timestamp: msg.timestamp,
msgType: (msg.msg_type as MessageDisplayType) || undefined, msgType: (msg.msg_type as MessageDisplayType) || undefined,
client_info: msg.client_info,
}); });
} }
setTyping(false); setTyping(false);
+1
View File
@@ -112,6 +112,7 @@ export interface WSClientMessage {
client_id?: string; client_id?: string;
device_name?: string; device_name?: string;
user_agent?: string; user_agent?: string;
client_msg_id?: string; // 客户端消息ID (跨端去重)
} }
/** 通知类型 */ /** 通知类型 */