Files
Cyrene/backend/platform-bridge/cmd/main.go
T
AskaEth 4954c1e58b feat: 消息并行处理 + QQ卡片完整解析 + 视觉OCR融合格式修复
- platform-bridge: 8-worker per-session 并行分发,同会话保序跨会话并行
- platform-bridge: 静默消息 fire-and-forget,不阻塞同用户后续消息
- QQ卡片: html.UnescapeString 解码 NapCat HTML实体,正确解析卡片JSON
- QQ卡片: 输出含应用名/简介/来源/封面URL,封面注入图片管线走视觉
- ai-core: 视觉+OCR结果融合为单句,单图不编号,避免LLM误解为多张图

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-06 09:08:03 +08:00

1065 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
"time"
discordstub "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/discord"
feishustub "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/feishu"
qqadapter "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/qq"
telegramadapter "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/telegram"
wechatstub "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/wechat"
webhookadapter "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/webhook"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/bridge"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/config"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/handler"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/logging"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/permissions"
)
func main() {
cfg := config.Load()
// Config store for platform adapter configs.
configStore, err := config.NewStore("platform_configs.json")
if err != nil {
fmt.Printf("FATAL: config store: %v\n", err)
os.Exit(1)
}
// Blocklist settings.
blocklistStore, err := config.NewBlocklistStore("platform_blocklist.json")
if err != nil {
fmt.Printf("FATAL: blocklist store: %v\n", err)
os.Exit(1)
}
// Message logger.
msgLogger, err := logging.NewLogger("logs")
if err != nil {
fmt.Printf("FATAL: logger: %v\n", err)
os.Exit(1)
}
defer msgLogger.Close()
// Core components.
mapper := bridge.NewIdentityMapper()
checker := permissions.NewChecker()
router := bridge.NewPlatformRouter(mapper, checker)
lastDisplayNames := make(map[string]string) // platformUID -> last known display name
// Seed default identities from environment.
seedIdentities(mapper, configStore, cfg.AdminNickname)
// Register platform adapters based on stored configs or defaults.
adapters := createAdapters(cfg, configStore)
for _, a := range adapters {
router.RegisterAdapter(a)
}
// Set message handler with logging.
router.SetMessageHandler(func(msg *bridge.UnifiedMessage) (*bridge.UnifiedResponse, error) {
// Log incoming.
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "incoming",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
SenderName: msg.OriginalSenderName,
GroupName: msg.GroupName,
Content: msg.Content,
ContentType: msg.ContentType,
MessageID: msg.MessageID,
Success: true,
})
// Routing decisions.
isAdmin := mapper.IsAdmin(msg.Platform, msg.OriginalSenderUID)
adminNick := cfg.AdminNickname
if isAdmin {
if id := mapper.ResolveOrNil(msg.Platform, msg.OriginalSenderUID); id != nil && id.Nickname != "" {
adminNick = id.Nickname
}
// Track per-group display names (群名片 can differ across groups).
nameKey := msg.OriginalSenderUID
if msg.ChannelType == "group" {
nameKey = msg.OriginalSenderUID + ":" + msg.ChannelID
}
if prevName, ok := lastDisplayNames[nameKey]; ok && prevName != msg.OriginalSenderName && msg.OriginalSenderName != "" {
ctx := msg.ChannelID
if msg.GroupName != "" {
ctx = truncateString(msg.GroupName, 8)
}
msg.Content = fmt.Sprintf("【昵称更新:该用户在%s(%s)上的昵称已从\"%s\"变更为\"%s\"】\n%s", ctx, msg.Platform, prevName, msg.OriginalSenderName, msg.Content)
}
if msg.OriginalSenderName != "" {
lastDisplayNames[nameKey] = msg.OriginalSenderName
}
}
isMentioned, _ := detectAdminMention(msg, mapper, cfg)
isBotMentioned := msg.BotUID != "" && containsString(msg.Mentions, msg.BotUID)
isSilent := cfg.PlatformSilentEnabled && !isAdmin && !isBotMentioned
// Add message timestamp for AI context.
if !msg.Timestamp.IsZero() {
timeAgo := time.Since(msg.Timestamp)
timeLabel := fmt.Sprintf("【消息时间: %s (%s前)】\n", msg.Timestamp.Format("15:04:05"), formatDuration(timeAgo))
msg.Content = timeLabel + msg.Content
}
// Enrich group messages with group name and sender info.
if msg.ChannelType == "group" {
groupLabel := msg.ChannelID
if msg.GroupName != "" {
groupLabel = truncateString(msg.GroupName, 8) + " " + msg.ChannelID
}
senderLabel := msg.OriginalSenderName
if senderLabel == "" {
senderLabel = msg.SenderName
}
if isAdmin {
senderLabel = adminNick + "/" + msg.OriginalSenderName
}
msg.Content = fmt.Sprintf("[群聊 %s] %s (%s)\n%s", groupLabel, senderLabel, msg.OriginalSenderUID, msg.Content)
} else if msg.ChannelType == "private" {
msg.Content = fmt.Sprintf("【私聊 %s】%s/%s (%s)\n%s", msg.Platform, adminNick, msg.OriginalSenderName, msg.OriginalSenderUID, msg.Content)
}
// Blocklist/whitelist check (admin always bypasses).
if blocked := blocklistStore.IsBlocked(msg.ChannelType, msg.ChannelID, msg.OriginalSenderUID, isAdmin); blocked {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
SenderName: "Cyrene",
Content: "[blocked]",
Success: true,
})
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "silent", Content: "", FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
var response *bridge.UnifiedResponse
var routeErr error
// Extract image URLs for vision/OCR processing (admin + bot-mentioned + admin-mentioned only).
imageURLs := getImageURLs(msg)
videoURLs := getShortVideoURLs(msg)
voiceURLs := getVoiceURLs(msg)
// For group chats, use a channel-based user ID to share context between admin and regular users.
chatUserID := msg.SenderID
if msg.ChannelType == "group" {
chatUserID = fmt.Sprintf("platform_%s_group_%s", msg.Platform, msg.ChannelID)
}
groupSessionID := fmt.Sprintf("platform_%s_%s", msg.Platform, msg.ChannelID)
// Helper: fire-and-forget a silent observation. Silent paths don't
// produce visible responses, so we can background them to avoid
// blocking the worker for the next message from the same user.
fireSilent := func(namespace string, imgs, vids, voices []string) {
go func() {
_, err := forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, imgs, vids, voices, isAdmin)
if err != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: err.Error(),
})
}
}()
}
switch {
case isMessageHistorical(msg, router):
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
fireSilent(namespace, nil, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
case isAdmin && !isBotMentioned && shouldAdminBeSilent(msg, router):
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
fireSilent(namespace, imageURLs, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
case isAdmin:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs, videoURLs, voiceURLs, isAdmin)
case isBotMentioned:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs, videoURLs, voiceURLs, isAdmin)
case isMentioned:
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
fireSilent(namespace, imageURLs, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
case isSilent:
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
fireSilent(namespace, imageURLs, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
default:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, nil, videoURLs, voiceURLs, isAdmin)
}
if routeErr != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: routeErr.Error(),
})
return nil, routeErr
}
// Log outgoing messages (skip for silent route).
if msg.RouteType != "silent" {
for _, rm := range response.Messages {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.BotUID,
SenderName: "Cyrene",
GroupName: msg.GroupName,
Content: rm.Content,
ContentType: "text",
Success: true,
})
}
}
return response, nil
})
// Connect all adapters.
ctx := context.Background()
for _, a := range adapters {
if err := a.Connect(ctx); err != nil {
fmt.Printf("WARN: connect %s failed: %v\n", a.PlatformName(), err)
} else {
fmt.Printf("Platform adapter connected: %s\n", a.PlatformName())
}
}
// Setup HTTP server.
mux := http.NewServeMux()
bh := handler.NewBridgeHandler(router)
bh.RegisterRoutes(mux)
// Config and log handlers.
ch := handler.NewConfigHandler(configStore, router)
// Hot-reload: on config save/delete, dynamically replace adapters.
ch.SetOnConfigChanged(func(name, platform string, enabled bool, fields map[string]string) {
if enabled {
a := createSingleAdapter(cfg, platform, name, fields)
if a == nil {
fmt.Printf("WARN: cannot create adapter for %s (platform=%s)\n", name, platform)
return
}
if err := router.ReplaceAdapter(a); err != nil {
fmt.Printf("WARN: hot-reload connect %s failed: %v\n", name, err)
} else {
fmt.Printf("Platform adapter hot-reloaded: %s\n", name)
}
// Sync admin identities from config fields.
syncAdminUIDs(mapper, platform, fields, cfg.AdminNickname)
// Restart QQ reader when QQ config changes.
if platform == "qq" {
startQQReaders(router)
}
} else {
router.RemoveAdapter(name)
fmt.Printf("Platform adapter removed: %s\n", name)
// Cancel reader goroutines for removed adapter.
if platform == "qq" {
startQQReaders(router)
}
}
})
ch.RegisterRoutes(mux)
lh := handler.NewLogHandler(msgLogger, configStore)
lh.RegisterRoutes(mux)
// Log WebSocket for real-time log streaming to ethend.
logWS := handler.NewLogWSHub(msgLogger)
mux.HandleFunc("/ws/logs", logWS.ServeWS)
// Blocklist settings.
blh := handler.NewBlocklistHandler(blocklistStore)
blh.RegisterRoutes(mux)
// Start QQ message reader loop.
startQQReaders(router)
addr := ":" + cfg.Port
srv := &http.Server{Addr: addr, Handler: mux}
go func() {
fmt.Printf("Platform Bridge listening on port %s\n", cfg.Port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("FATAL: %v\n", err)
os.Exit(1)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("Shutting down Platform Bridge...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, a := range adapters {
a.Disconnect(shutdownCtx)
}
srv.Shutdown(shutdownCtx)
fmt.Println("Platform Bridge stopped")
}
// qqReaderCancels maps adapter config name to its cancel function.
var qqReaderCancels = make(map[string]context.CancelFunc)
var qqReaderCancelsMu sync.Mutex
// startQQReaders cancels any existing QQ readers and starts one per registered QQ adapter.
func startQQReaders(router *bridge.PlatformRouter) {
// Cancel all existing readers.
qqReaderCancelsMu.Lock()
for _, cancel := range qqReaderCancels {
cancel()
}
qqReaderCancels = make(map[string]context.CancelFunc)
qqReaderCancelsMu.Unlock()
for _, qqa := range router.GetAdaptersByPlatform("qq") {
qqAdapter, ok := qqa.(*qqadapter.Adapter)
if !ok {
continue
}
adapterKey := qqAdapter.ConfigName()
ctx, cancel := context.WithCancel(context.Background())
qqReaderCancelsMu.Lock()
qqReaderCancels[adapterKey] = cancel
qqReaderCancelsMu.Unlock()
adapter := qqAdapter // capture for goroutine
const numWorkers = 8
// Single reader pumps raw messages from the adapter.
rawCh := make(chan *qqadapter.OBv11Message, 100)
go adapter.ReadMessages(ctx, rawCh)
// Per-worker channels.
var workerChs [numWorkers]chan *qqadapter.OBv11Message
for i := 0; i < numWorkers; i++ {
workerChs[i] = make(chan *qqadapter.OBv11Message, 50)
}
// Dispatcher: route to worker by session hash so same conversation stays ordered.
go func() {
for msg := range rawCh {
idx := hashQQSession(msg) % numWorkers
workerChs[idx] <- msg
}
for i := 0; i < numWorkers; i++ {
close(workerChs[i])
}
}()
// Worker pool: each worker processes messages sequentially; workers run in parallel.
for _, wch := range workerChs {
wch := wch // capture
go func() {
for msg := range wch {
response, err := router.RouteMessage(adapterKey, msg)
if err != nil {
fmt.Printf("[qq:%s] route error: %v\n", adapterKey, err)
continue
}
if response != nil && len(response.Messages) > 0 && !hasOnlySilentMessages(response.Messages) {
messageType := msg.MessageType
userID := msg.UserID
groupID := msg.GroupID
// Filter non-empty messages.
var toSend []bridge.ResponseMessage
for _, rm := range response.Messages {
if rm.Content != "" {
rm.Content = qqadapter.ConvertMarkdownToQQ(rm.Content)
toSend = append(toSend, rm)
}
}
interval := time.Duration(adapter.SendIntervalMs()) * time.Millisecond
if interval <= 0 {
interval = 2 * time.Second
}
for i, rm := range toSend {
if i > 0 && interval > 0 {
select {
case <-ctx.Done():
return
case <-time.After(interval):
}
}
cur, err := router.GetAdapter(adapterKey)
if err != nil {
continue
}
curQQ, ok := cur.(*qqadapter.Adapter)
if !ok {
continue
}
var sendErr error
switch messageType {
case "private":
sendErr = curQQ.SendMessage("private", userID, 0, rm.Content)
case "group":
sendErr = curQQ.SendMessage("group", 0, groupID, rm.Content)
}
if sendErr != nil {
fmt.Printf("[qq:%s] send msg error: %v\n", adapterKey, sendErr)
}
}
}
}
}()
}
}
}
// createAdapters builds platform adapters from stored configs.
func createAdapters(cfg *config.Config, store *config.Store) []bridge.PlatformAdapter {
var adapters []bridge.PlatformAdapter
// Build adapters from stored configs. Each config is a separate adapter instance.
seen := make(map[string]bool)
for _, stored := range store.List() {
if !stored.Enabled {
fmt.Printf("Platform %s is disabled in config, skipping\n", stored.Name)
continue
}
platform := stored.Platform
if platform == "" {
platform = stored.Name
}
fields := mergeFields(cfg, platform, &stored)
a := createSingleAdapter(cfg, platform, stored.Name, fields)
if a != nil {
adapters = append(adapters, a)
seen[stored.Name] = true
}
}
// Seed default adapters for platforms that have no stored config.
defaultPlatforms := []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"}
for _, name := range defaultPlatforms {
if seen[name] {
continue
}
fields := mergeFields(cfg, name, nil)
a := createSingleAdapter(cfg, name, name, fields)
if a != nil {
adapters = append(adapters, a)
}
}
return adapters
}
// createSingleAdapter creates one platform adapter from config fields.
// platform is the base platform type ("qq", "telegram", etc.), configName is the instance key.
func createSingleAdapter(cfg *config.Config, platform, configName string, fields map[string]string) bridge.PlatformAdapter {
switch platform {
case "qq":
port := cfg.QQBotPort
if p, ok := fields["bot_port"]; ok && p != "" {
port = p
}
token := ""
if t, ok := fields["access_token"]; ok {
token = t
}
remoteURL := ""
if r, ok := fields["remote_url"]; ok {
remoteURL = r
}
mode := "server"
if m, ok := fields["mode"]; ok && m != "" {
mode = m
} else if fields["remote_url"] != "" {
mode = "client" // backward compat: old configs with remote_url
}
sendIntervalMs := cfg.MessageSendIntervalMs
if s, ok := fields["send_interval_ms"]; ok && s != "" {
if n := parseIntOr(s, cfg.MessageSendIntervalMs); n > 0 {
sendIntervalMs = n
}
}
return qqadapter.NewAdapter(configName, mode, port, token, remoteURL, sendIntervalMs)
case "telegram":
token := cfg.TelegramToken
if t, ok := fields["bot_token"]; ok && t != "" {
token = t
}
webhookURL := cfg.TelegramWebhookURL
if w, ok := fields["webhook_url"]; ok && w != "" {
webhookURL = w
}
return telegramadapter.NewAdapter(token, webhookURL)
case "webhook":
return webhookadapter.NewAdapter(configName)
case "wechat":
return wechatstub.NewAdapter()
case "feishu":
return feishustub.NewAdapter()
case "discord":
return discordstub.NewAdapter()
}
return nil
}
// mergeFields returns fields from stored config, falling back to env defaults.
func mergeFields(cfg *config.Config, platform string, stored *config.PlatformConfig) map[string]string {
fields := make(map[string]string)
if stored != nil {
for k, v := range stored.Fields {
fields[k] = v
}
}
if fields["bot_token"] == "" && cfg.TelegramToken != "" && platform == "telegram" {
fields["bot_token"] = cfg.TelegramToken
}
if fields["webhook_url"] == "" && cfg.TelegramWebhookURL != "" && platform == "telegram" {
fields["webhook_url"] = cfg.TelegramWebhookURL
}
if fields["bot_port"] == "" && cfg.QQBotPort != "" && platform == "qq" {
fields["bot_port"] = cfg.QQBotPort
}
return fields
}
// containsString checks whether a string slice contains a specific value.
func containsString(list []string, val string) bool {
for _, v := range list {
if v == val {
return true
}
}
return false
}
// shouldAdminBeSilent checks if admin is talking to other users in a group.
// Returns true if 昔涟 should not interrupt (route as silent observation instead).
func shouldAdminBeSilent(msg *bridge.UnifiedMessage, router *bridge.PlatformRouter) bool {
if msg.ChannelType != "group" {
return false
}
// Rule 1: Admin @mentions someone other than the bot → talking to them, don't interrupt.
for _, m := range msg.Mentions {
if m != msg.BotUID {
return true
}
}
// Rule 2: Recent context shows a conversation with non-admin users.
// Note: updateContext runs before this handler, so RecentSenders already
// includes the current message. Check the second-to-last sender instead.
ctx := router.GetContext(msg.Platform, msg.ChannelID)
if ctx != nil && len(ctx.RecentSenders) >= 2 {
prevSender := ctx.RecentSenders[len(ctx.RecentSenders)-2]
if prevSender != msg.OriginalSenderUID && prevSender != msg.BotUID {
return true
}
}
return false
}
// getImageURLs extracts image attachment URLs from a UnifiedMessage.
func getImageURLs(msg *bridge.UnifiedMessage) []string {
if len(msg.Attachments) == 0 {
return nil
}
var urls []string
for _, a := range msg.Attachments {
if a.Type == "image" && a.URL != "" {
urls = append(urls, a.URL)
}
}
return urls
}
// getVoiceURLs extracts voice/record attachment URLs from a UnifiedMessage.
func getVoiceURLs(msg *bridge.UnifiedMessage) []string {
if len(msg.Attachments) == 0 {
return nil
}
var urls []string
for _, a := range msg.Attachments {
if a.Type == "voice" && a.URL != "" {
urls = append(urls, a.URL)
}
}
return urls
}
// getShortVideoURLs returns video URLs with duration ≤ 20 seconds.
func getShortVideoURLs(msg *bridge.UnifiedMessage) []string {
if len(msg.Attachments) == 0 {
return nil
}
var urls []string
for _, a := range msg.Attachments {
if a.Type == "video" && a.URL != "" && a.Duration > 0 && a.Duration <= 20 {
urls = append(urls, a.URL)
}
}
return urls
}
// forwardToAICore sends a unified message to AI-Core's chat endpoint and returns the response.
func forwardToAICore(cfg *config.Config, msg *bridge.UnifiedMessage, mode, userID, sessionID string, images []string, videoURLs []string, voiceURLs []string, isAdmin bool) (*bridge.UnifiedResponse, error) {
bodyMap := map[string]interface{}{
"user_id": userID,
"session_id": sessionID,
"message": msg.Content,
"mode": mode,
"routing": msg.RouteType,
"nickname": fmt.Sprintf("%s (%s)", msg.SenderName, msg.OriginalSenderUID),
"is_admin": isAdmin,
"source": map[string]string{
"platform": msg.Platform,
"channel_id": msg.ChannelID,
"channel_type": msg.ChannelType,
"sender_name": msg.SenderName,
"original_uid": msg.OriginalSenderUID,
},
}
if len(images) > 0 {
bodyMap["images"] = images
}
if len(videoURLs) > 0 {
bodyMap["video_urls"] = videoURLs
}
if len(voiceURLs) > 0 {
bodyMap["voice_urls"] = voiceURLs
}
reqBody, _ := json.Marshal(bodyMap)
url := cfg.AICoreURL + "/api/v1/chat"
req, _ := http.NewRequest("POST", url, bytes.NewReader(reqBody))
req.Header.Set("Content-Type", "application/json")
if mode == "platform_silent" {
req.Header.Set("Accept", "application/json")
} else {
req.Header.Set("Accept", "text/event-stream")
}
if cfg.InternalToken != "" {
req.Header.Set("X-Internal-Token", cfg.InternalToken)
}
client := &http.Client{Timeout: 120 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("forward to ai-core failed: %w", err)
}
defer resp.Body.Close()
// Silent mode: only check status, no reply expected.
if mode == "platform_silent" {
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("silent forward returned status %d", resp.StatusCode)
}
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "silent", Content: "", FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
// Try JSON first (non-streaming or already-complete response).
var result struct {
Content string `json:"content"`
Error string `json:"error"`
}
bodyBytes, readErr := ioReadAll(resp.Body)
if readErr != nil {
return nil, fmt.Errorf("read ai-core response: %w", readErr)
}
if json.Unmarshal(bodyBytes, &result) == nil {
if result.Error != "" {
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "system_info", Content: result.Error, FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
if result.Content != "" {
return &bridge.UnifiedResponse{
Messages: splitContent(filterActions(result.Content)),
Platform: msg.Platform,
}, nil
}
}
// Not JSON — parse as SSE (text/event-stream).
content := parseSSEAndAccumulate(string(bodyBytes))
if content == "" {
return nil, fmt.Errorf("ai-core returned empty response")
}
return &bridge.UnifiedResponse{
Messages: splitContent(filterActions(content)),
Platform: msg.Platform,
}, nil
}
// ioReadAll reads all bytes from a reader (replaces io.ReadAll for older Go compat).
func ioReadAll(r io.Reader) ([]byte, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(r)
return buf.Bytes(), err
}
// parseSSEAndAccumulate parses a Server-Sent Events stream and accumulates the full text.
// It handles both "delta" chunks and the final "segments" structure.
func parseSSEAndAccumulate(body string) string {
lines := strings.Split(body, "\n")
var deltas []string
var finalText string
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || line == "[DONE]" {
continue
}
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
var chunk map[string]interface{}
if err := json.Unmarshal([]byte(payload), &chunk); err != nil {
continue
}
// Final segment: use the full text if available.
if done, _ := chunk["done"].(bool); done {
if segs, ok := chunk["segments"].([]interface{}); ok && len(segs) > 0 {
if seg, ok := segs[0].(map[string]interface{}); ok {
if t, ok := seg["text"].(string); ok && t != "" {
finalText = t
}
}
}
continue
}
// Accumulate deltas.
if delta, ok := chunk["delta"].(string); ok && delta != "" {
deltas = append(deltas, delta)
}
// Also accumulate content if present (some APIs send full content).
if content, ok := chunk["content"].(string); ok && content != "" {
deltas = append(deltas, content)
}
}
if finalText != "" {
return finalText
}
return strings.Join(deltas, "")
}
// splitContent splits text into separate chat messages.
// It first splits by \n\n (message separator), then within each message
// optionally splits further by ♪ (sentence-break marker).
// Very short segments (< 10 chars) are merged with their neighbors to avoid
// one-word messages followed by a wall of text.
func splitContent(text string) []bridge.ResponseMessage {
// Step 1: split by \n\n (message-level separator) always.
rawParts := strings.Split(text, "\n\n")
var parts []string
for _, p := range rawParts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
// Step 2: within each \n\n segment, split by ♪ if present.
if strings.Contains(p, "♪") {
for _, sub := range strings.Split(p, "♪") {
sub = strings.TrimSpace(sub)
if sub != "" {
parts = append(parts, sub)
}
}
} else {
parts = append(parts, p)
}
}
// Step 3: merge very short segments with neighbors.
const minRunes = 8
var merged []string
for _, part := range parts {
if len(merged) > 0 && len([]rune(merged[len(merged)-1])) < minRunes {
// Previous segment is too short: merge current into it.
merged[len(merged)-1] = merged[len(merged)-1] + "\n" + part
} else if len(merged) > 0 && len([]rune(part)) < minRunes {
// Current segment is too short: merge it into previous.
merged[len(merged)-1] = merged[len(merged)-1] + "\n" + part
} else {
merged = append(merged, part)
}
}
parts = merged
var msgs []bridge.ResponseMessage
for _, part := range parts {
msgs = append(msgs, bridge.ResponseMessage{
DisplayType: "chat",
Content: part,
FormatMode: "plain",
})
}
if len(msgs) == 0 {
return []bridge.ResponseMessage{
{DisplayType: "chat", Content: text, FormatMode: "plain"},
}
}
return msgs
}
// filterActions removes action/emote tags and their content from text.
// Handles both <action> and <app> tag variants that DeepSeek may produce.
func filterActions(text string) string {
// Tags to filter (paired: open → close).
actionTags := [][2]string{
{"<action>", "</action>"},
{"<app>", "</app>"},
}
for _, tags := range actionTags {
openTag, closeTag := tags[0], tags[1]
for {
start := strings.Index(text, openTag)
if start == -1 {
break
}
end := strings.Index(text[start:], closeTag)
if end == -1 {
text = text[:start] + text[start+len(openTag):]
continue
}
text = text[:start] + text[start+end+len(closeTag):]
}
}
return strings.TrimSpace(text)
}
// buildMemoryNamespace creates a memory-isolated user_id for a platform channel.
func buildMemoryNamespace(platform, channelType, channelID string) string {
return fmt.Sprintf("platform_%s_%s_%s", platform, channelType, channelID)
}
// detectAdminMention checks whether a message mentions the admin.
func detectAdminMention(msg *bridge.UnifiedMessage, mapper *bridge.IdentityMapper, cfg *config.Config) (bool, string) {
for _, mentionUID := range msg.Mentions {
if mapper.IsAdmin(msg.Platform, mentionUID) {
return true, fmt.Sprintf("@mention of admin UID %s", mentionUID)
}
}
lowerContent := strings.ToLower(msg.Content)
for _, nickname := range cfg.AdminNicknames {
if strings.Contains(msg.Content, nickname) {
return true, fmt.Sprintf("admin nickname mention: %s", nickname)
}
}
for _, kw := range cfg.AdminMentionKeywords {
if strings.Contains(lowerContent, strings.ToLower(kw)) {
return true, fmt.Sprintf("admin keyword mention: %s", kw)
}
}
return false, ""
}
// hasOnlySilentMessages checks if all response messages are silent (no reply needed).
func hasOnlySilentMessages(messages []bridge.ResponseMessage) bool {
for _, m := range messages {
if m.DisplayType != "silent" && m.Content != "" {
return false
}
}
return true
}
// hashQQSession returns a hash for dispatching a QQ message to a worker.
// Messages from the same conversation (private or group) get the same hash,
// preserving ordering within a session while allowing cross-session parallelism.
func hashQQSession(msg *qqadapter.OBv11Message) uint32 {
h := fnv.New32a()
h.Write([]byte(msg.MessageType))
h.Write([]byte(":"))
if msg.MessageType == "group" {
// Hash by group+user: same user's messages in a group stay ordered,
// but different users in the same group can be processed in parallel.
h.Write([]byte(strconv.FormatInt(msg.GroupID, 10)))
h.Write([]byte(":"))
h.Write([]byte(strconv.FormatInt(msg.UserID, 10)))
} else {
h.Write([]byte(strconv.FormatInt(msg.UserID, 10)))
}
return h.Sum32()
}
func parseIntOr(s string, defaultVal int) int {
n := 0
for _, c := range s {
if c >= '0' && c <= '9' {
n = n*10 + int(c-'0')
} else {
return defaultVal
}
}
if n == 0 && s != "0" {
return defaultVal
}
return n
}
// seedIdentities loads default identity mappings from env vars and stored platform configs.
func seedIdentities(m *bridge.IdentityMapper, store *config.Store, adminNickname string) {
// From environment variables.
for _, entry := range []struct{ envKey, platform string }{
{"QQ_ADMIN_UID", "qq"},
{"TELEGRAM_ADMIN_UID", "telegram"},
} {
if raw := os.Getenv(entry.envKey); raw != "" {
for _, uid := range strings.Split(raw, ",") {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
m.Register(permissions.PlatformIdentity{
Platform: entry.platform,
PlatformUID: uid,
CyreneUser: "admin",
Nickname: adminNickname,
PermissionLevel: "admin",
})
}
}
}
// From stored platform configs (admin_uids field).
for _, name := range []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"} {
stored, _ := store.Get(name)
if stored == nil {
continue
}
syncAdminUIDs(m, name, stored.Fields, adminNickname)
}
}
// syncAdminUIDs registers admin identities from a platform config's admin_uids field.
// Comma-separated list of platform UIDs.
func syncAdminUIDs(m *bridge.IdentityMapper, platform string, fields map[string]string, adminNickname string) {
raw, ok := fields["admin_uids"]
if !ok || raw == "" {
return
}
for _, uid := range strings.Split(raw, ",") {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
m.Register(permissions.PlatformIdentity{
Platform: platform,
PlatformUID: uid,
CyreneUser: "admin",
Nickname: adminNickname,
PermissionLevel: "admin",
})
}
fmt.Printf("Synced admin identities for %s from config: %s\n", platform, raw)
}
// formatDuration returns a human-readable duration string like "1h2m3s".
func formatDuration(d time.Duration) string {
if d < time.Minute {
return fmt.Sprintf("%ds", int(d.Seconds()))
}
if d < time.Hour {
m := int(d.Minutes())
s := int(d.Seconds()) % 60
if s > 0 {
return fmt.Sprintf("%dm%ds", m, s)
}
return fmt.Sprintf("%dm", m)
}
h := int(d.Hours())
m := int(d.Minutes()) % 60
if m > 0 {
return fmt.Sprintf("%dh%dm", h, m)
}
return fmt.Sprintf("%dh", h)
}
// truncateString truncates a string to maxRunes runes, appending "…" if truncated.
func truncateString(s string, maxRunes int) string {
runes := []rune(s)
if len(runes) <= maxRunes {
return s
}
return string(runes[:maxRunes]) + "…"
}
// isMessageHistorical returns true if the message timestamp is before the adapter's connection time,
// indicating it is a replayed historical message that should be silently observed.
func isMessageHistorical(msg *bridge.UnifiedMessage, router *bridge.PlatformRouter) bool {
if msg.Timestamp.IsZero() {
return false
}
for _, a := range router.GetAdaptersByPlatform(msg.Platform) {
if connectedAt, ok := a.(interface{ ConnectedAt() time.Time }); ok {
if msg.Timestamp.Before(connectedAt.ConnectedAt()) {
return true
}
}
}
return false
}