package bus import ( "github.com/yourname/cyrene-ai/pkg/logger" "sync" "time" ) // Bus 总线接口(方便测试和替换) type Bus interface { Publish(event BusEvent) Subscribe(eventType EventType, handler EventHandler) *Subscription } // ConversationBus 对话事件总线 // Step 1: 仅 side-channel 发布,无消费端 type ConversationBus struct { mu sync.RWMutex subscribers map[EventType][]*Subscription eventCh chan BusEvent done chan struct{} } // NewConversationBus 创建总线 func NewConversationBus() *ConversationBus { b := &ConversationBus{ subscribers: make(map[EventType][]*Subscription), eventCh: make(chan BusEvent, 64), done: make(chan struct{}), } go b.dispatchLoop() return b } // Publish 发布事件到总线(非阻塞) func (b *ConversationBus) Publish(event BusEvent) { if event.Timestamp.IsZero() { event.Timestamp = time.Now() } select { case b.eventCh <- event: default: logger.Printf("[bus] 事件通道已满,丢弃事件: type=%s session=%s", event.Type, event.SessionID) } } // Subscribe 订阅事件类型 func (b *ConversationBus) Subscribe(eventType EventType, handler EventHandler) *Subscription { b.mu.Lock() defer b.mu.Unlock() sub := &Subscription{bus: b, eventType: eventType, handler: handler} b.subscribers[eventType] = append(b.subscribers[eventType], sub) return sub } // unsubscribe 内部取消订阅 func (b *ConversationBus) unsubscribe(sub *Subscription) { b.mu.Lock() defer b.mu.Unlock() subs := b.subscribers[sub.eventType] for i, s := range subs { if s == sub { b.subscribers[sub.eventType] = append(subs[:i], subs[i+1:]...) break } } } // Stop 停止总线 func (b *ConversationBus) Stop() { close(b.done) } // dispatchLoop 后台分发循环 func (b *ConversationBus) dispatchLoop() { for { select { case event := <-b.eventCh: b.mu.RLock() subs := b.subscribers[event.Type] // 拷贝一份避免持锁回调 handlers := make([]EventHandler, len(subs)) for i, s := range subs { handlers[i] = s.handler } b.mu.RUnlock() for _, h := range handlers { func() { defer func() { if r := recover(); r != nil { logger.Printf("[bus] handler panic: %v", r) } }() h(event) }() } case <-b.done: return } } }