package handler import ( "bufio" "bytes" "crypto/hmac" "crypto/sha256" "encoding/json" "fmt" "io" "github.com/yourname/cyrene-ai/pkg/logger" "net/http" "strings" "time" "github.com/gin-gonic/gin" "github.com/yourname/cyrene-ai/gateway/internal/config" "github.com/yourname/cyrene-ai/gateway/internal/ws" ) // WebhookHandler 副对话系统处理器(第三方平台接入) type WebhookHandler struct { cfg *config.Config hub *ws.Hub } // NewWebhookHandler 创建 Webhook 处理器 func NewWebhookHandler(cfg *config.Config, hub *ws.Hub) *WebhookHandler { return &WebhookHandler{cfg: cfg, hub: hub} } // ============================================================ // 通用 Webhook 格式 // ============================================================ // GenericWebhookRequest 通用 webhook 请求格式 type GenericWebhookRequest struct { UserID string `json:"user_id"` // 第三方平台的用户标识 SessionID string `json:"session_id"` // 可选,不提供则自动生成 Message string `json:"message"` // 用户消息 Mode string `json:"mode"` // text | voice_msg(默认 text) Platform string `json:"platform"` // 平台标识(如 discord, telegram, generic) } // GenericWebhookResponse 通用 webhook 响应 type GenericWebhookResponse struct { Reply string `json:"reply"` SessionID string `json:"session_id"` MessageID string `json:"message_id"` FinishReason string `json:"finish_reason,omitempty"` Error string `json:"error,omitempty"` } // HandleGenericWebhook 处理通用 webhook 请求 // POST /api/v1/webhook/generic // Authorization: Bearer func (h *WebhookHandler) HandleGenericWebhook(c *gin.Context) { var req GenericWebhookRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(400, GenericWebhookResponse{Error: "请求格式错误: " + err.Error()}) return } if req.Message == "" { c.JSON(400, GenericWebhookResponse{Error: "消息不能为空"}) return } userID := req.UserID if userID == "" { // 从 webhook API key 推导用户 userID = "webhook_generic" } // 为第三方用户添加前缀以避免与主系统用户冲突 if !strings.HasPrefix(userID, "ext_") { userID = "ext_" + userID } mode := req.Mode if mode == "" { mode = "text" } sessionID := req.SessionID if sessionID == "" { sessionID = "webhook_" + randomID(12) } platform := req.Platform if platform == "" { platform = "generic" } // 调用 AI-Core 获取回复 resp, err := h.callAICore(userID, sessionID, req.Message, mode, platform) if err != nil { logger.Printf("[webhook] AI-Core 调用失败 (platform=%s): %v", platform, err) c.JSON(502, GenericWebhookResponse{Error: "昔涟暂时无法回应喵...(请稍后再试)"}) return } // 缓存对话(非 WebSocket 场景,使用 Hub 缓存) if resp.Reply != "" { h.hub.CacheMessage(userID, sessionID, ws.Message{ Role: "user", Content: req.Message, Timestamp: time.Now().UnixMilli(), }) h.hub.CacheMessage(userID, sessionID, ws.Message{ Role: "assistant", Content: resp.Reply, Timestamp: time.Now().UnixMilli(), }) } c.JSON(200, resp) } // ============================================================ // Discord Webhook 格式 // ============================================================ // DiscordInteraction Discord 交互请求 type DiscordInteraction struct { Type int `json:"type"` // 1=PING, 2=APPLICATION_COMMAND Token string `json:"token"` // 交互 token(用于后续回复) Member *DiscordMember `json:"member"` // 用户信息 User *DiscordUser `json:"user"` // DM 场景的用户信息 Data *DiscordCommandData `json:"data"` // 命令数据 GuildID string `json:"guild_id"` ChannelID string `json:"channel_id"` } // DiscordMember Discord 成员信息 type DiscordMember struct { User *DiscordUser `json:"user"` } // DiscordUser Discord 用户信息 type DiscordUser struct { ID string `json:"id"` Username string `json:"username"` } // DiscordCommandData Discord 命令数据 type DiscordCommandData struct { Name string `json:"name"` Options []DiscordCommandOption `json:"options,omitempty"` } // DiscordCommandOption 命令参数 type DiscordCommandOption struct { Name string `json:"name"` Value string `json:"value"` } // DiscordResponse Discord 响应(deferred 即时响应 + followup) type DiscordResponse struct { Type int `json:"type"` // 4=CHANNEL_MESSAGE_WITH_SOURCE, 5=DEFERRED_CHANNEL_MESSAGE_WITH_SOURCE Data *DiscordResponseData `json:"data,omitempty"` } // DiscordResponseData Discord 响应数据 type DiscordResponseData struct { Content string `json:"content"` } // HandleDiscordWebhook 处理 Discord 交互 // POST /api/v1/webhook/discord // 注意: Discord 需要 ed25519 签名验证,这里简化为 API Key 验证 func (h *WebhookHandler) HandleDiscordWebhook(c *gin.Context) { // 验证 Discord 签名 // signature := c.GetHeader("X-Signature-Ed25519") // timestamp := c.GetHeader("X-Signature-Timestamp") var interaction DiscordInteraction body, err := io.ReadAll(c.Request.Body) if err != nil { c.JSON(400, gin.H{"error": "无法读取请求体"}) return } // 恢复 body 以供后续读取 c.Request.Body = io.NopCloser(bytes.NewReader(body)) if err := json.Unmarshal(body, &interaction); err != nil { c.JSON(400, gin.H{"error": "无效的 Discord 交互数据"}) return } // PING 类型(Discord 端点验证) if interaction.Type == 1 { c.JSON(200, gin.H{"type": 1}) // PONG return } // APPLICATION_COMMAND 类型 if interaction.Type != 2 { c.JSON(200, DiscordResponse{Type: 4, Data: &DiscordResponseData{Content: "暂不支持的交互类型"}}) return } // 提取用户信息和消息 userID := "discord_unknown" username := "unknown" if interaction.Member != nil && interaction.Member.User != nil { userID = "discord_" + interaction.Member.User.ID username = interaction.Member.User.Username } else if interaction.User != nil { userID = "discord_" + interaction.User.ID username = interaction.User.Username } prefix := "discord_" + username + "_" sessionID := prefix + interaction.ChannelID var message string if interaction.Data != nil { switch interaction.Data.Name { case "chat": for _, opt := range interaction.Data.Options { if opt.Name == "message" { message = opt.Value } } default: // 其他命令当作普通消息 message = "/" + interaction.Data.Name for _, opt := range interaction.Data.Options { message += " " + opt.Value } } } if message == "" { c.JSON(200, DiscordResponse{ Type: 4, Data: &DiscordResponseData{Content: "请提供消息内容!用法: `/chat message:你好,昔涟`"}, }) return } // Discord 要求 3 秒内响应,先返回 deferred,后续通过 webhook URL 发送结果 // 但这里简化处理:直接同步调用 AI-Core(如果调用超过 3 秒,Discord 会显示超时) resp, err := h.callAICore("ext_"+userID, sessionID, message, "text", "discord") if err != nil { logger.Printf("[webhook:discord] AI-Core 调用失败: %v", err) c.JSON(200, DiscordResponse{ Type: 4, Data: &DiscordResponseData{Content: "昔涟暂时无法回应喵...(遇到了一些问题,请稍后再试)"}, }) return } c.JSON(200, DiscordResponse{ Type: 4, Data: &DiscordResponseData{Content: resp.Reply}, }) } // ============================================================ // 核心:调用 AI-Core 并收集完整响应 // ============================================================ // callAICore 调用 AI-Core SSE 流式接口并收集完整响应 func (h *WebhookHandler) callAICore(userID, sessionID, message, mode, platform string) (*GenericWebhookResponse, error) { reqBody, err := json.Marshal(map[string]string{ "user_id": userID, "session_id": sessionID, "message": message, "mode": mode, "platform": platform, }) if err != nil { return nil, fmt.Errorf("序列化请求失败: %w", err) } aiCoreURL := h.cfg.AICoreURL + "/api/v1/chat" httpReq, err := http.NewRequest("POST", aiCoreURL, bytes.NewReader(reqBody)) if err != nil { return nil, fmt.Errorf("创建请求失败: %w", err) } httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Accept", "text/event-stream") httpClient := &http.Client{Timeout: 120 * time.Second} resp, err := httpClient.Do(httpReq) if err != nil { return nil, fmt.Errorf("请求 AI-Core 失败: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("AI-Core 返回 %d: %s", resp.StatusCode, string(body)) } scanner := bufio.NewScanner(resp.Body) scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) var fullText string var msgID string var finishReason string for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") if data == "[DONE]" { break } var chunk struct { Delta string `json:"delta"` Error string `json:"error,omitempty"` MessageID string `json:"message_id,omitempty"` Done bool `json:"done,omitempty"` FinishReason string `json:"finish_reason,omitempty"` } if err := json.Unmarshal([]byte(data), &chunk); err != nil { continue } if chunk.Error != "" { return nil, fmt.Errorf("AI-Core 流式错误: %s", chunk.Error) } if chunk.MessageID != "" { msgID = chunk.MessageID } if chunk.FinishReason != "" { finishReason = chunk.FinishReason } if chunk.Done { break } if chunk.Delta != "" { fullText += chunk.Delta } } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("读取 SSE 流失败: %w", err) } if msgID == "" { msgID = "msg_" + randomID(16) } return &GenericWebhookResponse{ Reply: fullText, SessionID: sessionID, MessageID: msgID, FinishReason: finishReason, }, nil } // ============================================================ // Webhook API Key 验证中间件 // ============================================================ // WebhookAuth 中间件:验证 webhook API key func (h *WebhookHandler) WebhookAuth() gin.HandlerFunc { return func(c *gin.Context) { apiKey := c.GetHeader("X-Webhook-Key") if apiKey == "" { // 也支持 Authorization: Bearer auth := c.GetHeader("Authorization") if strings.HasPrefix(auth, "Bearer ") { apiKey = strings.TrimPrefix(auth, "Bearer ") } } if apiKey == "" { c.JSON(401, gin.H{"error": "缺少 webhook API key (X-Webhook-Key 或 Authorization: Bearer )"}) c.Abort() return } // 验证 API key(从环境变量读取) expectedKey := h.cfg.WebhookAPIKey if expectedKey == "" { // 未配置则允许任意 key c.Next() return } if !hmacEqual(apiKey, expectedKey) { c.JSON(401, gin.H{"error": "无效的 webhook API key"}) c.Abort() return } c.Next() } } // hmacEqual 常量时间字符串比较(防止时序攻击) func hmacEqual(a, b string) bool { if len(a) != len(b) { return false } mac := hmac.New(sha256.New, []byte("cyrene_webhook_salt")) mac.Write([]byte(a)) expected := hmac.New(sha256.New, []byte("cyrene_webhook_salt")) expected.Write([]byte(b)) return hmac.Equal(mac.Sum(nil), expected.Sum(nil)) }