package context import ( "context" "database/sql" "fmt" "strings" "sync" _ "github.com/lib/pq" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/memory" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/model" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/persona" "git.yeij.top/AskaEth/Cyrene/pkg/logger" ) // IoTDeviceSummary IoT设备摘要接口(避免循环依赖) type IoTDeviceSummary interface { GetName() string GetType() string GetStatus() string } // ConversationStore 会话历史存储接口 type ConversationStore struct { mu sync.RWMutex messages map[string][]model.LLMMessage // key = sessionID maxHistory int databaseURL string // lazy-load from DB on cache miss } // NewConversationStore 创建会话历史存储 func NewConversationStore(maxHistory int) *ConversationStore { return &ConversationStore{ messages: make(map[string][]model.LLMMessage), maxHistory: maxHistory, } } // SetDatabaseURL sets the database URL for lazy-loading history on cache miss. func (cs *ConversationStore) SetDatabaseURL(url string) { cs.mu.Lock() defer cs.mu.Unlock() cs.databaseURL = url } // AddMessage 添加消息到会话历史 func (cs *ConversationStore) AddMessage(sessionID string, msg model.LLMMessage) { cs.mu.Lock() defer cs.mu.Unlock() msgs := cs.messages[sessionID] msgs = append(msgs, msg) // 限制历史长度 if len(msgs) > cs.maxHistory { // 保留 system 消息在开头,只裁剪 user/assistant 消息 cutoff := len(msgs) - cs.maxHistory for cutoff < len(msgs) && msgs[cutoff].Role == model.RoleSystem { cutoff++ } if cutoff > 0 { msgs = msgs[cutoff:] } } cs.messages[sessionID] = msgs } // GetHistory 获取会话历史。 // 如果内存缓存为空且配置了 databaseURL,会尝试从 DB 懒加载历史。 func (cs *ConversationStore) GetHistory(sessionID string, limit int) []model.LLMMessage { cs.mu.RLock() msgs := cs.messages[sessionID] dbURL := cs.databaseURL cs.mu.RUnlock() if len(msgs) == 0 && dbURL != "" { // 懒加载:从 DB 恢复该会话的历史 if err := cs.LoadFromDB(dbURL, sessionID, limit); err == nil { cs.mu.RLock() msgs = cs.messages[sessionID] cs.mu.RUnlock() } } if len(msgs) == 0 { return nil } start := 0 if limit > 0 && len(msgs) > limit { start = len(msgs) - limit } result := make([]model.LLMMessage, len(msgs[start:])) copy(result, msgs[start:]) return result } // LoadFromDB 从数据库的 messages 表恢复会话历史到内存 func (cs *ConversationStore) LoadFromDB(databaseURL, sessionID string, limit int) error { db, err := sql.Open("postgres", databaseURL) if err != nil { return fmt.Errorf("连接数据库失败: %w", err) } defer db.Close() rows, err := db.Query( `SELECT role, content FROM messages WHERE session_id = $1 ORDER BY created_at ASC LIMIT $2`, sessionID, limit, ) if err != nil { return fmt.Errorf("查询消息失败: %w", err) } defer rows.Close() cs.mu.Lock() defer cs.mu.Unlock() var loaded int for rows.Next() { var roleStr, content string if err := rows.Scan(&roleStr, &content); err != nil { return fmt.Errorf("扫描消息行失败: %w", err) } // 将旧数据中的 "action" 角色映射为 "assistant"(LLM 模型不支持自定义角色) role := model.Role(roleStr) if role == "action" { role = model.RoleAssistant } cs.messages[sessionID] = append(cs.messages[sessionID], model.LLMMessage{ Role: role, Content: content, }) loaded++ } if loaded > 0 { logger.Printf("[context] 从数据库恢复会话 %s 历史 %d 条", sessionID, loaded) } return rows.Err() } // Builder 对话上下文构建器 type Builder struct { convStore *ConversationStore } // NewBuilder 创建上下文构建器 func NewBuilder(convStore *ConversationStore) *Builder { return &Builder{convStore: convStore} } type BuildParams struct { UserID string SessionID string UserMessage string Persona *persona.PersonaConfig Memories []memory.MemoryEntry HistoryLimit int DeviceContext string // 注入的设备状态文本 PendingThoughts []string // 待注入的后台思考 PlatformObservationSummary string // 平台观察摘要(中间会话生成) Nickname string // 用户昵称 (昔涟对用户的称呼) } // Build 构建发送给LLM的完整消息列表 func (b *Builder) Build(ctx context.Context, params BuildParams) ([]model.LLMMessage, error) { messages := []model.LLMMessage{} // 1. 系统消息 —— 昔涟的人格Prompt // 使用传入的昵称,如果为空则回退到 userID userName := params.Nickname if userName == "" { userName = params.UserID } systemPrompt := params.Persona.BuildSystemPrompt( userName, 1, ) // 1.1 注入设备上下文到系统消息 if params.DeviceContext != "" { systemPrompt += "\n\n" + params.DeviceContext } // 1.2 注入后台思考到系统消息(不打扰地) if len(params.PendingThoughts) > 0 { systemPrompt += "\n\n【昔涟的内心思考(仅供你参考,不要直接复述,请自然地融入对话)】\n" for _, thought := range params.PendingThoughts { systemPrompt += fmt.Sprintf("- %s\n", thought) } } messages = append(messages, model.LLMMessage{ Role: "system", Content: systemPrompt, }) // 2. 记忆注入 —— 相关记忆以系统消息形式注入,按重要性排序并分类标注 if len(params.Memories) > 0 { // 按 Importance 排序 sortedMems := make([]memory.MemoryEntry, len(params.Memories)) copy(sortedMems, params.Memories) sortMemoriesByImportance(sortedMems) // 分离核心记忆和最近记忆 var coreMems, recentMems, otherMems []memory.MemoryEntry for _, m := range sortedMems { if m.Importance >= 8 { coreMems = append(coreMems, m) } else if m.Importance >= 5 { recentMems = append(recentMems, m) } else { otherMems = append(otherMems, m) } } // 限制每类记忆数量 if len(coreMems) > 5 { coreMems = coreMems[:5] } if len(recentMems) > 8 { recentMems = recentMems[:8] } if len(otherMems) > 3 { otherMems = otherMems[:3] } var memoryPrompt string memoryPrompt += "【以下是关于开拓者的重要记忆,请在合适的时机自然地提及】\n\n" if len(coreMems) > 0 { memoryPrompt += "★ 核心记忆(非常重要,务必优先参考):\n" for _, m := range coreMems { memoryPrompt += formatMemoryLine(m) } memoryPrompt += "\n" } if len(recentMems) > 0 { memoryPrompt += "● 常用记忆:\n" for _, m := range recentMems { memoryPrompt += formatMemoryLine(m) } memoryPrompt += "\n" } if len(otherMems) > 0 { memoryPrompt += "○ 其他记忆:\n" for _, m := range otherMems { memoryPrompt += formatMemoryLine(m) } memoryPrompt += "\n" } messages = append(messages, model.LLMMessage{ Role: "system", Content: memoryPrompt, }) } // 3. 历史对话 history, err := b.loadHistory(ctx, params.SessionID, params.HistoryLimit) if err == nil { messages = append(messages, history...) } // 4. 当前用户消息 messages = append(messages, model.LLMMessage{ Role: "user", Content: params.UserMessage, }) return messages, nil } // loadHistory 从 ConversationStore 加载会话历史 func (b *Builder) loadHistory(_ context.Context, sessionID string, limit int) ([]model.LLMMessage, error) { if b.convStore == nil { logger.Printf("[context] 会话历史存储未初始化,跳过加载") return nil, nil } history := b.convStore.GetHistory(sessionID, limit) if len(history) == 0 { logger.Printf("[context] 会话 %s 无历史记录", sessionID) return nil, nil } logger.Printf("[context] 加载会话 %s 历史 %d 条", sessionID, len(history)) return history, nil } // CacheMessage 缓存消息到会话历史(供chat handler在回复后调用) func (b *Builder) CacheMessage(sessionID string, role model.Role, content string) { if b.convStore == nil { return } b.convStore.AddMessage(sessionID, model.LLMMessage{ Role: role, Content: content, }) } // GetHistory 获取会话历史(供 Orchestrator 使用) func (b *Builder) GetHistory(sessionID string, limit int) []model.LLMMessage { if b.convStore == nil { return nil } return b.convStore.GetHistory(sessionID, limit) } // InjectDeviceContext 将设备状态格式化为简洁的文本注入系统上下文 func InjectDeviceContext(devices []DeviceInfo) string { if len(devices) == 0 { return "" } var sb strings.Builder sb.WriteString("[当前IoT设备状态 — 你已知晓这些设备的状态,无需调用工具查询,直接引用即可]\n") for _, d := range devices { switch d.Type { case "light": if d.Status == "on" { sb.WriteString(fmt.Sprintf("- %s: 开启 (亮度%d%%, %s)\n", d.Name, d.Brightness, d.Color)) } else { sb.WriteString(fmt.Sprintf("- %s: 关闭\n", d.Name)) } case "ac": if d.Status == "on" { modeLabel := acModeLabel(d.Mode) sb.WriteString(fmt.Sprintf("- %s: 运行中 (%s%.0f°C)\n", d.Name, modeLabel, d.Temperature)) } else { sb.WriteString(fmt.Sprintf("- %s: 关闭\n", d.Name)) } case "curtain": statusLabel := "已关闭" if d.Status == "open" { statusLabel = "已打开" } sb.WriteString(fmt.Sprintf("- %s: %s\n", d.Name, statusLabel)) case "sensor": sb.WriteString(fmt.Sprintf("- %s: %.1f%s\n", d.Name, d.Value, d.Unit)) case "lock": statusLabel := "已锁定" if d.Status == "unlocked" { statusLabel = "已解锁" } sb.WriteString(fmt.Sprintf("- %s: %s (电量%d%%)\n", d.Name, statusLabel, d.Battery)) } } return sb.String() } // DeviceInfo 设备信息(避免循环依赖的简化结构体) type DeviceInfo struct { Name string Type string Status string Brightness int Color string Temperature float64 Mode string Value float64 Unit string Battery int } func acModeLabel(mode string) string { switch mode { case "cool": return "制冷" case "heat": return "制热" case "auto": return "自动" default: return mode } } // sortMemoriesByImportance 按 Importance 降序排列记忆 func sortMemoriesByImportance(mems []memory.MemoryEntry) { for i := 0; i < len(mems); i++ { for j := i + 1; j < len(mems); j++ { if mems[j].Importance > mems[i].Importance || (mems[j].Importance == mems[i].Importance && mems[j].Priority > mems[i].Priority) { mems[i], mems[j] = mems[j], mems[i] } } } } // formatMemoryLine 格式化单条记忆为展示行 func formatMemoryLine(m model.MemoryEntry) string { content := m.Content runes := []rune(content) if len(runes) > 80 { content = string(runes[:80]) + "…" } stars := "" for i := 0; i < m.Importance/2; i++ { stars += "★" } if m.Importance%2 != 0 { stars += "☆" } return fmt.Sprintf("- [%s%s] %s\n", m.Category.DisplayName(), stars, content) }