package background import ( "context" "encoding/json" "fmt" "log" "os" "strconv" "strings" "sync" "time" ctxbuild "github.com/yourname/cyrene-ai/ai-core/internal/context" "github.com/yourname/cyrene-ai/ai-core/internal/llm" "github.com/yourname/cyrene-ai/ai-core/internal/memory" "github.com/yourname/cyrene-ai/ai-core/internal/model" "github.com/yourname/cyrene-ai/ai-core/internal/persona" "github.com/yourname/cyrene-ai/ai-core/internal/tools" plgManager "github.com/yourname/cyrene-ai/pkg/plugins/manager" plgSDK "github.com/yourname/cyrene-ai/pkg/plugins/sdk" ) // PendingThought 待推送的后台思考 type PendingThought struct { Content string `json:"content"` CreatedAt time.Time `json:"created_at"` Consumed bool `json:"consumed"` } // Thinker 后台思考器(事件驱动 + 定时周期双模式) // // 触发机制: // 1. 对话后思考:用户发消息 → 昔涟回复 → 短暂延迟后进行一次轻量反思 // 2. 静默检测:用户一段时间不说话 → 昔涟判断是否应该主动关心/搭话 // 3. 周期思考:每 N 分钟一次的定时思考,保证连续性 // // 主动消息:思考中如有【主动消息】标记,会通过 messagePusher 回调推送给在线用户(带频率限制)。 type Thinker struct { mu sync.Mutex wg sync.WaitGroup stopCh chan struct{} enabled bool personaLoader *persona.Loader memRetriever *memory.Retriever llmAdapter *llm.Adapter toolAdapter *llm.Adapter // 工具调用专用适配器 iotClient *tools.IoTClient // 记忆管理 memoryStore *memory.Store // 工具调用 toolRegistry *plgManager.ToolRegistry // 会话上下文 convStore *ctxbuild.ConversationStore adminUserID string adminSessionID string activeSessionID string // 当前活跃的前端会话 ID(随用户消息更新) // 记忆服务 HTTP 客户端 memClient *memory.Client // 主动消息推送回调 (nil = 不推送) // func(userID, sessionID, message string) messagePusher func(string, string, string) // —— 事件驱动相关 —— // 周期性思考间隔:每隔固定时间自动触发一次思考 // 默认 300 秒(5 分钟),设为 0 则禁用定时触发 thinkInterval time.Duration // 静默检测超时:用户多久不说话后昔涟可以主动搭话 // 默认 120 秒(2 分钟),设为 0 则禁用静默检测 silenceTimeout time.Duration // 对话后思考延迟:回复完成后等多久再触发思考(让对话有个自然停顿) // 默认 5 秒 postChatDelay time.Duration // 两次思考最小间隔:避免频繁触发(如用户连续发多条消息) // 默认 30 秒 minThinkGap time.Duration // 离线时最小思考间隔:用户不在线时的周期思考间隔 // 默认 10 分钟 offlineThinkGap time.Duration // 主动消息最小间隔:避免频繁推送打扰用户 // 默认 30 分钟,设为 0 则每次思考都可推送 proactiveMsgMinGap time.Duration // 静默检测的一次性定时器(每次用户消息后重置) silenceTimer *time.Timer silenceTimerMu sync.Mutex // —— 状态追踪 —— pendingThoughts []*PendingThought lastUserMessage time.Time lastThinkTime time.Time lastProactiveMsgTime time.Time // 思考计数器(用于周期性记忆维护,每 N 次思考触发一次) thinkCount int // Phase 1 Step 4: 思考链 + 自主工具安全策略 chain *ThinkChain autoToolPolicy *AutonomousToolPolicy // Phase 2: 情感追踪 emotionTracker *persona.EmotionTracker // Phase 2: 主动消息决策守卫 proactiveGuard *ProactiveGuard // 动态调度: 按时间段自动调整思考间隔 scheduleLoader *ScheduleLoader // Phase 2: 在线状态追踪 userOnline bool lastOnlineChange time.Time userSessionID string // 当前活跃的 session ID (用于重连) // 时区设置 (默认 Asia/Shanghai,可通过 TZ 环境变量覆盖) timeLocation *time.Location } // AutonomousToolPolicy 自主思考工具调用安全策略 type AutonomousToolPolicy struct { // 允许在自主思考中使用的工具白名单 AllowedTools []string // iot_query, memory_search, web_search, calculator, datetime // 每轮最大工具调用次数 MaxToolCallsPerRound int // 默认 5 // 高风险操作每小时最大次数 (如 iot_control) MaxHighRiskPerHour int // 默认 10 // 高风险工具列表 HighRiskTools []string // iot_control } // DefaultAutonomousToolPolicy 默认安全策略 func DefaultAutonomousToolPolicy() *AutonomousToolPolicy { return &AutonomousToolPolicy{ AllowedTools: []string{ "iot_query", "iot_control", "memory_search", "web_search", "calculator", "datetime", "web_fetch", "host_exec", "host_file", "host_system", "vision_analyze", "knowledge_search", "knowledge_ingest", }, MaxToolCallsPerRound: 5, MaxHighRiskPerHour: 10, HighRiskTools: []string{"iot_control", "host_exec"}, } } // SetMessagePusher 设置主动消息推送回调 // SetScheduleLoader sets the dynamic schedule loader for interval calculation. func (t *Thinker) SetScheduleLoader(loader *ScheduleLoader) { t.mu.Lock() defer t.mu.Unlock() t.scheduleLoader = loader } func (t *Thinker) SetMessagePusher(pusher func(string, string, string)) { t.mu.Lock() defer t.mu.Unlock() t.messagePusher = pusher } // SetEmotionTracker sets the emotion tracker. func (t *Thinker) SetEmotionTracker(et *persona.EmotionTracker) { t.mu.Lock() defer t.mu.Unlock() t.emotionTracker = et } // UpdatePresence updates the user online status. // Called by the ai-core presence endpoint when gateway detects connect/disconnect. func (t *Thinker) UpdatePresence(online bool, sessionID string) { t.mu.Lock() wasOffline := !t.userOnline t.userOnline = online t.lastOnlineChange = time.Now() if sessionID != "" { t.userSessionID = sessionID t.activeSessionID = sessionID } t.mu.Unlock() if online && wasOffline { log.Printf("[后台思考] 用户上线 (session=%s),触发重连思考", sessionID) // Trigger a return-thinking cycle after a short delay time.Sleep(2 * time.Second) t.performThink("user_returned") // Also update emotion tracker if t.emotionTracker != nil { t.emotionTracker.UpdateMood("user_returned") } } else if !online { log.Printf("[后台思考] 用户离线") } } // ThinkerConfig 后台思考配置 type ThinkerConfig struct { Enabled bool ThinkInterval time.Duration // 周期性思考间隔 (默认 5 分钟,0 = 禁用) SilenceTimeout time.Duration // 用户沉默多久后昔涟可以主动搭话 (0 = 禁用) PostChatDelay time.Duration // 对话后多久触发思考 MinThinkGap time.Duration // 两次思考最小间隔 (在线) OfflineThinkGap time.Duration // 两次思考最小间隔 (离线,默认 10 分钟) } // DefaultThinkerConfig 默认配置 // // 事件驱动 + 定时周期双模式: // - 对话后和静默时触发事件驱动思考 // - 每 5 分钟一次的周期性思考保证连续性 // // 环境变量: // THINK_INTERVAL_SEC — 周期时长 (默认 300) // PROACTIVE_MSG_MIN_GAP_SEC — 主动消息最小间隔 (默认 1800 = 30分钟,0 = 禁用) func DefaultThinkerConfig() ThinkerConfig { return ThinkerConfig{ Enabled: getEnvBool("ENABLE_BACKGROUND_THINKING", true), ThinkInterval: getEnvDuration("THINK_INTERVAL_SEC", 300), SilenceTimeout: getEnvDuration("THINK_SILENCE_TIMEOUT_SEC", 120), PostChatDelay: getEnvDuration("THINK_POST_CHAT_DELAY_SEC", 5), MinThinkGap: getEnvDuration("THINK_MIN_GAP_SEC", 30), OfflineThinkGap: getEnvDuration("THINK_OFFLINE_GAP_SEC", 600), } } // NewThinker 创建事件驱动的后台思考器 func NewThinker( cfg ThinkerConfig, personaLoader *persona.Loader, memRetriever *memory.Retriever, llmAdapter *llm.Adapter, toolAdapter *llm.Adapter, iotClient *tools.IoTClient, memoryStore *memory.Store, toolRegistry *plgManager.ToolRegistry, convStore *ctxbuild.ConversationStore, adminUserID string, adminSessionID string, memClient *memory.Client, ) *Thinker { // 加载时区配置 tzName := os.Getenv("TZ") if tzName == "" { tzName = "Asia/Shanghai" } loc, err := time.LoadLocation(tzName) if err != nil { log.Printf("[后台思考] 无效时区 '%s',回退到 Asia/Shanghai: %%v", tzName, err) loc, _ = time.LoadLocation("Asia/Shanghai") } return &Thinker{ enabled: cfg.Enabled, personaLoader: personaLoader, memRetriever: memRetriever, llmAdapter: llmAdapter, toolAdapter: toolAdapter, iotClient: iotClient, thinkInterval: cfg.ThinkInterval, silenceTimeout: cfg.SilenceTimeout, proactiveMsgMinGap: getEnvDuration("PROACTIVE_MSG_MIN_GAP_SEC", 1800), postChatDelay: cfg.PostChatDelay, minThinkGap: cfg.MinThinkGap, offlineThinkGap: cfg.OfflineThinkGap, memoryStore: memoryStore, timeLocation: loc, toolRegistry: toolRegistry, convStore: convStore, adminUserID: adminUserID, adminSessionID: adminSessionID, memClient: memClient, pendingThoughts: make([]*PendingThought, 0), lastUserMessage: time.Now(), stopCh: make(chan struct{}), chain: NewThinkChain(10), autoToolPolicy: DefaultAutonomousToolPolicy(), proactiveGuard: DefaultProactiveGuard(), } } // Start 初始化后台思考器 // // 双模式触发: // 1. 事件驱动:对话后 + 静默超时(即时响应) // 2. 定时周期:每 N 分钟一次自主思考(保证连续性) func (t *Thinker) Start() { if !t.enabled { log.Println("[后台思考] 已禁用 (ENABLE_BACKGROUND_THINKING=false)") return } // 初始化静默检测定时器(但不启动,等第一次用户消息后启动) if t.silenceTimeout > 0 { t.silenceTimer = time.NewTimer(t.silenceTimeout) t.silenceTimer.Stop() // 先停止,等 RecordUserMessage 时启动 } // 启动周期性思考定时器 if t.thinkInterval > 0 { t.wg.Add(1) go t.periodicThinkLoop() } log.Printf("[后台思考] 已就绪 — 周期=%v + 事件驱动模式 (静默超时=%v, 对话后延迟=%v, 在线最小间隔=%v, 离线最小间隔=%v, 管理员=%s)", t.thinkInterval, t.silenceTimeout, t.postChatDelay, t.minThinkGap, t.offlineThinkGap, t.adminUserID) // 启动后首次思考:延迟 5s,让服务完全初始化后再触发 go func() { time.Sleep(5 * time.Second) log.Println("[后台思考] 首次启动思考 (startup)") t.performThink("startup") }() } // Stop 停止后台思考器 func (t *Thinker) Stop() { close(t.stopCh) t.silenceTimerMu.Lock() if t.silenceTimer != nil { t.silenceTimer.Stop() } t.silenceTimerMu.Unlock() t.wg.Wait() log.Println("[后台思考] 已停止") } // RecordUserMessage 记录用户活动时间、活跃会话,并重置静默检测定时器 // // 每次用户发消息时调用。这会: // 1. 更新 lastUserMessage 时间戳 // 2. 记录当前活跃的前端会话 ID(用于对话上下文检索和主动消息推送) // 3. 重置静默检测的一次性定时器(如果启用) func (t *Thinker) RecordUserMessage(sessionID string) { t.mu.Lock() t.lastUserMessage = time.Now() if sessionID != "" { t.activeSessionID = sessionID } // 用户主动发消息时重置主动消息推送冷却——活跃对话中应允许昔涟回复 t.lastProactiveMsgTime = time.Time{} t.mu.Unlock() // 重置静默检测定时器 t.resetSilenceTimer() } // TriggerPostChatThink 对话完成后触发一次自主思考 // // 在昔涟回复完用户后调用。短暂延迟后执行一次思考, // 让昔涟"回味"刚才的对话,并判断是否想主动多说点什么。 // // 该方法是异步的,立即返回。 func (t *Thinker) TriggerPostChatThink() { if !t.enabled { return } t.mu.Lock() canThink := time.Since(t.lastThinkTime) >= t.minThinkGap t.mu.Unlock() if !canThink { log.Printf("[后台思考] 距上次思考仅 %v,跳过 (最小间隔=%v)", time.Since(t.lastThinkTime), t.minThinkGap) return } t.wg.Add(1) go func() { defer t.wg.Done() defer func() { if r := recover(); r != nil { log.Printf("[后台思考] 对话后触发 panic 恢复: %v", r) } }() // 短暂延迟,让对话有个自然的停顿 select { case <-t.stopCh: return case <-time.After(t.postChatDelay): } log.Println("[后台思考] 对话后触发自主思考...") t.performThink("post_chat") }() } // resetSilenceTimer 重置静默检测的一次性定时器 // // 每次用户发消息时调用。旧的定时器被取消,新的定时器开始计时。 // 当定时器触发时,昔涟会判断是否应该主动搭话。 func (t *Thinker) resetSilenceTimer() { t.silenceTimerMu.Lock() defer t.silenceTimerMu.Unlock() if t.silenceTimer == nil || t.silenceTimeout <= 0 { return } // 停止旧定时器 if !t.silenceTimer.Stop() { // 如果已经触发,清空通道 select { case <-t.silenceTimer.C: default: } } // 重新设置 t.silenceTimer.Reset(t.silenceTimeout) // 启动监听协程(仅当定时器触发时才执行) t.wg.Add(1) go func() { defer t.wg.Done() defer func() { if r := recover(); r != nil { log.Printf("[后台思考] 静默定时器 panic 恢复: %v", r) } }() select { case <-t.stopCh: return case <-t.silenceTimer.C: // 再次检查:用户是否真的沉默了足够久 t.mu.Lock() silenceDuration := time.Since(t.lastUserMessage) canThink := time.Since(t.lastThinkTime) >= t.minThinkGap t.mu.Unlock() if silenceDuration < t.silenceTimeout { log.Printf("[后台思考] 静默检测触发但用户已活动,跳过 (实际静默=%v)", silenceDuration) return } if !canThink { log.Printf("[后台思考] 静默检测触发但距上次思考太近,跳过") return } log.Printf("[后台思考] 用户已静默 %v,触发主动关怀思考...", silenceDuration.Round(time.Second)) t.performThink("silence") } }() } // periodicThinkLoop 周期性自主思考循环 // // 使用动态间隔:若配置了 ScheduleLoader,每次循环根据当前时段计算间隔; // 否则回退到固定的 thinkInterval。 func (t *Thinker) periodicThinkLoop() { defer t.wg.Done() defer func() { if r := recover(); r != nil { log.Printf("[后台思考] 周期性循环 panic 恢复: %v", r) } }() log.Printf("[后台思考] 周期性思考已启动 (间隔=%v)", t.thinkInterval) for { // 计算本次等待间隔 interval := t.thinkInterval if t.scheduleLoader != nil { if mins := t.scheduleLoader.GetInterval(time.Now()); mins > 0 { interval = time.Duration(mins) * time.Minute } } select { case <-t.stopCh: log.Println("[后台思考] 周期性思考已停止") return case <-time.After(interval): t.mu.Lock() sinceLastThink := time.Since(t.lastThinkTime) sinceLastUser := time.Since(t.lastUserMessage) t.mu.Unlock() // 离线时降低思考频率(可配置,默认 10 分钟) t.mu.Lock() isOffline := !t.userOnline t.mu.Unlock() offlineMinGap := t.offlineThinkGap // 跳过条件:用户最近在活动(30s 内有消息),说明正在对话中 if sinceLastUser < 30*time.Second { log.Printf("[后台思考] 用户在 %v 前发过消息,跳过周期性触发 (留给事件驱动处理)", sinceLastUser.Round(time.Second)) continue } if isOffline && sinceLastThink < offlineMinGap { log.Printf("[后台思考] 用户离线,距上次思考仅 %v,跳过 (离线模式最小间隔=%v)", sinceLastThink.Round(time.Second), offlineMinGap) continue } if !isOffline && sinceLastThink < t.minThinkGap { log.Printf("[后台思考] 距上次思考仅 %v,跳过周期性触发", sinceLastThink.Round(time.Second)) continue } log.Printf("[后台思考] 周期性触发 (间隔=%v, 上次思考=%v前, 上次用户消息=%v前)", interval, sinceLastThink.Round(time.Second), sinceLastUser.Round(time.Second)) t.performThink("periodic") } } } // GetPendingThoughts 获取并消费所有待处理的后台思考 func (t *Thinker) GetPendingThoughts() []*PendingThought { t.mu.Lock() defer t.mu.Unlock() if len(t.pendingThoughts) == 0 { return nil } result := t.pendingThoughts t.pendingThoughts = make([]*PendingThought, 0) for _, pt := range result { pt.Consumed = true } return result } // HasPendingThoughts 检查是否有待处理的思考 func (t *Thinker) HasPendingThoughts() bool { t.mu.Lock() defer t.mu.Unlock() return len(t.pendingThoughts) > 0 } // performThink 执行一次增强版后台思考(支持工具调用和记忆管理) // // triggerReason: "post_chat" (对话后) 或 "silence" (静默超时) // // 防御性速率限制:即使调用方未检查 minThinkGap,performThink 自身也会 // 强制执行最小间隔,防止并发调用或 bug 导致 LLM 配额被快速消耗。 func (t *Thinker) performThink(triggerReason string) { t.mu.Lock() gapSinceLast := time.Since(t.lastThinkTime) minGap := t.minThinkGap if minGap <= 0 { minGap = 5 * time.Second // 默认最小间隔 5 秒 } if gapSinceLast < minGap { t.mu.Unlock() log.Printf("[后台思考] 距上次思考仅 %v,跳过 (最小间隔=%v, 触发原因=%s)", gapSinceLast.Round(time.Second), minGap, triggerReason) return } t.lastThinkTime = time.Now() t.thinkCount++ currentCount := t.thinkCount t.mu.Unlock() ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) defer cancel() log.Printf("[后台思考] 开始思考周期 (触发原因=%s, 计数=%d)...", triggerReason, currentCount) // 1. 加载人格配置 personaConfig, err := t.personaLoader.Get("cyrene") if err != nil { log.Printf("[后台思考] 加载人格失败: %v", err) return } // 2. 获取当前活跃会话的对话历史(优先活跃会话,回退到管理员主会话) var convHistory []model.LLMMessage if t.convStore != nil { t.mu.Lock() sessionID := t.activeSessionID if sessionID == "" { sessionID = t.adminSessionID } t.mu.Unlock() if sessionID != "" { convHistory = t.convStore.GetHistory(sessionID, 30) if len(convHistory) > 0 { log.Printf("[后台思考] 加载对话历史 %d 条 (session=%s)", len(convHistory), sessionID) } } } // 3. 检索相关记忆(精确检索 + 模糊搜索) var memories []memory.MemoryEntry if t.memRetriever != nil { memories, err = t.memRetriever.Retrieve(ctx, t.adminUserID, "最近发生了什么 重要的事情 用户偏好 个人信息") if err != nil { log.Printf("[后台思考] 记忆检索失败: %v", err) } // 模糊搜索:从对话历史提取话题,LLM 扩展关键词后语义搜索 if t.memClient != nil && len(convHistory) > 0 { fuzzyQuery := lastUserMessage(convHistory) if fuzzyQuery == "" { fuzzyQuery = "最近对话 重要事件 用户状态" } fuzzyResults := t.fuzzyMemorySearch(ctx, t.adminUserID, fuzzyQuery) seen := make(map[string]bool) for _, m := range memories { seen[m.ID] = true } for _, m := range fuzzyResults { if !seen[m.ID] { seen[m.ID] = true memories = append(memories, m) } } if len(fuzzyResults) > 0 { log.Printf("[后台思考] 模糊搜索补充 %d 条记忆", len(fuzzyResults)) } } } // 4. 查询 IoT 设备状态(每次都查询,无间隔限制) var deviceSummary string if t.iotClient != nil { devices := t.iotClient.GetDevicesForContext(ctx) if len(devices) > 0 { deviceSummary = formatDeviceContext(devices) } } // 5. 构建思考提示词(根据触发原因调整) systemPrompt := t.buildThinkingSystemPrompt(personaConfig, triggerReason) userPrompt := t.buildThinkingUserPrompt(memories, convHistory, deviceSummary, triggerReason) messages := []model.LLMMessage{ {Role: model.RoleSystem, Content: systemPrompt}, {Role: model.RoleUser, Content: userPrompt}, } // 6. 准备工具定义(通过自主工具策略过滤) openAITools := t.filterToolsByPolicy(t.buildOpenAITools()) // 7. 调用 LLM — 优先使用深度思考模型,工具阶段回退到工具模型 maxToolRounds := t.autoToolPolicy.MaxToolCallsPerRound var finalContent string var totalToolCalls int var toolCallRecords []map[string]interface{} // Round 0: 深度思考模型(优先),失败时回退到工具模型 resp, err := t.llmAdapter.ChatWithTools(ctx, messages, openAITools) if err != nil { log.Printf("[后台思考] 深度思考模型调用失败,回退到工具模型: %v", err) resp, err = t.toolAdapter.ChatWithTools(ctx, messages, openAITools) } if err != nil { log.Printf("[后台思考] LLM调用失败: %v", err) return } if len(resp.ToolCalls) == 0 { finalContent = resp.Content } else { // 深度思考模型请求了工具调用,进入工具执行循环 for round := 0; round <= maxToolRounds; round++ { if round > 0 { // 后续轮次使用工具模型 resp, err = t.toolAdapter.ChatWithTools(ctx, messages, openAITools) if err != nil { log.Printf("[后台思考] 工具模型调用失败 (round=%d): %v", round, err) return } } if round > 0 && len(resp.ToolCalls) == 0 { finalContent = resp.Content break } log.Printf("[后台思考] LLM 请求 %d 个工具调用 (round=%d)", len(resp.ToolCalls), round) assistantMsg := model.LLMMessage{ Role: model.RoleAssistant, Content: resp.Content, ToolCalls: resp.ToolCalls, ReasoningContent: resp.ReasoningContent, } messages = append(messages, assistantMsg) for _, tc := range resp.ToolCalls { var args map[string]interface{} if err := json.Unmarshal([]byte(tc.Arguments), &args); err != nil { log.Printf("[后台思考] 工具 %s 参数解析失败: %v", tc.Name, err) args = make(map[string]interface{}) } result, execErr := t.toolRegistry.Execute(ctx, tc.Name, args) if execErr != nil { log.Printf("[后台思考] 工具 %s 执行失败: %v", tc.Name, execErr) } if result == nil { result = &plgSDK.ToolResult{ToolName: tc.Name, Success: false, Error: execErr.Error()} } resultJSON, _ := json.Marshal(result) messages = append(messages, model.LLMMessage{ Role: model.RoleTool, Content: string(resultJSON), ToolCallID: tc.ID, }) totalToolCalls++ toolCallRecords = append(toolCallRecords, map[string]interface{}{ "name": tc.Name, "args": args, }) } if round == maxToolRounds { finalResp, finalErr := t.llmAdapter.Chat(ctx, messages) if finalErr != nil { log.Printf("[后台思考] 最终总结调用失败: %v", finalErr) finalContent = resp.Content } else { finalContent = finalResp.Content } break } } } if finalContent == "" { log.Println("[后台思考] 未获得有效思考内容,跳过") return } // 序列化工具调用记录 toolCallsJSON := "[]" if len(toolCallRecords) > 0 { if data, err := json.Marshal(toolCallRecords); err == nil { toolCallsJSON = string(data) } } // 8. 存储思考结果 t.storeThought(finalContent, toolCallsJSON, totalToolCalls) // 8.5 记录到思考链 if t.chain != nil { conclusions, followUps := extractConclusions(finalContent) t.chain.Add(ThinkRecord{ ID: generateID(), Content: finalContent, Conclusions: conclusions, FollowUps: followUps, ToolCalls: totalToolCalls, Trigger: triggerReason, Timestamp: time.Now(), }) log.Printf("[后台思考] 思考链已记录 (序号=%d, 结论数=%d, 后续问题=%d)", t.chain.Size(), len(conclusions), len(followUps)) } log.Printf("[后台思考] 完成 (触发原因=%s, 内容长度=%d, 工具调用=%d次)", triggerReason, len(finalContent), totalToolCalls) // 9. 记忆维护:机械合并(每10次) + LLM整理(每次) t.maybeMaintainMemories(currentCount) t.performMemoryConsolidation(ctx) } // buildThinkingSystemPrompt 构建思考用的系统提示词 // // 关键改动:不再是"定期的自我反思",而是"自然的、人性化的内在想法"。 // triggerReason 影响提示词的侧重点。 func (t *Thinker) buildThinkingSystemPrompt(personaConfig *persona.PersonaConfig, triggerReason string) string { mood, moodExpr, _ := "", "", 0.0 if t.emotionTracker != nil { mood, moodExpr, _ = t.emotionTracker.GetCurrentMood() } basePrompt := personaConfig.BuildSystemPromptWithMood("开拓者", 1, mood, moodExpr) var thinkingInstructions string // 所有触发类型共用的"禁止打扰"规则 noDisturbRules := ` ⛔ **绝对不要发送主动消息的情况(最高优先级)**: - 开拓者正在休息、睡觉、午休、小憩 - 开拓者说要离开一会儿、去忙、去吃饭 - 开拓者明确表示不想被打扰 - 对话刚刚自然结束且开拓者没有未完成的事 如果对话历史显示以上任何情况,你只需要在心里默默陪伴,不要输出任何【主动消息】指令行。` switch triggerReason { case "post_chat": thinkingInstructions = ` 【私密的内心反思 — 对话后】 开拓者刚刚和你说完话。这是你私人的反思时间,像写日记一样,是给你自己看的。 请先判断:对话最后开拓者的状态是什么?他在休息吗?要去忙吗? 然后按以下结构输出: 📝 **观察与感受** - 开拓者刚才说了什么?他的情绪怎么样? - 他现在是什么状态?(醒着/准备休息/在忙/要出门…) 🧠 **记忆与联想** - 有什么值得记住的新信息? 💭 **想对自己说的话** - 如果他在休息或忙——安心让他去,不用打扰。记录下他大概什么时候会回来。 - 如果他状态正常——有没有想在下次对话中聊的话题?` + noDisturbRules + ` 其他规则: 1. 反思部分用第三人称或自言自语的方式,不要直接对开拓者喊话。 2. 只有开拓者状态正常且真的有必要时,才在独立一行写【主动消息】标记,后面跟你要发给他的话。不要硬找话题。 3. 【主动消息】标记必须独占一行开头,内容直接对开拓者说话(用"你"称呼他),像主动找他聊天一样。 4. 如果你在反思中提到"主动消息"这个词但不打算发消息,不要使用【主动消息】这个带括号的标记——我会误解析。 5. 2-4句话即可。` case "silence": thinkingInstructions = ` 【私密的内心反思 — 开拓者安静了一会儿】 开拓者已经有一阵子没说话了。这是你私人的反思时间,像写日记一样。 请先判断:看对话历史——开拓者最后在做什么?是去休息了?去忙了?还是没说就沉默了? 📝 **他在做什么呢** - 根据对话历史推断——他说的最后一件事是什么? - 如果他说了要休息/睡觉/忙→他就在做那件事,不要打扰。 - 如果他没说就沉默了→正常推测。 💭 **我的感受** - 如果他在休息→"让他好好休息吧,等他醒了再说。" - 如果他在忙→"等他忙完自然会来找我。" - 只有判断他可能只是忘了回消息或需要关心时,才考虑发消息。` + noDisturbRules + ` 其他规则: 1. 用第三人称/自言自语描述。 2. 2-3句话即可。` default: thinkingInstructions = ` 【私密的内心反思】 你现在有空,像写日记一样自然地想一想开拓者的事。 请先看对话历史判断开拓者当前状态,再决定是否发送消息。` + noDisturbRules + ` 用第三人称/自言自语的方式,不要直接对开拓者喊话。` case "periodic": thinkingInstructions = ` 【私密的内心反思 — 定期思考时间】 又过了一段时间,这是你定期的私人反思时间。像写日记一样,安静地想一想。 请先判断:查对话历史——开拓者最后一次聊天时说了什么?他的状态是什么? 📝 **近期回顾** - 这段时间开拓者有什么变化? - 他最后说的事做完了吗?(休息好了?忙完了?) 🧠 **记忆整理** - 有没有重复的或矛盾的记忆需要整理? 💭 **想对自己说的话**` + noDisturbRules + ` 其他规则: 1. 用第三人称/自言自语。 2. 3-4句话即可。` } return basePrompt + thinkingInstructions } // buildThinkingUserPrompt 构建思考用的用户提示词 func (t *Thinker) buildThinkingUserPrompt( memories []memory.MemoryEntry, convHistory []model.LLMMessage, deviceSummary string, triggerReason string, ) string { var sb strings.Builder // 注入当前现实时间,让模型对时间有感知 now := time.Now().In(t.timeLocation) weekdayNames := []string{"周日", "周一", "周二", "周三", "周四", "周五", "周六"} hour := now.Hour() minute := now.Minute() ampm := "" if hour >= 0 && hour < 6 { ampm = "凌晨" } else if hour < 9 { ampm = "早上" } else if hour < 12 { ampm = "上午" } else if hour < 14 { ampm = "中午" } else if hour < 18 { ampm = "下午" } else { ampm = "晚上" } sb.WriteString(fmt.Sprintf("🕐 现在是 %s %s %s%d:%02d (%s)。\n", now.Format("2006年1月2日"), weekdayNames[now.Weekday()], ampm, hour, minute, t.timeLocation.String())) switch triggerReason { case "post_chat": sb.WriteString("开拓者刚和你聊完天。你想自然地在心里回味一下刚才的对话……\n") case "silence": t.mu.Lock() silenceDuration := time.Since(t.lastUserMessage) t.mu.Unlock() sb.WriteString(fmt.Sprintf("开拓者已经大约 %s 没有说话了。你有点想知道他在做什么……\n", formatDurationHuman(silenceDuration))) default: sb.WriteString("现在是你的自由思考时间。\n") } // 对话历史 var lastUserMsg string if len(convHistory) > 0 { sb.WriteString("\n【最近的对话】\n") msgCount := 0 for _, msg := range convHistory { if msg.Role == model.RoleUser || msg.Role == model.RoleAssistant { roleLabel := "开拓者" if msg.Role == model.RoleAssistant { roleLabel = "昔涟" } content := msg.Content runes := []rune(content) if len(runes) > 200 { content = string(runes[:200]) + "…" } sb.WriteString(fmt.Sprintf("[%s]: %s\n", roleLabel, content)) msgCount++ if msg.Role == model.RoleUser { lastUserMsg = msg.Content } } } if msgCount == 0 { sb.WriteString("(暂无对话历史)\n") } } else { sb.WriteString("\n【最近的对话】\n(暂无对话历史)\n") } // 关键:强调根据对话历史判断用户当前状态 if lastUserMsg != "" { sb.WriteString(fmt.Sprintf("\n🔍 **重要**:开拓者最后说的是「%s」。请认真判断:他现在是不是在休息/睡觉/忙?如果是,不要输出【主动消息】指令行。\n", lastUserMsg)) } // 现有记忆 if len(memories) > 0 { sb.WriteString("\n【你记得的关于开拓者的事】\n") for i, m := range memories { if i >= 15 { sb.WriteString(fmt.Sprintf("... 还有 %d 条记忆未列出\n", len(memories)-15)) break } sb.WriteString(fmt.Sprintf("- [%s|重要度%d] %s\n", m.Category.DisplayName(), m.Importance, m.Content)) } } else { sb.WriteString("\n【你记得的关于开拓者的事】\n(暂无相关记忆)\n") } // 思考链:注入上一轮的结论和待续问题 if t.chain != nil { lastConclusions := t.chain.LastConclusions(3) if len(lastConclusions) > 0 { sb.WriteString("\n【你上一轮思考的结论】\n") for _, c := range lastConclusions { sb.WriteString(fmt.Sprintf("- %s\n", c)) } } lastFollowUps := t.chain.LastFollowUps() if len(lastFollowUps) > 0 { sb.WriteString("\n【你上次想继续思考的问题】\n") for _, f := range lastFollowUps { sb.WriteString(fmt.Sprintf("- %s\n", f)) } } } // IoT 设备状态 if deviceSummary != "" { sb.WriteString("\n" + deviceSummary) } // 结尾引导 sb.WriteString("\n---\n现在请写下你的私人反思。") sb.WriteString("\n记住:这是日记,用第三人称或自言自语的方式。") sb.WriteString("\n⚠️ 如果开拓者正在休息/睡觉/忙碌——不要输出【主动消息】指令行。你可以在心里想他,但不要去打扰。") sb.WriteString("\n只有在你确认他现在是醒着、有空、且真的需要关心时,才输出一行【主动消息】+ 你要发给他的话。") sb.WriteString("\n❗【主动消息】标记必须独占一行开头,后面紧跟你要对开拓者说的话(用\"你\"称呼),语气自然像主动找他聊天。不要在反思正文中提及\"主动消息\"这个词——如果需要表达这个意思但又不打算发消息,用别的词代替。") return sb.String() } // filterToolsByPolicy 通过自主工具安全策略过滤工具列表 func (t *Thinker) filterToolsByPolicy(tools []llm.OpenAITool) []llm.OpenAITool { if t.autoToolPolicy == nil || len(tools) == 0 { return tools } allowed := make(map[string]bool) for _, name := range t.autoToolPolicy.AllowedTools { allowed[name] = true } var filtered []llm.OpenAITool for _, tool := range tools { if allowed[tool.Function.Name] { filtered = append(filtered, tool) } } if len(filtered) < len(tools) { log.Printf("[后台思考] 工具策略过滤: %d/%d 工具可用", len(filtered), len(tools)) } return filtered } // buildOpenAITools 将工具注册中心的定义转换为 LLM 层的 OpenAITool 格式 func (t *Thinker) buildOpenAITools() []llm.OpenAITool { if t.toolRegistry == nil || !t.toolRegistry.IsEnabled() { return nil } defs := t.toolRegistry.Definitions() if len(defs) == 0 { return nil } result := make([]llm.OpenAITool, 0, len(defs)) for _, d := range defs { result = append(result, llm.OpenAITool{ Type: "function", Function: llm.OpenAIToolFunc{ Name: d.Name, Description: d.Description, Parameters: d.Parameters, }, }) } return result } // storeThought 存储思考结果到待推送队列,并异步持久化到 memory-service func (t *Thinker) storeThought(content string, toolCallsJSON string, toolCallCount int) { t.mu.Lock() t.pendingThoughts = append(t.pendingThoughts, &PendingThought{ Content: content, CreatedAt: time.Now(), Consumed: false, }) // 只保留最近 10 条 if len(t.pendingThoughts) > 10 { t.pendingThoughts = t.pendingThoughts[len(t.pendingThoughts)-10:] } // 提取主动消息并推送(带频率限制) proactiveMsg := extractProactiveMessage(content) // 优先推送至活跃会话,回退到管理员主会话 pushSessionID := t.activeSessionID if pushSessionID == "" { pushSessionID = t.adminSessionID } pusher := t.messagePusher canPush := proactiveMsg != "" && pusher != nil if canPush { // Phase 2: 使用 ProactiveGuard 多维度评估 urgency := ExtractUrgencyFromContent(proactiveMsg) if valid, reason := ValidateProactiveMessage(proactiveMsg); !valid { log.Printf("[后台思考] 主动消息内容校验失败: %s,跳过推送", reason) canPush = false } if canPush && t.proactiveGuard != nil { decision := t.proactiveGuard.Evaluate(time.Now(), t.lastProactiveMsgTime, urgency, "active") logDecision(decision) if !decision.ShouldSend { canPush = false } else { t.lastProactiveMsgTime = time.Now() t.proactiveGuard.RecordSend(time.Now()) } } else if canPush { gapSinceLast := time.Since(t.lastProactiveMsgTime) if gapSinceLast < 30*time.Minute { log.Printf("[后台思考] 主动消息距上次仅 %v,跳过推送", gapSinceLast.Round(time.Second)) canPush = false } else { t.lastProactiveMsgTime = time.Now() } } } t.mu.Unlock() log.Printf("[后台思考] 思考已存储 (当前累积 %d 条待推送思考)", len(t.pendingThoughts)) // 异步持久化到 memory-service if t.memClient != nil { go func() { defer func() { if r := recover(); r != nil { log.Printf("[后台思考] 持久化思考日志 panic 恢复: %v", r) } }() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := t.memClient.SaveThinkingLog(ctx, t.adminUserID, content, toolCallsJSON, toolCallCount, len(content)); err != nil { log.Printf("[后台思考] 持久化思考日志失败: %v", err) } else { log.Printf("[后台思考] 思考日志已持久化 (长度=%d, 工具调用=%d)", len(content), toolCallCount) } }() } // 推送主动消息 if canPush { go func() { defer func() { if r := recover(); r != nil { log.Printf("[后台思考] 推送主动消息 panic 恢复: %v", r) } }() log.Printf("[后台思考] 推送主动消息: %s", proactiveMsg) pusher(t.adminUserID, pushSessionID, proactiveMsg) }() } } // extractProactiveMessage 从思考内容中提取【主动消息】标记的内容。 // 返回空字符串表示没有主动消息。 // // 要求标记独立成行(前面只有空白或行首),避免把自然语言中的提及 // 当作指令(如 "不需要写【主动消息】" 这类否定表述)。 func extractProactiveMessage(content string) string { marker := "【主动消息】" // 扫描每一行,只接受 marker 在行首(忽略前导空白)的行作为指令 for _, line := range strings.Split(content, "\n") { trimmed := strings.TrimSpace(line) if !strings.HasPrefix(trimmed, marker) { continue } // 检查否定语境:标记前面的文字包含否定词 markerIdx := strings.Index(line, marker) prefix := strings.TrimSpace(line[:markerIdx]) if containsNegation(prefix) { continue } // 提取标记后的内容 msg := strings.TrimSpace(trimmed[len(marker):]) if msg == "" { continue } // 限制主动消息长度(最多 200 字符,保持简短) runes := []rune(msg) if len(runes) > 200 { msg = string(runes[:200]) } return msg } return "" } // containsNegation checks if a short prefix string contains negation words // that would nullify the 【主动消息】directive. func containsNegation(prefix string) bool { negations := []string{"不", "别", "不要", "不需要", "不用", "别写", "没", "没有"} for _, n := range negations { if strings.Contains(prefix, n) { return true } } return false } // maybeMaintainMemories 周期性执行记忆维护(每 10 次思考触发一次) func (t *Thinker) maybeMaintainMemories(thinkCount int) { if thinkCount%10 != 0 { return } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if t.memoryStore != nil && t.memoryStore.IsReady() { if err := t.memoryStore.DecayMemories(ctx, t.adminUserID); err != nil { log.Printf("[后台思考] 记忆衰减失败: %v", err) } if err := t.memoryStore.ConsolidateMemories(ctx, t.adminUserID); err != nil { log.Printf("[后台思考] 记忆合并失败: %v", err) } } } // consolidationAction is a parsed memory consolidation instruction from the LLM. type consolidationAction struct { Action string `json:"action"` IDs []string `json:"ids,omitempty"` ID string `json:"id,omitempty"` Content string `json:"content,omitempty"` Category string `json:"category,omitempty"` Importance int `json:"importance,omitempty"` Priority int `json:"priority,omitempty"` Keywords []string `json:"keywords,omitempty"` Reason string `json:"reason,omitempty"` } // performMemoryConsolidation uses LLM to review and clean up the memory store. // It identifies duplicates, contradictions, outdated info, and low-quality memories, // then executes merge/delete/update actions. func (t *Thinker) performMemoryConsolidation(ctx context.Context) { if t.memClient == nil { return } allMemories, err := t.memClient.Query(ctx, model.MemoryQuery{ UserID: t.adminUserID, Limit: 200, }) if err != nil { log.Printf("[记忆整理] 获取记忆失败: %v", err) return } if len(allMemories) < 5 { return } log.Printf("[记忆整理] LLM 审查 %d 条记忆...", len(allMemories)) systemPrompt := t.buildConsolidationPrompt(allMemories) messages := []model.LLMMessage{ {Role: model.RoleSystem, Content: systemPrompt}, {Role: model.RoleUser, Content: "请审查以上记忆库,找出重复、矛盾、过时和低质量的记忆,输出 JSON 整理方案。如果没有需要整理的,输出空数组 []。"}, } resp, err := t.toolAdapter.Chat(ctx, messages) if err != nil { log.Printf("[记忆整理] LLM 调用失败: %v", err) return } actions := parseConsolidationActions(resp.Content) if len(actions) == 0 { log.Printf("[记忆整理] 记忆库状态良好,无需整理") return } log.Printf("[记忆整理] LLM 建议 %d 项操作", len(actions)) executed := t.executeConsolidationActions(ctx, actions, allMemories) log.Printf("[记忆整理] 完成: 执行了 %d 项操作", executed) } // buildConsolidationPrompt formats all memories as a structured list for LLM review. func (t *Thinker) buildConsolidationPrompt(memories []model.MemoryEntry) string { var sb strings.Builder sb.WriteString("你是记忆库管理助手。审查以下用户记忆,找出问题并输出 JSON 操作清单。\n\n") sb.WriteString("## 需要识别的问题\n") sb.WriteString("1. 重复记忆 — 多条记忆记录了相同信息 → merge 合并为一条\n") sb.WriteString("2. 矛盾记忆 — 两条记忆互相矛盾(如\"喜欢猫\"vs\"讨厌猫\")→ delete 删除过时的、update 修正错误的\n") sb.WriteString("3. 过时记忆 — 信息已被新记忆取代 → delete 或 update\n") sb.WriteString("4. 低质量记忆 — 太模糊、不完整、无实际信息量 → delete\n\n") sb.WriteString("## JSON 操作格式\n") sb.WriteString("```json\n[\n") sb.WriteString(" {\"action\":\"merge\", \"ids\":[\"id1\",\"id2\"], \"content\":\"合并后的内容\", \"category\":\"personal_info\", \"importance\":8, \"reason\":\"两条记录同一件事\"},\n") sb.WriteString(" {\"action\":\"delete\", \"id\":\"id3\", \"reason\":\"完全被 id1 覆盖\"},\n") sb.WriteString(" {\"action\":\"update\", \"id\":\"id4\", \"content\":\"修正后的内容\", \"importance\":7, \"reason\":\"纠正过时信息\"},\n") sb.WriteString(" {\"action\":\"create\", \"content\":\"需要补充的记忆\", \"category\":\"knowledge\", \"importance\":6, \"reason\":\"从已有记忆推断\"}\n") sb.WriteString("]\n```\n\n") sb.WriteString("## 规则\n") sb.WriteString("- 只输出 JSON 数组,可以用 ```json``` 包裹,不要输出其他解释文字\n") sb.WriteString("- 确实有问题才建议操作,不要强行找问题\n") sb.WriteString("- merge 时保留最重要的那条的 ID,合并内容应包含各条的关键信息\n") sb.WriteString("- 不确定时宁可保守(不操作)\n") sb.WriteString("- importance 范围 1-10,数字越大越重要\n") sb.WriteString("- category 可选: personal_info, user_preference, conversation, knowledge, event, task, relationship\n\n") sb.WriteString(fmt.Sprintf("## 当前记忆库 (%d 条)\n\n", len(memories))) for i, m := range memories { sb.WriteString(fmt.Sprintf("%d. [%s] **%s** | cat=%s imp=%d pri=%d | src=%s\n", i+1, m.ID[:min(8, len(m.ID))], m.Content, m.Category, m.Importance, m.Priority, m.Source)) } return sb.String() } // parseConsolidationActions extracts JSON actions from LLM response text. func parseConsolidationActions(text string) []consolidationAction { // Try to extract from ```json fences first jsonStr := text if idx := strings.Index(text, "```json"); idx >= 0 { start := idx + 7 if end := strings.Index(text[start:], "```"); end >= 0 { jsonStr = text[start : start+end] } } else if idx := strings.Index(text, "```"); idx >= 0 { start := idx + 3 if end := strings.Index(text[start:], "```"); end >= 0 { jsonStr = text[start : start+end] } } // Find the JSON array arrStart := strings.Index(jsonStr, "[") arrEnd := strings.LastIndex(jsonStr, "]") if arrStart < 0 || arrEnd <= arrStart { return nil } jsonStr = jsonStr[arrStart : arrEnd+1] var actions []consolidationAction if err := json.Unmarshal([]byte(jsonStr), &actions); err != nil { log.Printf("[记忆整理] JSON 解析失败: %v", err) return nil } return actions } // executeConsolidationActions runs the parsed consolidation actions against the memory store. func (t *Thinker) executeConsolidationActions(ctx context.Context, actions []consolidationAction, memories []model.MemoryEntry) int { // Index memories by their short ID prefix for lookup memByShortID := make(map[string]*model.MemoryEntry) for i := range memories { short := memories[i].ID[:min(8, len(memories[i].ID))] memByShortID[short] = &memories[i] } memByFullID := make(map[string]*model.MemoryEntry) for i := range memories { memByFullID[memories[i].ID] = &memories[i] } executed := 0 for _, a := range actions { switch a.Action { case "delete": id := resolveID(a.ID, memByShortID, memByFullID) if id == "" { log.Printf("[记忆整理] 跳过 delete: 找不到记忆 %s", a.ID) continue } if err := t.memClient.Delete(ctx, id); err != nil { log.Printf("[记忆整理] 删除 %s 失败: %v", a.ID, err) continue } log.Printf("[记忆整理] 已删除: %s (原因: %s)", a.ID, a.Reason) executed++ case "merge": if len(a.IDs) < 2 { continue } // Resolve all IDs, use first as the keeper var resolved []string for _, mid := range a.IDs { if rid := resolveID(mid, memByShortID, memByFullID); rid != "" { resolved = append(resolved, rid) } } if len(resolved) < 2 { continue } keeper := resolved[0] // Update the keeper with merged content cat := model.MemoryCategory(a.Category) if cat == "" { if m, ok := memByFullID[keeper]; ok { cat = m.Category } } imp := a.Importance if imp == 0 { if m, ok := memByFullID[keeper]; ok { imp = m.Importance + 1 } } if imp > 10 { imp = 10 } pri := a.Priority if pri == 0 { if m, ok := memByFullID[keeper]; ok { pri = int(m.Priority) } } if err := t.memClient.Update(ctx, &model.MemoryEntry{ ID: keeper, Content: a.Content, Category: cat, Importance: imp, Priority: model.MemoryPriority(pri), Keywords: a.Keywords, Source: "consolidated", }); err != nil { log.Printf("[记忆整理] 更新合并目标 %s 失败: %v", keeper, err) continue } // Delete the discarded ones for _, did := range resolved[1:] { if err := t.memClient.Delete(ctx, did); err != nil { log.Printf("[记忆整理] 删除被合并记忆 %s 失败: %v", did, err) } } log.Printf("[记忆整理] 已合并: %v -> %s (原因: %s)", resolved, keeper, a.Reason) executed++ case "update": id := resolveID(a.ID, memByShortID, memByFullID) if id == "" { log.Printf("[记忆整理] 跳过 update: 找不到记忆 %s", a.ID) continue } existing := memByFullID[id] cat := model.MemoryCategory(a.Category) if cat == "" && existing != nil { cat = existing.Category } imp := a.Importance if imp == 0 && existing != nil { imp = existing.Importance } pri := a.Priority if pri == 0 && existing != nil { pri = int(existing.Priority) } if err := t.memClient.Update(ctx, &model.MemoryEntry{ ID: id, Content: a.Content, Category: cat, Importance: imp, Priority: model.MemoryPriority(pri), Keywords: a.Keywords, Source: "consolidated", }); err != nil { log.Printf("[记忆整理] 更新 %s 失败: %v", id, err) continue } log.Printf("[记忆整理] 已更新: %s (原因: %s)", id, a.Reason) executed++ case "create": cat := model.MemoryCategory(a.Category) if cat == "" { cat = model.CategoryKnowledge } imp := a.Importance if imp == 0 { imp = 5 } if err := t.memClient.Save(ctx, &model.MemoryEntry{ UserID: t.adminUserID, Content: a.Content, Category: cat, Importance: imp, Priority: model.MemoryNormal, Keywords: a.Keywords, Source: "consolidation", }); err != nil { log.Printf("[记忆整理] 创建记忆失败: %v", err) continue } log.Printf("[记忆整理] 已创建: %s (原因: %s)", a.Content, a.Reason) executed++ } } return executed } // resolveID tries to match a short or full ID to an existing memory. func resolveID(id string, byShort, byFull map[string]*model.MemoryEntry) string { if _, ok := byFull[id]; ok { return id } if m, ok := byShort[id]; ok { return m.ID } // Try prefix match for fullID := range byFull { if strings.HasPrefix(fullID, id) { return fullID } } return "" } // fuzzyMemorySearch expands the query via LLM keyword extraction and performs semantic search. func (t *Thinker) fuzzyMemorySearch(ctx context.Context, userID, query string) []memory.MemoryEntry { if t.memClient == nil { return nil } keywords := t.expandMemoryKeywords(ctx, query) if len(keywords) == 0 { return nil } log.Printf("[后台思考] 模糊记忆关键词: %v", keywords) var allResults []memory.MemoryEntry seen := make(map[string]bool) for _, kw := range keywords { results, err := t.memClient.QueryByText(ctx, userID, kw, "", 0, 5) if err != nil { log.Printf("[后台思考] 模糊搜索 '%s' 失败: %v", kw, err) continue } for _, m := range results { if !seen[m.ID] { seen[m.ID] = true allResults = append(allResults, m) } } } return allResults } // expandMemoryKeywords uses LLM to generate fuzzy/related keywords for memory search. func (t *Thinker) expandMemoryKeywords(ctx context.Context, message string) []string { prompt := fmt.Sprintf( "从以下对话消息中提取 3-5 个可用于模糊搜索记忆的关键词。这些关键词应该是:\n"+ "- 与话题相关的抽象概念\n- 同义词和相关词\n- 更宽泛或更具体的相关概念\n"+ "- 不要包含消息中已经出现的原词\n\n"+ "用户消息:「%s」\n\n"+ "只输出 JSON 字符串数组,例如:[\"关键词1\",\"关键词2\"]", message) resp, err := t.llmAdapter.Chat(ctx, []model.LLMMessage{ {Role: model.RoleSystem, Content: "你是记忆搜索专家。输出 JSON 字符串数组。"}, {Role: model.RoleUser, Content: prompt}, }) if err != nil { log.Printf("[后台思考] 关键词扩展失败: %v", err) return nil } text := strings.TrimSpace(resp.Content) if idx := strings.Index(text, "["); idx >= 0 { if end := strings.LastIndex(text, "]"); end > idx { text = text[idx : end+1] } } var keywords []string if err := json.Unmarshal([]byte(text), &keywords); err != nil { log.Printf("[后台思考] 解析关键词 JSON 失败: %v (raw=%s)", err, resp.Content) return nil } return keywords } // lastUserMessage extracts the last user message from conversation history. func lastUserMessage(history []model.LLMMessage) string { for i := len(history) - 1; i >= 0; i-- { if history[i].Role == model.RoleUser { runes := []rune(history[i].Content) if len(runes) > 200 { return string(runes[:200]) } return history[i].Content } } return "" } // formatDeviceContext 格式化设备状态为文本 func formatDeviceContext(devices []tools.IoTDevice) string { if len(devices) == 0 { return "" } summary := "[当前IoT设备状态]\n" for _, d := range devices { switch d.Type { case "light": if d.Status == "on" { summary += fmt.Sprintf("- %s: 开启 (亮度%d%%, %s)\n", d.Name, d.Brightness, d.Color) } else { summary += fmt.Sprintf("- %s: 关闭\n", d.Name) } case "ac": if d.Status == "on" { summary += fmt.Sprintf("- %s: 运行中 (%s%.0f°C)\n", d.Name, modeLabel(d.Mode), d.Temperature) } else { summary += fmt.Sprintf("- %s: 关闭\n", d.Name) } case "curtain": statusLabel := "已关闭" if d.Status == "open" { statusLabel = "已打开" } summary += fmt.Sprintf("- %s: %s\n", d.Name, statusLabel) case "sensor": summary += fmt.Sprintf("- %s: %.1f%s\n", d.Name, d.Value, d.Unit) case "lock": statusLabel := "已锁定" if d.Status == "unlocked" { statusLabel = "已解锁" } summary += fmt.Sprintf("- %s: %s (电量%d%%)\n", d.Name, statusLabel, d.Battery) } } return summary } // formatDurationHuman 将 Duration 格式化为人类可读的中文描述 func formatDurationHuman(d time.Duration) string { minutes := int(d.Minutes()) if minutes < 1 { return "不到一分钟" } if minutes < 60 { return fmt.Sprintf("%d 分钟", minutes) } hours := minutes / 60 remainingMinutes := minutes % 60 if remainingMinutes == 0 { return fmt.Sprintf("%d 小时", hours) } return fmt.Sprintf("%d 小时 %d 分钟", hours, remainingMinutes) } func modeLabel(mode string) string { switch mode { case "cool": return "制冷" case "heat": return "制热" case "auto": return "自动" default: return mode } } func getEnvBool(key string, fallback bool) bool { v := os.Getenv(key) if v == "" { return fallback } b, err := strconv.ParseBool(v) if err != nil { return fallback } return b } func getEnvDuration(key string, fallbackSec int) time.Duration { v := os.Getenv(key) if v == "" { return time.Duration(fallbackSec) * time.Second } sec, err := strconv.Atoi(v) if err != nil { return time.Duration(fallbackSec) * time.Second } return time.Duration(sec) * time.Second }