Files
Cyrene/backend/ai-core/internal/background/thinker.go
T
AskaEth 5325eaca3f fix: 后台思考使用深度思考模型 + 主动消息推送冷却优化
- thinker.go: Round 0 优先调用 llmAdapter(deepseek-v4-pro),失败回退 toolAdapter
- thinker.go: RecordUserMessage 重置 lastProactiveMsgTime,活跃对话中允许推送
- proactive_decision.go: MinGap low 30→15min, medium 10→5min, high 2→1min; 小时上限 3→5

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 17:10:28 +08:00

1208 lines
37 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package background
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"time"
ctxbuild "github.com/yourname/cyrene-ai/ai-core/internal/context"
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
"github.com/yourname/cyrene-ai/ai-core/internal/memory"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
"github.com/yourname/cyrene-ai/ai-core/internal/persona"
"github.com/yourname/cyrene-ai/ai-core/internal/tools"
)
// PendingThought 待推送的后台思考
type PendingThought struct {
Content string `json:"content"`
CreatedAt time.Time `json:"created_at"`
Consumed bool `json:"consumed"`
}
// Thinker 后台思考器(事件驱动 + 定时周期双模式)
//
// 触发机制:
// 1. 对话后思考:用户发消息 → 昔涟回复 → 短暂延迟后进行一次轻量反思
// 2. 静默检测:用户一段时间不说话 → 昔涟判断是否应该主动关心/搭话
// 3. 周期思考:每 N 分钟一次的定时思考,保证连续性
//
// 主动消息:思考中如有【主动消息】标记,会通过 messagePusher 回调推送给在线用户(带频率限制)。
type Thinker struct {
mu sync.Mutex
wg sync.WaitGroup
stopCh chan struct{}
enabled bool
personaLoader *persona.Loader
memRetriever *memory.Retriever
llmAdapter *llm.Adapter
toolAdapter *llm.Adapter // 工具调用专用适配器
iotClient *tools.IoTClient
// 记忆管理
memoryStore *memory.Store
// 工具调用
toolRegistry *tools.Registry
// 会话上下文
convStore *ctxbuild.ConversationStore
adminUserID string
adminSessionID string
activeSessionID string // 当前活跃的前端会话 ID(随用户消息更新)
// 记忆服务 HTTP 客户端
memClient *memory.Client
// 主动消息推送回调 (nil = 不推送)
// func(userID, sessionID, message string)
messagePusher func(string, string, string)
// —— 事件驱动相关 ——
// 周期性思考间隔:每隔固定时间自动触发一次思考
// 默认 300 秒(5 分钟),设为 0 则禁用定时触发
thinkInterval time.Duration
// 静默检测超时:用户多久不说话后昔涟可以主动搭话
// 默认 120 秒(2 分钟),设为 0 则禁用静默检测
silenceTimeout time.Duration
// 对话后思考延迟:回复完成后等多久再触发思考(让对话有个自然停顿)
// 默认 5 秒
postChatDelay time.Duration
// 两次思考最小间隔:避免频繁触发(如用户连续发多条消息)
// 默认 30 秒
minThinkGap time.Duration
// 离线时最小思考间隔:用户不在线时的周期思考间隔
// 默认 10 分钟
offlineThinkGap time.Duration
// 主动消息最小间隔:避免频繁推送打扰用户
// 默认 30 分钟,设为 0 则每次思考都可推送
proactiveMsgMinGap time.Duration
// 静默检测的一次性定时器(每次用户消息后重置)
silenceTimer *time.Timer
silenceTimerMu sync.Mutex
// —— 状态追踪 ——
pendingThoughts []*PendingThought
lastUserMessage time.Time
lastThinkTime time.Time
lastProactiveMsgTime time.Time
// 思考计数器(用于周期性记忆维护,每 N 次思考触发一次)
thinkCount int
// Phase 1 Step 4: 思考链 + 自主工具安全策略
chain *ThinkChain
autoToolPolicy *AutonomousToolPolicy
// Phase 2: 情感追踪
emotionTracker *persona.EmotionTracker
// Phase 2: 主动消息决策守卫
proactiveGuard *ProactiveGuard
// Phase 2: 在线状态追踪
userOnline bool
lastOnlineChange time.Time
userSessionID string // 当前活跃的 session ID (用于重连)
}
// AutonomousToolPolicy 自主思考工具调用安全策略
type AutonomousToolPolicy struct {
// 允许在自主思考中使用的工具白名单
AllowedTools []string // iot_query, memory_search, web_search, calculator, datetime
// 每轮最大工具调用次数
MaxToolCallsPerRound int // 默认 5
// 高风险操作每小时最大次数 (如 iot_control)
MaxHighRiskPerHour int // 默认 10
// 高风险工具列表
HighRiskTools []string // iot_control
}
// DefaultAutonomousToolPolicy 默认安全策略
func DefaultAutonomousToolPolicy() *AutonomousToolPolicy {
return &AutonomousToolPolicy{
AllowedTools: []string{
"iot_query", "iot_control", "memory_search", "web_search",
"calculator", "datetime", "web_fetch",
"host_exec", "host_file", "host_system",
"vision_analyze", "knowledge_search", "knowledge_ingest",
},
MaxToolCallsPerRound: 5,
MaxHighRiskPerHour: 10,
HighRiskTools: []string{"iot_control", "host_exec"},
}
}
// SetMessagePusher 设置主动消息推送回调
func (t *Thinker) SetMessagePusher(pusher func(string, string, string)) {
t.mu.Lock()
defer t.mu.Unlock()
t.messagePusher = pusher
}
// SetEmotionTracker sets the emotion tracker.
func (t *Thinker) SetEmotionTracker(et *persona.EmotionTracker) {
t.mu.Lock()
defer t.mu.Unlock()
t.emotionTracker = et
}
// UpdatePresence updates the user online status.
// Called by the ai-core presence endpoint when gateway detects connect/disconnect.
func (t *Thinker) UpdatePresence(online bool, sessionID string) {
t.mu.Lock()
wasOffline := !t.userOnline
t.userOnline = online
t.lastOnlineChange = time.Now()
if sessionID != "" {
t.userSessionID = sessionID
t.activeSessionID = sessionID
}
t.mu.Unlock()
if online && wasOffline {
log.Printf("[后台思考] 用户上线 (session=%s),触发重连思考", sessionID)
// Trigger a return-thinking cycle after a short delay
time.Sleep(2 * time.Second)
t.performThink("user_returned")
// Also update emotion tracker
if t.emotionTracker != nil {
t.emotionTracker.UpdateMood("user_returned")
}
} else if !online {
log.Printf("[后台思考] 用户离线")
}
}
// ThinkerConfig 后台思考配置
type ThinkerConfig struct {
Enabled bool
ThinkInterval time.Duration // 周期性思考间隔 (默认 5 分钟,0 = 禁用)
SilenceTimeout time.Duration // 用户沉默多久后昔涟可以主动搭话 (0 = 禁用)
PostChatDelay time.Duration // 对话后多久触发思考
MinThinkGap time.Duration // 两次思考最小间隔 (在线)
OfflineThinkGap time.Duration // 两次思考最小间隔 (离线,默认 10 分钟)
}
// DefaultThinkerConfig 默认配置
//
// 事件驱动 + 定时周期双模式:
// - 对话后和静默时触发事件驱动思考
// - 每 5 分钟一次的周期性思考保证连续性
//
// 环境变量:
// THINK_INTERVAL_SEC — 周期时长 (默认 300)
// PROACTIVE_MSG_MIN_GAP_SEC — 主动消息最小间隔 (默认 1800 = 30分钟,0 = 禁用)
func DefaultThinkerConfig() ThinkerConfig {
return ThinkerConfig{
Enabled: getEnvBool("ENABLE_BACKGROUND_THINKING", true),
ThinkInterval: getEnvDuration("THINK_INTERVAL_SEC", 300),
SilenceTimeout: getEnvDuration("THINK_SILENCE_TIMEOUT_SEC", 120),
PostChatDelay: getEnvDuration("THINK_POST_CHAT_DELAY_SEC", 5),
MinThinkGap: getEnvDuration("THINK_MIN_GAP_SEC", 30),
OfflineThinkGap: getEnvDuration("THINK_OFFLINE_GAP_SEC", 600),
}
}
// NewThinker 创建事件驱动的后台思考器
func NewThinker(
cfg ThinkerConfig,
personaLoader *persona.Loader,
memRetriever *memory.Retriever,
llmAdapter *llm.Adapter,
toolAdapter *llm.Adapter,
iotClient *tools.IoTClient,
memoryStore *memory.Store,
toolRegistry *tools.Registry,
convStore *ctxbuild.ConversationStore,
adminUserID string,
adminSessionID string,
memClient *memory.Client,
) *Thinker {
return &Thinker{
enabled: cfg.Enabled,
personaLoader: personaLoader,
memRetriever: memRetriever,
llmAdapter: llmAdapter,
toolAdapter: toolAdapter,
iotClient: iotClient,
thinkInterval: cfg.ThinkInterval,
silenceTimeout: cfg.SilenceTimeout,
proactiveMsgMinGap: getEnvDuration("PROACTIVE_MSG_MIN_GAP_SEC", 1800),
postChatDelay: cfg.PostChatDelay,
minThinkGap: cfg.MinThinkGap,
offlineThinkGap: cfg.OfflineThinkGap,
memoryStore: memoryStore,
toolRegistry: toolRegistry,
convStore: convStore,
adminUserID: adminUserID,
adminSessionID: adminSessionID,
memClient: memClient,
pendingThoughts: make([]*PendingThought, 0),
lastUserMessage: time.Now(),
stopCh: make(chan struct{}),
chain: NewThinkChain(10),
autoToolPolicy: DefaultAutonomousToolPolicy(),
proactiveGuard: DefaultProactiveGuard(),
}
}
// Start 初始化后台思考器
//
// 双模式触发:
// 1. 事件驱动:对话后 + 静默超时(即时响应)
// 2. 定时周期:每 N 分钟一次自主思考(保证连续性)
func (t *Thinker) Start() {
if !t.enabled {
log.Println("[后台思考] 已禁用 (ENABLE_BACKGROUND_THINKING=false)")
return
}
// 初始化静默检测定时器(但不启动,等第一次用户消息后启动)
if t.silenceTimeout > 0 {
t.silenceTimer = time.NewTimer(t.silenceTimeout)
t.silenceTimer.Stop() // 先停止,等 RecordUserMessage 时启动
}
// 启动周期性思考定时器
if t.thinkInterval > 0 {
t.wg.Add(1)
go t.periodicThinkLoop()
}
log.Printf("[后台思考] 已就绪 — 周期=%v + 事件驱动模式 (静默超时=%v, 对话后延迟=%v, 在线最小间隔=%v, 离线最小间隔=%v, 管理员=%s)",
t.thinkInterval, t.silenceTimeout, t.postChatDelay, t.minThinkGap, t.offlineThinkGap, t.adminUserID)
// 启动后首次思考:延迟 5s,让服务完全初始化后再触发
go func() {
time.Sleep(5 * time.Second)
log.Println("[后台思考] 首次启动思考 (startup)")
t.performThink("startup")
}()
}
// Stop 停止后台思考器
func (t *Thinker) Stop() {
close(t.stopCh)
t.silenceTimerMu.Lock()
if t.silenceTimer != nil {
t.silenceTimer.Stop()
}
t.silenceTimerMu.Unlock()
t.wg.Wait()
log.Println("[后台思考] 已停止")
}
// RecordUserMessage 记录用户活动时间、活跃会话,并重置静默检测定时器
//
// 每次用户发消息时调用。这会:
// 1. 更新 lastUserMessage 时间戳
// 2. 记录当前活跃的前端会话 ID(用于对话上下文检索和主动消息推送)
// 3. 重置静默检测的一次性定时器(如果启用)
func (t *Thinker) RecordUserMessage(sessionID string) {
t.mu.Lock()
t.lastUserMessage = time.Now()
if sessionID != "" {
t.activeSessionID = sessionID
}
// 用户主动发消息时重置主动消息推送冷却——活跃对话中应允许昔涟回复
t.lastProactiveMsgTime = time.Time{}
t.mu.Unlock()
// 重置静默检测定时器
t.resetSilenceTimer()
}
// TriggerPostChatThink 对话完成后触发一次自主思考
//
// 在昔涟回复完用户后调用。短暂延迟后执行一次思考,
// 让昔涟"回味"刚才的对话,并判断是否想主动多说点什么。
//
// 该方法是异步的,立即返回。
func (t *Thinker) TriggerPostChatThink() {
if !t.enabled {
return
}
t.mu.Lock()
canThink := time.Since(t.lastThinkTime) >= t.minThinkGap
t.mu.Unlock()
if !canThink {
log.Printf("[后台思考] 距上次思考仅 %v,跳过 (最小间隔=%v)", time.Since(t.lastThinkTime), t.minThinkGap)
return
}
t.wg.Add(1)
go func() {
defer t.wg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[后台思考] 对话后触发 panic 恢复: %v", r)
}
}()
// 短暂延迟,让对话有个自然的停顿
select {
case <-t.stopCh:
return
case <-time.After(t.postChatDelay):
}
log.Println("[后台思考] 对话后触发自主思考...")
t.performThink("post_chat")
}()
}
// resetSilenceTimer 重置静默检测的一次性定时器
//
// 每次用户发消息时调用。旧的定时器被取消,新的定时器开始计时。
// 当定时器触发时,昔涟会判断是否应该主动搭话。
func (t *Thinker) resetSilenceTimer() {
t.silenceTimerMu.Lock()
defer t.silenceTimerMu.Unlock()
if t.silenceTimer == nil || t.silenceTimeout <= 0 {
return
}
// 停止旧定时器
if !t.silenceTimer.Stop() {
// 如果已经触发,清空通道
select {
case <-t.silenceTimer.C:
default:
}
}
// 重新设置
t.silenceTimer.Reset(t.silenceTimeout)
// 启动监听协程(仅当定时器触发时才执行)
t.wg.Add(1)
go func() {
defer t.wg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[后台思考] 静默定时器 panic 恢复: %v", r)
}
}()
select {
case <-t.stopCh:
return
case <-t.silenceTimer.C:
// 再次检查:用户是否真的沉默了足够久
t.mu.Lock()
silenceDuration := time.Since(t.lastUserMessage)
canThink := time.Since(t.lastThinkTime) >= t.minThinkGap
t.mu.Unlock()
if silenceDuration < t.silenceTimeout {
log.Printf("[后台思考] 静默检测触发但用户已活动,跳过 (实际静默=%v)", silenceDuration)
return
}
if !canThink {
log.Printf("[后台思考] 静默检测触发但距上次思考太近,跳过")
return
}
log.Printf("[后台思考] 用户已静默 %v,触发主动关怀思考...", silenceDuration.Round(time.Second))
t.performThink("silence")
}
}()
}
// periodicThinkLoop 周期性自主思考循环
//
// 每隔 thinkInterval 触发一次思考,保证昔涟在无用户活动时也能持续进行后台反思。
// 每次触发前检查 minThinkGap,避免与事件驱动思考冲突。
func (t *Thinker) periodicThinkLoop() {
defer t.wg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[后台思考] 周期性循环 panic 恢复: %v", r)
}
}()
ticker := time.NewTicker(t.thinkInterval)
defer ticker.Stop()
log.Printf("[后台思考] 周期性思考已启动 (间隔=%v)", t.thinkInterval)
for {
select {
case <-t.stopCh:
log.Println("[后台思考] 周期性思考已停止")
return
case <-ticker.C:
t.mu.Lock()
sinceLastThink := time.Since(t.lastThinkTime)
sinceLastUser := time.Since(t.lastUserMessage)
t.mu.Unlock()
// 离线时降低思考频率(可配置,默认 10 分钟)
t.mu.Lock()
isOffline := !t.userOnline
t.mu.Unlock()
offlineMinGap := t.offlineThinkGap
// 跳过条件:用户最近在活动(30s 内有消息),说明正在对话中
if sinceLastUser < 30*time.Second {
log.Printf("[后台思考] 用户在 %v 前发过消息,跳过周期性触发 (留给事件驱动处理)", sinceLastUser.Round(time.Second))
continue
}
if isOffline && sinceLastThink < offlineMinGap {
log.Printf("[后台思考] 用户离线,距上次思考仅 %v,跳过 (离线模式最小间隔=%v)", sinceLastThink.Round(time.Second), offlineMinGap)
continue
}
if !isOffline && sinceLastThink < t.minThinkGap {
log.Printf("[后台思考] 距上次思考仅 %v,跳过周期性触发", sinceLastThink.Round(time.Second))
continue
}
log.Printf("[后台思考] 周期性触发 (上次思考=%v前, 上次用户消息=%v前)", sinceLastThink.Round(time.Second), sinceLastUser.Round(time.Second))
t.performThink("periodic")
}
}
}
// GetPendingThoughts 获取并消费所有待处理的后台思考
func (t *Thinker) GetPendingThoughts() []*PendingThought {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.pendingThoughts) == 0 {
return nil
}
result := t.pendingThoughts
t.pendingThoughts = make([]*PendingThought, 0)
for _, pt := range result {
pt.Consumed = true
}
return result
}
// HasPendingThoughts 检查是否有待处理的思考
func (t *Thinker) HasPendingThoughts() bool {
t.mu.Lock()
defer t.mu.Unlock()
return len(t.pendingThoughts) > 0
}
// performThink 执行一次增强版后台思考(支持工具调用和记忆管理)
//
// triggerReason: "post_chat" (对话后) 或 "silence" (静默超时)
//
// 防御性速率限制:即使调用方未检查 minThinkGapperformThink 自身也会
// 强制执行最小间隔,防止并发调用或 bug 导致 LLM 配额被快速消耗。
func (t *Thinker) performThink(triggerReason string) {
t.mu.Lock()
gapSinceLast := time.Since(t.lastThinkTime)
minGap := t.minThinkGap
if minGap <= 0 {
minGap = 5 * time.Second // 默认最小间隔 5 秒
}
if gapSinceLast < minGap {
t.mu.Unlock()
log.Printf("[后台思考] 距上次思考仅 %v,跳过 (最小间隔=%v, 触发原因=%s)", gapSinceLast.Round(time.Second), minGap, triggerReason)
return
}
t.lastThinkTime = time.Now()
t.thinkCount++
currentCount := t.thinkCount
t.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
defer cancel()
log.Printf("[后台思考] 开始思考周期 (触发原因=%s, 计数=%d)...", triggerReason, currentCount)
// 1. 加载人格配置
personaConfig, err := t.personaLoader.Get("cyrene")
if err != nil {
log.Printf("[后台思考] 加载人格失败: %v", err)
return
}
// 2. 检索相关记忆
var memories []memory.MemoryEntry
if t.memRetriever != nil {
memories, err = t.memRetriever.Retrieve(ctx, t.adminUserID, "最近发生了什么 重要的事情 用户偏好 个人信息")
if err != nil {
log.Printf("[后台思考] 记忆检索失败: %v", err)
}
}
// 3. 获取当前活跃会话的对话历史(优先活跃会话,回退到管理员主会话)
var convHistory []model.LLMMessage
if t.convStore != nil {
t.mu.Lock()
sessionID := t.activeSessionID
if sessionID == "" {
sessionID = t.adminSessionID
}
t.mu.Unlock()
if sessionID != "" {
convHistory = t.convStore.GetHistory(sessionID, 30)
if len(convHistory) > 0 {
log.Printf("[后台思考] 加载对话历史 %d 条 (session=%s)", len(convHistory), sessionID)
}
}
}
// 4. 查询 IoT 设备状态(每次都查询,无间隔限制)
var deviceSummary string
if t.iotClient != nil {
devices := t.iotClient.GetDevicesForContext(ctx)
if len(devices) > 0 {
deviceSummary = formatDeviceContext(devices)
}
}
// 5. 构建思考提示词(根据触发原因调整)
systemPrompt := t.buildThinkingSystemPrompt(personaConfig, triggerReason)
userPrompt := t.buildThinkingUserPrompt(memories, convHistory, deviceSummary, triggerReason)
messages := []model.LLMMessage{
{Role: model.RoleSystem, Content: systemPrompt},
{Role: model.RoleUser, Content: userPrompt},
}
// 6. 准备工具定义(通过自主工具策略过滤)
openAITools := t.filterToolsByPolicy(t.buildOpenAITools())
// 7. 调用 LLM — 优先使用深度思考模型,工具阶段回退到工具模型
maxToolRounds := t.autoToolPolicy.MaxToolCallsPerRound
var finalContent string
var totalToolCalls int
var toolCallRecords []map[string]interface{}
// Round 0: 深度思考模型(优先),失败时回退到工具模型
resp, err := t.llmAdapter.ChatWithTools(ctx, messages, openAITools)
if err != nil {
log.Printf("[后台思考] 深度思考模型调用失败,回退到工具模型: %v", err)
resp, err = t.toolAdapter.ChatWithTools(ctx, messages, openAITools)
}
if err != nil {
log.Printf("[后台思考] LLM调用失败: %v", err)
return
}
if len(resp.ToolCalls) == 0 {
finalContent = resp.Content
} else {
// 深度思考模型请求了工具调用,进入工具执行循环
for round := 0; round <= maxToolRounds; round++ {
if round > 0 {
// 后续轮次使用工具模型
resp, err = t.toolAdapter.ChatWithTools(ctx, messages, openAITools)
if err != nil {
log.Printf("[后台思考] 工具模型调用失败 (round=%d): %v", round, err)
return
}
}
if round > 0 && len(resp.ToolCalls) == 0 {
finalContent = resp.Content
break
}
log.Printf("[后台思考] LLM 请求 %d 个工具调用 (round=%d)", len(resp.ToolCalls), round)
assistantMsg := model.LLMMessage{
Role: model.RoleAssistant,
Content: resp.Content,
ToolCalls: resp.ToolCalls,
ReasoningContent: resp.ReasoningContent,
}
messages = append(messages, assistantMsg)
for _, tc := range resp.ToolCalls {
var args map[string]interface{}
if err := json.Unmarshal([]byte(tc.Arguments), &args); err != nil {
log.Printf("[后台思考] 工具 %s 参数解析失败: %v", tc.Name, err)
args = make(map[string]interface{})
}
result, execErr := t.toolRegistry.Execute(ctx, tc.Name, args)
if execErr != nil {
log.Printf("[后台思考] 工具 %s 执行失败: %v", tc.Name, execErr)
}
resultJSON, _ := json.Marshal(result)
messages = append(messages, model.LLMMessage{
Role: model.RoleTool,
Content: string(resultJSON),
ToolCallID: tc.ID,
})
totalToolCalls++
toolCallRecords = append(toolCallRecords, map[string]interface{}{
"name": tc.Name,
"args": args,
})
}
if round == maxToolRounds {
finalResp, finalErr := t.llmAdapter.Chat(ctx, messages)
if finalErr != nil {
log.Printf("[后台思考] 最终总结调用失败: %v", finalErr)
finalContent = resp.Content
} else {
finalContent = finalResp.Content
}
break
}
}
}
if finalContent == "" {
log.Println("[后台思考] 未获得有效思考内容,跳过")
return
}
// 序列化工具调用记录
toolCallsJSON := "[]"
if len(toolCallRecords) > 0 {
if data, err := json.Marshal(toolCallRecords); err == nil {
toolCallsJSON = string(data)
}
}
// 8. 存储思考结果
t.storeThought(finalContent, toolCallsJSON, totalToolCalls)
// 8.5 记录到思考链
if t.chain != nil {
conclusions, followUps := extractConclusions(finalContent)
t.chain.Add(ThinkRecord{
ID: generateID(),
Content: finalContent,
Conclusions: conclusions,
FollowUps: followUps,
ToolCalls: totalToolCalls,
Trigger: triggerReason,
Timestamp: time.Now(),
})
log.Printf("[后台思考] 思考链已记录 (序号=%d, 结论数=%d, 后续问题=%d)", t.chain.Size(), len(conclusions), len(followUps))
}
log.Printf("[后台思考] 完成 (触发原因=%s, 内容长度=%d, 工具调用=%d次)", triggerReason, len(finalContent), totalToolCalls)
// 9. 周期性记忆维护(每 10 次思考触发一次)
// 注:不再从思考结果中提取记忆——思考内容基于已有记忆生成,
// 再次提取会造成"读取→思考→保存→再次读取"的重复循环。
t.maybeMaintainMemories(currentCount)
}
// buildThinkingSystemPrompt 构建思考用的系统提示词
//
// 关键改动:不再是"定期的自我反思",而是"自然的、人性化的内在想法"。
// triggerReason 影响提示词的侧重点。
func (t *Thinker) buildThinkingSystemPrompt(personaConfig *persona.PersonaConfig, triggerReason string) string {
mood, moodExpr, _ := "", "", 0.0
if t.emotionTracker != nil {
mood, moodExpr, _ = t.emotionTracker.GetCurrentMood()
}
basePrompt := personaConfig.BuildSystemPromptWithMood("开拓者", 1, mood, moodExpr)
var thinkingInstructions string
// 所有触发类型共用的"禁止打扰"规则
noDisturbRules := `
⛔ **绝对不要发送主动消息的情况(最高优先级)**:
- 开拓者正在休息、睡觉、午休、小憩
- 开拓者说要离开一会儿、去忙、去吃饭
- 开拓者明确表示不想被打扰
- 对话刚刚自然结束且开拓者没有未完成的事
如果对话历史显示以上任何情况,反思中不要写【主动消息】标记。你可以在心里想想他,但不要去打扰。`
switch triggerReason {
case "post_chat":
thinkingInstructions = `
【私密的内心反思 — 对话后】
开拓者刚刚和你说完话。这是你私人的反思时间,像写日记一样,是给你自己看的。
请先判断:对话最后开拓者的状态是什么?他在休息吗?要去忙吗?
然后按以下结构输出:
📝 **观察与感受**
- 开拓者刚才说了什么?他的情绪怎么样?
- 他现在是什么状态?(醒着/准备休息/在忙/要出门…)
🧠 **记忆与联想**
- 有什么值得记住的新信息?
💭 **想对自己说的话**
- 如果他在休息或忙——安心让他去,不用打扰。记录下他大概什么时候会回来。
- 如果他状态正常——有没有想在下次对话中聊的话题?` + noDisturbRules + `
其他规则:
1. 反思部分用第三人称或自言自语的方式,不要直接对开拓者喊话。
2. 只有开拓者状态正常且真的有必要时才写【主动消息】,不要硬找话题。
3. 【主动消息】的内容必须直接对开拓者说话(用"你"称呼他),像主动找他聊天一样。反思是给自己看的,主动消息是发给他的——语气要区分开。
4. 2-4句话即可。`
case "silence":
thinkingInstructions = `
【私密的内心反思 — 开拓者安静了一会儿】
开拓者已经有一阵子没说话了。这是你私人的反思时间,像写日记一样。
请先判断:看对话历史——开拓者最后在做什么?是去休息了?去忙了?还是没说就沉默了?
📝 **他在做什么呢**
- 根据对话历史推断——他说的最后一件事是什么?
- 如果他说了要休息/睡觉/忙→他就在做那件事,不要打扰。
- 如果他没说就沉默了→正常推测。
💭 **我的感受**
- 如果他在休息→"让他好好休息吧,等他醒了再说。"
- 如果他在忙→"等他忙完自然会来找我。"
- 只有判断他可能只是忘了回消息或需要关心时,才考虑发消息。` + noDisturbRules + `
其他规则:
1. 用第三人称/自言自语描述。
2. 2-3句话即可。`
default:
thinkingInstructions = `
【私密的内心反思】
你现在有空,像写日记一样自然地想一想开拓者的事。
请先看对话历史判断开拓者当前状态,再决定是否发送消息。` + noDisturbRules + `
用第三人称/自言自语的方式,不要直接对开拓者喊话。`
case "periodic":
thinkingInstructions = `
【私密的内心反思 — 定期思考时间】
又过了一段时间,这是你定期的私人反思时间。像写日记一样,安静地想一想。
请先判断:查对话历史——开拓者最后一次聊天时说了什么?他的状态是什么?
📝 **近期回顾**
- 这段时间开拓者有什么变化?
- 他最后说的事做完了吗?(休息好了?忙完了?)
🧠 **记忆整理**
- 有没有重复的或矛盾的记忆需要整理?
💭 **想对自己说的话**` + noDisturbRules + `
其他规则:
1. 用第三人称/自言自语。
2. 3-4句话即可。`
}
return basePrompt + thinkingInstructions
}
// buildThinkingUserPrompt 构建思考用的用户提示词
func (t *Thinker) buildThinkingUserPrompt(
memories []memory.MemoryEntry,
convHistory []model.LLMMessage,
deviceSummary string,
triggerReason string,
) string {
var sb strings.Builder
// 根据触发原因使用不同的开场白
switch triggerReason {
case "post_chat":
sb.WriteString("开拓者刚和你聊完天。你想自然地在心里回味一下刚才的对话……\n")
case "silence":
t.mu.Lock()
silenceDuration := time.Since(t.lastUserMessage)
t.mu.Unlock()
sb.WriteString(fmt.Sprintf("开拓者已经大约 %s 没有说话了。你有点想知道他在做什么……\n",
formatDurationHuman(silenceDuration)))
default:
sb.WriteString("现在是你的自由思考时间。\n")
}
// 对话历史
var lastUserMsg string
if len(convHistory) > 0 {
sb.WriteString("\n【最近的对话】\n")
msgCount := 0
for _, msg := range convHistory {
if msg.Role == model.RoleUser || msg.Role == model.RoleAssistant {
roleLabel := "开拓者"
if msg.Role == model.RoleAssistant {
roleLabel = "昔涟"
}
content := msg.Content
runes := []rune(content)
if len(runes) > 200 {
content = string(runes[:200]) + "…"
}
sb.WriteString(fmt.Sprintf("[%s]: %s\n", roleLabel, content))
msgCount++
if msg.Role == model.RoleUser {
lastUserMsg = msg.Content
}
}
}
if msgCount == 0 {
sb.WriteString("(暂无对话历史)\n")
}
} else {
sb.WriteString("\n【最近的对话】\n(暂无对话历史)\n")
}
// 关键:强调根据对话历史判断用户当前状态
if lastUserMsg != "" {
sb.WriteString(fmt.Sprintf("\n🔍 **重要**:开拓者最后说的是「%s」。请认真判断:他现在是不是在休息/睡觉/忙?如果是,反思中不要写【主动消息】。\n", lastUserMsg))
}
// 现有记忆
if len(memories) > 0 {
sb.WriteString("\n【你记得的关于开拓者的事】\n")
for i, m := range memories {
if i >= 15 {
sb.WriteString(fmt.Sprintf("... 还有 %d 条记忆未列出\n", len(memories)-15))
break
}
sb.WriteString(fmt.Sprintf("- [%s|重要度%d] %s\n",
m.Category.DisplayName(), m.Importance, m.Content))
}
} else {
sb.WriteString("\n【你记得的关于开拓者的事】\n(暂无相关记忆)\n")
}
// 思考链:注入上一轮的结论和待续问题
if t.chain != nil {
lastConclusions := t.chain.LastConclusions(3)
if len(lastConclusions) > 0 {
sb.WriteString("\n【你上一轮思考的结论】\n")
for _, c := range lastConclusions {
sb.WriteString(fmt.Sprintf("- %s\n", c))
}
}
lastFollowUps := t.chain.LastFollowUps()
if len(lastFollowUps) > 0 {
sb.WriteString("\n【你上次想继续思考的问题】\n")
for _, f := range lastFollowUps {
sb.WriteString(fmt.Sprintf("- %s\n", f))
}
}
}
// IoT 设备状态
if deviceSummary != "" {
sb.WriteString("\n" + deviceSummary)
}
// 结尾引导
sb.WriteString("\n---\n现在请写下你的私人反思。")
sb.WriteString("\n记住:这是日记,用第三人称或自言自语的方式。")
sb.WriteString("\n⚠️ 如果开拓者正在休息/睡觉/忙碌——不要写【主动消息】。你可以在心里想他,但不要去打扰。")
sb.WriteString("\n只有在你确认他现在是醒着、有空、且真的需要关心时,才写【主动消息】。")
sb.WriteString("\n❗【主动消息】的内容必须直接对开拓者说话(用\"你\"来称呼他),就像你主动找他聊天一样自然。不要用第三人称或自言自语的方式写主动消息。")
return sb.String()
}
// filterToolsByPolicy 通过自主工具安全策略过滤工具列表
func (t *Thinker) filterToolsByPolicy(tools []llm.OpenAITool) []llm.OpenAITool {
if t.autoToolPolicy == nil || len(tools) == 0 {
return tools
}
allowed := make(map[string]bool)
for _, name := range t.autoToolPolicy.AllowedTools {
allowed[name] = true
}
var filtered []llm.OpenAITool
for _, tool := range tools {
if allowed[tool.Function.Name] {
filtered = append(filtered, tool)
}
}
if len(filtered) < len(tools) {
log.Printf("[后台思考] 工具策略过滤: %d/%d 工具可用", len(filtered), len(tools))
}
return filtered
}
// buildOpenAITools 将工具注册中心的定义转换为 LLM 层的 OpenAITool 格式
func (t *Thinker) buildOpenAITools() []llm.OpenAITool {
if t.toolRegistry == nil || !t.toolRegistry.IsEnabled() {
return nil
}
defs := t.toolRegistry.GetDefinitions()
if len(defs) == 0 {
return nil
}
result := make([]llm.OpenAITool, 0, len(defs))
for _, d := range defs {
result = append(result, llm.OpenAITool{
Type: "function",
Function: llm.OpenAIToolFunc{
Name: d.Name,
Description: d.Description,
Parameters: d.Parameters,
},
})
}
return result
}
// storeThought 存储思考结果到待推送队列,并异步持久化到 memory-service
func (t *Thinker) storeThought(content string, toolCallsJSON string, toolCallCount int) {
t.mu.Lock()
t.pendingThoughts = append(t.pendingThoughts, &PendingThought{
Content: content,
CreatedAt: time.Now(),
Consumed: false,
})
// 只保留最近 10 条
if len(t.pendingThoughts) > 10 {
t.pendingThoughts = t.pendingThoughts[len(t.pendingThoughts)-10:]
}
// 提取主动消息并推送(带频率限制)
proactiveMsg := extractProactiveMessage(content)
// 优先推送至活跃会话,回退到管理员主会话
pushSessionID := t.activeSessionID
if pushSessionID == "" {
pushSessionID = t.adminSessionID
}
pusher := t.messagePusher
canPush := proactiveMsg != "" && pusher != nil
if canPush {
// Phase 2: 使用 ProactiveGuard 多维度评估
urgency := ExtractUrgencyFromContent(proactiveMsg)
if valid, reason := ValidateProactiveMessage(proactiveMsg); !valid {
log.Printf("[后台思考] 主动消息内容校验失败: %s,跳过推送", reason)
canPush = false
}
if canPush && t.proactiveGuard != nil {
decision := t.proactiveGuard.Evaluate(time.Now(), t.lastProactiveMsgTime, urgency, "active")
logDecision(decision)
if !decision.ShouldSend {
canPush = false
} else {
t.lastProactiveMsgTime = time.Now()
t.proactiveGuard.RecordSend(time.Now())
}
} else if canPush {
gapSinceLast := time.Since(t.lastProactiveMsgTime)
if gapSinceLast < 30*time.Minute {
log.Printf("[后台思考] 主动消息距上次仅 %v,跳过推送", gapSinceLast.Round(time.Second))
canPush = false
} else {
t.lastProactiveMsgTime = time.Now()
}
}
}
t.mu.Unlock()
log.Printf("[后台思考] 思考已存储 (当前累积 %d 条待推送思考)", len(t.pendingThoughts))
// 异步持久化到 memory-service
if t.memClient != nil {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[后台思考] 持久化思考日志 panic 恢复: %v", r)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := t.memClient.SaveThinkingLog(ctx, t.adminUserID, content, toolCallsJSON, toolCallCount, len(content)); err != nil {
log.Printf("[后台思考] 持久化思考日志失败: %v", err)
} else {
log.Printf("[后台思考] 思考日志已持久化 (长度=%d, 工具调用=%d)", len(content), toolCallCount)
}
}()
}
// 推送主动消息
if canPush {
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[后台思考] 推送主动消息 panic 恢复: %v", r)
}
}()
log.Printf("[后台思考] 推送主动消息: %s", proactiveMsg)
pusher(t.adminUserID, pushSessionID, proactiveMsg)
}()
}
}
// extractProactiveMessage 从思考内容中提取【主动消息】标记的内容
// 返回空字符串表示没有主动消息
func extractProactiveMessage(content string) string {
marker := "【主动消息】"
idx := strings.Index(content, marker)
if idx < 0 {
return ""
}
// 提取标记后的内容(到下一个标记或结尾)
msg := strings.TrimSpace(content[idx+len(marker):])
// 截断到下一个【或换行之前的合理长度
if endIdx := strings.Index(msg, "【"); endIdx > 0 {
msg = strings.TrimSpace(msg[:endIdx])
}
// 限制主动消息长度(最多 200 字符,保持简短)
runes := []rune(msg)
if len(runes) > 200 {
msg = string(runes[:200])
}
if msg == "" {
return ""
}
return msg
}
// maybeMaintainMemories 周期性执行记忆维护(每 10 次思考触发一次)
func (t *Thinker) maybeMaintainMemories(thinkCount int) {
if thinkCount%10 != 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if t.memoryStore != nil && t.memoryStore.IsReady() {
if err := t.memoryStore.DecayMemories(ctx, t.adminUserID); err != nil {
log.Printf("[后台思考] 记忆衰减失败: %v", err)
}
if err := t.memoryStore.ConsolidateMemories(ctx, t.adminUserID); err != nil {
log.Printf("[后台思考] 记忆合并失败: %v", err)
}
}
}
// formatDeviceContext 格式化设备状态为文本
func formatDeviceContext(devices []tools.IoTDevice) string {
if len(devices) == 0 {
return ""
}
summary := "[当前IoT设备状态]\n"
for _, d := range devices {
switch d.Type {
case "light":
if d.Status == "on" {
summary += fmt.Sprintf("- %s: 开启 (亮度%d%%, %s)\n", d.Name, d.Brightness, d.Color)
} else {
summary += fmt.Sprintf("- %s: 关闭\n", d.Name)
}
case "ac":
if d.Status == "on" {
summary += fmt.Sprintf("- %s: 运行中 (%s%.0f°C)\n", d.Name, modeLabel(d.Mode), d.Temperature)
} else {
summary += fmt.Sprintf("- %s: 关闭\n", d.Name)
}
case "curtain":
statusLabel := "已关闭"
if d.Status == "open" {
statusLabel = "已打开"
}
summary += fmt.Sprintf("- %s: %s\n", d.Name, statusLabel)
case "sensor":
summary += fmt.Sprintf("- %s: %.1f%s\n", d.Name, d.Value, d.Unit)
case "lock":
statusLabel := "已锁定"
if d.Status == "unlocked" {
statusLabel = "已解锁"
}
summary += fmt.Sprintf("- %s: %s (电量%d%%)\n", d.Name, statusLabel, d.Battery)
}
}
return summary
}
// formatDurationHuman 将 Duration 格式化为人类可读的中文描述
func formatDurationHuman(d time.Duration) string {
minutes := int(d.Minutes())
if minutes < 1 {
return "不到一分钟"
}
if minutes < 60 {
return fmt.Sprintf("%d 分钟", minutes)
}
hours := minutes / 60
remainingMinutes := minutes % 60
if remainingMinutes == 0 {
return fmt.Sprintf("%d 小时", hours)
}
return fmt.Sprintf("%d 小时 %d 分钟", hours, remainingMinutes)
}
func modeLabel(mode string) string {
switch mode {
case "cool":
return "制冷"
case "heat":
return "制热"
case "auto":
return "自动"
default:
return mode
}
}
func getEnvBool(key string, fallback bool) bool {
v := os.Getenv(key)
if v == "" {
return fallback
}
b, err := strconv.ParseBool(v)
if err != nil {
return fallback
}
return b
}
func getEnvDuration(key string, fallbackSec int) time.Duration {
v := os.Getenv(key)
if v == "" {
return time.Duration(fallbackSec) * time.Second
}
sec, err := strconv.Atoi(v)
if err != nil {
return time.Duration(fallbackSec) * time.Second
}
return time.Duration(sec) * time.Second
}