package service import ( "context" "encoding/base64" "encoding/json" "fmt" "sync" "time" "git.yeij.top/AskaEth/Cyrene/pkg/audio" "github.com/gorilla/websocket" ) // DashScopeSTT 使用阿里云百炼 Qwen ASR 模型进行语音识别。 // 实时模型 (qwen3-asr-flash-realtime) 通过 WebSocket realtime 端点进行流式识别, // 基于 session/VAD 协议(类似 OpenAI Realtime API)。 type DashScopeSTT struct { apiKey string model string timeout time.Duration } // NewDashScopeSTT 创建 DashScope STT 客户端。 func NewDashScopeSTT(apiKey, model string) *DashScopeSTT { if model == "" { model = "qwen3-asr-flash-realtime" } return &DashScopeSTT{ apiKey: apiKey, model: model, timeout: 30 * time.Second, } } // IsAvailable 检查 API Key 是否已配置。 func (d *DashScopeSTT) IsAvailable() bool { return d.apiKey != "" } // Model 返回模型名。 func (d *DashScopeSTT) Model() string { return d.model } // --- Realtime 端点协议消息类型 --- type rtClientMsg struct { EventID string `json:"event_id,omitempty"` Type string `json:"type"` Session interface{} `json:"session,omitempty"` Audio string `json:"audio,omitempty"` } type rtServerMsg struct { EventID string `json:"event_id,omitempty"` Type string `json:"type"` Session json.RawMessage `json:"session,omitempty"` Error *rtError `json:"error,omitempty"` // response.audio_transcript.delta Delta string `json:"delta,omitempty"` Response *struct { Output []struct { Transcript string `json:"transcript,omitempty"` } `json:"output,omitempty"` } `json:"response,omitempty"` // transcription completed transcript Transcript string `json:"transcript,omitempty"` // conversation.item.input_audio_transcription.completed Item *struct { Content []struct { Transcript string `json:"transcript,omitempty"` } `json:"content,omitempty"` } `json:"item,omitempty"` } type rtError struct { Type string `json:"type"` Code string `json:"code"` Message string `json:"message"` Param string `json:"param,omitempty"` } // Transcribe 将音频数据发送到 DashScope 进行识别,返回识别文本。 // 使用 realtime 端点,通过 Server VAD 自动检测语音并触发转录。 func (d *DashScopeSTT) Transcribe(ctx context.Context, audioData []byte, format string, language string) (string, error) { if !d.IsAvailable() { return "", fmt.Errorf("DashScope API Key 未配置") } url := fmt.Sprintf("wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=%s", d.model) header := map[string][]string{ "Authorization": {"Bearer " + d.apiKey}, } dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second} conn, _, err := dialer.DialContext(ctx, url, header) if err != nil { return "", fmt.Errorf("连接 DashScope STT 失败: %w", err) } defer conn.Close() // 1. session.created conn.SetReadDeadline(time.Now().Add(10 * time.Second)) var msg rtServerMsg if err := conn.ReadJSON(&msg); err != nil { return "", fmt.Errorf("等待 session.created 失败: %w", err) } if msg.Type != "session.created" { return "", fmt.Errorf("预期 session.created 但收到: %s", msg.Type) } // 2. session.update if language == "" || language == "auto" { language = "zh" } updateMsg := rtClientMsg{ Type: "session.update", Session: map[string]interface{}{ "modalities": []string{"text"}, "input_audio_format": "pcm", "sample_rate": 16000, "input_audio_transcription": map[string]interface{}{ "language": language, }, "turn_detection": map[string]interface{}{ "type": "server_vad", }, }, } conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := conn.WriteJSON(updateMsg); err != nil { return "", fmt.Errorf("发送 session.update 失败: %w", err) } // 3. session.updated conn.SetReadDeadline(time.Now().Add(10 * time.Second)) if err := conn.ReadJSON(&msg); err != nil { return "", fmt.Errorf("等待 session.updated 失败: %w", err) } if msg.Type == "error" && msg.Error != nil { return "", fmt.Errorf("session.update 失败: %s", msg.Error.Message) } // 4. 规范化音频格式并发送 pcmData, err := audio.ConvertToPCM16(audioData, format) if err != nil { return "", fmt.Errorf("音频格式转换失败: %w", err) } chunkSize := 3200 for i := 0; i < len(pcmData); i += chunkSize { end := i + chunkSize if end > len(pcmData) { end = len(pcmData) } chunkB64 := base64.StdEncoding.EncodeToString(pcmData[i:end]) audioMsg := rtClientMsg{ Type: "input_audio_buffer.append", Audio: chunkB64, } conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := conn.WriteJSON(audioMsg); err != nil { return "", fmt.Errorf("发送音频数据失败: %w", err) } } // 5. 等待转录结果 // 用 goroutine + channel 避免 gorilla/websocket 超时后重复读取 panic type readResult struct { msg rtServerMsg err error } msgCh := make(chan readResult, 1) readDone := make(chan struct{}) defer close(readDone) go func() { for { select { case <-readDone: return default: } var m rtServerMsg err := conn.ReadJSON(&m) select { case msgCh <- readResult{m, err}: case <-readDone: return } if err != nil { return } } }() var textResult string silenceTimeout := 3 * time.Second timer := time.NewTimer(60 * time.Second) defer timer.Stop() for { select { case result := <-msgCh: if result.err != nil { if websocket.IsUnexpectedCloseError(result.err) { return "", fmt.Errorf("连接异常关闭: %w", result.err) } return textResult, nil } msg := result.msg switch msg.Type { case "conversation.item.input_audio_transcription.completed": if msg.Transcript != "" { if textResult != "" { textResult += "\n" } textResult += msg.Transcript } if textResult == "" && msg.Item != nil { for _, c := range msg.Item.Content { if c.Transcript != "" { textResult = c.Transcript } } } case "response.audio_transcript.delta": if msg.Delta != "" { textResult += msg.Delta } case "response.done": if textResult == "" && msg.Response != nil { for _, o := range msg.Response.Output { if o.Transcript != "" { textResult += o.Transcript } } } if textResult != "" { return textResult, nil } case "error": if msg.Error != nil { return "", fmt.Errorf("DashScope 识别失败: %s", msg.Error.Message) } return "", fmt.Errorf("DashScope 返回未知错误") } if textResult != "" { timer.Reset(silenceTimeout) } case <-timer.C: return textResult, nil } } } // --- 流式识别 (StreamingSession) --- // StreamingSession 维护一个持久的 DashScope WebSocket 连接,用于实时语音识别。 type StreamingSession struct { conn *websocket.Conn results chan StreamingResult done chan struct{} mu sync.Mutex closed bool } // StreamingResult 实时识别结果。 type StreamingResult struct { Text string `json:"text"` IsFinal bool `json:"is_final"` Error string `json:"error,omitempty"` } // StartStreaming 建立 DashScope realtime WebSocket 连接并返回 StreamingSession。 func (d *DashScopeSTT) StartStreaming(ctx context.Context, format, language string) (*StreamingSession, error) { if !d.IsAvailable() { return nil, fmt.Errorf("DashScope API Key 未配置") } url := fmt.Sprintf("wss://dashscope.aliyuncs.com/api-ws/v1/realtime?model=%s", d.model) header := map[string][]string{ "Authorization": {"Bearer " + d.apiKey}, } dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second} conn, _, err := dialer.DialContext(ctx, url, header) if err != nil { return nil, fmt.Errorf("连接 DashScope STT 失败: %w", err) } // 1. session.created conn.SetReadDeadline(time.Now().Add(10 * time.Second)) var msg rtServerMsg if err := conn.ReadJSON(&msg); err != nil { conn.Close() return nil, fmt.Errorf("等待 session.created 失败: %w", err) } if msg.Type != "session.created" { conn.Close() return nil, fmt.Errorf("预期 session.created 但收到: %s", msg.Type) } // 2. session.update if language == "" || language == "auto" { language = "zh" } updateMsg := rtClientMsg{ Type: "session.update", Session: map[string]interface{}{ "modalities": []string{"text"}, "input_audio_format": "pcm", "sample_rate": 16000, "input_audio_transcription": map[string]interface{}{ "language": language, }, "turn_detection": map[string]interface{}{ "type": "server_vad", }, }, } conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := conn.WriteJSON(updateMsg); err != nil { conn.Close() return nil, fmt.Errorf("发送 session.update 失败: %w", err) } // 3. session.updated conn.SetReadDeadline(time.Now().Add(10 * time.Second)) if err := conn.ReadJSON(&msg); err != nil { conn.Close() return nil, fmt.Errorf("等待 session.updated 失败: %w", err) } if msg.Type == "error" && msg.Error != nil { conn.Close() return nil, fmt.Errorf("session.update 失败: %s", msg.Error.Message) } session := &StreamingSession{ conn: conn, results: make(chan StreamingResult, 64), done: make(chan struct{}), } go session.readLoop() return session, nil } // SendAudio 发送一帧 PCM 音频数据到 DashScope。 // data 必须是 16-bit little-endian PCM,16000Hz,mono。 func (s *StreamingSession) SendAudio(data []byte) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return fmt.Errorf("session 已关闭") } b64 := base64.StdEncoding.EncodeToString(data) msg := rtClientMsg{ Type: "input_audio_buffer.append", Audio: b64, } s.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) return s.conn.WriteJSON(msg) } // Results 返回识别结果通道。 func (s *StreamingSession) Results() <-chan StreamingResult { return s.results } // Close 结束会话并关闭 WebSocket 连接。 func (s *StreamingSession) Close() error { s.mu.Lock() if s.closed { s.mu.Unlock() return nil } s.closed = true s.mu.Unlock() finishMsg := rtClientMsg{Type: "session.finish"} s.conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) s.conn.WriteJSON(finishMsg) select { case <-s.done: case <-time.After(5 * time.Second): } close(s.results) return s.conn.Close() } // readLoop 读取 DashScope 服务端返回的消息并转换为 StreamingResult。 func (s *StreamingSession) readLoop() { defer close(s.done) for { var msg rtServerMsg if err := s.conn.ReadJSON(&msg); err != nil { s.results <- StreamingResult{Error: fmt.Sprintf("读取响应失败: %v", err)} return } switch msg.Type { case "conversation.item.input_audio_transcription.completed": if msg.Transcript != "" { s.results <- StreamingResult{Text: msg.Transcript, IsFinal: true} } else if msg.Item != nil { for _, c := range msg.Item.Content { if c.Transcript != "" { s.results <- StreamingResult{Text: c.Transcript, IsFinal: true} } } } case "response.audio_transcript.delta": s.results <- StreamingResult{Text: msg.Delta, IsFinal: false} case "response.done": // 全部完成 case "error": errMsg := "未知错误" if msg.Error != nil { errMsg = msg.Error.Message } s.results <- StreamingResult{Error: fmt.Sprintf("DashScope 识别失败: %s", errMsg)} return case "response.created", "input_audio_buffer.committed", "input_audio_buffer.speech_started", "input_audio_buffer.speech_stopped", "conversation.item.created", "conversation.item.input_audio_transcription.text", "response.audio_transcript.done": // 内部事件,忽略 } } } // GetStatus 返回 DashScope STT 状态。 func (d *DashScopeSTT) GetStatus() map[string]interface{} { return map[string]interface{}{ "available": d.IsAvailable(), "model": d.model, "provider": "dashscope", } }