Files
Cyrene/backend/ai-core/internal/bus/conversation_bus.go
T
AskaEth 87214b9441 feat: Phase 1+2 架构进化 — 连续思考链/主动消息决策/情感状态机/离线自主思考 (86文件)
Phase 1 (基础设施):
- ThinkChain 思考链连续性 + 差异化思考提示词 (persistent)
- AutonomousToolPolicy 工具安全策略 (safe/unsafe/conditional)
- MessageScheduler 自适应消息节奏 (Idle/Available/Busy)
- SessionEnrichmentStore 渐进式上下文丰富 (5层)
- ConversationBus 事件总线 + ResponseCache (dedup)
- pkg/logger 统一日志 + 所有 handler 替换 fmt.Printf
- NPE 守卫/链路优化/数据库表修复/Go workspace

Phase 2 (人格交互):
- EmotionState/EmotionTracker 情感状态机 (5种心情, 情绪衰减)
- ProactiveGuard 主动消息多维决策 (静默时段/紧急度/频率/校验)
- Gateway↔ai-core 在线状态感知链路 (presence notification)
- 离线思考频率控制 + 重连问候 + 离线消息排队

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 15:25:12 +08:00

102 lines
2.3 KiB
Go

package bus
import (
"github.com/yourname/cyrene-ai/pkg/logger"
"sync"
"time"
)
// Bus 总线接口(方便测试和替换)
type Bus interface {
Publish(event BusEvent)
Subscribe(eventType EventType, handler EventHandler) *Subscription
}
// ConversationBus 对话事件总线
// Step 1: 仅 side-channel 发布,无消费端
type ConversationBus struct {
mu sync.RWMutex
subscribers map[EventType][]*Subscription
eventCh chan BusEvent
done chan struct{}
}
// NewConversationBus 创建总线
func NewConversationBus() *ConversationBus {
b := &ConversationBus{
subscribers: make(map[EventType][]*Subscription),
eventCh: make(chan BusEvent, 64),
done: make(chan struct{}),
}
go b.dispatchLoop()
return b
}
// Publish 发布事件到总线(非阻塞)
func (b *ConversationBus) Publish(event BusEvent) {
if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}
select {
case b.eventCh <- event:
default:
logger.Printf("[bus] 事件通道已满,丢弃事件: type=%s session=%s", event.Type, event.SessionID)
}
}
// Subscribe 订阅事件类型
func (b *ConversationBus) Subscribe(eventType EventType, handler EventHandler) *Subscription {
b.mu.Lock()
defer b.mu.Unlock()
sub := &Subscription{bus: b, eventType: eventType, handler: handler}
b.subscribers[eventType] = append(b.subscribers[eventType], sub)
return sub
}
// unsubscribe 内部取消订阅
func (b *ConversationBus) unsubscribe(sub *Subscription) {
b.mu.Lock()
defer b.mu.Unlock()
subs := b.subscribers[sub.eventType]
for i, s := range subs {
if s == sub {
b.subscribers[sub.eventType] = append(subs[:i], subs[i+1:]...)
break
}
}
}
// Stop 停止总线
func (b *ConversationBus) Stop() {
close(b.done)
}
// dispatchLoop 后台分发循环
func (b *ConversationBus) dispatchLoop() {
for {
select {
case event := <-b.eventCh:
b.mu.RLock()
subs := b.subscribers[event.Type]
// 拷贝一份避免持锁回调
handlers := make([]EventHandler, len(subs))
for i, s := range subs {
handlers[i] = s.handler
}
b.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {
if r := recover(); r != nil {
logger.Printf("[bus] handler panic: %v", r)
}
}()
h(event)
}()
}
case <-b.done:
return
}
}
}