package handler import ( "encoding/json" "fmt" "strings" "sync" "time" "github.com/gorilla/websocket" "git.yeij.top/AskaEth/Cyrene/gateway/internal/ws" "git.yeij.top/AskaEth/Cyrene/pkg/logger" ) // voiceStreamSession manages a proxied WebSocket connection to voice-service // for real-time streaming speech-to-text during a single voice input. type voiceStreamSession struct { client *ws.Client voiceConn *websocket.Conn language string format string mu sync.Mutex done chan struct{} interimBuf strings.Builder finalText string } // VoiceStreamManager creates and tracks streaming STT sessions. type VoiceStreamManager struct { voiceServiceURL string sessions map[string]*voiceStreamSession // key: clientID+sessionID mu sync.Mutex } // NewVoiceStreamManager creates a voice stream manager. func NewVoiceStreamManager(voiceServiceURL string) *VoiceStreamManager { return &VoiceStreamManager{ voiceServiceURL: voiceServiceURL, sessions: make(map[string]*voiceStreamSession), } } func (m *VoiceStreamManager) sessionKey(clientID, sessionID string) string { return clientID + ":" + sessionID } // StartStream begins a streaming STT session by connecting to voice-service. func (m *VoiceStreamManager) StartStream(client *ws.Client, format, language string) error { m.mu.Lock() key := m.sessionKey(client.ClientID, client.SessionID) if _, exists := m.sessions[key]; exists { m.mu.Unlock() return fmt.Errorf("voice stream already active for this session") } if format == "" { format = "webm" } if language == "" { language = "zh" } voiceURL := strings.TrimRight(m.voiceServiceURL, "/") wsURL := "ws" + strings.TrimPrefix(voiceURL, "http") + "/api/v1/stt/stream" wsURL += "?language=" + language + "&format=" + format dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second} voiceConn, _, err := dialer.Dial(wsURL, nil) if err != nil { m.mu.Unlock() return fmt.Errorf("connect to voice-service stream: %w", err) } session := &voiceStreamSession{ client: client, voiceConn: voiceConn, language: language, format: format, done: make(chan struct{}), } m.sessions[key] = session m.mu.Unlock() // Read results from voice-service in background go session.readResults(m, key) logger.Printf("[voice-stream] 流式 STT 会话已建立: client=%s, lang=%s, fmt=%s", client.ClientID, language, format) return nil } // SendChunk forwards an audio chunk (already decoded bytes) to voice-service. func (m *VoiceStreamManager) SendChunk(clientID, sessionID string, audioData []byte, seq int) error { m.mu.Lock() key := m.sessionKey(clientID, sessionID) session, exists := m.sessions[key] m.mu.Unlock() if !exists { return fmt.Errorf("no active voice stream for this session") } session.mu.Lock() defer session.mu.Unlock() if err := session.voiceConn.WriteMessage(websocket.BinaryMessage, audioData); err != nil { return fmt.Errorf("send audio chunk: %w", err) } return nil } // EndStream signals voice-service that the audio stream is complete, // waits for final result, then cleans up. func (m *VoiceStreamManager) EndStream(clientID, sessionID string) (string, error) { m.mu.Lock() key := m.sessionKey(clientID, sessionID) session, exists := m.sessions[key] m.mu.Unlock() if !exists { return "", fmt.Errorf("no active voice stream for this session") } // Send stop action to voice-service session.mu.Lock() stopMsg, _ := json.Marshal(map[string]interface{}{"action": "stop"}) session.voiceConn.WriteMessage(websocket.TextMessage, stopMsg) session.mu.Unlock() // Wait for result processing to finish select { case <-session.done: case <-time.After(15 * time.Second): logger.Printf("[voice-stream] 等待最终结果超时: client=%s", clientID) } // Cleanup session.close() m.mu.Lock() delete(m.sessions, key) m.mu.Unlock() text := session.finalText if text == "" { text = session.interimBuf.String() } logger.Printf("[voice-stream] 流式 STT 结束: client=%s, text=%q", clientID, text) return text, nil } // CancelStream forcibly terminates a voice stream. func (m *VoiceStreamManager) CancelStream(clientID, sessionID string) { m.mu.Lock() key := m.sessionKey(clientID, sessionID) session, exists := m.sessions[key] if exists { delete(m.sessions, key) } m.mu.Unlock() if exists { session.close() logger.Printf("[voice-stream] 流式 STT 已取消: client=%s", clientID) } } // readResults reads STT results from voice-service and forwards them to the client. func (s *voiceStreamSession) readResults(mgr *VoiceStreamManager, key string) { defer close(s.done) voiceConn := s.voiceConn for { msgType, data, err := voiceConn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { logger.Printf("[voice-stream] voice-service 读取错误: %v", err) } return } if msgType != websocket.TextMessage { continue } var result struct { Type string `json:"type"` Text string `json:"text"` IsFinal bool `json:"isFinal"` Error string `json:"error"` } if err := json.Unmarshal(data, &result); err != nil { logger.Printf("[voice-stream] 解析结果失败: %v", err) continue } if result.Error != "" { logger.Printf("[voice-stream] voice-service 错误: %s", result.Error) s.client.SendMessage(ws.ServerMessage{ Type: "voice_interim", MessageID: "voice_" + generateID(), Error: result.Error, Timestamp: time.Now().UnixMilli(), }) return } if result.Text != "" { if result.IsFinal { s.finalText = result.Text s.client.SendMessage(ws.ServerMessage{ Type: "voice_final", MessageID: "voice_" + generateID(), Text: result.Text, Timestamp: time.Now().UnixMilli(), }) return } // Interim result — accumulate and forward s.interimBuf.Reset() s.interimBuf.WriteString(result.Text) s.client.SendMessage(ws.ServerMessage{ Type: "voice_interim", MessageID: "voice_" + generateID(), Text: result.Text, Timestamp: time.Now().UnixMilli(), }) } // "done" type from voice-service signals end of results if result.Type == "done" { return } } } func (s *voiceStreamSession) close() { if s.voiceConn != nil { s.voiceConn.Close() } } // HasActiveStream checks if a client already has an active voice stream. func (m *VoiceStreamManager) HasActiveStream(clientID, sessionID string) bool { m.mu.Lock() defer m.mu.Unlock() _, exists := m.sessions[m.sessionKey(clientID, sessionID)] return exists } // CleanupClient removes all streams for a client. func (m *VoiceStreamManager) CleanupClient(clientID string) { m.mu.Lock() var toRemove []string for key, session := range m.sessions { if session.client.ClientID == clientID { toRemove = append(toRemove, key) } } for _, key := range toRemove { delete(m.sessions, key) } m.mu.Unlock() for _, key := range toRemove { // Close connection if session exists (we already deleted from map) logger.Printf("[voice-stream] 清理客户端流: key=%s", key) } }