Files
Cyrene/backend/platform-bridge/cmd/main.go
T

933 lines
28 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
discordstub "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/discord"
feishustub "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/feishu"
qqadapter "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/qq"
telegramadapter "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/telegram"
wechatstub "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/wechat"
webhookadapter "git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/adapter/webhook"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/bridge"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/config"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/handler"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/logging"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/permissions"
)
func main() {
cfg := config.Load()
// Config store for platform adapter configs.
configStore, err := config.NewStore("platform_configs.json")
if err != nil {
fmt.Printf("FATAL: config store: %v\n", err)
os.Exit(1)
}
// Blocklist settings.
blocklistStore, err := config.NewBlocklistStore("platform_blocklist.json")
if err != nil {
fmt.Printf("FATAL: blocklist store: %v\n", err)
os.Exit(1)
}
// Message logger.
msgLogger, err := logging.NewLogger("logs")
if err != nil {
fmt.Printf("FATAL: logger: %v\n", err)
os.Exit(1)
}
defer msgLogger.Close()
// Core components.
mapper := bridge.NewIdentityMapper()
checker := permissions.NewChecker()
router := bridge.NewPlatformRouter(mapper, checker)
// Seed default identities from environment.
seedIdentities(mapper, configStore)
// Register platform adapters based on stored configs or defaults.
adapters := createAdapters(cfg, configStore)
for _, a := range adapters {
router.RegisterAdapter(a)
}
// Set message handler with logging.
router.SetMessageHandler(func(msg *bridge.UnifiedMessage) (*bridge.UnifiedResponse, error) {
// Log incoming.
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "incoming",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
SenderName: msg.OriginalSenderName,
GroupName: msg.GroupName,
Content: msg.Content,
ContentType: msg.ContentType,
MessageID: msg.MessageID,
Success: true,
})
// Routing decisions.
isAdmin := mapper.IsAdmin(msg.Platform, msg.OriginalSenderUID)
isMentioned, _ := detectAdminMention(msg, mapper, cfg)
isBotMentioned := msg.BotUID != "" && containsString(msg.Mentions, msg.BotUID)
isSilent := cfg.PlatformSilentEnabled && !isAdmin && !isBotMentioned
// Add message timestamp for AI context.
if !msg.Timestamp.IsZero() {
timeAgo := time.Since(msg.Timestamp)
timeLabel := fmt.Sprintf("【消息时间: %s (%s前)】\n", msg.Timestamp.Format("15:04:05"), formatDuration(timeAgo))
msg.Content = timeLabel + msg.Content
}
// Enrich group messages with group name and sender info.
if msg.ChannelType == "group" {
groupLabel := msg.ChannelID
if msg.GroupName != "" {
groupLabel = truncateString(msg.GroupName, 8) + " " + msg.ChannelID
}
senderLabel := msg.OriginalSenderName
if senderLabel == "" {
senderLabel = msg.SenderName
}
if isAdmin {
senderLabel = "【管理员】" + msg.OriginalSenderName
}
msg.Content = fmt.Sprintf("[群聊 %s] %s (%s)\n%s", groupLabel, senderLabel, msg.OriginalSenderUID, msg.Content)
}
// Blocklist/whitelist check (admin always bypasses).
if blocked := blocklistStore.IsBlocked(msg.ChannelType, msg.ChannelID, msg.OriginalSenderUID, isAdmin); blocked {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
SenderName: "Cyrene",
Content: "[blocked]",
Success: true,
})
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "silent", Content: "", FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
var response *bridge.UnifiedResponse
var routeErr error
// Extract image URLs for vision/OCR processing (admin + bot-mentioned + admin-mentioned only).
imageURLs := getImageURLs(msg)
// For group chats, use a channel-based user ID to share context between admin and regular users.
chatUserID := msg.SenderID
if msg.ChannelType == "group" {
chatUserID = fmt.Sprintf("platform_%s_group_%s", msg.Platform, msg.ChannelID)
}
groupSessionID := fmt.Sprintf("platform_%s_%s", msg.Platform, msg.ChannelID)
switch {
case isMessageHistorical(msg, router):
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
response, routeErr = forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, nil)
case isAdmin && !isBotMentioned && shouldAdminBeSilent(msg, router):
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
response, routeErr = forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, nil)
case isAdmin:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs)
case isBotMentioned:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs)
case isMentioned:
// Non-admin user mentioned an admin. Don't respond in channel —
// the admin already gets QQ's native @notification. Observe silently.
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
response, routeErr = forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, nil)
case isSilent:
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
silentResponse, silentErr := forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, nil)
if silentErr != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: silentErr.Error(),
})
return nil, silentErr
}
response = silentResponse
routeErr = nil
default:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, nil)
}
if routeErr != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: routeErr.Error(),
})
return nil, routeErr
}
// Log outgoing messages (skip for silent route).
if msg.RouteType != "silent" {
for _, rm := range response.Messages {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.BotUID,
SenderName: "Cyrene",
GroupName: msg.GroupName,
Content: rm.Content,
ContentType: "text",
Success: true,
})
}
}
return response, nil
})
// Connect all adapters.
ctx := context.Background()
for _, a := range adapters {
if err := a.Connect(ctx); err != nil {
fmt.Printf("WARN: connect %s failed: %v\n", a.PlatformName(), err)
} else {
fmt.Printf("Platform adapter connected: %s\n", a.PlatformName())
}
}
// Setup HTTP server.
mux := http.NewServeMux()
bh := handler.NewBridgeHandler(router)
bh.RegisterRoutes(mux)
// Config and log handlers.
ch := handler.NewConfigHandler(configStore, router)
// Hot-reload: on config save/delete, dynamically replace adapters.
ch.SetOnConfigChanged(func(name, platform string, enabled bool, fields map[string]string) {
if enabled {
a := createSingleAdapter(cfg, platform, name, fields)
if a == nil {
fmt.Printf("WARN: cannot create adapter for %s (platform=%s)\n", name, platform)
return
}
if err := router.ReplaceAdapter(a); err != nil {
fmt.Printf("WARN: hot-reload connect %s failed: %v\n", name, err)
} else {
fmt.Printf("Platform adapter hot-reloaded: %s\n", name)
}
// Sync admin identities from config fields.
syncAdminUIDs(mapper, platform, fields)
// Restart QQ reader when QQ config changes.
if platform == "qq" {
startQQReaders(router)
}
} else {
router.RemoveAdapter(name)
fmt.Printf("Platform adapter removed: %s\n", name)
// Cancel reader goroutines for removed adapter.
if platform == "qq" {
startQQReaders(router)
}
}
})
ch.RegisterRoutes(mux)
lh := handler.NewLogHandler(msgLogger, configStore)
lh.RegisterRoutes(mux)
// Log WebSocket for real-time log streaming to ethend.
logWS := handler.NewLogWSHub(msgLogger)
mux.HandleFunc("/ws/logs", logWS.ServeWS)
// Blocklist settings.
blh := handler.NewBlocklistHandler(blocklistStore)
blh.RegisterRoutes(mux)
// Start QQ message reader loop.
startQQReaders(router)
addr := ":" + cfg.Port
srv := &http.Server{Addr: addr, Handler: mux}
go func() {
fmt.Printf("Platform Bridge listening on port %s\n", cfg.Port)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("FATAL: %v\n", err)
os.Exit(1)
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("Shutting down Platform Bridge...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, a := range adapters {
a.Disconnect(shutdownCtx)
}
srv.Shutdown(shutdownCtx)
fmt.Println("Platform Bridge stopped")
}
// qqReaderCancels maps adapter config name to its cancel function.
var qqReaderCancels = make(map[string]context.CancelFunc)
var qqReaderCancelsMu sync.Mutex
// startQQReaders cancels any existing QQ readers and starts one per registered QQ adapter.
func startQQReaders(router *bridge.PlatformRouter) {
// Cancel all existing readers.
qqReaderCancelsMu.Lock()
for _, cancel := range qqReaderCancels {
cancel()
}
qqReaderCancels = make(map[string]context.CancelFunc)
qqReaderCancelsMu.Unlock()
for _, qqa := range router.GetAdaptersByPlatform("qq") {
qqAdapter, ok := qqa.(*qqadapter.Adapter)
if !ok {
continue
}
adapterKey := qqAdapter.ConfigName()
ctx, cancel := context.WithCancel(context.Background())
qqReaderCancelsMu.Lock()
qqReaderCancels[adapterKey] = cancel
qqReaderCancelsMu.Unlock()
adapter := qqAdapter // capture for goroutine
qqMsgCh := make(chan *qqadapter.OBv11Message, 100)
go adapter.ReadMessages(ctx, qqMsgCh)
go func() {
for msg := range qqMsgCh {
response, err := router.RouteMessage(adapterKey, msg)
if err != nil {
fmt.Printf("[qq:%s] route error: %v\n", adapterKey, err)
continue
}
if response != nil && len(response.Messages) > 0 && !hasOnlySilentMessages(response.Messages) {
messageType := msg.MessageType
userID := msg.UserID
groupID := msg.GroupID
// Filter non-empty messages.
var toSend []bridge.ResponseMessage
for _, rm := range response.Messages {
if rm.Content != "" {
rm.Content = qqadapter.ConvertMarkdownToQQ(rm.Content)
toSend = append(toSend, rm)
}
}
interval := time.Duration(adapter.SendIntervalMs()) * time.Millisecond
if interval <= 0 {
interval = 2 * time.Second
}
for i, rm := range toSend {
if i > 0 && interval > 0 {
select {
case <-ctx.Done():
return
case <-time.After(interval):
}
}
// Re-acquire current adapter for hot-reload safety.
cur, err := router.GetAdapter(adapterKey)
if err != nil {
continue
}
curQQ, ok := cur.(*qqadapter.Adapter)
if !ok {
continue
}
var sendErr error
switch messageType {
case "private":
sendErr = curQQ.SendMessage("private", userID, 0, rm.Content)
case "group":
sendErr = curQQ.SendMessage("group", 0, groupID, rm.Content)
}
if sendErr != nil {
fmt.Printf("[qq:%s] send msg error: %v\n", adapterKey, sendErr)
}
}
}
}
}()
}
}
// createAdapters builds platform adapters from stored configs.
func createAdapters(cfg *config.Config, store *config.Store) []bridge.PlatformAdapter {
var adapters []bridge.PlatformAdapter
// Build adapters from stored configs. Each config is a separate adapter instance.
seen := make(map[string]bool)
for _, stored := range store.List() {
if !stored.Enabled {
fmt.Printf("Platform %s is disabled in config, skipping\n", stored.Name)
continue
}
platform := stored.Platform
if platform == "" {
platform = stored.Name
}
fields := mergeFields(cfg, platform, &stored)
a := createSingleAdapter(cfg, platform, stored.Name, fields)
if a != nil {
adapters = append(adapters, a)
seen[stored.Name] = true
}
}
// Seed default adapters for platforms that have no stored config.
defaultPlatforms := []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"}
for _, name := range defaultPlatforms {
if seen[name] {
continue
}
fields := mergeFields(cfg, name, nil)
a := createSingleAdapter(cfg, name, name, fields)
if a != nil {
adapters = append(adapters, a)
}
}
return adapters
}
// createSingleAdapter creates one platform adapter from config fields.
// platform is the base platform type ("qq", "telegram", etc.), configName is the instance key.
func createSingleAdapter(cfg *config.Config, platform, configName string, fields map[string]string) bridge.PlatformAdapter {
switch platform {
case "qq":
port := cfg.QQBotPort
if p, ok := fields["bot_port"]; ok && p != "" {
port = p
}
token := ""
if t, ok := fields["access_token"]; ok {
token = t
}
remoteURL := ""
if r, ok := fields["remote_url"]; ok {
remoteURL = r
}
mode := "server"
if m, ok := fields["mode"]; ok && m != "" {
mode = m
} else if fields["remote_url"] != "" {
mode = "client" // backward compat: old configs with remote_url
}
sendIntervalMs := cfg.MessageSendIntervalMs
if s, ok := fields["send_interval_ms"]; ok && s != "" {
if n := parseIntOr(s, cfg.MessageSendIntervalMs); n > 0 {
sendIntervalMs = n
}
}
return qqadapter.NewAdapter(configName, mode, port, token, remoteURL, sendIntervalMs)
case "telegram":
token := cfg.TelegramToken
if t, ok := fields["bot_token"]; ok && t != "" {
token = t
}
webhookURL := cfg.TelegramWebhookURL
if w, ok := fields["webhook_url"]; ok && w != "" {
webhookURL = w
}
return telegramadapter.NewAdapter(token, webhookURL)
case "webhook":
return webhookadapter.NewAdapter(configName)
case "wechat":
return wechatstub.NewAdapter()
case "feishu":
return feishustub.NewAdapter()
case "discord":
return discordstub.NewAdapter()
}
return nil
}
// mergeFields returns fields from stored config, falling back to env defaults.
func mergeFields(cfg *config.Config, platform string, stored *config.PlatformConfig) map[string]string {
fields := make(map[string]string)
if stored != nil {
for k, v := range stored.Fields {
fields[k] = v
}
}
if fields["bot_token"] == "" && cfg.TelegramToken != "" && platform == "telegram" {
fields["bot_token"] = cfg.TelegramToken
}
if fields["webhook_url"] == "" && cfg.TelegramWebhookURL != "" && platform == "telegram" {
fields["webhook_url"] = cfg.TelegramWebhookURL
}
if fields["bot_port"] == "" && cfg.QQBotPort != "" && platform == "qq" {
fields["bot_port"] = cfg.QQBotPort
}
return fields
}
// containsString checks whether a string slice contains a specific value.
func containsString(list []string, val string) bool {
for _, v := range list {
if v == val {
return true
}
}
return false
}
// shouldAdminBeSilent checks if admin is talking to other users in a group.
// Returns true if 昔涟 should not interrupt (route as silent observation instead).
func shouldAdminBeSilent(msg *bridge.UnifiedMessage, router *bridge.PlatformRouter) bool {
if msg.ChannelType != "group" {
return false
}
// Rule 1: Admin @mentions someone other than the bot → talking to them, don't interrupt.
for _, m := range msg.Mentions {
if m != msg.BotUID {
return true
}
}
// Rule 2: Recent context shows a conversation with non-admin users.
// Note: updateContext runs before this handler, so RecentSenders already
// includes the current message. Check the second-to-last sender instead.
ctx := router.GetContext(msg.Platform, msg.ChannelID)
if ctx != nil && len(ctx.RecentSenders) >= 2 {
prevSender := ctx.RecentSenders[len(ctx.RecentSenders)-2]
if prevSender != msg.OriginalSenderUID && prevSender != msg.BotUID {
return true
}
}
return false
}
// getImageURLs extracts image attachment URLs from a UnifiedMessage.
func getImageURLs(msg *bridge.UnifiedMessage) []string {
if len(msg.Attachments) == 0 {
return nil
}
var urls []string
for _, a := range msg.Attachments {
if a.Type == "image" && a.URL != "" {
urls = append(urls, a.URL)
}
}
return urls
}
// forwardToAICore sends a unified message to AI-Core's chat endpoint and returns the response.
// If images is non-empty, they are passed as URL strings for AI-Core to download and process.
func forwardToAICore(cfg *config.Config, msg *bridge.UnifiedMessage, mode, userID, sessionID string, images []string) (*bridge.UnifiedResponse, error) {
bodyMap := map[string]interface{}{
"user_id": userID,
"session_id": sessionID,
"message": msg.Content,
"mode": mode,
"routing": msg.RouteType,
"nickname": fmt.Sprintf("%s (%s)", msg.SenderName, msg.OriginalSenderUID),
"source": map[string]string{
"platform": msg.Platform,
"channel_id": msg.ChannelID,
"channel_type": msg.ChannelType,
"sender_name": msg.SenderName,
"original_uid": msg.OriginalSenderUID,
},
}
if len(images) > 0 {
bodyMap["images"] = images
}
reqBody, _ := json.Marshal(bodyMap)
url := cfg.AICoreURL + "/api/v1/chat"
req, _ := http.NewRequest("POST", url, bytes.NewReader(reqBody))
req.Header.Set("Content-Type", "application/json")
if mode == "platform_silent" {
req.Header.Set("Accept", "application/json")
} else {
req.Header.Set("Accept", "text/event-stream")
}
if cfg.InternalToken != "" {
req.Header.Set("X-Internal-Token", cfg.InternalToken)
}
client := &http.Client{Timeout: 120 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("forward to ai-core failed: %w", err)
}
defer resp.Body.Close()
// Silent mode: only check status, no reply expected.
if mode == "platform_silent" {
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("silent forward returned status %d", resp.StatusCode)
}
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "silent", Content: "", FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
// Try JSON first (non-streaming or already-complete response).
var result struct {
Content string `json:"content"`
Error string `json:"error"`
}
bodyBytes, readErr := ioReadAll(resp.Body)
if readErr != nil {
return nil, fmt.Errorf("read ai-core response: %w", readErr)
}
if json.Unmarshal(bodyBytes, &result) == nil {
if result.Error != "" {
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "system_info", Content: result.Error, FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
if result.Content != "" {
return &bridge.UnifiedResponse{
Messages: splitContent(filterActions(result.Content)),
Platform: msg.Platform,
}, nil
}
}
// Not JSON — parse as SSE (text/event-stream).
content := parseSSEAndAccumulate(string(bodyBytes))
if content == "" {
return nil, fmt.Errorf("ai-core returned empty response")
}
return &bridge.UnifiedResponse{
Messages: splitContent(filterActions(content)),
Platform: msg.Platform,
}, nil
}
// ioReadAll reads all bytes from a reader (replaces io.ReadAll for older Go compat).
func ioReadAll(r io.Reader) ([]byte, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(r)
return buf.Bytes(), err
}
// parseSSEAndAccumulate parses a Server-Sent Events stream and accumulates the full text.
// It handles both "delta" chunks and the final "segments" structure.
func parseSSEAndAccumulate(body string) string {
lines := strings.Split(body, "\n")
var deltas []string
var finalText string
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || line == "[DONE]" {
continue
}
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
var chunk map[string]interface{}
if err := json.Unmarshal([]byte(payload), &chunk); err != nil {
continue
}
// Final segment: use the full text if available.
if done, _ := chunk["done"].(bool); done {
if segs, ok := chunk["segments"].([]interface{}); ok && len(segs) > 0 {
if seg, ok := segs[0].(map[string]interface{}); ok {
if t, ok := seg["text"].(string); ok && t != "" {
finalText = t
}
}
}
continue
}
// Accumulate deltas.
if delta, ok := chunk["delta"].(string); ok && delta != "" {
deltas = append(deltas, delta)
}
// Also accumulate content if present (some APIs send full content).
if content, ok := chunk["content"].(string); ok && content != "" {
deltas = append(deltas, content)
}
}
if finalText != "" {
return finalText
}
return strings.Join(deltas, "")
}
// splitContent splits text by ♪ (sentence-break marker), then by \n\n within each segment.
// Non-empty segments are each wrapped as a chat message; empty input returns a single empty message.
func splitContent(text string) []bridge.ResponseMessage {
// First split by ♪ sentence-break marker.
var rawParts []string
if strings.Contains(text, "♪") {
rawParts = strings.Split(text, "♪")
} else {
rawParts = strings.Split(text, "\n\n")
}
var parts []string
for _, p := range rawParts {
p = strings.TrimSpace(p)
if p != "" {
parts = append(parts, p)
}
}
var msgs []bridge.ResponseMessage
for _, part := range parts {
msgs = append(msgs, bridge.ResponseMessage{
DisplayType: "chat",
Content: part,
FormatMode: "plain",
})
}
if len(msgs) == 0 {
return []bridge.ResponseMessage{
{DisplayType: "chat", Content: text, FormatMode: "plain"},
}
}
return msgs
}
// filterActions removes <action>...</action> tags and their content from text.
// Also handles unescaped <action> tags from SSE (e.g. <action>).
func filterActions(text string) string {
// Remove standard <action>...</action> tags.
for {
start := strings.Index(text, "<action>")
if start == -1 {
break
}
end := strings.Index(text[start:], "</action>")
if end == -1 {
// Unclosed action tag — just remove the opening tag.
text = text[:start] + text[start+len("<action>"):]
continue
}
text = text[:start] + text[start+end+len("</action>"):]
}
// Remove SSE-escaped action tags (<action>...</action>).
for {
start := strings.Index(text, `<action>`)
if start == -1 {
break
}
end := strings.Index(text[start:], `</action>`)
if end == -1 {
text = text[:start] + text[start+len(`<action>`):]
continue
}
text = text[:start] + text[start+end+len(`</action>`):]
}
return strings.TrimSpace(text)
}
// buildMemoryNamespace creates a memory-isolated user_id for a platform channel.
func buildMemoryNamespace(platform, channelType, channelID string) string {
return fmt.Sprintf("platform_%s_%s_%s", platform, channelType, channelID)
}
// detectAdminMention checks whether a message mentions the admin.
func detectAdminMention(msg *bridge.UnifiedMessage, mapper *bridge.IdentityMapper, cfg *config.Config) (bool, string) {
for _, mentionUID := range msg.Mentions {
if mapper.IsAdmin(msg.Platform, mentionUID) {
return true, fmt.Sprintf("@mention of admin UID %s", mentionUID)
}
}
lowerContent := strings.ToLower(msg.Content)
for _, nickname := range cfg.AdminNicknames {
if strings.Contains(msg.Content, nickname) {
return true, fmt.Sprintf("admin nickname mention: %s", nickname)
}
}
for _, kw := range cfg.AdminMentionKeywords {
if strings.Contains(lowerContent, strings.ToLower(kw)) {
return true, fmt.Sprintf("admin keyword mention: %s", kw)
}
}
return false, ""
}
// hasOnlySilentMessages checks if all response messages are silent (no reply needed).
func hasOnlySilentMessages(messages []bridge.ResponseMessage) bool {
for _, m := range messages {
if m.DisplayType != "silent" && m.Content != "" {
return false
}
}
return true
}
func parseIntOr(s string, defaultVal int) int {
n := 0
for _, c := range s {
if c >= '0' && c <= '9' {
n = n*10 + int(c-'0')
} else {
return defaultVal
}
}
if n == 0 && s != "0" {
return defaultVal
}
return n
}
// seedIdentities loads default identity mappings from env vars and stored platform configs.
func seedIdentities(m *bridge.IdentityMapper, store *config.Store) {
// From environment variables.
for _, entry := range []struct{ envKey, platform string }{
{"QQ_ADMIN_UID", "qq"},
{"TELEGRAM_ADMIN_UID", "telegram"},
} {
if raw := os.Getenv(entry.envKey); raw != "" {
for _, uid := range strings.Split(raw, ",") {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
m.Register(permissions.PlatformIdentity{
Platform: entry.platform,
PlatformUID: uid,
CyreneUser: "admin",
Nickname: "开拓者",
PermissionLevel: "admin",
})
}
}
}
// From stored platform configs (admin_uids field).
for _, name := range []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"} {
stored, _ := store.Get(name)
if stored == nil {
continue
}
syncAdminUIDs(m, name, stored.Fields)
}
}
// syncAdminUIDs registers admin identities from a platform config's admin_uids field.
// Comma-separated list of platform UIDs.
func syncAdminUIDs(m *bridge.IdentityMapper, platform string, fields map[string]string) {
raw, ok := fields["admin_uids"]
if !ok || raw == "" {
return
}
for _, uid := range strings.Split(raw, ",") {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
m.Register(permissions.PlatformIdentity{
Platform: platform,
PlatformUID: uid,
CyreneUser: "admin",
Nickname: "开拓者",
PermissionLevel: "admin",
})
}
fmt.Printf("Synced admin identities for %s from config: %s\n", platform, raw)
}
// formatDuration returns a human-readable duration string like "1h2m3s".
func formatDuration(d time.Duration) string {
if d < time.Minute {
return fmt.Sprintf("%ds", int(d.Seconds()))
}
if d < time.Hour {
m := int(d.Minutes())
s := int(d.Seconds()) % 60
if s > 0 {
return fmt.Sprintf("%dm%ds", m, s)
}
return fmt.Sprintf("%dm", m)
}
h := int(d.Hours())
m := int(d.Minutes()) % 60
if m > 0 {
return fmt.Sprintf("%dh%dm", h, m)
}
return fmt.Sprintf("%dh", h)
}
// truncateString truncates a string to maxRunes runes, appending "…" if truncated.
func truncateString(s string, maxRunes int) string {
runes := []rune(s)
if len(runes) <= maxRunes {
return s
}
return string(runes[:maxRunes]) + "…"
}
// isMessageHistorical returns true if the message timestamp is before the adapter's connection time,
// indicating it is a replayed historical message that should be silently observed.
func isMessageHistorical(msg *bridge.UnifiedMessage, router *bridge.PlatformRouter) bool {
if msg.Timestamp.IsZero() {
return false
}
for _, a := range router.GetAdaptersByPlatform(msg.Platform) {
if connectedAt, ok := a.(interface{ ConnectedAt() time.Time }); ok {
if msg.Timestamp.Before(connectedAt.ConnectedAt()) {
return true
}
}
}
return false
}