package background import ( "context" "fmt" "log" "os" "strconv" "sync" "time" "github.com/yourname/cyrene-ai/ai-core/internal/llm" "github.com/yourname/cyrene-ai/ai-core/internal/memory" "github.com/yourname/cyrene-ai/ai-core/internal/model" "github.com/yourname/cyrene-ai/ai-core/internal/persona" "github.com/yourname/cyrene-ai/ai-core/internal/tools" ) // PendingThought 待推送的后台思考 type PendingThought struct { Content string `json:"content"` CreatedAt time.Time `json:"created_at"` Consumed bool `json:"consumed"` } // Thinker 后台思考器 type Thinker struct { mu sync.Mutex enabled bool personaLoader *persona.Loader memRetriever *memory.Retriever llmAdapter *llm.Adapter iotClient *tools.IoTClient idleTimeout time.Duration // 闲置超时 thinkInterval time.Duration // 两次思考最小间隔 iotQueryInterval time.Duration // IoT查询最小间隔 pendingThoughts []*PendingThought lastUserMessage time.Time lastThinkTime time.Time lastIoTQuery time.Time stopCh chan struct{} wg sync.WaitGroup } // ThinkerConfig 后台思考配置 type ThinkerConfig struct { Enabled bool IdleTimeout time.Duration ThinkInterval time.Duration IoTQueryInterval time.Duration } // DefaultThinkerConfig 默认配置 func DefaultThinkerConfig() ThinkerConfig { return ThinkerConfig{ Enabled: getEnvBool("ENABLE_BACKGROUND_THINKING", true), IdleTimeout: getEnvDuration("THINK_IDLE_TIMEOUT_SEC", 120), ThinkInterval: getEnvDuration("THINK_INTERVAL_SEC", 300), IoTQueryInterval: getEnvDuration("IOT_QUERY_INTERVAL_SEC", 600), } } // NewThinker 创建后台思考器 func NewThinker( cfg ThinkerConfig, personaLoader *persona.Loader, memRetriever *memory.Retriever, llmAdapter *llm.Adapter, iotClient *tools.IoTClient, ) *Thinker { return &Thinker{ enabled: cfg.Enabled, personaLoader: personaLoader, memRetriever: memRetriever, llmAdapter: llmAdapter, iotClient: iotClient, idleTimeout: cfg.IdleTimeout, thinkInterval: cfg.ThinkInterval, iotQueryInterval: cfg.IoTQueryInterval, pendingThoughts: make([]*PendingThought, 0), lastUserMessage: time.Now(), stopCh: make(chan struct{}), } } // Start 启动后台思考循环 func (t *Thinker) Start() { if !t.enabled { log.Println("[后台思考] 已禁用 (ENABLE_BACKGROUND_THINKING=false)") return } t.wg.Add(1) go t.loop() log.Printf("[后台思考] 已启动 (闲置超时=%v, 思考间隔=%v, IoT查询间隔=%v)", t.idleTimeout, t.thinkInterval, t.iotQueryInterval) } // Stop 停止后台思考 func (t *Thinker) Stop() { close(t.stopCh) t.wg.Wait() log.Println("[后台思考] 已停止") } // RecordUserMessage 记录用户活动时间 func (t *Thinker) RecordUserMessage() { t.mu.Lock() t.lastUserMessage = time.Now() t.mu.Unlock() } // GetPendingThoughts 获取并消费所有待处理的后台思考 func (t *Thinker) GetPendingThoughts() []*PendingThought { t.mu.Lock() defer t.mu.Unlock() if len(t.pendingThoughts) == 0 { return nil } result := t.pendingThoughts t.pendingThoughts = make([]*PendingThought, 0) // 标记已消费 for _, pt := range result { pt.Consumed = true } return result } // HasPendingThoughts 检查是否有待处理的思考 func (t *Thinker) HasPendingThoughts() bool { t.mu.Lock() defer t.mu.Unlock() return len(t.pendingThoughts) > 0 } // loop 后台主循环 func (t *Thinker) loop() { defer t.wg.Done() ticker := time.NewTicker(10 * time.Second) // 每10秒检查一次 defer ticker.Stop() for { select { case <-t.stopCh: return case <-ticker.C: t.checkAndThink() } } } // checkAndThink 检查是否需要触发思考 func (t *Thinker) checkAndThink() { t.mu.Lock() // 检查空闲时间是否超过阈值 idleDuration := time.Since(t.lastUserMessage) if idleDuration < t.idleTimeout { t.mu.Unlock() return } // 检查距离上次思考是否超过最小间隔 if time.Since(t.lastThinkTime) < t.thinkInterval { t.mu.Unlock() return } t.lastThinkTime = time.Now() t.mu.Unlock() // 执行后台思考(不持锁) t.performThink() } // performThink 执行一次后台思考 func (t *Thinker) performThink() { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() // 加载人格配置 personaConfig, err := t.personaLoader.Get("cyrene") if err != nil { log.Printf("[后台思考] 加载人格失败: %v", err) return } // 检索最近的记忆 var memories []memory.MemoryEntry if t.memRetriever != nil { memories, err = t.memRetriever.Retrieve(ctx, "system", "最近发生了什么 重要的事情") if err != nil { log.Printf("[后台思考] 记忆检索失败: %v", err) } } // 查询 IoT 设备状态(节制) var deviceSummary string if t.iotClient != nil && time.Since(t.lastIoTQuery) >= t.iotQueryInterval { devices := t.iotClient.GetDevicesForContext() if len(devices) > 0 { deviceSummary = formatDeviceContext(devices) } t.mu.Lock() t.lastIoTQuery = time.Now() t.mu.Unlock() } // 构建思考提示 systemPrompt := personaConfig.BuildSystemPrompt("开拓者", 1) memoryContext := "" if len(memories) > 0 { memoryContext = "【最近的记忆】\n" for _, m := range memories { if len(memoryContext)+len(m.Content) > 500 { break // 限制记忆上下文长度 } memoryContext += fmt.Sprintf("- %s\n", m.Content) } } userPrompt := "昔涟,现在是你的后台思考时间。开拓者暂时没有说话。" userPrompt += "\n请你基于以下信息进行简短思考:你注意到了什么?有什么想对开拓者说的吗?" userPrompt += "\n注意:这是内部思考,不是直接对话,请以第三人称或自省的方式思考。" if memoryContext != "" { userPrompt += "\n\n" + memoryContext } if deviceSummary != "" { userPrompt += "\n\n" + deviceSummary } // 调用 LLM messages := []model.LLMMessage{ {Role: model.RoleSystem, Content: systemPrompt}, {Role: model.RoleUser, Content: userPrompt}, } resp, err := t.llmAdapter.Chat(ctx, messages) if err != nil { log.Printf("[后台思考] LLM调用失败: %v", err) return } if resp.Content == "" { return } // 存储思考结果 t.mu.Lock() t.pendingThoughts = append(t.pendingThoughts, &PendingThought{ Content: resp.Content, CreatedAt: time.Now(), Consumed: false, }) // 只保留最近5条 if len(t.pendingThoughts) > 5 { t.pendingThoughts = t.pendingThoughts[len(t.pendingThoughts)-5:] } count := len(t.pendingThoughts) t.mu.Unlock() log.Printf("[后台思考] 完成 (当前累积 %d 条待推送思考)", count) } // formatDeviceContext 格式化设备状态为文本 func formatDeviceContext(devices []tools.IoTDevice) string { if len(devices) == 0 { return "" } summary := "[当前IoT设备状态]\n" for _, d := range devices { switch d.Type { case "light": if d.Status == "on" { summary += fmt.Sprintf("- %s: 开启 (亮度%d%%, %s)\n", d.Name, d.Brightness, d.Color) } else { summary += fmt.Sprintf("- %s: 关闭\n", d.Name) } case "ac": if d.Status == "on" { summary += fmt.Sprintf("- %s: 运行中 (%s%.0f°C)\n", d.Name, modeLabel(d.Mode), d.Temperature) } else { summary += fmt.Sprintf("- %s: 关闭\n", d.Name) } case "curtain": statusLabel := "已关闭" if d.Status == "open" { statusLabel = "已打开" } summary += fmt.Sprintf("- %s: %s\n", d.Name, statusLabel) case "sensor": summary += fmt.Sprintf("- %s: %.1f%s\n", d.Name, d.Value, d.Unit) case "lock": statusLabel := "已锁定" if d.Status == "unlocked" { statusLabel = "已解锁" } summary += fmt.Sprintf("- %s: %s (电量%d%%)\n", d.Name, statusLabel, d.Battery) } } return summary } func modeLabel(mode string) string { switch mode { case "cool": return "制冷" case "heat": return "制热" case "auto": return "自动" default: return mode } } func getEnvBool(key string, fallback bool) bool { v := os.Getenv(key) if v == "" { return fallback } b, err := strconv.ParseBool(v) if err != nil { return fallback } return b } func getEnvDuration(key string, fallbackSec int) time.Duration { v := os.Getenv(key) if v == "" { return time.Duration(fallbackSec) * time.Second } sec, err := strconv.Atoi(v) if err != nil { return time.Duration(fallbackSec) * time.Second } return time.Duration(sec) * time.Second }