feat: 多功能升级 — 流式逐字渲染、对话缓存、会话组织优化、记忆管理修复、性能仪表盘

- 前端消息流式逐字渲染 (AI-Core ChatStream → SSE → Gateway → WebSocket stream_chunk → fadeInUp + cursorBlink)
- 后端对话缓存 (conversationCache sync.Map, GET /sessions/:id/messages)
- 前端侧边栏历史多轮对话显示
- DevTools 性能监控图标移至首页仪表盘
- DevTools 用户记忆查询/删减功能修复 (补全 DELETE 数据链路)
- 后端和 DevTools 按用户分类组织实时活动会话 (map[userID]map[sessionID]*Client)
- 新增 docs/api-reference/ 路由参考文档
- 新增 docs/message-flow-architecture.md 消息链路架构文档
This commit is contained in:
2026-05-16 17:44:03 +08:00
parent 63513210b7
commit 186513f381
24 changed files with 1024 additions and 216 deletions
+141 -41
View File
@@ -1,19 +1,20 @@
package handler
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/yourname/cyrene-ai/gateway/internal/config"
"github.com/yourname/cyrene-ai/gateway/internal/middleware"
"github.com/yourname/cyrene-ai/gateway/internal/ws"
)
@@ -96,12 +97,14 @@ func (h *ChatHandler) handleMessage(client *ws.Client, msg ws.ClientMessage) {
h.handleChatMessage(client, msg)
case "voice_input":
h.handleVoiceInput(client, msg)
case "history":
h.handleHistoryRequest(client, msg)
default:
log.Printf("[WS] 未知消息类型: %s from user=%s", msg.Type, client.UserID)
}
}
// handleChatMessage 处理文字聊天消息 - 转发到 AI-Core
// handleChatMessage 处理文字聊天消息 - 转发到 AI-Core(流式发送)
func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage) {
mode := msg.Mode
if mode == "" {
@@ -134,7 +137,19 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
return
}
// 调用 AI-Core
// 缓存用户消息(在 goroutine 前完成,避免竞态)
h.hub.CacheMessage(client.UserID, client.SessionID, ws.Message{
Role: "user",
Content: msg.Content,
Timestamp: time.Now().UnixMilli(),
})
// 在 goroutine 中进行 AI-Core 调用和流式发送,避免阻塞 ReadPump
go h.streamResponse(client, mode, reqBody, msg.Content)
}
// streamResponse 调用 AI-Core SSE 流式接口并逐 delta 转发给客户端
func (h *ChatHandler) streamResponse(client *ws.Client, mode string, reqBody []byte, userMsg string) {
aiCoreURL := h.cfg.AICoreURL + "/api/v1/chat"
httpReq, err := http.NewRequest("POST", aiCoreURL, bytes.NewReader(reqBody))
if err != nil {
@@ -149,6 +164,7 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
return
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "text/event-stream")
httpClient := &http.Client{Timeout: 120 * time.Second}
resp, err := httpClient.Do(httpReq)
@@ -165,20 +181,8 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("[chat] 读取 AI-Core 响应失败: %v", err)
h.hub.UpdateSessionState(client.SessionID, "error")
client.SendMessage(ws.ServerMessage{
Type: "error",
MessageID: "msg_" + generateID(),
Error: "读取响应失败",
Timestamp: time.Now().UnixMilli(),
})
return
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
log.Printf("[chat] AI-Core 返回错误 [%d]: %s", resp.StatusCode, string(body))
h.hub.UpdateSessionState(client.SessionID, "error")
client.SendMessage(ws.ServerMessage{
@@ -190,42 +194,115 @@ func (h *ChatHandler) handleChatMessage(client *ws.Client, msg ws.ClientMessage)
return
}
// 解析 AI-Core 响应
var aiResp struct {
Text string `json:"text"`
Mode string `json:"mode"`
MessageID string `json:"message_id"`
// 使用 bufio.Scanner 逐行读取 SSE 响应
scanner := bufio.NewScanner(resp.Body)
// 增大 scanner buffer 以处理大块 SSE 数据
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
var fullText string
var msgID string
for scanner.Scan() {
line := scanner.Text()
// 跳过非 data 行
if !strings.HasPrefix(line, "data: ") {
continue
}
data := strings.TrimPrefix(line, "data: ")
// SSE 流结束标记
if data == "[DONE]" {
break
}
// 解析 delta 数据
var chunk struct {
Delta string `json:"delta"`
Error string `json:"error,omitempty"`
MessageID string `json:"message_id,omitempty"`
Done bool `json:"done,omitempty"`
}
if err := json.Unmarshal([]byte(data), &chunk); err != nil {
log.Printf("[chat] 解析 SSE delta 失败: %v, raw=%s", err, data)
continue
}
// 错误处理
if chunk.Error != "" {
log.Printf("[chat] AI-Core 流式错误: %s", chunk.Error)
h.hub.UpdateSessionState(client.SessionID, "error")
client.SendMessage(ws.ServerMessage{
Type: "error",
MessageID: "msg_" + generateID(),
Error: chunk.Error,
Timestamp: time.Now().UnixMilli(),
})
return
}
// 记录 message_id
if chunk.MessageID != "" {
msgID = chunk.MessageID
}
// 如果是结束标记(含 done: true),跳出
if chunk.Done {
break
}
// 逐 delta 转发
if chunk.Delta != "" {
fullText += chunk.Delta
client.SendMessage(ws.ServerMessage{
Type: "stream_chunk",
MessageID: msgID,
Content: chunk.Delta,
Role: "assistant",
SessionID: client.SessionID,
Timestamp: time.Now().UnixMilli(),
})
}
}
if err := json.Unmarshal(body, &aiResp); err != nil {
log.Printf("[chat] 解析 AI-Core 响应失败: %v", err)
if err := scanner.Err(); err != nil {
log.Printf("[chat] SSE 读取错误: %v", err)
h.hub.UpdateSessionState(client.SessionID, "error")
client.SendMessage(ws.ServerMessage{
Type: "error",
MessageID: "msg_" + generateID(),
Error: "解析响应失败",
Error: fmt.Sprintf("流读取错误: %v", err),
Timestamp: time.Now().UnixMilli(),
})
return
}
// 记录助手响应
h.hub.RecordMessage(client.SessionID, "assistant", aiResp.Text)
if msgID == "" {
msgID = "msg_" + generateID()
}
// 发送 stream_end
client.SendMessage(ws.ServerMessage{
Type: "stream_end",
MessageID: msgID,
SessionID: client.SessionID,
Timestamp: time.Now().UnixMilli(),
})
// 缓存完整响应
if fullText != "" {
h.hub.CacheMessage(client.UserID, client.SessionID, ws.Message{
Role: "assistant",
Content: fullText,
Timestamp: time.Now().UnixMilli(),
})
}
h.hub.RecordMessage(client.SessionID, "assistant", fullText)
// 设置会话状态为 idle
h.hub.UpdateSessionState(client.SessionID, "idle")
// 发送响应给客户端
response := ws.ServerMessage{
Type: "response",
MessageID: aiResp.MessageID,
Text: aiResp.Text,
ResponseMode: mode,
Timestamp: time.Now().UnixMilli(),
}
if err := client.SendMessage(response); err != nil {
log.Printf("[WS] 发送响应失败: %v", err)
}
}
// handleVoiceInput 处理语音输入
@@ -241,6 +318,31 @@ func (h *ChatHandler) handleVoiceInput(client *ws.Client, msg ws.ClientMessage)
}
// handleHistoryRequest 处理历史消息请求
func (h *ChatHandler) handleHistoryRequest(client *ws.Client, msg ws.ClientMessage) {
// 优先使用请求中的 session_id,否则使用客户端的 session_id
sessionID := msg.SessionID
if sessionID == "" {
sessionID = client.SessionID
}
messages := h.hub.GetConversation(client.UserID, sessionID)
if messages == nil {
messages = []ws.Message{}
}
response := ws.ServerMessage{
Type: "history_response",
MessageID: "hist_" + generateID(),
Messages: messages,
Timestamp: time.Now().UnixMilli(),
}
if err := client.SendMessage(response); err != nil {
log.Printf("[WS] 发送历史消息失败: %v", err)
}
}
// SendSystemMessage 向用户发送系统消息(用于主动通知)
func (h *ChatHandler) SendSystemMessage(userID, sessionID, text string) error {
msg := ws.ServerMessage{
@@ -272,5 +374,3 @@ func randomStr(n int) string {
return string(b)
}
// 确保未使用变量不报错
var _ = middleware.GetUserID
@@ -127,3 +127,33 @@ func (h *MemoryHandler) Add(c *gin.Context) {
json.Unmarshal(body, &result)
c.JSON(resp.StatusCode, result)
}
// Delete 删除单条记忆 — 代理 DELETE /api/v1/memory?id=...
func (h *MemoryHandler) Delete(c *gin.Context) {
memoryID := c.Query("id")
if memoryID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少 id 参数"})
return
}
url := fmt.Sprintf("%s/api/v1/memory?id=%s", h.aiCoreURL, memoryID)
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "构建请求失败"})
return
}
resp, err := h.client.Do(req)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": fmt.Sprintf("AI-Core 不可达: %v", err)})
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var result interface{}
json.Unmarshal(body, &result)
c.JSON(resp.StatusCode, result)
}
@@ -19,11 +19,13 @@ type SessionHandler struct {
// SessionInfo 会话信息
type SessionInfo struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Title string `json:"title"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
ID string `json:"id"`
UserID string `json:"user_id"`
Title string `json:"title"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
MessageCount int `json:"message_count"`
IsActive bool `json:"is_active"`
}
// NewSessionHandler 创建会话处理器
@@ -108,6 +110,21 @@ func (h *SessionHandler) Get(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "会话不存在"})
}
// GetMessages 获取会话的完整消息列表
func (h *SessionHandler) GetMessages(c *gin.Context) {
userID := middleware.GetUserID(c)
sessionID := c.Param("id")
messages := h.hub.GetConversation(userID, sessionID)
if messages == nil {
messages = []ws.Message{}
}
c.JSON(http.StatusOK, gin.H{
"messages": messages,
})
}
// ========== Admin 端点 ==========
// ListActiveSessions 获取当前所有活跃 WebSocket 会话列表 (管理员)
@@ -123,6 +140,18 @@ func (h *SessionHandler) ListActiveSessions(c *gin.Context) {
})
}
// GetActiveSessions 返回按用户分组的活跃会话列表
func (h *SessionHandler) GetActiveSessions(c *gin.Context) {
sessionsByUser := h.hub.GetActiveSessionsByUser()
if sessionsByUser == nil {
sessionsByUser = make(map[string][]*ws.SessionState)
}
c.JSON(http.StatusOK, gin.H{
"users": sessionsByUser,
})
}
// GetSession 获取指定会话的详细信息 (管理员)
func (h *SessionHandler) GetSession(c *gin.Context) {
sessionID := c.Param("id")
+10 -7
View File
@@ -51,13 +51,15 @@ func Setup(r *gin.Engine, hub *ws.Hub, cfg *config.Config) {
protected.POST("/auth/refresh", authHandler.RefreshToken)
// 会话管理
sessions := protected.Group("/sessions")
{
sessions.POST("", sessionHandler.Create)
sessions.GET("", sessionHandler.List)
sessions.GET("/:id", sessionHandler.Get)
sessions.DELETE("/:id", sessionHandler.Delete)
}
sessions := protected.Group("/sessions")
{
sessions.POST("", sessionHandler.Create)
sessions.GET("", sessionHandler.List)
sessions.GET("/active", sessionHandler.GetActiveSessions)
sessions.GET("/:id", sessionHandler.Get)
sessions.DELETE("/:id", sessionHandler.Delete)
sessions.GET("/:id/messages", sessionHandler.GetMessages)
}
// 记忆管理
memory := protected.Group("/memory")
@@ -65,6 +67,7 @@ func Setup(r *gin.Engine, hub *ws.Hub, cfg *config.Config) {
memory.GET("/search", memoryHandler.Query)
memory.GET("", memoryHandler.List)
memory.POST("", memoryHandler.Add)
memory.DELETE("", memoryHandler.Delete)
}
// Admin 路由 (需要管理员权限)
+93
View File
@@ -1,6 +1,7 @@
package ws
import (
"fmt"
"log"
"sync"
"time"
@@ -24,6 +25,13 @@ type SessionMessage struct {
Timestamp int64 `json:"timestamp"`
}
// Message 完整对话消息(用于缓存)
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
Timestamp int64 `json:"timestamp"`
}
const maxRecentMessages = 20
// Hub WebSocket连接池
@@ -39,6 +47,10 @@ type Hub struct {
// 会话状态追踪 (sessionID -> SessionState)
sessions map[string]*SessionState
// 对话缓存:key = "userID:sessionID", value = []Message
conversationCache sync.Map
convCacheMu sync.Mutex
}
// NewHub 创建WebSocket Hub
@@ -203,6 +215,42 @@ func (h *Hub) GetActiveSessions() []*SessionState {
return result
}
// GetActiveSessionsByUser 返回按用户分组的活跃会话列表
func (h *Hub) GetActiveSessionsByUser() map[string][]*SessionState {
h.mu.RLock()
defer h.mu.RUnlock()
result := make(map[string][]*SessionState)
for _, s := range h.sessions {
cp := *s
cp.RecentMessages = nil
result[s.UserID] = append(result[s.UserID], &cp)
}
return result
}
// GetUserSessions 获取某用户的所有活跃会话
func (h *Hub) GetUserSessions(userID string) []*SessionState {
h.mu.RLock()
defer h.mu.RUnlock()
var result []*SessionState
if clients, ok := h.userClients[userID]; ok {
seen := make(map[string]bool)
for c := range clients {
if !seen[c.SessionID] {
if s, ok := h.sessions[c.SessionID]; ok {
cp := *s
cp.RecentMessages = nil
result = append(result, &cp)
seen[c.SessionID] = true
}
}
}
}
return result
}
// GetSession 返回指定会话的详细信息(含最近消息)
func (h *Hub) GetSession(sessionID string) *SessionState {
h.mu.RLock()
@@ -263,3 +311,48 @@ func (h *Hub) RecordMessage(sessionID, role, content string) {
s.RecentMessages = s.RecentMessages[len(s.RecentMessages)-maxRecentMessages:]
}
}
// ========== 对话缓存方法 ==========
// cacheKey 生成对话缓存 key
func cacheKey(userID, sessionID string) string {
return fmt.Sprintf("%s:%s", userID, sessionID)
}
// CacheMessage 缓存单条消息到对话历史
func (h *Hub) CacheMessage(userID, sessionID string, msg Message) {
key := cacheKey(userID, sessionID)
h.convCacheMu.Lock()
defer h.convCacheMu.Unlock()
existing, _ := h.conversationCache.Load(key)
var messages []Message
if existing != nil {
messages = existing.([]Message)
}
messages = append(messages, msg)
h.conversationCache.Store(key, messages)
}
// GetConversation 获取完整对话历史
func (h *Hub) GetConversation(userID, sessionID string) []Message {
key := cacheKey(userID, sessionID)
val, ok := h.conversationCache.Load(key)
if !ok {
return []Message{}
}
messages, ok := val.([]Message)
if !ok {
return []Message{}
}
return messages
}
// DeleteConversation 删除对话缓存
func (h *Hub) DeleteConversation(userID, sessionID string) {
key := cacheKey(userID, sessionID)
h.conversationCache.Delete(key)
}
+19 -15
View File
@@ -2,25 +2,29 @@ package ws
// 客户端 → 服务端消息
type ClientMessage struct {
Type string `json:"type"` // message | voice_input | ping
SessionID string `json:"session_id"`
Mode string `json:"mode"` // text | voice_msg | voice_assistant
Content string `json:"content"`
AudioData string `json:"audio_data,omitempty"` // base64
Timestamp int64 `json:"timestamp"`
Type string `json:"type"` // message | voice_input | ping | history
SessionID string `json:"session_id"`
Mode string `json:"mode"` // text | voice_msg | voice_assistant
Content string `json:"content"`
AudioData string `json:"audio_data,omitempty"` // base64
Timestamp int64 `json:"timestamp"`
}
// 服务端 → 客户端消息
type ServerMessage struct {
Type string `json:"type"` // response | segment | audio | error | device_update
MessageID string `json:"message_id"`
Text string `json:"text,omitempty"`
Segments []VoiceSegment `json:"segments,omitempty"` // 断句数组
FullAudioURL string `json:"full_audio_url,omitempty"`
ResponseMode string `json:"response_mode"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
Error string `json:"error,omitempty"`
Timestamp int64 `json:"timestamp"`
Type string `json:"type"` // response | segment | audio | error | device_update | pong | history_response | stream_chunk | stream_end
MessageID string `json:"message_id"`
Text string `json:"text,omitempty"`
Content string `json:"content,omitempty"` // stream_chunk 的增量文本
Role string `json:"role,omitempty"` // stream 消息的角色
SessionID string `json:"session_id,omitempty"` // 会话 ID
Segments []VoiceSegment `json:"segments,omitempty"` // 断句数组
FullAudioURL string `json:"full_audio_url,omitempty"`
ResponseMode string `json:"response_mode"`
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
Error string `json:"error,omitempty"`
Timestamp int64 `json:"timestamp"`
Messages []Message `json:"messages,omitempty"` // 历史消息列表
}
type VoiceSegment struct {
Binary file not shown.