feat: Gateway 消息排队机制 — 同会话串行化处理

同一 session 的消息按顺序处理:当前回复未完成时新消息进入队列,
完成后自动消费下一条。避免并发请求导致上下文竞争和响应交错。
客户端收到 type:"queued" 时可显示排队状态。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-29 21:06:59 +08:00
parent 61284c9c6a
commit 22d7b91cb1
2 changed files with 86 additions and 2 deletions
@@ -12,6 +12,7 @@ import (
"mime/multipart"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
@@ -23,6 +24,14 @@ import (
"github.com/yourname/cyrene-ai/pkg/logger"
)
// queuedMsg 队列中的待处理消息
type queuedMsg struct {
client *ws.Client
mode string
reqBody []byte
content string
}
// ChatHandler 聊天处理器
type ChatHandler struct {
cfg *config.Config
@@ -30,6 +39,8 @@ type ChatHandler struct {
sessionStore *store.SessionStore
fileStore *store.FileStore
upgrader websocket.Upgrader
pending map[string][]queuedMsg // per-session message queue
pendingMu sync.Mutex
}
// NewChatHandler 创建聊天处理器
@@ -39,6 +50,7 @@ func NewChatHandler(cfg *config.Config, hub *ws.Hub, sessionStore *store.Session
hub: hub,
sessionStore: sessionStore,
fileStore: fileStore,
pending: make(map[string][]queuedMsg),
upgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
@@ -230,8 +242,58 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
},
})
// 在 goroutine 中进行 AI-Core 调用和流式发送,避免阻塞 ReadPump
go h.streamResponse(client, mode, reqBody, msg.Content)
// 排队处理:同一会话的消息串行化,避免并发请求导致上下文竞争
h.enqueueOrProcess(client, mode, reqBody, msg.Content)
}
// enqueueOrProcess 将消息加入 per-session 队列,若会话空闲则立即处理
func (h *ChatHandler) enqueueOrProcess(client *ws.Client, mode string, reqBody []byte, content string) {
h.pendingMu.Lock()
if queue, busy := h.pending[client.SessionID]; busy {
// 会话正在处理中,加入队列
h.pending[client.SessionID] = append(queue, queuedMsg{
client: client, mode: mode, reqBody: reqBody, content: content,
})
queueLen := len(h.pending[client.SessionID])
h.pendingMu.Unlock()
logger.Printf("[chat] 会话 %s 正在处理中,消息已加入队列 (位置 %d)", client.SessionID, queueLen)
client.SendMessage(ws.ServerMessage{
Type: "queued",
SessionID: client.SessionID,
Timestamp: time.Now().UnixMilli(),
})
return
}
// 标记为处理中
h.pending[client.SessionID] = nil
h.pendingMu.Unlock()
go h.processQueue(client, mode, reqBody, content)
}
// processQueue 处理当前消息,完成后自动消费队列中的下一条
func (h *ChatHandler) processQueue(client *ws.Client, mode string, reqBody []byte, content string) {
h.streamResponse(client, mode, reqBody, content)
// 处理队列中的后续消息
for {
h.pendingMu.Lock()
queue := h.pending[client.SessionID]
if len(queue) == 0 {
delete(h.pending, client.SessionID)
h.pendingMu.Unlock()
return
}
next := queue[0]
h.pending[client.SessionID] = queue[1:]
h.pendingMu.Unlock()
logger.Printf("[chat] 会话 %s 从队列取出消息继续处理 (剩余 %d)", client.SessionID, len(queue)-1)
h.streamResponse(next.client, next.mode, next.reqBody, next.content)
}
}
// streamResponse 调用 AI-Core SSE 流式接口并逐 delta 转发给客户端