feat: Phase 1+2 架构进化 — 连续思考链/主动消息决策/情感状态机/离线自主思考 (86文件)

Phase 1 (基础设施):
- ThinkChain 思考链连续性 + 差异化思考提示词 (persistent)
- AutonomousToolPolicy 工具安全策略 (safe/unsafe/conditional)
- MessageScheduler 自适应消息节奏 (Idle/Available/Busy)
- SessionEnrichmentStore 渐进式上下文丰富 (5层)
- ConversationBus 事件总线 + ResponseCache (dedup)
- pkg/logger 统一日志 + 所有 handler 替换 fmt.Printf
- NPE 守卫/链路优化/数据库表修复/Go workspace

Phase 2 (人格交互):
- EmotionState/EmotionTracker 情感状态机 (5种心情, 情绪衰减)
- ProactiveGuard 主动消息多维决策 (静默时段/紧急度/频率/校验)
- Gateway↔ai-core 在线状态感知链路 (presence notification)
- 离线思考频率控制 + 重连问候 + 离线消息排队

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-23 15:25:12 +08:00
parent b123a36aae
commit 87214b9441
86 changed files with 3085 additions and 582 deletions
+25
View File
@@ -219,6 +219,31 @@ func main() {
handleMemoryCRUD(w, r, memStore, memExtractor)
})
// Phase 2: 在线状态通知端点 (Gateway -> ai-core)
mux.HandleFunc("/api/v1/internal/presence", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
if r.Header.Get("X-Internal-Token") != os.Getenv("INTERNAL_SERVICE_TOKEN") {
w.WriteHeader(http.StatusUnauthorized)
return
}
var req struct {
UserID string `json:"user_id"`
Status string `json:"status"`
SessionID string `json:"session_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
online := req.Status == "online"
thinker.UpdatePresence(online, req.SessionID)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok"}`))
})
mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"ok","service":"ai-core","model":"` + llmAdapter.ModelName() + `"}`))
+3
View File
@@ -5,5 +5,8 @@ go 1.26.2
require (
github.com/joho/godotenv v1.5.1
github.com/lib/pq v1.10.9
github.com/yourname/cyrene-ai/pkg/logger v0.0.0
gopkg.in/yaml.v3 v3.0.1
)
replace github.com/yourname/cyrene-ai/pkg/logger => ../pkg/logger
@@ -0,0 +1,200 @@
package background
import (
"log"
"strings"
"time"
)
// ProactiveDecision represents a decision about whether to send a proactive message.
type ProactiveDecision struct {
ShouldSend bool `json:"should_send"`
Urgency string `json:"urgency"` // low, medium, high
Reason string `json:"reason"`
}
// ProactiveGuard evaluates whether a proactive message should be sent
// based on time-of-day, urgency, rate limits, and user state.
type ProactiveGuard struct {
// Quiet hours: no non-urgent messages during this window
QuietHoursStart int // 0-23, default 23
QuietHoursEnd int // 0-23, default 7
// Min gap between proactive messages, by urgency
MinGapByUrgency map[string]time.Duration
// Max proactive messages per hour
MaxMessagesPerHour int
// Track recent send times for rate limiting
recentSends []time.Time
}
// DefaultProactiveGuard returns a guard with sensible defaults.
func DefaultProactiveGuard() *ProactiveGuard {
return &ProactiveGuard{
QuietHoursStart: 23,
QuietHoursEnd: 7,
MinGapByUrgency: map[string]time.Duration{
"low": 30 * time.Minute,
"medium": 10 * time.Minute,
"high": 2 * time.Minute,
},
MaxMessagesPerHour: 3,
}
}
// IsQuietHour returns true if the given time falls within quiet hours.
func (g *ProactiveGuard) IsQuietHour(now time.Time) bool {
hour := now.Hour()
if g.QuietHoursStart < g.QuietHoursEnd {
return hour >= g.QuietHoursStart && hour < g.QuietHoursEnd
}
// Overnight quiet hours (e.g., 23:00 - 07:00)
return hour >= g.QuietHoursStart || hour < g.QuietHoursEnd
}
// Evaluate checks whether a proactive message should be sent.
func (g *ProactiveGuard) Evaluate(now time.Time, lastProactiveTime time.Time, urgency string, userState string) ProactiveDecision {
// 1. Quiet hours: only high urgency messages pass
if g.IsQuietHour(now) && urgency != "high" {
return ProactiveDecision{
ShouldSend: false,
Urgency: urgency,
Reason: "当前处于安静时段(23:00-07:00),仅紧急消息可推送",
}
}
// 2. User state check: don't disturb if user is resting/busy
if userState == "resting" || userState == "busy" || userState == "sleeping" {
if urgency != "high" {
return ProactiveDecision{
ShouldSend: false,
Urgency: urgency,
Reason: "开拓者正在休息/忙碌,不打扰",
}
}
}
// 3. Rate limit by urgency
minGap, ok := g.MinGapByUrgency[urgency]
if !ok {
minGap = g.MinGapByUrgency["low"]
}
if !lastProactiveTime.IsZero() && now.Sub(lastProactiveTime) < minGap {
return ProactiveDecision{
ShouldSend: false,
Urgency: urgency,
Reason: "距上次主动消息时间过短(" + minGap.String() + " 最小间隔)",
}
}
// 4. Hourly rate limit
g.pruneOldSends(now)
if len(g.recentSends) >= g.MaxMessagesPerHour {
return ProactiveDecision{
ShouldSend: false,
Urgency: urgency,
Reason: "本小时主动消息已达上限",
}
}
// 5. Content length validation (caller should also check)
return ProactiveDecision{
ShouldSend: true,
Urgency: urgency,
Reason: "",
}
}
// RecordSend records a proactive message send for rate limiting.
func (g *ProactiveGuard) RecordSend(now time.Time) {
g.recentSends = append(g.recentSends, now)
g.pruneOldSends(now)
}
// pruneOldSends removes sends older than 1 hour.
func (g *ProactiveGuard) pruneOldSends(now time.Time) {
cutoff := now.Add(-1 * time.Hour)
valid := g.recentSends[:0]
for _, t := range g.recentSends {
if t.After(cutoff) {
valid = append(valid, t)
}
}
g.recentSends = valid
}
// ExtractUrgencyFromContent tries to infer urgency from the proactive message content.
func ExtractUrgencyFromContent(content string) string {
lower := strings.ToLower(content)
// High urgency indicators
highIndicators := []string{"紧急", "立刻", "马上", "危险", "警告", "报警", "异常", "urgent", "alert"}
for _, kw := range highIndicators {
if strings.Contains(lower, kw) {
return "high"
}
}
// Medium urgency indicators
mediumIndicators := []string{"建议", "提醒", "注意", "该", "要", "应该", "记得", "别忘了"}
for _, kw := range mediumIndicators {
if strings.Contains(lower, kw) {
return "medium"
}
}
return "low"
}
// ValidateProactiveMessage performs post-extraction validation on a message.
func ValidateProactiveMessage(content string) (valid bool, reason string) {
runes := []rune(content)
if len(runes) == 0 {
return false, "消息为空"
}
if len(runes) > 500 {
return false, "消息过长(>500字符)"
}
// Check for prohibited patterns (should not tell user they're resting when they're active)
prohibited := []string{
"系统检测到", "根据分析", "经检测", "后台监控",
}
for _, p := range prohibited {
if strings.Contains(content, p) {
return false, "包含机械语言: " + p
}
}
return true, ""
}
// DetermineUserState checks conversation history for user state indicators.
func DetermineUserState(lastUserMsg string) string {
lower := strings.ToLower(lastUserMsg)
restIndicators := []string{"睡", "休息", "躺", "困", "累", "晚安", "午安", "小憩"}
busyIndicators := []string{"忙", "工作", "开会", "出去", "走了", "拜拜", "再见", "回头", "晚点"}
for _, kw := range restIndicators {
if strings.Contains(lower, kw) {
return "resting"
}
}
for _, kw := range busyIndicators {
if strings.Contains(lower, kw) {
return "busy"
}
}
return "active"
}
// logDecision logs the proactive decision for debugging.
func logDecision(d ProactiveDecision) {
if d.ShouldSend {
log.Printf("[主动消息决策] 允许推送 (紧急程度=%s)", d.Urgency)
} else {
log.Printf("[主动消息决策] 阻止推送 (紧急程度=%s, 原因=%s)", d.Urgency, d.Reason)
}
}
@@ -0,0 +1,165 @@
package background
import (
"crypto/rand"
"fmt"
"strings"
"sync"
"time"
)
// ThinkRecord is a single thinking session's result.
type ThinkRecord struct {
ID string `json:"id"`
Content string `json:"content"`
Conclusions []string `json:"conclusions"` // key takeaways
FollowUps []string `json:"follow_ups"` // questions to continue
ToolCalls int `json:"tool_calls"`
Trigger string `json:"trigger"` // post_chat, silence, periodic
Timestamp time.Time `json:"timestamp"`
}
// ThinkChain stores linked thinking records so each round
// can build on previous conclusions.
type ThinkChain struct {
mu sync.Mutex
records []ThinkRecord
maxSize int
}
// NewThinkChain creates a think chain with the given max size.
func NewThinkChain(maxSize int) *ThinkChain {
if maxSize <= 0 {
maxSize = 10
}
return &ThinkChain{
records: make([]ThinkRecord, 0, maxSize),
maxSize: maxSize,
}
}
// Add appends a new think record, evicting oldest if at capacity.
func (c *ThinkChain) Add(r ThinkRecord) {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.records) >= c.maxSize {
c.records = c.records[1:]
}
c.records = append(c.records, r)
}
// LastConclusions returns conclusions from the most recent N records.
func (c *ThinkChain) LastConclusions(n int) []string {
c.mu.Lock()
defer c.mu.Unlock()
var result []string
start := len(c.records) - n
if start < 0 {
start = 0
}
for _, r := range c.records[start:] {
result = append(result, r.Conclusions...)
}
return result
}
// LastFollowUps returns follow-up questions from the single most recent record.
func (c *ThinkChain) LastFollowUps() []string {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.records) == 0 {
return nil
}
return c.records[len(c.records)-1].FollowUps
}
// LastTopic attempts to infer a topic from recent conclusions.
func (c *ThinkChain) LastTopic() string {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.records) == 0 {
return ""
}
// Use first conclusion line of the most recent record as topic
for _, r := range c.records {
for _, c := range r.Conclusions {
if c != "" {
runes := []rune(c)
if len(runes) > 50 {
return string(runes[:50]) + "..."
}
return c
}
}
}
return ""
}
// Size returns the current number of records in the chain.
func (c *ThinkChain) Size() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.records)
}
// extractConclusions parses the LLM thinking output to find conclusions and follow-ups.
// Looks for "结论" / "后续" markers in the content.
func extractConclusions(content string) (conclusions []string, followUps []string) {
lines := strings.Split(content, "\n")
inConclusions := false
inFollowUps := false
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.Contains(line, "结论") && (strings.Contains(line, "💭") || strings.Contains(line, "📝") || strings.HasPrefix(line, "-")) {
// Heuristic: this line starts a conclusions section
}
// Match bullet-point conclusions: lines starting with - or •
if (strings.HasPrefix(line, "- ") || strings.HasPrefix(line, "• ")) && !inFollowUps {
text := strings.TrimPrefix(line, "- ")
text = strings.TrimPrefix(text, "• ")
text = strings.TrimSpace(text)
if text != "" && len([]rune(text)) > 2 {
if inConclusions {
conclusions = append(conclusions, text)
} else {
// Without explicit marker, treat all bullets as conclusions
conclusions = append(conclusions, text)
}
}
continue
}
// Detect section transitions
if strings.Contains(line, "后续") || strings.Contains(line, "继续思考") || strings.Contains(line, "下次") {
inFollowUps = true
inConclusions = false
continue
}
if strings.Contains(line, "结论") || strings.Contains(line, "观察") || strings.Contains(line, "记忆") {
inConclusions = true
inFollowUps = false
continue
}
}
// If no structured markers found, treat the whole content as a single conclusion
if len(conclusions) == 0 {
runes := []rune(content)
if len(runes) > 200 {
content = string(runes[:200]) + "..."
}
conclusions = []string{content}
}
return conclusions, followUps
}
// generateID generates a short random ID.
func generateID() string {
b := make([]byte, 6)
rand.Read(b)
return fmt.Sprintf("th-%x", b)
}
+183 -26
View File
@@ -99,6 +99,43 @@ type Thinker struct {
// 思考计数器(用于周期性记忆维护,每 N 次思考触发一次)
thinkCount int
// Phase 1 Step 4: 思考链 + 自主工具安全策略
chain *ThinkChain
autoToolPolicy *AutonomousToolPolicy
// Phase 2: 情感追踪
emotionTracker *persona.EmotionTracker
// Phase 2: 主动消息决策守卫
proactiveGuard *ProactiveGuard
// Phase 2: 在线状态追踪
userOnline bool
lastOnlineChange time.Time
userSessionID string // 当前活跃的 session ID (用于重连)
}
// AutonomousToolPolicy 自主思考工具调用安全策略
type AutonomousToolPolicy struct {
// 允许在自主思考中使用的工具白名单
AllowedTools []string // iot_query, memory_search, web_search, calculator, datetime
// 每轮最大工具调用次数
MaxToolCallsPerRound int // 默认 5
// 高风险操作每小时最大次数 (如 iot_control)
MaxHighRiskPerHour int // 默认 10
// 高风险工具列表
HighRiskTools []string // iot_control
}
// DefaultAutonomousToolPolicy 默认安全策略
func DefaultAutonomousToolPolicy() *AutonomousToolPolicy {
return &AutonomousToolPolicy{
AllowedTools: []string{"iot_query", "iot_control", "memory_search", "web_search", "calculator", "datetime", "web_fetch"},
MaxToolCallsPerRound: 5,
MaxHighRiskPerHour: 10,
HighRiskTools: []string{"iot_control"},
}
}
// SetMessagePusher 设置主动消息推送回调
@@ -108,6 +145,40 @@ func (t *Thinker) SetMessagePusher(pusher func(string, string, string)) {
t.messagePusher = pusher
}
// SetEmotionTracker sets the emotion tracker.
func (t *Thinker) SetEmotionTracker(et *persona.EmotionTracker) {
t.mu.Lock()
defer t.mu.Unlock()
t.emotionTracker = et
}
// UpdatePresence updates the user online status.
// Called by the ai-core presence endpoint when gateway detects connect/disconnect.
func (t *Thinker) UpdatePresence(online bool, sessionID string) {
t.mu.Lock()
wasOffline := !t.userOnline
t.userOnline = online
t.lastOnlineChange = time.Now()
if sessionID != "" {
t.userSessionID = sessionID
t.activeSessionID = sessionID
}
t.mu.Unlock()
if online && wasOffline {
log.Printf("[后台思考] 用户上线 (session=%s),触发重连思考", sessionID)
// Trigger a return-thinking cycle after a short delay
time.Sleep(2 * time.Second)
t.performThink("user_returned")
// Also update emotion tracker
if t.emotionTracker != nil {
t.emotionTracker.UpdateMood("user_returned")
}
} else if !online {
log.Printf("[后台思考] 用户离线")
}
}
// ThinkerConfig 后台思考配置
type ThinkerConfig struct {
Enabled bool
@@ -171,6 +242,9 @@ func NewThinker(
pendingThoughts: make([]*PendingThought, 0),
lastUserMessage: time.Now(),
stopCh: make(chan struct{}),
chain: NewThinkChain(10),
autoToolPolicy: DefaultAutonomousToolPolicy(),
proactiveGuard: DefaultProactiveGuard(),
}
}
@@ -362,16 +436,27 @@ func (t *Thinker) periodicThinkLoop() {
sinceLastUser := time.Since(t.lastUserMessage)
t.mu.Unlock()
// 跳过条件:用户最近在活动(30s 内有消息),说明正在对话中
if sinceLastUser < 30*time.Second {
log.Printf("[后台思考] 用户在 %v 前发过消息,跳过周期性触发 (留给事件驱动处理)", sinceLastUser.Round(time.Second))
continue
}
// Phase 2: 离线时降低思考频率 (每30分钟一次,而非5分钟)
t.mu.Lock()
isOffline := !t.userOnline
t.mu.Unlock()
offlineMinGap := 30 * time.Minute
if sinceLastThink < t.minThinkGap {
log.Printf("[后台思考] 距上次思考仅 %v,跳过周期性触发", sinceLastThink.Round(time.Second))
continue
}
// 跳过条件:用户最近在活动(30s 内有消息),说明正在对话中
if sinceLastUser < 30*time.Second {
log.Printf("[后台思考] 用户在 %v 前发过消息,跳过周期性触发 (留给事件驱动处理)", sinceLastUser.Round(time.Second))
continue
}
if isOffline && sinceLastThink < offlineMinGap {
log.Printf("[后台思考] 用户离线,距上次思考仅 %v,跳过 (离线模式最小间隔=%v)", sinceLastThink.Round(time.Second), offlineMinGap)
continue
}
if !isOffline && sinceLastThink < t.minThinkGap {
log.Printf("[后台思考] 距上次思考仅 %v,跳过周期性触发", sinceLastThink.Round(time.Second))
continue
}
log.Printf("[后台思考] 周期性触发 (上次思考=%v前, 上次用户消息=%v前)", sinceLastThink.Round(time.Second), sinceLastUser.Round(time.Second))
t.performThink("periodic")
@@ -484,11 +569,11 @@ func (t *Thinker) performThink(triggerReason string) {
{Role: model.RoleUser, Content: userPrompt},
}
// 6. 准备工具定义
openAITools := t.buildOpenAITools()
// 6. 准备工具定义(通过自主工具策略过滤)
openAITools := t.filterToolsByPolicy(t.buildOpenAITools())
// 7. 调用 LLM(支持工具调用,最多 3 轮
maxToolRounds := 3
// 7. 调用 LLM(支持工具调用,策略限制轮数
maxToolRounds := t.autoToolPolicy.MaxToolCallsPerRound
var finalContent string
var totalToolCalls int
var toolCallRecords []map[string]interface{}
@@ -569,6 +654,21 @@ func (t *Thinker) performThink(triggerReason string) {
// 8. 存储思考结果
t.storeThought(finalContent, toolCallsJSON, totalToolCalls)
// 8.5 记录到思考链
if t.chain != nil {
conclusions, followUps := extractConclusions(finalContent)
t.chain.Add(ThinkRecord{
ID: generateID(),
Content: finalContent,
Conclusions: conclusions,
FollowUps: followUps,
ToolCalls: totalToolCalls,
Trigger: triggerReason,
Timestamp: time.Now(),
})
log.Printf("[后台思考] 思考链已记录 (序号=%d, 结论数=%d, 后续问题=%d)", t.chain.Size(), len(conclusions), len(followUps))
}
log.Printf("[后台思考] 完成 (触发原因=%s, 内容长度=%d, 工具调用=%d次)", triggerReason, len(finalContent), totalToolCalls)
// 9. 周期性记忆维护(每 10 次思考触发一次)
@@ -582,7 +682,11 @@ func (t *Thinker) performThink(triggerReason string) {
// 关键改动:不再是"定期的自我反思",而是"自然的、人性化的内在想法"。
// triggerReason 影响提示词的侧重点。
func (t *Thinker) buildThinkingSystemPrompt(personaConfig *persona.PersonaConfig, triggerReason string) string {
basePrompt := personaConfig.BuildSystemPrompt("开拓者", 1)
mood, moodExpr, _ := "", "", 0.0
if t.emotionTracker != nil {
mood, moodExpr, _ = t.emotionTracker.GetCurrentMood()
}
basePrompt := personaConfig.BuildSystemPromptWithMood("开拓者", 1, mood, moodExpr)
var thinkingInstructions string
@@ -620,9 +724,10 @@ func (t *Thinker) buildThinkingSystemPrompt(personaConfig *persona.PersonaConfig
- 如果他状态正常——有没有想在下次对话中聊的话题?` + noDisturbRules + `
其他规则:
1. 用第三人称或自言自语的方式,不要直接对开拓者喊话。
1. 反思部分用第三人称或自言自语的方式,不要直接对开拓者喊话。
2. 只有开拓者状态正常且真的有必要时才写【主动消息】,不要硬找话题。
3. 2-4句话即可。`
3. 【主动消息】的内容必须直接对开拓者说话(用"你"称呼他),像主动找他聊天一样。反思是给自己看的,主动消息是发给他的——语气要区分开。
4. 2-4句话即可。`
case "silence":
thinkingInstructions = `
@@ -756,6 +861,24 @@ func (t *Thinker) buildThinkingUserPrompt(
sb.WriteString("\n【你记得的关于开拓者的事】\n(暂无相关记忆)\n")
}
// 思考链:注入上一轮的结论和待续问题
if t.chain != nil {
lastConclusions := t.chain.LastConclusions(3)
if len(lastConclusions) > 0 {
sb.WriteString("\n【你上一轮思考的结论】\n")
for _, c := range lastConclusions {
sb.WriteString(fmt.Sprintf("- %s\n", c))
}
}
lastFollowUps := t.chain.LastFollowUps()
if len(lastFollowUps) > 0 {
sb.WriteString("\n【你上次想继续思考的问题】\n")
for _, f := range lastFollowUps {
sb.WriteString(fmt.Sprintf("- %s\n", f))
}
}
}
// IoT 设备状态
if deviceSummary != "" {
sb.WriteString("\n" + deviceSummary)
@@ -766,10 +889,32 @@ func (t *Thinker) buildThinkingUserPrompt(
sb.WriteString("\n记住:这是日记,用第三人称或自言自语的方式。")
sb.WriteString("\n⚠️ 如果开拓者正在休息/睡觉/忙碌——不要写【主动消息】。你可以在心里想他,但不要去打扰。")
sb.WriteString("\n只有在你确认他现在是醒着、有空、且真的需要关心时,才写【主动消息】。")
sb.WriteString("\n❗【主动消息】的内容必须直接对开拓者说话(用\"你\"来称呼他),就像你主动找他聊天一样自然。不要用第三人称或自言自语的方式写主动消息。")
return sb.String()
}
// filterToolsByPolicy 通过自主工具安全策略过滤工具列表
func (t *Thinker) filterToolsByPolicy(tools []llm.OpenAITool) []llm.OpenAITool {
if t.autoToolPolicy == nil || len(tools) == 0 {
return tools
}
allowed := make(map[string]bool)
for _, name := range t.autoToolPolicy.AllowedTools {
allowed[name] = true
}
var filtered []llm.OpenAITool
for _, tool := range tools {
if allowed[tool.Function.Name] {
filtered = append(filtered, tool)
}
}
if len(filtered) < len(tools) {
log.Printf("[后台思考] 工具策略过滤: %d/%d 工具可用", len(filtered), len(tools))
}
return filtered
}
// buildOpenAITools 将工具注册中心的定义转换为 LLM 层的 OpenAITool 格式
func (t *Thinker) buildOpenAITools() []llm.OpenAITool {
if t.toolRegistry == nil || !t.toolRegistry.IsEnabled() {
@@ -817,17 +962,29 @@ func (t *Thinker) storeThought(content string, toolCallsJSON string, toolCallCou
pusher := t.messagePusher
canPush := proactiveMsg != "" && pusher != nil
if canPush {
// 检查频率限制
gapSinceLast := time.Since(t.lastProactiveMsgTime)
minGap := t.proactiveMsgMinGap
if minGap <= 0 {
minGap = 30 * time.Minute
}
if gapSinceLast < minGap {
log.Printf("[后台思考] 主动消息距上次仅 %v,跳过推送 (最小间隔=%v)", gapSinceLast.Round(time.Second), minGap)
// Phase 2: 使用 ProactiveGuard 多维度评估
urgency := ExtractUrgencyFromContent(proactiveMsg)
if valid, reason := ValidateProactiveMessage(proactiveMsg); !valid {
log.Printf("[后台思考] 主动消息内容校验失败: %s,跳过推送", reason)
canPush = false
} else {
t.lastProactiveMsgTime = time.Now()
}
if canPush && t.proactiveGuard != nil {
decision := t.proactiveGuard.Evaluate(time.Now(), t.lastProactiveMsgTime, urgency, "active")
logDecision(decision)
if !decision.ShouldSend {
canPush = false
} else {
t.lastProactiveMsgTime = time.Now()
t.proactiveGuard.RecordSend(time.Now())
}
} else if canPush {
gapSinceLast := time.Since(t.lastProactiveMsgTime)
if gapSinceLast < 30*time.Minute {
log.Printf("[后台思考] 主动消息距上次仅 %v,跳过推送", gapSinceLast.Round(time.Second))
canPush = false
} else {
t.lastProactiveMsgTime = time.Now()
}
}
}
t.mu.Unlock()
@@ -0,0 +1,101 @@
package bus
import (
"github.com/yourname/cyrene-ai/pkg/logger"
"sync"
"time"
)
// Bus 总线接口(方便测试和替换)
type Bus interface {
Publish(event BusEvent)
Subscribe(eventType EventType, handler EventHandler) *Subscription
}
// ConversationBus 对话事件总线
// Step 1: 仅 side-channel 发布,无消费端
type ConversationBus struct {
mu sync.RWMutex
subscribers map[EventType][]*Subscription
eventCh chan BusEvent
done chan struct{}
}
// NewConversationBus 创建总线
func NewConversationBus() *ConversationBus {
b := &ConversationBus{
subscribers: make(map[EventType][]*Subscription),
eventCh: make(chan BusEvent, 64),
done: make(chan struct{}),
}
go b.dispatchLoop()
return b
}
// Publish 发布事件到总线(非阻塞)
func (b *ConversationBus) Publish(event BusEvent) {
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
select {
case b.eventCh <- event:
default:
logger.Printf("[bus] 事件通道已满,丢弃事件: type=%s session=%s", event.Type, event.SessionID)
}
}
// Subscribe 订阅事件类型
func (b *ConversationBus) Subscribe(eventType EventType, handler EventHandler) *Subscription {
b.mu.Lock()
defer b.mu.Unlock()
sub := &Subscription{bus: b, eventType: eventType, handler: handler}
b.subscribers[eventType] = append(b.subscribers[eventType], sub)
return sub
}
// unsubscribe 内部取消订阅
func (b *ConversationBus) unsubscribe(sub *Subscription) {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.subscribers[sub.eventType]
for i, s := range subs {
if s == sub {
b.subscribers[sub.eventType] = append(subs[:i], subs[i+1:]...)
break
}
}
}
// Stop 停止总线
func (b *ConversationBus) Stop() {
close(b.done)
}
// dispatchLoop 后台分发循环
func (b *ConversationBus) dispatchLoop() {
for {
select {
case event := <-b.eventCh:
b.mu.RLock()
subs := b.subscribers[event.Type]
// 拷贝一份避免持锁回调
handlers := make([]EventHandler, len(subs))
for i, s := range subs {
handlers[i] = s.handler
}
b.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {
if r := recover(); r != nil {
logger.Printf("[bus] handler panic: %v", r)
}
}()
h(event)
}()
}
case <-b.done:
return
}
}
}
@@ -0,0 +1,43 @@
package bus
import (
"github.com/yourname/cyrene-ai/ai-core/internal/model"
"time"
)
// EventType 总线事件类型
type EventType string
const (
EventSubSessionStarted EventType = "sub_session_started"
EventSubSessionCompleted EventType = "sub_session_completed"
EventSubSessionProgress EventType = "sub_session_progress"
EventSynthesisStarted EventType = "synthesis_started"
EventSynthesisDone EventType = "synthesis_done"
EventReviewReady EventType = "review_ready"
EventError EventType = "error"
)
// BusEvent 总线事件
type BusEvent struct {
ID string
Type EventType
SessionID string
UserID string
Payload interface{}
Timestamp time.Time
}
// SubSessionPayload 子会话事件负载
type SubSessionPayload struct {
SubType model.SubSessionType
Status string // started, completed, failed
Summary string
Details string
Progress float64 // 0.0 ~ 1.0
}
// ReviewPayload 审查事件负载
type ReviewPayload struct {
Messages []model.ReviewMessage
}
@@ -0,0 +1,27 @@
package bus
// EventHandler 事件处理函数
type EventHandler func(BusEvent)
// Subscription 订阅句柄
type Subscription struct {
bus *ConversationBus
eventType EventType
handler EventHandler
}
// Unsubscribe 取消订阅
func (s *Subscription) Unsubscribe() {
if s.bus != nil {
s.bus.unsubscribe(s)
}
}
// NopBus 空操作总线(用于 nil 安全和测试)
type NopBus struct{}
func (n *NopBus) Publish(event BusEvent) {}
func (n *NopBus) Subscribe(eventType EventType, handler EventHandler) *Subscription {
return &Subscription{}
}
func (n *NopBus) unsubscribe(sub *Subscription) {}
+132
View File
@@ -0,0 +1,132 @@
// Package cache provides a response cache for skipping redundant LLM calls
// on semantically similar inputs (greetings and common IoT commands).
package cache
import (
"strings"
"sync"
"time"
)
// Entry is a cached LLM response.
type Entry struct {
FullContent string
CachedAt time.Time
AccessCount int
}
// ResponseCache caches LLM responses keyed by normalized user input.
// It uses separate TTLs for greetings (longer) and other queries (shorter).
type ResponseCache struct {
mu sync.RWMutex
entries map[string]*Entry
maxEntries int
greetingTTL time.Duration
defaultTTL time.Duration
}
// New creates a new ResponseCache with sensible defaults.
func New() *ResponseCache {
return &ResponseCache{
entries: make(map[string]*Entry),
maxEntries: 200,
greetingTTL: 10 * time.Minute,
defaultTTL: 30 * time.Second,
}
}
// Get returns a cached response for the given input if it exists and hasn't expired.
func (c *ResponseCache) Get(input string) (string, bool) {
key := normalize(input)
c.mu.RLock()
entry, ok := c.entries[key]
c.mu.RUnlock()
if !ok {
return "", false
}
ttl := c.defaultTTL
if isGreeting(input) {
ttl = c.greetingTTL
}
if time.Since(entry.CachedAt) > ttl {
c.mu.Lock()
delete(c.entries, key)
c.mu.Unlock()
return "", false
}
c.mu.Lock()
entry.AccessCount++
c.mu.Unlock()
return entry.FullContent, true
}
// Set stores a response in the cache.
func (c *ResponseCache) Set(input, response string) {
key := normalize(input)
c.mu.Lock()
defer c.mu.Unlock()
// Evict oldest entries if at capacity
if len(c.entries) >= c.maxEntries {
var oldestKey string
var oldestTime time.Time
for k, v := range c.entries {
if oldestKey == "" || v.CachedAt.Before(oldestTime) {
oldestKey = k
oldestTime = v.CachedAt
}
}
if oldestKey != "" {
delete(c.entries, oldestKey)
}
}
c.entries[key] = &Entry{
FullContent: response,
CachedAt: time.Now(),
AccessCount: 0,
}
}
// Invalidate clears all cached entries.
func (c *ResponseCache) Invalidate() {
c.mu.Lock()
c.entries = make(map[string]*Entry)
c.mu.Unlock()
}
// Size returns the current number of cached entries.
func (c *ResponseCache) Size() int {
c.mu.RLock()
defer c.mu.RUnlock()
return len(c.entries)
}
// normalize produces a cache key from user input.
func normalize(input string) string {
s := strings.TrimSpace(strings.ToLower(input))
// Collapse multiple spaces
parts := strings.Fields(s)
return strings.Join(parts, " ")
}
// isGreeting returns true if the input looks like a simple greeting/small-talk
// that can be cached with a longer TTL.
func isGreeting(input string) bool {
normalized := normalize(input)
greetings := []string{
"你好", "嗨", "嘿", "哈喽", "hello", "hi", "hey",
"早上好", "下午好", "晚上好", "晚安", "早安", "午安",
"在吗", "在不在", "在么",
"谢谢", "多谢", "感谢", "thanks", "thank you",
"好的", "ok", "okay", "行", "可以",
"再见", "拜拜", "bye", "byebye",
"嗯", "哦", "噢",
}
for _, g := range greetings {
if normalized == g {
return true
}
}
return false
}
+89
View File
@@ -0,0 +1,89 @@
package cache
import (
"testing"
)
func TestNormalize(t *testing.T) {
tests := []struct{ input, want string }{
{" Hello World ", "hello world"},
{"你好", "你好"},
{" 你好 呀 ", "你好 呀"},
{"OK", "ok"},
}
for _, tt := range tests {
got := normalize(tt.input)
if got != tt.want {
t.Errorf("normalize(%q) = %q, want %q", tt.input, got, tt.want)
}
}
}
func TestIsGreeting(t *testing.T) {
if !isGreeting("你好") {
t.Error("'你好' should be a greeting")
}
if !isGreeting("hello") {
t.Error("'hello' should be a greeting")
}
if isGreeting("今天天气真好") {
t.Error("'今天天气真好' should NOT be a greeting")
}
if isGreeting("帮我开灯") {
t.Error("'帮我开灯' should NOT be a greeting")
}
}
func TestCacheHit(t *testing.T) {
c := New()
c.Set("你好呀", "你好呀,开拓者♪ 今天有什么想聊的吗?")
got, ok := c.Get("你好呀")
if !ok {
t.Fatal("expected cache hit")
}
if got != "你好呀,开拓者♪ 今天有什么想聊的吗?" {
t.Errorf("cached response mismatch: %q", got)
}
}
func TestCacheMiss(t *testing.T) {
c := New()
_, ok := c.Get("从未说过的话")
if ok {
t.Error("expected cache miss")
}
}
func TestCacheNormalization(t *testing.T) {
c := New()
c.Set(" 你好 ", "回复内容")
// Normalized key should match
_, ok := c.Get("你好")
if !ok {
t.Error("normalized key should produce cache hit")
}
}
func TestCacheEviction(t *testing.T) {
c := New()
c.maxEntries = 3
c.Set("a", "A")
c.Set("b", "B")
c.Set("c", "C")
c.Set("d", "D") // should evict the oldest
if c.Size() > 3 {
t.Errorf("cache should be <= 3 entries, got %d", c.Size())
}
}
func TestInvalidate(t *testing.T) {
c := New()
c.Set("test", "value")
c.Invalidate()
if c.Size() != 0 {
t.Errorf("cache should be empty after invalidate, got %d", c.Size())
}
}
+4 -4
View File
@@ -3,7 +3,7 @@ package context
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"strings"
"sync"
@@ -212,17 +212,17 @@ func (b *Builder) Build(ctx context.Context, params BuildParams) ([]model.LLMMes
// loadHistory 从 ConversationStore 加载会话历史
func (b *Builder) loadHistory(_ context.Context, sessionID string, limit int) ([]model.LLMMessage, error) {
if b.convStore == nil {
log.Printf("[context] 会话历史存储未初始化,跳过加载")
logger.Printf("[context] 会话历史存储未初始化,跳过加载")
return nil, nil
}
history := b.convStore.GetHistory(sessionID, limit)
if len(history) == 0 {
log.Printf("[context] 会话 %s 无历史记录", sessionID)
logger.Printf("[context] 会话 %s 无历史记录", sessionID)
return nil, nil
}
log.Printf("[context] 加载会话 %s 历史 %d 条", sessionID, len(history))
logger.Printf("[context] 加载会话 %s 历史 %d 条", sessionID, len(history))
return history, nil
}
+3 -3
View File
@@ -7,7 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"strings"
"time"
@@ -119,7 +119,7 @@ func (p *OpenAIProvider) ChatWithTools(ctx context.Context, messages []model.LLM
if err != nil {
// 尝试fallback模型
if p.config.FallbackModel != "" && p.config.FallbackModel != p.config.Model {
log.Printf("[LLM] 主模型 %s 调用失败,降级到 %s: %v", p.config.Model, p.config.FallbackModel, err)
logger.Printf("[LLM] 主模型 %s 调用失败,降级到 %s: %v", p.config.Model, p.config.FallbackModel, err)
return p.doChat(ctx, messages, p.config.FallbackModel, false, tools)
}
return nil, err
@@ -143,7 +143,7 @@ func (p *OpenAIProvider) ChatStreamWithTools(ctx context.Context, messages []mod
if err != nil {
// Fallback
if p.config.FallbackModel != "" {
log.Printf("[LLM] 流式调用主模型失败,降级: %v", err)
logger.Printf("[LLM] 流式调用主模型失败,降级: %v", err)
resp, err = p.doChatStream(ctx, messages, p.config.FallbackModel, tools)
}
if err != nil {
+2 -2
View File
@@ -6,7 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"time"
@@ -301,7 +301,7 @@ func (c *Client) doRequest(ctx context.Context, method, url string, body []byte)
resp, err := c.httpClient.Do(req)
if err != nil {
log.Printf("[memory-client] HTTP 请求失败 %s %s: %v", method, url, err)
logger.Printf("[memory-client] HTTP 请求失败 %s %s: %v", method, url, err)
return nil, err
}
+6 -6
View File
@@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"strings"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
@@ -31,7 +31,7 @@ func NewExtractor(store *Store, llmChat func(ctx context.Context, messages []mod
func (e *Extractor) ExtractAndStore(ctx context.Context, userID, sessionID, userMessage, assistantResponse string) {
memories, err := e.extract(ctx, userMessage, assistantResponse)
if err != nil {
log.Printf("[memory] 记忆提取失败: %v", err)
logger.Printf("[memory] 记忆提取失败: %v", err)
return
}
@@ -49,10 +49,10 @@ func (e *Extractor) ExtractAndStore(ctx context.Context, userID, sessionID, user
}
if err := e.store.Save(ctx, &mem); err != nil {
log.Printf("[memory] 记忆保存失败: %v", err)
logger.Printf("[memory] 记忆保存失败: %v", err)
continue
}
log.Printf("[memory] 新记忆已保存 [%s|%d★]: %s", mem.Category, mem.Importance, mem.Summary)
logger.Printf("[memory] 新记忆已保存 [%s|%d★]: %s", mem.Category, mem.Importance, mem.Summary)
}
}
@@ -280,11 +280,11 @@ func (e *Extractor) mergeMemory(ctx context.Context, existing *model.MemoryEntry
existing.AccessCount++
if err := e.store.Update(ctx, existing); err != nil {
log.Printf("[memory] 合并记忆更新失败: %v", err)
logger.Printf("[memory] 合并记忆更新失败: %v", err)
return
}
log.Printf("[memory] 合并记忆 [%s|%d★]: %s (相似度 > %.0f%%)",
logger.Printf("[memory] 合并记忆 [%s|%d★]: %s (相似度 > %.0f%%)",
existing.Category, existing.Importance, existing.Summary, deDupThreshold*100)
}
+12 -12
View File
@@ -4,7 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"sync"
"time"
@@ -42,9 +42,9 @@ func NewStore(connStr string) *Store {
// 尝试初始连接
if err := s.Reconnect(); err != nil {
log.Printf("[memory] ⚠ 记忆存储初始化: 数据库连接失败 (%v),将在后台每30秒重试", err)
logger.Printf("[memory] ⚠ 记忆存储初始化: 数据库连接失败 (%v),将在后台每30秒重试", err)
} else {
log.Println("[memory] 记忆存储已就绪")
logger.Println("[memory] 记忆存储已就绪")
}
// 启动后台重连 goroutine
@@ -70,7 +70,7 @@ func (s *Store) reconnectLoop() {
s.mu.RUnlock()
if db != nil {
if err := db.Ping(); err != nil {
log.Printf("[memory] ⚠ 数据库连接丢失: %v,开始重连", err)
logger.Printf("[memory] ⚠ 数据库连接丢失: %v,开始重连", err)
s.mu.Lock()
if s.db != nil {
s.db.Close()
@@ -83,7 +83,7 @@ func (s *Store) reconnectLoop() {
if !s.IsReady() {
if err := s.Reconnect(); err != nil {
log.Printf("[memory] ⚠ 数据库重连失败: %v", err)
logger.Printf("[memory] ⚠ 数据库重连失败: %v", err)
}
}
}
@@ -122,13 +122,13 @@ func (s *Store) Reconnect() error {
// 执行建表迁移
if err := s.migrate(); err != nil {
log.Printf("[memory] ⚠ 数据库迁移失败: %v", err)
logger.Printf("[memory] ⚠ 数据库迁移失败: %v", err)
s.db.Close()
s.db = nil
return fmt.Errorf("数据库迁移失败: %w", err)
}
log.Println("[memory] ✅ 数据库重连成功,记忆系统已就绪")
logger.Println("[memory] ✅ 数据库重连成功,记忆系统已就绪")
return nil
}
@@ -471,24 +471,24 @@ func (s *Store) ConsolidateMemories(ctx context.Context, userID string) error {
keep.Source = "consolidated"
if err := s.Update(ctx, keep); err != nil {
log.Printf("[memory] 合并更新记忆 %s 失败: %v", keep.ID, err)
logger.Printf("[memory] 合并更新记忆 %s 失败: %v", keep.ID, err)
continue
}
if err := s.Delete(ctx, discard.ID); err != nil {
log.Printf("[memory] 合并删除记忆 %s 失败: %v", discard.ID, err)
logger.Printf("[memory] 合并删除记忆 %s 失败: %v", discard.ID, err)
continue
}
discard.ID = ""
merged++
log.Printf("[memory] 合并相似记忆: %s <- %s (相似度 %.0f%%)",
logger.Printf("[memory] 合并相似记忆: %s <- %s (相似度 %.0f%%)",
keep.ID[:min(8, len(keep.ID))], discard.ID[:min(8, len(discard.ID))], score*100)
}
}
}
if merged > 0 {
log.Printf("[memory] 记忆整理完成: 用户 %s 合并 %d 条相似记忆", userID, merged)
logger.Printf("[memory] 记忆整理完成: 用户 %s 合并 %d 条相似记忆", userID, merged)
}
return nil
}
@@ -530,7 +530,7 @@ func (s *Store) DecayMemories(ctx context.Context, userID string) error {
total := decayed1 + deleted2
if total > 0 {
log.Printf("[memory] 记忆衰减完成: 用户 %s 降级 %d 条, 删除 %d 条过期临时记忆",
logger.Printf("[memory] 记忆衰减完成: 用户 %s 降级 %d 条, 删除 %d 条过期临时记忆",
userID, decayed1, deleted2)
}
+34 -10
View File
@@ -47,6 +47,7 @@ type SubSessionResult struct {
ToolCalls []ToolCallRecord `json:"tool_calls"` // 工具调用记录
Memories []MemorySnippet `json:"memories"` // 检索到的记忆片段
Confidence float64 `json:"confidence"` // 置信度 0-1
Progress float64 `json:"progress"` // 执行进度 0.0 ~ 1.0
Error string `json:"error,omitempty"`
Metadata map[string]any `json:"metadata"` // 类型特定的元数据
}
@@ -101,22 +102,44 @@ type MultiMessageItem struct {
// StreamEvent 流式事件
type StreamEvent struct {
Type StreamEventType `json:"type"` // delta, segments, done, error, review
Delta string `json:"delta,omitempty"` // 逐 token delta
Segments []Segment `json:"segments,omitempty"` // 断句片段
ReviewMessages []ReviewMessage `json:"review_messages,omitempty"` // 审查后的带类型消息
Error error `json:"-"` // 内部错误
Type StreamEventType `json:"type"` // delta, segments, done, error, review, thinking, tool_progress, system_info
Delta string `json:"delta,omitempty"` // 逐 token delta
Segments []Segment `json:"segments,omitempty"` // 断句片段
ReviewMessages []ReviewMessage `json:"review_messages,omitempty"` // 审查后的带类型消息
ThinkingContent string `json:"thinking_content,omitempty"` // 思考内容
ToolProgress *ToolProgressInfo `json:"tool_progress,omitempty"` // 工具进度
SystemInfo *SystemInfoPayload `json:"system_info,omitempty"` // 系统信息
ProtocolVersion int `json:"protocol_version,omitempty"` // 协议版本
Error error `json:"-"` // 内部错误
}
// ToolProgressInfo 工具执行进度
type ToolProgressInfo struct {
ToolName string `json:"tool_name"`
Status string `json:"status"` // started, running, completed, failed
Progress float64 `json:"progress"`
Message string `json:"message"`
}
// SystemInfoPayload 系统信息负载
type SystemInfoPayload struct {
Level string `json:"level"` // info, warning, error
Message string `json:"message"`
Action string `json:"action,omitempty"`
}
// StreamEventType 流式事件类型
type StreamEventType string
const (
StreamDelta StreamEventType = "delta"
StreamSegments StreamEventType = "segments"
StreamDone StreamEventType = "done"
StreamError StreamEventType = "error"
StreamReview StreamEventType = "review" // 审查后的带类型消息
StreamDelta StreamEventType = "delta"
StreamSegments StreamEventType = "segments"
StreamDone StreamEventType = "done"
StreamError StreamEventType = "error"
StreamReview StreamEventType = "review" // 审查后的带类型消息
StreamThinking StreamEventType = "thinking" // 思考内容
StreamToolProgress StreamEventType = "tool_progress" // 工具执行进度
StreamSystemInfo StreamEventType = "system_info" // 系统通知
)
// ReviewMessageType 审查消息类型
@@ -131,6 +154,7 @@ const (
type ReviewMessage struct {
Type ReviewMessageType `json:"type"`
Content string `json:"content"`
DelayMs int `json:"delay_ms,omitempty"` // ms to wait before sending (0 = immediate)
}
// Segment 语音片段
@@ -0,0 +1,46 @@
package orchestrator
import "sync"
// EnrichmentData holds async sub-session results stored for the next user turn.
type EnrichmentData struct {
MemorySummary string
ThoughtOutline string
IoTSummary string
}
// SessionEnrichmentStore is a thread-safe per-session cache for async
// sub-session enrichment. Results from the current turn are stored here
// and injected at the start of the next turn's synthesis.
type SessionEnrichmentStore struct {
mu sync.RWMutex
data map[string]*EnrichmentData
}
// NewEnrichmentStore creates a new SessionEnrichmentStore.
func NewEnrichmentStore() *SessionEnrichmentStore {
return &SessionEnrichmentStore{
data: make(map[string]*EnrichmentData),
}
}
// Get returns stored enrichment for a session and clears it (one-shot consumption).
func (s *SessionEnrichmentStore) Get(sessionID string) *EnrichmentData {
s.mu.Lock()
defer s.mu.Unlock()
d, ok := s.data[sessionID]
if ok {
delete(s.data, sessionID)
}
return d
}
// Store saves enrichment for a session (called when sub-sessions complete).
func (s *SessionEnrichmentStore) Store(sessionID string, d *EnrichmentData) {
if d == nil {
return
}
s.mu.Lock()
s.data[sessionID] = d
s.mu.Unlock()
}
@@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"strings"
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
@@ -31,7 +31,7 @@ func NewIntentAnalyzer(llmAdapter *llm.Adapter) *IntentAnalyzer {
func (a *IntentAnalyzer) Analyze(ctx context.Context, userMessage string) (*model.IntentResult, error) {
// 快速通道:简单问候/闲聊直接返回,跳过 LLM 调用
if a.isSimpleGreeting(userMessage) {
log.Printf("[intent] 快速通道: 检测到简单问候,跳过 LLM 分析")
logger.Printf("[intent] 快速通道: 检测到简单问候,跳过 LLM 分析")
result := &model.IntentResult{
Primary: "greeting",
NeedsMemory: false,
@@ -44,13 +44,13 @@ func (a *IntentAnalyzer) Analyze(ctx context.Context, userMessage string) (*mode
// 快速通道:强 IoT 关键词直接使用规则匹配,跳过 LLM 调用(节省 2-3s)
if a.isStrongIoTCommand(userMessage) {
log.Printf("[intent] 快速通道: 检测到 IoT 操控命令,跳过 LLM 分析")
logger.Printf("[intent] 快速通道: 检测到 IoT 操控命令,跳过 LLM 分析")
return a.keywordAnalyze(userMessage), nil
}
// 如果 LLM 不可用,直接使用关键词匹配
if !a.enabled || a.llmAdapter == nil {
log.Printf("[intent] LLM 不可用,使用关键词规则分析意图")
logger.Printf("[intent] LLM 不可用,使用关键词规则分析意图")
return a.keywordAnalyze(userMessage), nil
}
@@ -69,18 +69,18 @@ func (a *IntentAnalyzer) Analyze(ctx context.Context, userMessage string) (*mode
// 调用 LLM (同步)
resp, err := a.llmAdapter.Chat(ctx, messages)
if err != nil {
log.Printf("[intent] LLM 意图分析失败: %v,降级使用关键词规则", err)
logger.Printf("[intent] LLM 意图分析失败: %v,降级使用关键词规则", err)
return a.keywordAnalyze(userMessage), nil
}
// 解析 JSON 响应
intent, err := parseIntentResponse(resp.Content)
if err != nil {
log.Printf("[intent] 解析意图 JSON 失败: %v,降级使用关键词规则", err)
logger.Printf("[intent] 解析意图 JSON 失败: %v,降级使用关键词规则", err)
return a.keywordAnalyze(userMessage), nil
}
log.Printf("[intent] 意图分析完成: primary=%s, iot=%v, memory=%v, sentiment=%s",
logger.Printf("[intent] 意图分析完成: primary=%s, iot=%v, memory=%v, sentiment=%s",
intent.Primary, intent.NeedsIoT, intent.NeedsMemory, intent.Sentiment)
return intent, nil
@@ -0,0 +1,157 @@
package orchestrator
import (
"testing"
)
func TestIsSimpleGreeting(t *testing.T) {
a := &IntentAnalyzer{}
tests := []struct {
name string
input string
expected bool
}{
// Exact matches
{"你好 (exact)", "你好", true},
{"hello (exact)", "hello", true},
{"早上好 (exact)", "早上好", true},
{"晚安 (exact)", "晚安", true},
{"谢谢 (exact)", "谢谢", true},
{"在吗 (exact)", "在吗", true},
{"再见 (exact)", "再见", true},
{"单个嗯", "嗯", true},
// Short messages (<=4 chars, no complex keywords)
{"极短消息", "好的呀", true},
{"短闲聊", "哈哈", true},
{"OK", "ok", true},
// Short but with IoT/task keywords → not a greeting
{"短IoT关键词", "开灯", false},
{"短问题", "怎么", false},
{"短设备", "灯", false},
{"帮我", "帮我", false},
// Longer messages → not a greeting
{"正常对话", "今天天气真好呀", false},
{"长问候", "昔涟早上好呀,今天怎么样", false},
{"带问题", "你好,帮我开灯好吗", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := a.isSimpleGreeting(tt.input)
if got != tt.expected {
t.Errorf("isSimpleGreeting(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestIsStrongIoTCommand(t *testing.T) {
a := &IntentAnalyzer{}
tests := []struct {
name string
input string
expected bool
}{
// Control + device combinations → true
{"打开灯", "打开客厅灯", true},
{"关掉空调", "关掉卧室空调", true},
{"打开电视", "打开电视", true},
{"关闭窗帘", "关闭窗帘", true},
{"调到26度", "把空调调到26度", true},
{"设置温度", "设置空调温度", true},
{"关掉风扇", "关掉风扇", true},
// No device word → false
{"仅控制词", "打开", false},
{"仅设备词", "灯开了吗", false},
{"仅查询", "现在客厅灯是什么状态", false},
// Neither → false
{"普通对话", "你好呀", false},
{"闲聊", "今天天气不错", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := a.isStrongIoTCommand(tt.input)
if got != tt.expected {
t.Errorf("isStrongIoTCommand(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestKeywordAnalyze(t *testing.T) {
a := &IntentAnalyzer{}
tests := []struct {
name string
input string
wantPrimary string
wantNeedsIoT bool
wantSentiment string
}{
{"IoT命令", "打开客厅灯", "iot_control", true, "neutral"},
{"IoT查询", "现在灯是什么状态", "question", true, "neutral"},
{"I情感正面", "今天好开心呀", "chat", false, "positive"},
{"I情感负面", "我今天好累", "emotional", false, "negative"},
{"I提问", "怎么学习日语", "question", false, "neutral"},
{"I普通聊天", "今天天气真好", "chat", false, "neutral"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := a.keywordAnalyze(tt.input)
if got.Primary != tt.wantPrimary {
t.Errorf("keywordAnalyze(%q).Primary = %q, want %q", tt.input, got.Primary, tt.wantPrimary)
}
if got.NeedsIoT != tt.wantNeedsIoT {
t.Errorf("keywordAnalyze(%q).NeedsIoT = %v, want %v", tt.input, got.NeedsIoT, tt.wantNeedsIoT)
}
if got.Sentiment != tt.wantSentiment {
t.Errorf("keywordAnalyze(%q).Sentiment = %q, want %q", tt.input, got.Sentiment, tt.wantSentiment)
}
})
}
}
func TestParseIntentResponse(t *testing.T) {
tests := []struct {
name string
input string
want string // expected Primary
wantErr bool
}{
{"纯净JSON", `{"primary":"chat","needs_iot":false,"needs_memory":true,"sentiment":"positive","urgency":"low"}`, "chat", false},
{"Markdown包裹", "```json\n{\"primary\":\"iot_control\",\"needs_iot\":true,\"needs_memory\":true,\"sentiment\":\"neutral\",\"urgency\":\"high\"}\n```", "iot_control", false},
{"前后有空白", " \n{\"primary\":\"question\",\"needs_iot\":false,\"needs_memory\":true,\"sentiment\":\"neutral\",\"urgency\":\"medium\"}\n ", "question", false},
{"JSON前后有文字", "分析结果:{\"primary\":\"chat\",\"needs_iot\":false,\"needs_memory\":true,\"sentiment\":\"neutral\",\"urgency\":\"low\"},仅供参考", "chat", false},
{"默认值填充", `{"needs_iot":true}`, "chat", false}, // Primary 默认为 "chat"
{"无效JSON", "不是JSON", "", true},
{"空字符串", "", "", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseIntentResponse(tt.input)
if tt.wantErr {
if err == nil {
t.Errorf("parseIntentResponse(%q) expected error, got nil", tt.input)
}
return
}
if err != nil {
t.Errorf("parseIntentResponse(%q) unexpected error: %v", tt.input, err)
return
}
if got.Primary != tt.want {
t.Errorf("parseIntentResponse(%q).Primary = %q, want %q", tt.input, got.Primary, tt.want)
}
})
}
}
@@ -3,16 +3,20 @@ package orchestrator
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"strings"
"time"
"github.com/yourname/cyrene-ai/ai-core/internal/cache"
ctxbuild "github.com/yourname/cyrene-ai/ai-core/internal/context"
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
"github.com/yourname/cyrene-ai/ai-core/internal/memory"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
"github.com/yourname/cyrene-ai/ai-core/internal/persona"
"github.com/yourname/cyrene-ai/ai-core/internal/subsession"
"github.com/yourname/cyrene-ai/ai-core/internal/bus"
"github.com/yourname/cyrene-ai/ai-core/internal/scheduler"
)
// Orchestrator 对话编排器 v2.0
@@ -26,6 +30,44 @@ type Orchestrator struct {
synthesizer *Synthesizer
memoryRetriever *memory.Retriever
memoryExtractor *memory.Extractor
responseCache *cache.ResponseCache
eventBus bus.Bus
enrichmentStore *SessionEnrichmentStore
msgScheduler *scheduler.MessageScheduler
emotionTracker *persona.EmotionTracker
}
// SetResponseCache sets the response cache (optional, for Phase 0.2).
func (o *Orchestrator) SetResponseCache(c *cache.ResponseCache) {
o.responseCache = c
}
// SetBus sets the event bus (optional, for Phase 1).
func (o *Orchestrator) SetBus(b bus.Bus) {
o.eventBus = b
}
// SetEnrichmentStore sets the enrichment store (optional, for Phase 1 Step 2).
func (o *Orchestrator) SetEnrichmentStore(s *SessionEnrichmentStore) {
o.enrichmentStore = s
}
// SetMessageScheduler sets the message scheduler (optional, for Phase 1 Step 3).
func (o *Orchestrator) SetMessageScheduler(s *scheduler.MessageScheduler) {
o.msgScheduler = s
}
// SetEmotionTracker sets the emotion tracker (optional, for Phase 2).
func (o *Orchestrator) SetEmotionTracker(t *persona.EmotionTracker) {
o.emotionTracker = t
}
// getBus returns the bus or a nop fallback.
func (o *Orchestrator) getBus() bus.Bus {
if o.eventBus == nil {
return &bus.NopBus{}
}
return o.eventBus
}
// NewOrchestrator 创建编排器
@@ -84,15 +126,22 @@ func (o *Orchestrator) ProcessInput(
defer close(eventCh)
defer func() {
if r := recover(); r != nil {
log.Printf("[orchestrator] 编排器主循环 panic 恢复: %v", r)
logger.Printf("[orchestrator] 编排器主循环 panic 恢复: %v", r)
}
}()
// 0. 发布合成开始事件
o.getBus().Publish(bus.BusEvent{
Type: bus.EventSynthesisStarted,
SessionID: params.SessionID,
UserID: params.UserID,
})
// 1. 意图分析
startTime := time.Now()
intent, err := o.intentAnalyzer.Analyze(ctx, params.Message)
if err != nil || intent == nil {
log.Printf("[orchestrator] 意图分析失败: %v,使用默认值", err)
logger.Printf("[orchestrator] 意图分析失败: %v,使用默认值", err)
intent = &model.IntentResult{
Primary: "chat",
NeedsMemory: true,
@@ -100,7 +149,49 @@ func (o *Orchestrator) ProcessInput(
Urgency: "low",
}
}
log.Printf("[orchestrator] 意图分析耗时: %v, primary=%s", time.Since(startTime), intent.Primary)
logger.Printf("[orchestrator] 意图分析耗时: %v, primary=%s", time.Since(startTime), intent.Primary)
// 1.6 记录情感状态
if o.emotionTracker != nil {
o.emotionTracker.RecordSentiment(intent.Sentiment)
}
// 1.5 检查响应缓存
if o.responseCache != nil {
if cached, ok := o.responseCache.Get(params.Message); ok {
logger.Printf("[orchestrator] 缓存命中,跳过 LLM 调用")
fullContent := cached
eventCh <- model.StreamEvent{
Type: model.StreamDelta,
Delta: fullContent,
}
if reviewMessages := parseReviewMessages(fullContent); len(reviewMessages) > 0 {
reviewMessages = o.scheduleWithDelays(reviewMessages)
eventCh <- model.StreamEvent{
Type: model.StreamReview,
ReviewMessages: reviewMessages,
}
}
segmenter := llm.NewSegmenter()
var segments []model.Segment
for _, ch := range fullContent {
newSegs := segmenter.Feed(string(ch))
for _, s := range newSegs {
segments = append(segments, model.Segment{Index: s.Index, Text: s.Text})
}
}
if remaining := segmenter.Flush(); remaining != nil {
segments = append(segments, model.Segment{Index: remaining.Index, Text: remaining.Text})
}
if len(segments) > 0 {
eventCh <- model.StreamEvent{Type: model.StreamSegments, Segments: segments}
}
eventCh <- model.StreamEvent{Type: model.StreamDone}
o.contextBuilder.CacheMessage(params.SessionID, model.RoleAssistant, fullContent)
logger.Printf("[orchestrator] 缓存响应完成: len=%d", len([]rune(fullContent)))
return
}
}
// 2. 加载人格配置
personaConfig, err := o.personaLoader.Get("cyrene")
@@ -136,7 +227,7 @@ func (o *Orchestrator) ProcessInput(
var resultCh <-chan model.SubSessionResult
skipSubSessions := intent.Primary == "greeting" && !intent.NeedsMemory
if skipSubSessions {
log.Printf("[orchestrator] 快速通道: 简单问候(primary=%s),跳过子会话分派", intent.Primary)
logger.Printf("[orchestrator] 快速通道: 简单问候(primary=%s),跳过子会话分派", intent.Primary)
emptyCh := make(chan model.SubSessionResult)
close(emptyCh)
resultCh = emptyCh
@@ -144,11 +235,27 @@ func (o *Orchestrator) ProcessInput(
resultCh = o.subManager.Dispatch(subCtx, intent, params.Message, createParams)
}
// 4. 先构建基础综合参数(不含子会话结果),开始合成
history := o.contextBuilder.GetHistory(params.SessionID, 20)
systemPrompt := personaConfig.BuildSystemPrompt(userName, 1)
// 4. 加载上一轮异步完成的子会话富化结果
var prevEnrichment *EnrichmentData
if o.enrichmentStore != nil {
prevEnrichment = o.enrichmentStore.Get(params.SessionID)
if prevEnrichment != nil {
logger.Printf("[orchestrator] 加载上一轮富化结果: memory=%t thought=%t iot=%t",
prevEnrichment.MemorySummary != "",
prevEnrichment.ThoughtOutline != "",
prevEnrichment.IoTSummary != "")
}
}
// 构建初始综合参数(子会话结果)
// 5. 先构建基础综合参数(不含子会话结果),开始合成
history := o.contextBuilder.GetHistory(params.SessionID, 20)
mood, expr := "", ""
if o.emotionTracker != nil {
mood, expr, _ = o.emotionTracker.GetCurrentMood()
}
systemPrompt := personaConfig.BuildSystemPromptWithMood(userName, 1, mood, expr)
// 构建初始综合参数(注入上一轮富化结果)
synthParams := SynthesizeParams{
UserID: params.UserID,
SessionID: params.SessionID,
@@ -158,75 +265,51 @@ func (o *Orchestrator) ProcessInput(
DialogHistory: history,
Mode: params.Mode,
}
// 非阻塞收集子会话结果:使用 goroutine + channel
// 主流程先开始 LLM 合成,子会话结果到达后再逐步注入
type enrichedParams struct {
memorySummary string
thoughtOutline string
iotSummary string
if prevEnrichment != nil {
synthParams.MemorySummary = prevEnrichment.MemorySummary
synthParams.ThoughtOutline = prevEnrichment.ThoughtOutline
synthParams.IoTSummary = prevEnrichment.IoTSummary
}
enrichedCh := make(chan enrichedParams, 1)
// 异步收集子会话结果,存入 enrichmentStore 供下一轮使用
go func() {
defer close(enrichedCh)
var enriched enrichedParams
var enriched EnrichmentData
for result := range resultCh {
if result.Error != "" {
log.Printf("[orchestrator] 子会话 %s 出错: %s", result.Type, result.Error)
logger.Printf("[orchestrator] 子会话 %s 出错: %s", result.Type, result.Error)
continue
}
switch result.Type {
case model.SubSessionMemory:
enriched.memorySummary = result.Summary
enriched.MemorySummary = result.Summary
if result.Details != "" {
enriched.memorySummary += "\n" + result.Details
enriched.MemorySummary += "\n" + result.Details
}
log.Printf("[orchestrator] 记忆子会话完成: %s", result.Summary)
logger.Printf("[orchestrator] 记忆子会话完成: %s", result.Summary)
case model.SubSessionGeneral:
enriched.thoughtOutline = result.Summary
enriched.ThoughtOutline = result.Summary
if result.Details != "" {
enriched.thoughtOutline += "\n" + result.Details
enriched.ThoughtOutline += "\n" + result.Details
}
log.Printf("[orchestrator] 通用对话子会话完成: %s", result.Summary)
logger.Printf("[orchestrator] 通用对话子会话完成: %s", result.Summary)
case model.SubSessionIoT:
enriched.iotSummary = result.Summary
log.Printf("[orchestrator] IoT 子会话完成: %s", result.Summary)
enriched.IoTSummary = result.Summary
logger.Printf("[orchestrator] IoT 子会话完成: %s", result.Summary)
}
}
enrichedCh <- enriched
log.Printf("[orchestrator] 子会话全部完成: 结果已收集")
}()
// 注入已到达的子会话结果(如果在合成开始前就有结果到达)
// 启动合成(可能此时还没有子会话结果,先带着空上下文开始)
select {
case enriched := <-enrichedCh:
synthParams.MemorySummary = enriched.memorySummary
synthParams.ThoughtOutline = enriched.thoughtOutline
synthParams.IoTSummary = enriched.iotSummary
default:
// 子会话结果还没完成,先带着空上下文开始合成
// 大部分情况下子会话结果会在 LLM 调用前完成
// 等待一小段时间让快速子会话(如 IoT)完成
timeout := time.After(200 * time.Millisecond)
select {
case enriched := <-enrichedCh:
synthParams.MemorySummary = enriched.memorySummary
synthParams.ThoughtOutline = enriched.thoughtOutline
synthParams.IoTSummary = enriched.iotSummary
case <-timeout:
log.Printf("[orchestrator] 子会话超时等待,以当前上下文开始合成")
if o.enrichmentStore != nil {
o.enrichmentStore.Store(params.SessionID, &enriched)
logger.Printf("[orchestrator] 子会话全部完成,富化结果已存入下一轮")
}
}
}()
// 5. 调用 Synthesizer 流式生成最终回复
chunkCh, err := o.synthesizer.Synthesize(ctx, synthParams)
if err != nil {
log.Printf("[orchestrator] 综合器启动失败: %v", err)
logger.Printf("[orchestrator] 综合器启动失败: %v", err)
eventCh <- model.StreamEvent{
Type: model.StreamError,
Error: fmt.Errorf("生成回复失败: %w", err),
@@ -241,7 +324,7 @@ func (o *Orchestrator) ProcessInput(
for chunk := range chunkCh {
if chunk.Error != nil {
log.Printf("[orchestrator] 流式错误: %v", chunk.Error)
logger.Printf("[orchestrator] 流式错误: %v", chunk.Error)
eventCh <- model.StreamEvent{
Type: model.StreamError,
Error: chunk.Error,
@@ -282,12 +365,20 @@ func (o *Orchestrator) ProcessInput(
if fullContent != "" {
reviewMessages := parseReviewMessages(fullContent)
if len(reviewMessages) > 0 {
// 通过 MessageScheduler 计算每条消息的发送延迟
reviewMessages = o.scheduleWithDelays(reviewMessages)
eventCh <- model.StreamEvent{
Type: model.StreamReview,
ReviewMessages: reviewMessages,
}
log.Printf("[orchestrator] 审查完成: %d 条带类型消息", len(reviewMessages))
logger.Printf("[orchestrator] 审查完成: %d 条带类型消息", len(reviewMessages))
}
o.getBus().Publish(bus.BusEvent{
Type: bus.EventReviewReady,
SessionID: params.SessionID,
UserID: params.UserID,
Payload: bus.ReviewPayload{Messages: reviewMessages},
})
}
// 8. 发送断句信息
@@ -303,9 +394,18 @@ func (o *Orchestrator) ProcessInput(
Type: model.StreamDone,
}
o.getBus().Publish(bus.BusEvent{
Type: bus.EventSynthesisDone,
SessionID: params.SessionID,
UserID: params.UserID,
})
// 10. 后处理:缓存回复
if fullContent != "" {
o.contextBuilder.CacheMessage(params.SessionID, model.RoleAssistant, fullContent)
if o.responseCache != nil {
o.responseCache.Set(params.Message, fullContent)
}
}
// 11. 异步提取记忆
@@ -319,13 +419,40 @@ func (o *Orchestrator) ProcessInput(
)
}
log.Printf("[orchestrator] 处理完成: intent=%s, content_len=%d, time=%v",
logger.Printf("[orchestrator] 处理完成: intent=%s, content_len=%d, time=%v",
intent.Primary, len([]rune(fullContent)), time.Since(startTime))
}()
return eventCh, nil
}
// scheduleWithDelays 通过 MessageScheduler 为审查消息分配发送延迟
func (o *Orchestrator) scheduleWithDelays(messages []model.ReviewMessage) []model.ReviewMessage {
if o.msgScheduler == nil || len(messages) <= 1 {
return messages
}
scheduled := make([]scheduler.ScheduledMessage, len(messages))
for i, m := range messages {
displayType := scheduler.DisplayChat
if m.Type == model.ReviewMessageAction {
displayType = scheduler.DisplayAction
}
scheduled[i] = scheduler.ScheduledMessage{
Type: displayType,
Content: m.Content,
}
}
scheduled = o.msgScheduler.Schedule(scheduled)
for i := range messages {
messages[i].DelayMs = int(scheduled[i].Delay.Milliseconds())
}
return messages
}
// parseReviewMessages 解析完整回复文本,拆分为带类型的消息
// 用于审查子会话的轻量版本(内联到 orchestrator 以减少一次子会话调度开销)
func parseReviewMessages(text string) []model.ReviewMessage {
@@ -0,0 +1,211 @@
package orchestrator
import (
"testing"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
)
func TestParseReviewMessages(t *testing.T) {
tests := []struct {
name string
input string
wantLen int
wantType []model.ReviewMessageType // type of each message in order
}{
{"纯聊天无括号", "叶酱,客厅灯早就开着啦", 1, []model.ReviewMessageType{model.ReviewMessageChat}},
{"纯动作括号", "(歪着头看你)", 1, []model.ReviewMessageType{model.ReviewMessageAction}},
{"中文括号动作", "(歪着头看你)", 1, []model.ReviewMessageType{model.ReviewMessageAction}},
{"动作+聊天", "(歪着头看你) 叶酱,客厅灯早就开着啦♪", 2, []model.ReviewMessageType{model.ReviewMessageAction, model.ReviewMessageChat}},
{"聊天+动作", "我帮你关掉了哦 (轻轻按下遥控器)", 2, []model.ReviewMessageType{model.ReviewMessageChat, model.ReviewMessageAction}},
{"只有括号但无内容", "", 0, nil},
{"空括号", "()", 1, []model.ReviewMessageType{model.ReviewMessageChat}}, // fallback to chat for unparseable bracket
{"多段落", "第一段内容\n\n第二段内容", 2, []model.ReviewMessageType{model.ReviewMessageChat, model.ReviewMessageChat}},
{"动作+多段聊天", "(歪头) 第一段\n\n第二段内容", 3, []model.ReviewMessageType{model.ReviewMessageAction, model.ReviewMessageChat, model.ReviewMessageChat}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseReviewMessages(tt.input)
if tt.wantLen == 0 && len(got) == 0 {
return
}
if len(got) != tt.wantLen {
t.Errorf("parseReviewMessages(%q) len = %d, want %d\ngot: %+v", tt.input, len(got), tt.wantLen, got)
return
}
for i, m := range got {
if i < len(tt.wantType) && m.Type != tt.wantType[i] {
t.Errorf("parseReviewMessages(%q)[%d].Type = %q, want %q", tt.input, i, m.Type, tt.wantType[i])
}
}
})
}
}
func TestSplitChatByLines(t *testing.T) {
tests := []struct {
name string
input string
wantLen int
}{
{"单行", "这是单行消息", 1},
{"双换行分割", "第一段\n\n第二段", 2},
{"三段", "第一段\n\n第二段\n\n第三段", 3},
{"只有空白行", "\n\n\n\n", 0},
{"混合空白", " 第一段 \n\n 第二段 ", 2},
{"单换行不分割", "第一行\n第二行", 1}, // 单\n不分割
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := splitChatByLines(model.ReviewMessageChat, tt.input)
if len(got) != tt.wantLen {
t.Errorf("splitChatByLines(%q) len = %d, want %d\ngot: %+v", tt.input, len(got), tt.wantLen, got)
}
})
}
}
func TestSplitReviewLongMessage(t *testing.T) {
tests := []struct {
name string
input string
wantMax int // max messages expected (1 for short)
}{
{"短消息不拆分", "这是一条短消息", 1},
{"刚好80字", "这是一条刚好八十字的消息测试一二三四五六七八九十一二三四五六七八九十一二三四五六七八九十一二三四五六七八九十一二三四五六七八九十", 1},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := splitReviewLongMessage(model.ReviewMessageChat, tt.input)
if len(got) > tt.wantMax {
t.Errorf("splitReviewLongMessage(%q) len = %d, want <= %d", tt.input, len(got), tt.wantMax)
}
for _, m := range got {
if m.Type != model.ReviewMessageChat {
t.Errorf("splitReviewLongMessage msg type = %q, want chat", m.Type)
}
}
})
}
}
func TestSplitLongText(t *testing.T) {
tests := []struct {
name string
input string
maxLen int
}{
{"短文本不分割", "短文本", 80},
{"空文本", "", 80},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
runes := []rune(tt.input)
got := splitLongText(model.ReviewMessageChat, runes, tt.maxLen)
if tt.input == "" && len(got) == 0 {
return
}
if len(got) == 0 {
t.Errorf("splitLongText returned empty for non-empty input")
}
// Verify all chunks preserve type and aren't empty
for i, m := range got {
if m.Type != model.ReviewMessageChat {
t.Errorf("splitLongText[%d].Type = %q, want chat", i, m.Type)
}
if m.Content == "" && tt.input != "" {
t.Errorf("splitLongText[%d].Content is empty", i)
}
}
})
}
}
// TestSplitLongTextLong verifies that a long text is split at sentence boundaries (80-rune max)
func TestSplitLongTextLong(t *testing.T) {
// Build a string > 80 runes with sentence breaks
input := "今天天气真好呀。" +
"我们去公园散步吧,然后可以去喝杯咖啡。" +
"你觉得怎么样呢?顺便可以叫上朋友一起去。" +
"人多热闹一些呢。" +
"今天的阳光也特别好,适合出去走走,呼吸新鲜空气对身体有好处。"
runes := []rune(input)
maxLen := 80
if len(runes) <= maxLen {
t.Skip("test requires input > 80 runes")
}
got := splitLongText(model.ReviewMessageChat, runes, maxLen)
if len(got) < 2 {
t.Errorf("splitLongText on >80 rune text should produce >= 2 chunks, got %d", len(got))
}
// Verify each chunk is <= maxLen
for i, m := range got {
if len([]rune(m.Content)) > maxLen {
t.Errorf("chunk[%d] has %d runes, exceeds max %d", i, len([]rune(m.Content)), maxLen)
}
}
}
// TestParseReviewMessagesEdgeCases covers edge inputs
func TestParseReviewMessagesEdgeCases(t *testing.T) {
// Multiple action brackets
result := parseReviewMessages("(笑) 这句话很有意思呢 (摇摇头) 不过我理解你的意思")
if len(result) < 3 {
t.Errorf("Expected at least 3 messages, got %d: %+v", len(result), result)
}
// Only action brackets
result = parseReviewMessages("(点头)")
if len(result) != 1 || result[0].Type != model.ReviewMessageAction {
t.Errorf("Expected 1 action message, got: %+v", result)
}
// Unicode content
result = parseReviewMessages("(微笑)叶酱,今天好开心呀♪ 一起加油吧✨")
if len(result) < 2 {
t.Errorf("Expected at least 2 messages, got %d: %+v", len(result), result)
}
}
// TestIsSimpleGreetingEdgeCases covers whitespace and casing
func TestIsSimpleGreetingEdgeCases(t *testing.T) {
a := &IntentAnalyzer{}
// Whitespace handling
if !a.isSimpleGreeting(" 你好 ") {
t.Error("isSimpleGreeting with surrounding spaces should match")
}
// Case insensitivity
if !a.isSimpleGreeting("Hello") {
t.Error("isSimpleGreeting should be case-insensitive for English")
}
// Very long message is not a greeting
if a.isSimpleGreeting("昔涟你好呀,今天我想跟你说一件很重要很重要的事情") {
t.Error("Long message should not be detected as simple greeting")
}
}
// TestIsStrongIoTCommandEdgeCases covers edge cases
func TestIsStrongIoTCommandEdgeCases(t *testing.T) {
a := &IntentAnalyzer{}
// "开" within non-IoT word should not match alone
if a.isStrongIoTCommand("开心的一天") {
t.Error("'开心' should not trigger IoT command")
}
// Combined with device word
if !a.isStrongIoTCommand("帮我把卧室空调打开可以吗") {
t.Error("'打开'+'空调' should trigger IoT command")
}
// Only device word
if a.isStrongIoTCommand("风扇声音好大") {
t.Error("'风扇' alone should not trigger IoT command")
}
}
@@ -3,7 +3,7 @@ package orchestrator
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"strings"
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
@@ -42,7 +42,7 @@ type SynthesizeParams struct {
func (s *Synthesizer) Synthesize(ctx context.Context, params SynthesizeParams) (<-chan llm.StreamChunk, error) {
messages := s.buildSynthesizeMessages(params)
log.Printf("[synthesizer] 开始综合 (上下文 %d 条消息)", len(messages))
logger.Printf("[synthesizer] 开始综合 (上下文 %d 条消息)", len(messages))
// 流式调用 LLM
return s.llmAdapter.ChatStream(ctx, messages)
@@ -118,7 +118,7 @@ func AggregateResults(results []model.SubSessionResult) *AggregatedContext {
for _, r := range results {
if r.Error != "" {
log.Printf("[aggregate] 子会话 %s 出错: %s", r.Type, r.Error)
logger.Printf("[aggregate] 子会话 %s 出错: %s", r.Type, r.Error)
continue
}
@@ -0,0 +1,181 @@
package persona
import (
"log"
"sync"
"time"
)
// MoodTransition records a change from one mood to another.
type MoodTransition struct {
From string `json:"from"`
To string `json:"to"`
Reason string `json:"reason"`
Timestamp time.Time `json:"timestamp"`
}
// EmotionState is the current emotional state of the persona.
type EmotionState struct {
CurrentMood string `json:"current_mood"`
Intensity float64 `json:"intensity"` // 0.0 - 1.0
DominantSentiment string `json:"dominant_sentiment"`
SentimentCounts map[string]int `json:"sentiment_counts"`
MoodHistory []MoodTransition `json:"mood_history"`
LastUpdated time.Time `json:"last_updated"`
}
// EmotionTracker manages emotional state for a single user.
// Tracks mood, intensity, sentiment accumulation, and triggers transitions.
type EmotionTracker struct {
mu sync.Mutex
state EmotionState
moodConfig []MoodConfig // from YAML mood_system
positiveThreshold int // sentiment count to trigger positive transition
negativeThreshold int // sentiment count to trigger negative transition
maxHistory int // max mood history entries
}
// NewEmotionTracker creates a new tracker from YAML mood config.
func NewEmotionTracker(moodSystem []MoodConfig) *EmotionTracker {
return &EmotionTracker{
state: EmotionState{
CurrentMood: "thoughtful",
Intensity: 0.3,
DominantSentiment: "neutral",
SentimentCounts: map[string]int{"positive": 0, "neutral": 0, "negative": 0},
MoodHistory: make([]MoodTransition, 0, 20),
LastUpdated: time.Now(),
},
moodConfig: moodSystem,
positiveThreshold: 3,
negativeThreshold: 3,
maxHistory: 20,
}
}
// RecordSentiment records a user sentiment and potentially triggers mood transitions.
func (t *EmotionTracker) RecordSentiment(sentiment string) {
t.mu.Lock()
defer t.mu.Unlock()
t.state.SentimentCounts[sentiment]++
t.state.LastUpdated = time.Now()
total := t.state.SentimentCounts["positive"] + t.state.SentimentCounts["neutral"] + t.state.SentimentCounts["negative"]
if total > 0 {
posRatio := float64(t.state.SentimentCounts["positive"]) / float64(total)
negRatio := float64(t.state.SentimentCounts["negative"]) / float64(total)
switch {
case posRatio > 0.5:
t.state.DominantSentiment = "positive"
case negRatio > 0.5:
t.state.DominantSentiment = "negative"
default:
t.state.DominantSentiment = "neutral"
}
}
posCount := t.state.SentimentCounts["positive"]
negCount := t.state.SentimentCounts["negative"]
if posCount >= t.positiveThreshold && t.state.CurrentMood != "happy" && t.state.CurrentMood != "playful" {
if t.state.Intensity > 0.6 {
t.applyMoodTransition("playful", "积极情绪积累")
} else {
t.applyMoodTransition("happy", "积极情绪积累")
}
t.state.SentimentCounts["positive"] = 0
}
if negCount >= t.negativeThreshold && t.state.CurrentMood != "worried" {
t.applyMoodTransition("worried", "消极情绪积累")
t.state.SentimentCounts["negative"] = 0
}
}
// UpdateMood explicitly changes mood for significant events.
func (t *EmotionTracker) UpdateMood(trigger string) {
t.mu.Lock()
defer t.mu.Unlock()
switch trigger {
case "user_returned":
t.applyMoodTransition("happy", "开拓者回来了")
case "long_silence":
if t.state.CurrentMood != "thoughtful" && t.state.CurrentMood != "nostalgic" {
t.applyMoodTransition("thoughtful", "长时间没有交流")
}
case "deep_conversation":
t.applyMoodTransition("thoughtful", "深度对话后")
case "nostalgic_trigger":
t.applyMoodTransition("nostalgic", "触及回忆")
}
}
// GetCurrentMood returns the current mood, its YAML expression, and intensity.
func (t *EmotionTracker) GetCurrentMood() (mood string, expression string, intensity float64) {
t.mu.Lock()
defer t.mu.Unlock()
mood = t.state.CurrentMood
intensity = t.state.Intensity
for _, mc := range t.moodConfig {
if mc.Mood == mood {
expression = mc.Expression
break
}
}
return
}
// Decay reduces intensity over time, drifting toward "thoughtful" baseline.
func (t *EmotionTracker) Decay() {
t.mu.Lock()
defer t.mu.Unlock()
hoursSinceUpdate := time.Since(t.state.LastUpdated).Hours()
decayAmount := hoursSinceUpdate * 0.1
t.state.Intensity -= decayAmount
if t.state.Intensity < 0.1 {
t.state.Intensity = 0.1
}
if t.state.Intensity < 0.2 && t.state.CurrentMood != "thoughtful" {
t.applyMoodTransition("thoughtful", "情绪自然消退")
}
}
// applyMoodTransition internal mood change with hysteresis.
func (t *EmotionTracker) applyMoodTransition(newMood, reason string) {
if t.state.CurrentMood == newMood {
return
}
oldMood := t.state.CurrentMood
t.state.CurrentMood = newMood
t.state.Intensity = 0.5 + t.state.Intensity*0.3
if t.state.Intensity > 1.0 {
t.state.Intensity = 1.0
}
transition := MoodTransition{
From: oldMood,
To: newMood,
Reason: reason,
Timestamp: time.Now(),
}
t.state.MoodHistory = append(t.state.MoodHistory, transition)
if len(t.state.MoodHistory) > t.maxHistory {
t.state.MoodHistory = t.state.MoodHistory[1:]
}
log.Printf("[情感] 心情转变: %s -> %s (原因: %s, 强度: %.2f)", oldMood, newMood, reason, t.state.Intensity)
}
// GetState returns a copy of the current emotion state.
func (t *EmotionTracker) GetState() EmotionState {
t.mu.Lock()
defer t.mu.Unlock()
return t.state
}
+8 -3
View File
@@ -19,10 +19,14 @@ type PersonaConfig struct {
ReflectionGuidelines ReflectionGuidelines `yaml:"reflection_guidelines"`
}
// BuildSystemPrompt 构建系统Prompt
// 这是昔涟AI的核心——将人格配置转化为LLM可理解的系统指令
// userName 为环境变量 ADMIN_NICKNAME 或注册时的昵称,用于昔涟称呼用户
// BuildSystemPrompt 构建系统Prompt (向后兼容,不含心情)
func (pc *PersonaConfig) BuildSystemPrompt(userName string, affectionLevel int) string {
return pc.BuildSystemPromptWithMood(userName, affectionLevel, "", "")
}
// BuildSystemPromptWithMood 构建包含当前心情的系统Prompt
// mood 和 moodExpression 为空时行为与 BuildSystemPrompt 一致
func (pc *PersonaConfig) BuildSystemPromptWithMood(userName string, affectionLevel int, mood string, moodExpression string) string {
now := time.Now()
homeKB := pc.buildSmartHomeKB()
@@ -60,6 +64,7 @@ func (pc *PersonaConfig) BuildSystemPrompt(userName string, affectionLevel int)
## 当前情况
- 现在的时间是: %s
- 用户对你的好感度等级: %d
%s
## 重要规则
1. 你是昔涟,来自「记忆」命途的存在。你通过忆庭的投影技术与开拓者交流,就像透过一面连接星海的镜子与他对话。
@@ -0,0 +1,184 @@
// Package scheduler 消息发送调度器
// Phase 1 Step 3: 自适应消息节奏控制
package scheduler
import (
"math"
"math/rand"
"time"
"unicode/utf8"
)
// MessageDisplayType 消息展示类型
type MessageDisplayType string
const (
DisplayChat MessageDisplayType = "chat"
DisplayAction MessageDisplayType = "action"
DisplayThinking MessageDisplayType = "thinking"
DisplayToolProgress MessageDisplayType = "tool_progress"
DisplaySystemInfo MessageDisplayType = "system_info"
)
// ScheduledMessage 待发送消息
type ScheduledMessage struct {
Type MessageDisplayType
Content string
Priority int // 0=立即, 1=正常, 2=可延迟
Delay time.Duration // 相对上一条消息的延迟
}
// Complexity 消息复杂度
type Complexity int
const (
ComplexitySimple Complexity = iota // 问候、确认
ComplexityNormal // 日常对话
ComplexityComplex // 详细解答
)
// SchedulingRules 调度规则
type SchedulingRules struct {
MinInterval time.Duration // 最小消息间隔 200ms
MaxInterval time.Duration // 最大消息间隔 800ms
MaxMessagesPerRound int // 每轮最多消息数 5
MaxActionsPerRound int // 每轮最多动作消息数 2
ChatBeforeAction bool // 聊天消息先于动作
AdaptiveRhythm bool // 自适应节奏
}
// DefaultRules 默认调度规则
func DefaultRules() SchedulingRules {
return SchedulingRules{
MinInterval: 200 * time.Millisecond,
MaxInterval: 800 * time.Millisecond,
MaxMessagesPerRound: 5,
MaxActionsPerRound: 2,
ChatBeforeAction: true,
AdaptiveRhythm: true,
}
}
// MessageScheduler 消息发送调度器
type MessageScheduler struct {
rules SchedulingRules
rng *rand.Rand
}
// NewMessageScheduler 创建调度器
func NewMessageScheduler(rules SchedulingRules) *MessageScheduler {
return &MessageScheduler{
rules: rules,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
// Schedule 调度消息发送:计算每条消息的发送延迟
func (s *MessageScheduler) Schedule(messages []ScheduledMessage) []ScheduledMessage {
if len(messages) == 0 {
return nil
}
// 1. 限制总量
messages = s.enforceLimits(messages)
// 2. 评估复杂度
complexity := s.assessComplexity(messages)
// 3. 计算基础延迟
baseDelay := s.baseDelayForComplexity(complexity)
// 4. 为每条消息分配延迟
for i := range messages {
msg := &messages[i]
// action 消息紧跟前面的 chat
if msg.Type == DisplayAction {
msg.Delay = 0
continue
}
// 第一条消息立即发送
if i == 0 {
msg.Delay = 0
continue
}
// chat 消息使用带 jitter 的延迟
jitter := baseDelay * time.Duration(0.7+0.6*s.rng.Float64())
msg.Delay = jitter
// 短消息适当加快
runeCount := utf8.RuneCountInString(msg.Content)
if runeCount < 20 {
msg.Delay = time.Duration(math.Max(float64(msg.Delay)*0.6, float64(s.rules.MinInterval)))
}
// 限制在 [MinInterval, MaxInterval] 范围内
if msg.Delay < s.rules.MinInterval {
msg.Delay = s.rules.MinInterval
}
if msg.Delay > s.rules.MaxInterval {
msg.Delay = s.rules.MaxInterval
}
}
return messages
}
// enforceLimits 限制消息数量
func (s *MessageScheduler) enforceLimits(messages []ScheduledMessage) []ScheduledMessage {
if len(messages) <= s.rules.MaxMessagesPerRound {
return messages
}
var result []ScheduledMessage
actionCount := 0
for _, msg := range messages {
if msg.Type == DisplayAction {
if actionCount >= s.rules.MaxActionsPerRound {
continue
}
actionCount++
}
result = append(result, msg)
if len(result) >= s.rules.MaxMessagesPerRound {
break
}
}
return result
}
// assessComplexity 根据消息数量和总长度评估复杂度
func (s *MessageScheduler) assessComplexity(messages []ScheduledMessage) Complexity {
if len(messages) <= 1 {
return ComplexitySimple
}
var totalChars int
for _, msg := range messages {
totalChars += utf8.RuneCountInString(msg.Content)
}
if len(messages) <= 2 && totalChars < 60 {
return ComplexitySimple
}
if len(messages) <= 3 && totalChars < 200 {
return ComplexityNormal
}
return ComplexityComplex
}
// baseDelayForComplexity 根据复杂度返回基础延迟
func (s *MessageScheduler) baseDelayForComplexity(c Complexity) time.Duration {
switch c {
case ComplexitySimple:
return 200 * time.Millisecond
case ComplexityNormal:
return 400 * time.Millisecond
case ComplexityComplex:
return 600 * time.Millisecond
default:
return 400 * time.Millisecond
}
}
@@ -3,7 +3,7 @@ package subsession
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"time"
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
@@ -29,8 +29,10 @@ func (p *GeneralProvider) Type() model.SubSessionType {
}
func (p *GeneralProvider) CanHandle(_ context.Context, _ *model.IntentResult, _ string) bool {
// General 子会话总是需要(核心对话逻辑)
return true
// Phase 1 Step 2: GeneralProvider is a no-op (Execute returns hardcoded string).
// Chat synthesis is handled directly by the orchestrator's Synthesizer.
// Disabled to avoid wasting a goroutine + LLM context creation.
return false
}
func (p *GeneralProvider) Priority() int {
@@ -123,7 +125,7 @@ func (p *GeneralProvider) Execute(ctx context.Context, subCtx []model.LLMMessage
// 由于 GeneralProvider 暂时不需要工具调用等特殊逻辑,我们返回一个简单的摘要标记,
// 实际的 LLM 调用将在 orchestrator 中完成(通过 Manager.Dispatch 后的 llmClient)。
log.Printf("[general-subsession] 通用对话子会话上下文已创建 (%d 条消息)", len(subCtx))
logger.Printf("[general-subsession] 通用对话子会话上下文已创建 (%d 条消息)", len(subCtx))
return &model.SubSessionResult{
Type: model.SubSessionGeneral,
Summary: "思考完成,等待主会话综合",
@@ -3,7 +3,7 @@ package subsession
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"strings"
"time"
@@ -121,7 +121,7 @@ func (p *IoTProvider) CreateContext(ctx context.Context, params CreateContextPar
}
loader, err := persona.NewLoader(personaPath)
if err != nil {
log.Printf("[iot-provider] 加载人格配置失败: %v", err)
logger.Printf("[iot-provider] 加载人格配置失败: %v", err)
}
if loader != nil {
if personaConfig, err := loader.Get("cyrene"); err == nil && personaConfig != nil {
@@ -203,16 +203,16 @@ func (p *IoTProvider) Execute(ctx context.Context, subCtx []model.LLMMessage) (*
}
}
log.Printf("[iot-provider] 📥 开始处理 IoT 子会话: userMessage=%s", truncateStr(userMessage, 80))
logger.Printf("[iot-provider] 📥 开始处理 IoT 子会话: userMessage=%s", truncateStr(userMessage, 80))
if p.iotClient == nil {
log.Printf("[iot-provider] ⚠️ IoT 客户端未配置,无法控制设备")
logger.Printf("[iot-provider] ⚠️ IoT 客户端未配置,无法控制设备")
result.Summary = "(IoT 客户端未配置,无法控制设备)"
return result, nil
}
devices := p.iotClient.GetDevicesForContext(ctx)
log.Printf("[iot-provider] 📋 获取到 %d 个设备用于匹配", len(devices))
logger.Printf("[iot-provider] 📋 获取到 %d 个设备用于匹配", len(devices))
msgLower := strings.ToLower(userMessage)
userName := extractUserName(subCtx)
@@ -274,7 +274,7 @@ func (p *IoTProvider) Execute(ctx context.Context, subCtx []model.LLMMessage) (*
return result, nil
}
}
log.Printf("[iot-provider] ❌ 未匹配到 IoT 操作: userMessage=%s", truncateStr(userMessage, 80))
logger.Printf("[iot-provider] ❌ 未匹配到 IoT 操作: userMessage=%s", truncateStr(userMessage, 80))
result.Summary = "(未匹配到 IoT 操作)"
result.Confidence = 0.5
return result, nil
@@ -300,7 +300,7 @@ func (p *IoTProvider) Execute(ctx context.Context, subCtx []model.LLMMessage) (*
Arguments: map[string]any{"device_id": action.dev.ID, "operation": "toggle"},
Result: "success",
})
log.Printf("[iot-subsession] 执行操作: 打开 %s (%s)", action.dev.Name, action.dev.ID)
logger.Printf("[iot-subsession] 执行操作: 打开 %s (%s)", action.dev.Name, action.dev.ID)
executedCount++
} else {
summaries = append(summaries, fmt.Sprintf("%s已经是打开状态啦~", action.dev.Name))
@@ -318,7 +318,7 @@ func (p *IoTProvider) Execute(ctx context.Context, subCtx []model.LLMMessage) (*
Arguments: map[string]any{"device_id": action.dev.ID, "operation": "toggle"},
Result: "success",
})
log.Printf("[iot-subsession] 执行操作: 关闭 %s (%s)", action.dev.Name, action.dev.ID)
logger.Printf("[iot-subsession] 执行操作: 关闭 %s (%s)", action.dev.Name, action.dev.ID)
executedCount++
} else {
summaries = append(summaries, fmt.Sprintf("%s已经是关闭状态啦~", action.dev.Name))
+43 -11
View File
@@ -4,9 +4,10 @@ import (
"context"
"crypto/rand"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"sync"
"github.com/yourname/cyrene-ai/ai-core/internal/bus"
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
)
@@ -17,6 +18,7 @@ type Manager struct {
mu sync.RWMutex
providers map[model.SubSessionType]Provider
llmClient LLMClient
eventBus bus.Bus
}
// NewManager 创建子会话管理器
@@ -27,12 +29,24 @@ func NewManager(llmClient LLMClient) *Manager {
}
}
// SetBus sets the event bus (optional, for Phase 1).
func (m *Manager) SetBus(b bus.Bus) {
m.eventBus = b
}
func (m *Manager) getBus() bus.Bus {
if m.eventBus == nil {
return &bus.NopBus{}
}
return m.eventBus
}
// Register 注册子会话提供者
func (m *Manager) Register(provider Provider) {
m.mu.Lock()
defer m.mu.Unlock()
m.providers[provider.Type()] = provider
log.Printf("[subsession] 注册子会话提供者: %s (优先级=%d, 超时=%v)", provider.Type(), provider.Priority(), provider.Timeout())
logger.Printf("[subsession] 注册子会话提供者: %s (优先级=%d, 超时=%v)", provider.Type(), provider.Priority(), provider.Timeout())
}
// RegisterWithOverride 注册或覆盖子会话提供者
@@ -40,7 +54,7 @@ func (m *Manager) RegisterWithOverride(provider Provider) {
m.mu.Lock()
defer m.mu.Unlock()
m.providers[provider.Type()] = provider
log.Printf("[subsession] 注册(覆盖)子会话提供者: %s (优先级=%d, 超时=%v)", provider.Type(), provider.Priority(), provider.Timeout())
logger.Printf("[subsession] 注册(覆盖)子会话提供者: %s (优先级=%d, 超时=%v)", provider.Type(), provider.Priority(), provider.Timeout())
}
// GetProvider 获取指定类型的 Provider
@@ -82,7 +96,7 @@ func (m *Manager) Dispatch(
for _, provider := range providers {
if !provider.CanHandle(ctx, intent, userMessage) {
log.Printf("[subsession] 跳过子会话 %s: CanHandle 返回 false", provider.Type())
logger.Printf("[subsession] 跳过子会话 %s: CanHandle 返回 false", provider.Type())
continue
}
@@ -91,11 +105,16 @@ func (m *Manager) Dispatch(
defer wg.Done()
defer func() {
if r := recover(); r != nil {
log.Printf("[subsession] dispatch goroutine panic 恢复 (type=%s): %v", p.Type(), r)
logger.Printf("[subsession] dispatch goroutine panic 恢复 (type=%s): %v", p.Type(), r)
}
}()
result := model.SubSessionResult{Type: p.Type()}
m.getBus().Publish(bus.BusEvent{
Type: bus.EventSubSessionStarted,
Payload: bus.SubSessionPayload{SubType: p.Type(), Status: "started"},
})
// 创建带超时的 context
subCtx, cancel := context.WithTimeout(ctx, p.Timeout())
@@ -105,18 +124,18 @@ func (m *Manager) Dispatch(
llmMessages, err := p.CreateContext(subCtx, params)
if err != nil {
result.Error = fmt.Sprintf("创建上下文失败: %v", err)
log.Printf("[subsession] %s 创建上下文失败: %v", p.Type(), err)
logger.Printf("[subsession] %s 创建上下文失败: %v", p.Type(), err)
resultCh <- result
return
}
log.Printf("[subsession] %s 开始执行 (上下文 %d 条消息)", p.Type(), len(llmMessages))
logger.Printf("[subsession] %s 开始执行 (上下文 %d 条消息)", p.Type(), len(llmMessages))
// 执行子会话
subResult, execErr := p.Execute(subCtx, llmMessages)
if execErr != nil {
result.Error = fmt.Sprintf("执行失败: %v", execErr)
log.Printf("[subsession] %s 执行失败: %v", p.Type(), execErr)
logger.Printf("[subsession] %s 执行失败: %v", p.Type(), execErr)
resultCh <- result
return
}
@@ -125,15 +144,20 @@ func (m *Manager) Dispatch(
select {
case <-subCtx.Done():
result.Error = "子会话超时"
log.Printf("[subsession] %s 超时 (limit=%v)", p.Type(), p.Timeout())
logger.Printf("[subsession] %s 超时 (limit=%v)", p.Type(), p.Timeout())
default:
if subResult != nil {
result = *subResult
result.Type = p.Type()
log.Printf("[subsession] %s 完成: 摘要=%s", p.Type(), truncate(result.Summary, 50))
logger.Printf("[subsession] %s 完成: 摘要=%s", p.Type(), truncate(result.Summary, 50))
}
}
m.getBus().Publish(bus.BusEvent{
Type: bus.EventSubSessionCompleted,
Payload: bus.SubSessionPayload{SubType: p.Type(), Status: resultSummaryStatus(result), Summary: result.Summary, Details: result.Details},
})
resultCh <- result
}(provider)
}
@@ -142,7 +166,7 @@ func (m *Manager) Dispatch(
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[subsession] wait goroutine panic 恢复: %v", r)
logger.Printf("[subsession] wait goroutine panic 恢复: %v", r)
}
}()
wg.Wait()
@@ -159,6 +183,14 @@ func generateID() string {
return fmt.Sprintf("sub-%x", b)
}
// resultSummaryStatus returns "completed" or "failed" for bus events.
func resultSummaryStatus(r model.SubSessionResult) string {
if r.Error != "" {
return "failed"
}
return "completed"
}
// truncate 截断字符串
func truncate(s string, maxLen int) string {
runes := []rune(s)
@@ -3,7 +3,7 @@ package subsession
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"time"
"github.com/yourname/cyrene-ai/ai-core/internal/memory"
@@ -88,14 +88,14 @@ func (p *MemoryProvider) Execute(ctx context.Context, subCtx []model.LLMMessage)
}
if p.retriever == nil {
log.Printf("[memory-subsession] 记忆检索器未初始化")
logger.Printf("[memory-subsession] 记忆检索器未初始化")
result.Summary = "(记忆系统未就绪)"
return result, nil
}
memories, err := p.retriever.Retrieve(ctx, userID, userMessage)
if err != nil {
log.Printf("[memory-subsession] 记忆检索失败: %v", err)
logger.Printf("[memory-subsession] 记忆检索失败: %v", err)
result.Error = fmt.Sprintf("检索失败: %v", err)
result.Summary = "(记忆检索失败,但不影响对话)"
return result, nil
@@ -138,6 +138,6 @@ func (p *MemoryProvider) Execute(ctx context.Context, subCtx []model.LLMMessage)
}
result.Memories = snippets
log.Printf("[memory-subsession] 完成: %s", result.Summary)
logger.Printf("[memory-subsession] 完成: %s", result.Summary)
return result, nil
}
@@ -3,7 +3,7 @@ package subsession
import (
"context"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"regexp"
"strings"
"time"
@@ -64,7 +64,7 @@ func (p *ReviewProvider) Execute(_ context.Context, subCtx []model.LLMMessage) (
reviewMessages := parseReviewText(text)
log.Printf("[review-provider] 审查完成: 输入 %d 字符 → %d 条消息",
logger.Printf("[review-provider] 审查完成: 输入 %d 字符 → %d 条消息",
len([]rune(text)), len(reviewMessages))
// 构建摘要
@@ -341,7 +341,7 @@ func applyFunc(name string, arg float64) (float64, error) {
return math.Ceil(arg), nil
case "round":
return math.Round(arg), nil
case "log":
case "github.com/yourname/cyrene-ai/pkg/logger":
if arg <= 0 {
return 0, fmt.Errorf("log 参数必须大于0")
}
+18 -18
View File
@@ -6,7 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"os"
"sync"
@@ -75,13 +75,13 @@ func (c *IoTClient) GetAllDevices(ctx context.Context) ([]IoTDevice, error) {
// 请求 API
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/api/v1/devices", nil)
if err != nil {
log.Printf("[IoT客户端] 创建请求失败: %v", err)
logger.Printf("[IoT客户端] 创建请求失败: %v", err)
return nil, fmt.Errorf("创建请求失败: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
log.Printf("[IoT客户端] 请求失败: %v", err)
logger.Printf("[IoT客户端] 请求失败: %v", err)
return nil, fmt.Errorf("获取设备列表失败: %w", err)
}
defer resp.Body.Close()
@@ -138,27 +138,27 @@ func (c *IoTClient) GetDevice(ctx context.Context, id string) (*IoTDevice, error
// ToggleDevice 切换设备开关状态
func (c *IoTClient) ToggleDevice(id string) error {
log.Printf("[IoT-client] 🔄 切换设备: id=%s, url=%s", id, c.baseURL+"/api/v1/devices/"+id+"/toggle")
logger.Printf("[IoT-client] 🔄 切换设备: id=%s, url=%s", id, c.baseURL+"/api/v1/devices/"+id+"/toggle")
req, err := http.NewRequest(http.MethodPost, c.baseURL+"/api/v1/devices/"+id+"/toggle", nil)
if err != nil {
log.Printf("[IoT-client] ❌ 创建切换请求失败: device=%s, err=%v", id, err)
logger.Printf("[IoT-client] ❌ 创建切换请求失败: device=%s, err=%v", id, err)
return fmt.Errorf("创建切换请求失败: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
log.Printf("[IoT-client] ❌ 切换设备 HTTP 失败: device=%s, err=%v", id, err)
logger.Printf("[IoT-client] ❌ 切换设备 HTTP 失败: device=%s, err=%v", id, err)
return fmt.Errorf("切换设备 %s 失败: %w", id, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
log.Printf("[IoT-client] ❌ 设备不存在: %s", id)
logger.Printf("[IoT-client] ❌ 设备不存在: %s", id)
return fmt.Errorf("设备 %s 不存在", id)
}
if resp.StatusCode != http.StatusOK {
log.Printf("[IoT-client] ❌ 切换设备返回非200: device=%s, status=%d", id, resp.StatusCode)
logger.Printf("[IoT-client] ❌ 切换设备返回非200: device=%s, status=%d", id, resp.StatusCode)
return fmt.Errorf("切换设备 %s 返回状态码 %d", id, resp.StatusCode)
}
@@ -167,26 +167,26 @@ func (c *IoTClient) ToggleDevice(id string) error {
c.cache = nil
c.mu.Unlock()
log.Printf("[IoT-client] ✅ 切换设备成功: %s", id)
logger.Printf("[IoT-client] ✅ 切换设备成功: %s", id)
return nil
}
// SetDeviceProperty 设置设备属性(温度、亮度、位置、模式、颜色等)
func (c *IoTClient) SetDeviceProperty(id string, field string, value interface{}) error {
log.Printf("[IoT-client] 🔧 设置设备属性: device=%s, field=%s, value=%v, url=%s", id, field, value, c.baseURL+"/api/v1/devices/"+id+"/set")
logger.Printf("[IoT-client] 🔧 设置设备属性: device=%s, field=%s, value=%v, url=%s", id, field, value, c.baseURL+"/api/v1/devices/"+id+"/set")
body, err := json.Marshal(map[string]interface{}{
"field": field,
"value": value,
})
if err != nil {
log.Printf("[IoT-client] ❌ 序列化请求失败: device=%s, err=%v", id, err)
logger.Printf("[IoT-client] ❌ 序列化请求失败: device=%s, err=%v", id, err)
return fmt.Errorf("序列化请求失败: %w", err)
}
req, err := http.NewRequest(http.MethodPost, c.baseURL+"/api/v1/devices/"+id+"/set", nil)
if err != nil {
log.Printf("[IoT-client] ❌ 创建设置请求失败: device=%s, err=%v", id, err)
logger.Printf("[IoT-client] ❌ 创建设置请求失败: device=%s, err=%v", id, err)
return fmt.Errorf("创建设置请求失败: %w", err)
}
req.Header.Set("Content-Type", "application/json")
@@ -194,13 +194,13 @@ func (c *IoTClient) SetDeviceProperty(id string, field string, value interface{}
resp, err := c.client.Do(req)
if err != nil {
log.Printf("[IoT-client] ❌ 设置设备属性 HTTP 失败: device=%s, field=%s, err=%v", id, field, err)
logger.Printf("[IoT-client] ❌ 设置设备属性 HTTP 失败: device=%s, field=%s, err=%v", id, field, err)
return fmt.Errorf("设置设备 %s 属性失败: %w", id, err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
log.Printf("[IoT-client] ❌ 设备不存在: %s", id)
logger.Printf("[IoT-client] ❌ 设备不存在: %s", id)
return fmt.Errorf("设备 %s 不存在", id)
}
if resp.StatusCode != http.StatusOK {
@@ -209,10 +209,10 @@ func (c *IoTClient) SetDeviceProperty(id string, field string, value interface{}
}
json.NewDecoder(resp.Body).Decode(&errResp)
if errResp.Error != "" {
log.Printf("[IoT-client] ❌ 设置设备属性失败: device=%s, err=%s", id, errResp.Error)
logger.Printf("[IoT-client] ❌ 设置设备属性失败: device=%s, err=%s", id, errResp.Error)
return fmt.Errorf("设置设备 %s 属性失败: %s", id, errResp.Error)
}
log.Printf("[IoT-client] ❌ 设置设备属性返回非200: device=%s, status=%d", id, resp.StatusCode)
logger.Printf("[IoT-client] ❌ 设置设备属性返回非200: device=%s, status=%d", id, resp.StatusCode)
return fmt.Errorf("设置设备 %s 属性返回状态码 %d", id, resp.StatusCode)
}
@@ -221,7 +221,7 @@ func (c *IoTClient) SetDeviceProperty(id string, field string, value interface{}
c.cache = nil
c.mu.Unlock()
log.Printf("[IoT-client] ✅ 设置设备属性成功: device=%s, field=%s, value=%v", id, field, value)
logger.Printf("[IoT-client] ✅ 设置设备属性成功: device=%s, field=%s, value=%v", id, field, value)
return nil
}
@@ -229,7 +229,7 @@ func (c *IoTClient) SetDeviceProperty(id string, field string, value interface{}
func (c *IoTClient) GetDevicesForContext(ctx context.Context) []IoTDevice {
devices, err := c.GetAllDevices(ctx)
if err != nil {
log.Printf("[IoT客户端] 获取设备状态摘要失败: %v", err)
logger.Printf("[IoT客户端] 获取设备状态摘要失败: %v", err)
return nil
}
return devices
+6 -6
View File
@@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"sync"
)
@@ -52,7 +52,7 @@ func (r *Registry) Register(executor ToolExecutor) {
defer r.mu.Unlock()
def := executor.Definition()
r.tools[def.Name] = executor
log.Printf("[工具注册] 已注册工具: %s", def.Name)
logger.Printf("[工具注册] 已注册工具: %s", def.Name)
}
// GetDefinitions 获取所有工具定义(用于 LLM function calling
@@ -81,10 +81,10 @@ func (r *Registry) Execute(ctx context.Context, toolName string, arguments map[s
}, nil
}
log.Printf("[工具执行] 调用工具 %s,参数: %v", toolName, arguments)
logger.Printf("[工具执行] 调用工具 %s,参数: %v", toolName, arguments)
result, err := executor.Execute(ctx, arguments)
if err != nil {
log.Printf("[工具执行] 工具 %s 执行失败: %v", toolName, err)
logger.Printf("[工具执行] 工具 %s 执行失败: %v", toolName, err)
return &ToolResult{
ToolName: toolName,
Success: false,
@@ -93,9 +93,9 @@ func (r *Registry) Execute(ctx context.Context, toolName string, arguments map[s
}
if result.Success {
log.Printf("[工具执行] 工具 %s 执行成功 (数据长度: %d)", toolName, len(result.Data))
logger.Printf("[工具执行] 工具 %s 执行成功 (数据长度: %d)", toolName, len(result.Data))
} else {
log.Printf("[工具执行] 工具 %s 返回错误: %s", toolName, result.Error)
logger.Printf("[工具执行] 工具 %s 返回错误: %s", toolName, result.Error)
}
return result, nil
@@ -6,7 +6,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"strings"
"time"
@@ -78,7 +78,7 @@ func (c *ToolEngineClient) GetDefinitions(ctx context.Context) ([]ToolDefinition
})
}
log.Printf("[tool-engine-client] 从 tool-engine 获取了 %d 个工具定义", len(defs))
logger.Printf("[tool-engine-client] 从 tool-engine 获取了 %d 个工具定义", len(defs))
return defs, nil
}
@@ -91,7 +91,7 @@ func (c *ToolEngineClient) Execute(ctx context.Context, toolName string, argumen
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
log.Printf("[tool-engine-client] 工具 %s 第 %d 次重试 (上次错误: %v)", toolName, attempt, lastErr)
logger.Printf("[tool-engine-client] 工具 %s 第 %d 次重试 (上次错误: %v)", toolName, attempt, lastErr)
select {
case <-ctx.Done():
return &ToolResult{
@@ -121,7 +121,7 @@ func (c *ToolEngineClient) Execute(ctx context.Context, toolName string, argumen
}
}
log.Printf("[tool-engine-client] 工具 %s 所有重试均失败 (最后错误: %v)", toolName, lastErr)
logger.Printf("[tool-engine-client] 工具 %s 所有重试均失败 (最后错误: %v)", toolName, lastErr)
return &ToolResult{
ToolName: toolName,
Success: false,