fix: platform_silent记忆提取 + 群聊上下文整合 + 多QQ实例支持

- platform_silent模式接入Orchestrator记忆提取:被动观察群聊时提取值得记住的信息到对应命名空间
- post_chat后台思考注入平台观察:对话后思考也能看到群聊摘要
- QQ适配器:OneBot v11 self_id动态捕获、CQ图片URL提取、视觉+OCR并行处理
- Router解耦:ConfigName/PlatformName分离,支持多QQ实例独立连接
- 黑白名单功能:后端API + Ethend代理 + UI面板
- \n\n双换行断句:AI回复按双换行分割为多条消息按间隔发送
- @提及修复:bot自感知UID进行@检测
- 群聊上下文共享:channel-based userID避免记忆碎片化
- 消息日志显示处理后内容而非原始SSE数据
- platform-bridge Dockerfile + docker-compose.yml更新

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-31 09:37:18 +08:00
parent 71f0a1abdb
commit 47dce276a4
22 changed files with 2375 additions and 313 deletions
+35
View File
@@ -0,0 +1,35 @@
# ========== 构建阶段 ==========
FROM golang:1.26-alpine AS builder
RUN apk add --no-cache git ca-certificates
WORKDIR /app
COPY backend/platform-bridge/ ./backend/platform-bridge/
WORKDIR /app/backend/platform-bridge
ENV GOPROXY=https://goproxy.cn,direct
RUN go mod download
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /platform-bridge ./cmd/main.go
# ========== 运行阶段 ==========
FROM alpine:3.20
RUN apk add --no-cache ca-certificates tzdata && \
cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
echo "Asia/Shanghai" > /etc/timezone
WORKDIR /app
COPY --from=builder /platform-bridge .
RUN mkdir -p logs && adduser -D -H cyrene && chown -R cyrene:cyrene /app
USER cyrene
EXPOSE 8095
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:8095/health || exit 1
ENTRYPOINT ["./platform-bridge"]
+625 -128
View File
@@ -5,9 +5,12 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
@@ -34,6 +37,13 @@ func main() {
os.Exit(1)
}
// Blocklist settings.
blocklistStore, err := config.NewBlocklistStore("platform_blocklist.json")
if err != nil {
fmt.Printf("FATAL: blocklist store: %v\n", err)
os.Exit(1)
}
// Message logger.
msgLogger, err := logging.NewLogger("logs")
if err != nil {
@@ -48,7 +58,7 @@ func main() {
router := bridge.NewPlatformRouter(mapper, checker)
// Seed default identities from environment.
seedIdentities(mapper)
seedIdentities(mapper, configStore)
// Register platform adapters based on stored configs or defaults.
adapters := createAdapters(cfg, configStore)
@@ -64,7 +74,7 @@ func main() {
Direction: "incoming",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.SenderID,
SenderID: msg.OriginalSenderUID,
SenderName: msg.SenderName,
Content: msg.Content,
ContentType: msg.ContentType,
@@ -72,33 +82,122 @@ func main() {
Success: true,
})
response, err := forwardToAICore(cfg, msg)
if err != nil {
// Routing decisions.
isAdmin := mapper.IsAdmin(msg.Platform, msg.OriginalSenderUID)
isMentioned, mentionReason := detectAdminMention(msg, mapper, cfg)
isBotMentioned := msg.BotUID != "" && containsString(msg.Mentions, msg.BotUID)
isSilent := cfg.PlatformSilentEnabled && !isAdmin && !isBotMentioned
// Blocklist/whitelist check (admin always bypasses).
if blocked := blocklistStore.IsBlocked(msg.ChannelType, msg.ChannelID, msg.OriginalSenderUID, isAdmin); blocked {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.SenderID,
Success: false,
Error: err.Error(),
SenderID: msg.OriginalSenderUID,
SenderName: "Cyrene",
Content: "[blocked]",
Success: true,
})
return nil, err
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "silent", Content: "", FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
// Log outgoing.
for _, rm := range response.Messages {
var response *bridge.UnifiedResponse
var routeErr error
// Extract image URLs for vision/OCR processing (admin + bot-mentioned + admin-mentioned only).
imageURLs := getImageURLs(msg)
includeImages := isAdmin || isBotMentioned || isMentioned
// For group chats, use a channel-based user ID to share context between admin and regular users.
chatUserID := msg.SenderID
if msg.ChannelType == "group" {
chatUserID = fmt.Sprintf("platform_%s_group_%s", msg.Platform, msg.ChannelID)
}
groupSessionID := fmt.Sprintf("platform_%s_%s", msg.Platform, msg.ChannelID)
switch {
case isAdmin:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs)
case isBotMentioned:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs)
case isMentioned:
msg.RouteType = "admin_mention"
enhancedContent := fmt.Sprintf(
"[来自%s平台 %s 频道 %s 的用户 %s 说]\n%s\n\n[系统提示:此消息提及了管理员(原因:%s)。请参考以上消息内容判断是否需要关注。]",
msg.Platform, msg.ChannelType, msg.ChannelID, msg.SenderName, msg.Content, mentionReason,
)
originalContent := msg.Content
msg.Content = enhancedContent
if includeImages {
response, routeErr = forwardToAICore(cfg, msg, "text", "admin", "admin-session-main", imageURLs)
} else {
response, routeErr = forwardToAICore(cfg, msg, "text", "admin", "admin-session-main", nil)
}
msg.Content = originalContent
case isSilent:
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
silentResponse, silentErr := forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, nil)
if silentErr != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: silentErr.Error(),
})
return nil, silentErr
}
response = silentResponse
routeErr = nil
default:
msg.RouteType = "normal"
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, nil)
}
if routeErr != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.SenderID,
SenderName: "Cyrene",
Content: rm.Content,
ContentType: "text",
Success: true,
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: routeErr.Error(),
})
return nil, routeErr
}
// Log outgoing messages (skip for silent route).
if msg.RouteType != "silent" {
for _, rm := range response.Messages {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
SenderName: "Cyrene",
Content: rm.Content,
ContentType: "text",
Success: true,
})
}
}
return response, nil
@@ -121,31 +220,50 @@ func main() {
// Config and log handlers.
ch := handler.NewConfigHandler(configStore, router)
// Hot-reload: on config save/delete, dynamically replace adapters.
ch.SetOnConfigChanged(func(name, platform string, enabled bool, fields map[string]string) {
if enabled {
a := createSingleAdapter(cfg, platform, name, fields)
if a == nil {
fmt.Printf("WARN: cannot create adapter for %s (platform=%s)\n", name, platform)
return
}
if err := router.ReplaceAdapter(a); err != nil {
fmt.Printf("WARN: hot-reload connect %s failed: %v\n", name, err)
} else {
fmt.Printf("Platform adapter hot-reloaded: %s\n", name)
}
// Sync admin identities from config fields.
syncAdminUIDs(mapper, platform, fields)
// Restart QQ reader when QQ config changes.
if platform == "qq" {
startQQReaders(router)
}
} else {
router.RemoveAdapter(name)
fmt.Printf("Platform adapter removed: %s\n", name)
// Cancel reader goroutines for removed adapter.
if platform == "qq" {
startQQReaders(router)
}
}
})
ch.RegisterRoutes(mux)
lh := handler.NewLogHandler(msgLogger)
lh := handler.NewLogHandler(msgLogger, configStore)
lh.RegisterRoutes(mux)
// Log WebSocket for real-time log streaming to ethend.
logWS := handler.NewLogWSHub(msgLogger)
mux.HandleFunc("/ws/logs", logWS.ServeWS)
// Blocklist settings.
blh := handler.NewBlocklistHandler(blocklistStore)
blh.RegisterRoutes(mux)
// Start QQ message reader loop.
qq, _ := router.GetAdapter("qq")
if qqa, ok := qq.(*qqadapter.Adapter); ok {
qqMsgCh := make(chan *qqadapter.OBv11Message, 100)
go qqa.ReadMessages(ctx, qqMsgCh)
go func() {
for msg := range qqMsgCh {
response, err := router.RouteMessage("qq", msg)
if err != nil {
fmt.Printf("[qq] route error: %v\n", err)
continue
}
msgs, err := router.SendResponse(response)
if err != nil {
fmt.Printf("[qq] send error: %v\n", err)
continue
}
_ = msgs
}
}()
}
startQQReaders(router)
addr := ":" + cfg.Port
srv := &http.Server{Addr: addr, Handler: mux}
@@ -171,48 +289,121 @@ func main() {
fmt.Println("Platform Bridge stopped")
}
// createAdapters builds platform adapters, preferring stored configs over defaults.
func createAdapters(cfg *config.Config, store *config.Store) []bridge.PlatformAdapter {
allNames := []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"}
var adapters []bridge.PlatformAdapter
// qqReaderCancels maps adapter config name to its cancel function.
var qqReaderCancels = make(map[string]context.CancelFunc)
var qqReaderCancelsMu sync.Mutex
for _, name := range allNames {
stored, _ := store.Get(name)
if stored != nil && !stored.Enabled {
fmt.Printf("Platform %s is disabled in config, skipping\n", name)
// startQQReaders cancels any existing QQ readers and starts one per registered QQ adapter.
func startQQReaders(router *bridge.PlatformRouter) {
// Cancel all existing readers.
qqReaderCancelsMu.Lock()
for _, cancel := range qqReaderCancels {
cancel()
}
qqReaderCancels = make(map[string]context.CancelFunc)
qqReaderCancelsMu.Unlock()
for _, qqa := range router.GetAdaptersByPlatform("qq") {
qqAdapter, ok := qqa.(*qqadapter.Adapter)
if !ok {
continue
}
adapterKey := qqAdapter.ConfigName()
ctx, cancel := context.WithCancel(context.Background())
qqReaderCancelsMu.Lock()
qqReaderCancels[adapterKey] = cancel
qqReaderCancelsMu.Unlock()
var a bridge.PlatformAdapter
fields := mergeFields(cfg, name, stored)
adapter := qqAdapter // capture for goroutine
qqMsgCh := make(chan *qqadapter.OBv11Message, 100)
go adapter.ReadMessages(ctx, qqMsgCh)
go func() {
for msg := range qqMsgCh {
response, err := router.RouteMessage(adapterKey, msg)
if err != nil {
fmt.Printf("[qq:%s] route error: %v\n", adapterKey, err)
continue
}
if response != nil && len(response.Messages) > 0 && !hasOnlySilentMessages(response.Messages) {
messageType := msg.MessageType
userID := msg.UserID
groupID := msg.GroupID
// Filter non-empty messages.
var toSend []bridge.ResponseMessage
for _, rm := range response.Messages {
if rm.Content != "" {
toSend = append(toSend, rm)
}
}
interval := time.Duration(adapter.SendIntervalMs()) * time.Millisecond
if interval <= 0 {
interval = 2 * time.Second
}
for i, rm := range toSend {
if i > 0 && interval > 0 {
select {
case <-ctx.Done():
return
case <-time.After(interval):
}
}
// Re-acquire current adapter for hot-reload safety.
cur, err := router.GetAdapter(adapterKey)
if err != nil {
continue
}
curQQ, ok := cur.(*qqadapter.Adapter)
if !ok {
continue
}
var sendErr error
switch messageType {
case "private":
sendErr = curQQ.SendMessage("private", userID, 0, rm.Content)
case "group":
sendErr = curQQ.SendMessage("group", 0, groupID, rm.Content)
}
if sendErr != nil {
fmt.Printf("[qq:%s] send msg error: %v\n", adapterKey, sendErr)
}
}
}
}
}()
}
}
switch name {
case "qq":
port := cfg.QQBotPort
if p, ok := fields["bot_port"]; ok && p != "" {
port = p
}
a = qqadapter.NewAdapter(port)
case "telegram":
token := cfg.TelegramToken
if t, ok := fields["bot_token"]; ok && t != "" {
token = t
}
webhookURL := cfg.TelegramWebhookURL
if w, ok := fields["webhook_url"]; ok && w != "" {
webhookURL = w
}
a = telegramadapter.NewAdapter(token, webhookURL)
case "webhook":
a = webhookadapter.NewAdapter("webhook")
case "wechat":
a = wechatstub.NewAdapter()
case "feishu":
a = feishustub.NewAdapter()
case "discord":
a = discordstub.NewAdapter()
// createAdapters builds platform adapters from stored configs.
func createAdapters(cfg *config.Config, store *config.Store) []bridge.PlatformAdapter {
var adapters []bridge.PlatformAdapter
// Build adapters from stored configs. Each config is a separate adapter instance.
seen := make(map[string]bool)
for _, stored := range store.List() {
if !stored.Enabled {
fmt.Printf("Platform %s is disabled in config, skipping\n", stored.Name)
continue
}
platform := stored.Platform
if platform == "" {
platform = stored.Name
}
fields := mergeFields(cfg, platform, &stored)
a := createSingleAdapter(cfg, platform, stored.Name, fields)
if a != nil {
adapters = append(adapters, a)
seen[stored.Name] = true
}
}
// Seed default adapters for platforms that have no stored config.
defaultPlatforms := []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"}
for _, name := range defaultPlatforms {
if seen[name] {
continue
}
fields := mergeFields(cfg, name, nil)
a := createSingleAdapter(cfg, name, name, fields)
if a != nil {
adapters = append(adapters, a)
}
@@ -220,46 +411,132 @@ func createAdapters(cfg *config.Config, store *config.Store) []bridge.PlatformAd
return adapters
}
// createSingleAdapter creates one platform adapter from config fields.
// platform is the base platform type ("qq", "telegram", etc.), configName is the instance key.
func createSingleAdapter(cfg *config.Config, platform, configName string, fields map[string]string) bridge.PlatformAdapter {
switch platform {
case "qq":
port := cfg.QQBotPort
if p, ok := fields["bot_port"]; ok && p != "" {
port = p
}
token := ""
if t, ok := fields["access_token"]; ok {
token = t
}
remoteURL := ""
if r, ok := fields["remote_url"]; ok {
remoteURL = r
}
mode := "server"
if m, ok := fields["mode"]; ok && m != "" {
mode = m
} else if fields["remote_url"] != "" {
mode = "client" // backward compat: old configs with remote_url
}
sendIntervalMs := cfg.MessageSendIntervalMs
if s, ok := fields["send_interval_ms"]; ok && s != "" {
if n := parseIntOr(s, cfg.MessageSendIntervalMs); n > 0 {
sendIntervalMs = n
}
}
return qqadapter.NewAdapter(configName, mode, port, token, remoteURL, sendIntervalMs)
case "telegram":
token := cfg.TelegramToken
if t, ok := fields["bot_token"]; ok && t != "" {
token = t
}
webhookURL := cfg.TelegramWebhookURL
if w, ok := fields["webhook_url"]; ok && w != "" {
webhookURL = w
}
return telegramadapter.NewAdapter(token, webhookURL)
case "webhook":
return webhookadapter.NewAdapter(configName)
case "wechat":
return wechatstub.NewAdapter()
case "feishu":
return feishustub.NewAdapter()
case "discord":
return discordstub.NewAdapter()
}
return nil
}
// mergeFields returns fields from stored config, falling back to env defaults.
func mergeFields(cfg *config.Config, name string, stored *config.PlatformConfig) map[string]string {
func mergeFields(cfg *config.Config, platform string, stored *config.PlatformConfig) map[string]string {
fields := make(map[string]string)
if stored != nil {
for k, v := range stored.Fields {
fields[k] = v
}
}
// Apply env var defaults if fields are missing.
if fields["bot_token"] == "" && cfg.TelegramToken != "" && name == "telegram" {
if fields["bot_token"] == "" && cfg.TelegramToken != "" && platform == "telegram" {
fields["bot_token"] = cfg.TelegramToken
}
if fields["webhook_url"] == "" && cfg.TelegramWebhookURL != "" && name == "telegram" {
if fields["webhook_url"] == "" && cfg.TelegramWebhookURL != "" && platform == "telegram" {
fields["webhook_url"] = cfg.TelegramWebhookURL
}
if fields["bot_port"] == "" && cfg.QQBotPort != "" && name == "qq" {
if fields["bot_port"] == "" && cfg.QQBotPort != "" && platform == "qq" {
fields["bot_port"] = cfg.QQBotPort
}
return fields
}
// containsString checks whether a string slice contains a specific value.
func containsString(list []string, val string) bool {
for _, v := range list {
if v == val {
return true
}
}
return false
}
// getImageURLs extracts image attachment URLs from a UnifiedMessage.
func getImageURLs(msg *bridge.UnifiedMessage) []string {
if len(msg.Attachments) == 0 {
return nil
}
var urls []string
for _, a := range msg.Attachments {
if a.Type == "image" && a.URL != "" {
urls = append(urls, a.URL)
}
}
return urls
}
// forwardToAICore sends a unified message to AI-Core's chat endpoint and returns the response.
func forwardToAICore(cfg *config.Config, msg *bridge.UnifiedMessage) (*bridge.UnifiedResponse, error) {
reqBody, _ := json.Marshal(map[string]interface{}{
"user_id": msg.SenderID,
"session_id": fmt.Sprintf("platform_%s_%s", msg.Platform, msg.ChannelID),
// If images is non-empty, they are passed as URL strings for AI-Core to download and process.
func forwardToAICore(cfg *config.Config, msg *bridge.UnifiedMessage, mode, userID, sessionID string, images []string) (*bridge.UnifiedResponse, error) {
bodyMap := map[string]interface{}{
"user_id": userID,
"session_id": sessionID,
"message": msg.Content,
"mode": "text",
"mode": mode,
"routing": msg.RouteType,
"source": map[string]string{
"platform": msg.Platform,
"channel_id": msg.ChannelID,
"channel_type": msg.ChannelType,
"sender_name": msg.SenderName,
"platform": msg.Platform,
"channel_id": msg.ChannelID,
"channel_type": msg.ChannelType,
"sender_name": msg.SenderName,
"original_uid": msg.OriginalSenderUID,
},
})
}
if len(images) > 0 {
bodyMap["images"] = images
}
reqBody, _ := json.Marshal(bodyMap)
url := cfg.AICoreURL + "/api/v1/chat"
req, _ := http.NewRequest("POST", url, bytes.NewReader(reqBody))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "text/event-stream")
if mode == "platform_silent" {
req.Header.Set("Accept", "application/json")
} else {
req.Header.Set("Accept", "text/event-stream")
}
if cfg.InternalToken != "" {
req.Header.Set("X-Internal-Token", cfg.InternalToken)
}
@@ -271,56 +548,276 @@ func forwardToAICore(cfg *config.Config, msg *bridge.UnifiedMessage) (*bridge.Un
}
defer resp.Body.Close()
// Silent mode: only check status, no reply expected.
if mode == "platform_silent" {
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("silent forward returned status %d", resp.StatusCode)
}
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "silent", Content: "", FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
// Try JSON first (non-streaming or already-complete response).
var result struct {
Content string `json:"content"`
Error string `json:"error"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "chat", Content: buf.String(), FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
bodyBytes, readErr := ioReadAll(resp.Body)
if readErr != nil {
return nil, fmt.Errorf("read ai-core response: %w", readErr)
}
if result.Error != "" {
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "system_info", Content: result.Error, FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
if json.Unmarshal(bodyBytes, &result) == nil {
if result.Error != "" {
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "system_info", Content: result.Error, FormatMode: "plain"},
},
Platform: msg.Platform,
}, nil
}
if result.Content != "" {
return &bridge.UnifiedResponse{
Messages: splitContent(filterActions(result.Content)),
Platform: msg.Platform,
}, nil
}
}
// Not JSON — parse as SSE (text/event-stream).
content := parseSSEAndAccumulate(string(bodyBytes))
if content == "" {
return nil, fmt.Errorf("ai-core returned empty response")
}
return &bridge.UnifiedResponse{
Messages: []bridge.ResponseMessage{
{DisplayType: "chat", Content: result.Content, FormatMode: "plain"},
},
Messages: splitContent(filterActions(content)),
Platform: msg.Platform,
}, nil
}
// seedIdentities loads default identity mappings.
func seedIdentities(m *bridge.IdentityMapper) {
if qqAdmin := os.Getenv("QQ_ADMIN_UID"); qqAdmin != "" {
m.Register(permissions.PlatformIdentity{
Platform: "qq",
PlatformUID: qqAdmin,
CyreneUser: "admin",
Nickname: "开拓者",
PermissionLevel: "admin",
})
// ioReadAll reads all bytes from a reader (replaces io.ReadAll for older Go compat).
func ioReadAll(r io.Reader) ([]byte, error) {
buf := new(bytes.Buffer)
_, err := buf.ReadFrom(r)
return buf.Bytes(), err
}
// parseSSEAndAccumulate parses a Server-Sent Events stream and accumulates the full text.
// It handles both "delta" chunks and the final "segments" structure.
func parseSSEAndAccumulate(body string) string {
lines := strings.Split(body, "\n")
var deltas []string
var finalText string
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" || line == "[DONE]" {
continue
}
if !strings.HasPrefix(line, "data: ") {
continue
}
payload := strings.TrimPrefix(line, "data: ")
var chunk map[string]interface{}
if err := json.Unmarshal([]byte(payload), &chunk); err != nil {
continue
}
// Final segment: use the full text if available.
if done, _ := chunk["done"].(bool); done {
if segs, ok := chunk["segments"].([]interface{}); ok && len(segs) > 0 {
if seg, ok := segs[0].(map[string]interface{}); ok {
if t, ok := seg["text"].(string); ok && t != "" {
finalText = t
}
}
}
continue
}
// Accumulate deltas.
if delta, ok := chunk["delta"].(string); ok && delta != "" {
deltas = append(deltas, delta)
}
// Also accumulate content if present (some APIs send full content).
if content, ok := chunk["content"].(string); ok && content != "" {
deltas = append(deltas, content)
}
}
if tgAdmin := os.Getenv("TELEGRAM_ADMIN_UID"); tgAdmin != "" {
m.Register(permissions.PlatformIdentity{
Platform: "telegram",
PlatformUID: tgAdmin,
CyreneUser: "admin",
Nickname: "开拓者",
PermissionLevel: "admin",
})
if finalText != "" {
return finalText
}
return strings.Join(deltas, "")
}
// splitContent splits text by \n\n into multiple ResponseMessage segments.
// Non-empty segments are each wrapped as a chat message; empty input returns a single empty message.
func splitContent(text string) []bridge.ResponseMessage {
parts := strings.Split(text, "\n\n")
var msgs []bridge.ResponseMessage
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
msgs = append(msgs, bridge.ResponseMessage{
DisplayType: "chat",
Content: part,
FormatMode: "plain",
})
}
}
if len(msgs) == 0 {
return []bridge.ResponseMessage{
{DisplayType: "chat", Content: text, FormatMode: "plain"},
}
}
return msgs
}
// filterActions removes <action>...</action> tags and their content from text.
// Also handles unescaped <action> tags from SSE (e.g. <action>).
func filterActions(text string) string {
// Remove standard <action>...</action> tags.
for {
start := strings.Index(text, "<action>")
if start == -1 {
break
}
end := strings.Index(text[start:], "</action>")
if end == -1 {
// Unclosed action tag — just remove the opening tag.
text = text[:start] + text[start+len("<action>"):]
continue
}
text = text[:start] + text[start+end+len("</action>"):]
}
// Remove SSE-escaped action tags (<action>...</action>).
for {
start := strings.Index(text, `<action>`)
if start == -1 {
break
}
end := strings.Index(text[start:], `</action>`)
if end == -1 {
text = text[:start] + text[start+len(`<action>`):]
continue
}
text = text[:start] + text[start+end+len(`</action>`):]
}
return strings.TrimSpace(text)
}
// buildMemoryNamespace creates a memory-isolated user_id for a platform channel.
func buildMemoryNamespace(platform, channelType, channelID string) string {
return fmt.Sprintf("platform_%s_%s_%s", platform, channelType, channelID)
}
// detectAdminMention checks whether a message mentions the admin.
func detectAdminMention(msg *bridge.UnifiedMessage, mapper *bridge.IdentityMapper, cfg *config.Config) (bool, string) {
for _, mentionUID := range msg.Mentions {
if mapper.IsAdmin(msg.Platform, mentionUID) {
return true, fmt.Sprintf("@mention of admin UID %s", mentionUID)
}
}
lowerContent := strings.ToLower(msg.Content)
for _, nickname := range cfg.AdminNicknames {
if strings.Contains(msg.Content, nickname) {
return true, fmt.Sprintf("admin nickname mention: %s", nickname)
}
}
for _, kw := range cfg.AdminMentionKeywords {
if strings.Contains(lowerContent, strings.ToLower(kw)) {
return true, fmt.Sprintf("admin keyword mention: %s", kw)
}
}
return false, ""
}
// hasOnlySilentMessages checks if all response messages are silent (no reply needed).
func hasOnlySilentMessages(messages []bridge.ResponseMessage) bool {
for _, m := range messages {
if m.DisplayType != "silent" && m.Content != "" {
return false
}
}
return true
}
func parseIntOr(s string, defaultVal int) int {
n := 0
for _, c := range s {
if c >= '0' && c <= '9' {
n = n*10 + int(c-'0')
} else {
return defaultVal
}
}
if n == 0 && s != "0" {
return defaultVal
}
return n
}
// seedIdentities loads default identity mappings from env vars and stored platform configs.
func seedIdentities(m *bridge.IdentityMapper, store *config.Store) {
// From environment variables.
for _, entry := range []struct{ envKey, platform string }{
{"QQ_ADMIN_UID", "qq"},
{"TELEGRAM_ADMIN_UID", "telegram"},
} {
if raw := os.Getenv(entry.envKey); raw != "" {
for _, uid := range strings.Split(raw, ",") {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
m.Register(permissions.PlatformIdentity{
Platform: entry.platform,
PlatformUID: uid,
CyreneUser: "admin",
Nickname: "开拓者",
PermissionLevel: "admin",
})
}
}
}
// From stored platform configs (admin_uids field).
for _, name := range []string{"qq", "telegram", "webhook", "wechat", "feishu", "discord"} {
stored, _ := store.Get(name)
if stored == nil {
continue
}
syncAdminUIDs(m, name, stored.Fields)
}
}
// syncAdminUIDs registers admin identities from a platform config's admin_uids field.
// Comma-separated list of platform UIDs.
func syncAdminUIDs(m *bridge.IdentityMapper, platform string, fields map[string]string) {
raw, ok := fields["admin_uids"]
if !ok || raw == "" {
return
}
for _, uid := range strings.Split(raw, ",") {
uid = strings.TrimSpace(uid)
if uid == "" {
continue
}
m.Register(permissions.PlatformIdentity{
Platform: platform,
PlatformUID: uid,
CyreneUser: "admin",
Nickname: "开拓者",
PermissionLevel: "admin",
})
}
fmt.Printf("Synced admin identities for %s from config: %s\n", platform, raw)
}
@@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"
@@ -17,31 +19,51 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// Adapter implements PlatformAdapter for QQ via OBv11 WebSocket.
// Adapter implements PlatformAdapter for QQ via OneBot v11 WebSocket.
// Supports two modes:
// - "server" (正向 WS): adapter starts a WS server, NapCat connects as client.
// - "client" (反向 WS): adapter connects to NapCat's WS server as a client.
type Adapter struct {
port string
conn *websocket.Conn
connMu sync.Mutex
connected bool
configName string // instance name, e.g. "qq-home"
mode string // "client" or "server"
port string
accessToken string
remoteURL string // NapCat OneBot WS server URL, used in client mode
sendIntervalMs int // minimum interval between consecutive messages
selfID string // bot's own QQ number, populated from incoming messages
conn *websocket.Conn
connMu sync.Mutex
connected bool
srv *http.Server // HTTP server for WS upgrades (server mode only)
// Pending API call responses.
pendingResponses map[string]chan *OBv11APIResponse
respMu sync.Mutex
}
func NewAdapter(port string) *Adapter {
func NewAdapter(configName, mode, port, accessToken, remoteURL string, sendIntervalMs int) *Adapter {
if mode == "" {
mode = "server"
}
return &Adapter{
configName: configName,
mode: mode,
port: port,
accessToken: accessToken,
remoteURL: remoteURL,
sendIntervalMs: sendIntervalMs,
pendingResponses: make(map[string]chan *OBv11APIResponse),
}
}
func (a *Adapter) PlatformName() string { return "qq" }
func (a *Adapter) PlatformName() string { return "qq" }
func (a *Adapter) ConfigName() string { return a.configName }
func (a *Adapter) SendIntervalMs() int { return a.sendIntervalMs }
func (a *Adapter) SelfID() string { return a.selfID }
func (a *Adapter) Capabilities() bridge.PlatformCapabilities {
return bridge.PlatformCapabilities{
MaxMessageLength: 200,
SupportsMarkdown: true, // QQ supports basic markdown
SupportsMarkdown: true,
SupportsImage: true,
SupportsVoice: false,
SupportsEmoji: true,
@@ -51,38 +73,101 @@ func (a *Adapter) Capabilities() bridge.PlatformCapabilities {
}
}
// checkAuth 验证 WebSocket 升级请求的 access_token。
// NapCat 通过两种方式传递: query 参数 ?access_token=xxx 或 Authorization: Bearer xxx 头.
func (a *Adapter) checkAuth(r *http.Request) bool {
if a.accessToken == "" {
return true // 未配置 token,允许所有连接
}
// 1) query 参数
if r.URL.Query().Get("access_token") == a.accessToken {
return true
}
// 2) Authorization: Bearer <token>
auth := r.Header.Get("Authorization")
if strings.HasPrefix(auth, "Bearer ") && strings.TrimPrefix(auth, "Bearer ") == a.accessToken {
return true
}
return false
}
// wsHandler 统一的 WebSocket 连接处理 — 单连接承载 API 调用 + 事件推送 (OneBot 正向 WS).
func (a *Adapter) wsHandler(w http.ResponseWriter, r *http.Request) {
if !a.checkAuth(r) {
http.Error(w, "Forbidden: invalid access_token", http.StatusForbidden)
return
}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Printf("[qq] upgrade error: %v\n", err)
return
}
a.connMu.Lock()
// 关闭旧连接 (NapCat 重连)
if a.conn != nil {
a.conn.Close()
}
a.conn = conn
a.connected = true
a.connMu.Unlock()
fmt.Println("[qq] NapCat/OneBot connected (正向WS)")
}
// legacyHandler 兼容旧的路径 /ws/qq 和 /ws/qq/event.
func (a *Adapter) legacyHandler(w http.ResponseWriter, r *http.Request) {
fmt.Printf("[qq] legacy WS path %s connected (consider changing NapCat URL to root /)\n", r.URL.Path)
a.wsHandler(w, r)
}
func (a *Adapter) Connect(ctx context.Context) error {
if a.mode == "client" {
return a.connectClient(ctx)
}
return a.connectServer()
}
func (a *Adapter) connectClient(ctx context.Context) error {
url := a.remoteURL
if a.accessToken != "" {
sep := "?"
if strings.Contains(url, "?") {
sep = "&"
}
url += sep + "access_token=" + a.accessToken
}
dialer := websocket.DefaultDialer
conn, _, err := dialer.DialContext(ctx, url, nil)
if err != nil {
return fmt.Errorf("dial NapCat WS %s: %w", url, err)
}
a.connMu.Lock()
if a.conn != nil {
a.conn.Close()
}
a.conn = conn
a.connected = true
a.connMu.Unlock()
fmt.Printf("[qq] connected to NapCat OneBot WS (client mode): %s\n", a.remoteURL)
return nil
}
func (a *Adapter) connectServer() error {
mux := http.NewServeMux()
mux.HandleFunc("/ws/qq", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Printf("[qq] upgrade error: %v\n", err)
return
}
a.connMu.Lock()
a.conn = conn
a.connected = true
a.connMu.Unlock()
fmt.Println("[qq] bot connected")
})
mux.HandleFunc("/ws/qq/event", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Printf("[qq] event upgrade error: %v\n", err)
return
}
a.connMu.Lock()
a.conn = conn
a.connected = true
a.connMu.Unlock()
fmt.Println("[qq] event WebSocket connected")
})
mux.HandleFunc("/", a.wsHandler) // NapCat 正向 WS 标准路径
mux.HandleFunc("/ws/qq", a.legacyHandler) // 向下兼容旧配置
mux.HandleFunc("/ws/qq/event", a.legacyHandler) // 向下兼容旧配置
addr := ":" + a.port
srv := &http.Server{Addr: addr, Handler: mux}
a.srv = &http.Server{Addr: addr, Handler: mux}
go func() {
fmt.Printf("[qq] listening on %s (waiting for bot WebSocket connection)\n", addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("[qq] WebSocket server on %s (waiting for NapCat forward WS connection)\n", addr)
if a.accessToken != "" {
fmt.Println("[qq] access_token 已配置,将验证连接请求")
}
if err := a.srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
fmt.Printf("[qq] server error: %v\n", err)
}
}()
@@ -91,12 +176,22 @@ func (a *Adapter) Connect(ctx context.Context) error {
func (a *Adapter) Disconnect(ctx context.Context) error {
a.connMu.Lock()
defer a.connMu.Unlock()
if a.conn != nil {
a.conn.Close()
a.conn = nil
}
a.connected = false
srv := a.srv
a.srv = nil
a.connMu.Unlock()
if srv != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("qq server shutdown: %w", err)
}
}
return nil
}
@@ -120,10 +215,8 @@ func (a *Adapter) ToUnified(rawMessage interface{}) (*bridge.UnifiedMessage, err
return nil, fmt.Errorf("expected *OBv11Message, got %T", rawMessage)
}
// Extract text content.
content := extractText(msg)
// Determine sender.
senderID := ""
senderName := "unknown"
channelType := "direct"
@@ -150,7 +243,6 @@ func (a *Adapter) ToUnified(rawMessage interface{}) (*bridge.UnifiedMessage, err
}
}
// Extract mentions.
var mentions []string
if segments, ok := msg.Message.([]interface{}); ok {
for _, s := range segments {
@@ -166,6 +258,8 @@ func (a *Adapter) ToUnified(rawMessage interface{}) (*bridge.UnifiedMessage, err
}
}
attachments := extractAttachments(msg)
return &bridge.UnifiedMessage{
SenderID: senderID,
SenderName: senderName,
@@ -176,6 +270,7 @@ func (a *Adapter) ToUnified(rawMessage interface{}) (*bridge.UnifiedMessage, err
ContentType: "text",
MessageID: fmt.Sprintf("%d", msg.MessageID),
Mentions: mentions,
Attachments: attachments,
RawData: rawMessage,
Timestamp: time.Unix(msg.Time, 0),
}, nil
@@ -189,7 +284,6 @@ func (a *Adapter) FromUnified(response *bridge.UnifiedResponse) ([]bridge.Platfo
if rm.FormatMode == "markdown" {
content = convertMarkdownToQQ(rm.Content)
}
// QQ prefers shorter messages — split if needed.
runes := []rune(content)
if len(runes) > 200 {
content = string(runes[:200])
@@ -240,6 +334,14 @@ func (a *Adapter) ReadMessages(ctx context.Context, msgCh chan<- *OBv11Message)
conn := a.conn
a.connMu.Unlock()
if conn == nil {
// Client mode: auto-reconnect when connection is lost.
if a.mode == "client" {
if err := a.connectClient(ctx); err != nil {
fmt.Printf("[qq] reconnect failed: %v, retrying in 3s...\n", err)
time.Sleep(3 * time.Second)
}
continue
}
time.Sleep(time.Second)
continue
}
@@ -248,6 +350,7 @@ func (a *Adapter) ReadMessages(ctx context.Context, msgCh chan<- *OBv11Message)
if err != nil {
fmt.Printf("[qq] read error: %v\n", err)
a.connMu.Lock()
a.conn.Close()
a.conn = nil
a.connected = false
a.connMu.Unlock()
@@ -258,7 +361,6 @@ func (a *Adapter) ReadMessages(ctx context.Context, msgCh chan<- *OBv11Message)
// Try to parse as OBv11 message (event from QQ).
var msg OBv11Message
if err := json.Unmarshal(raw, &msg); err != nil {
// Might be an API response.
var resp OBv11APIResponse
if err := json.Unmarshal(raw, &resp); err != nil {
fmt.Printf("[qq] unknown message: %s\n", string(raw))
@@ -275,7 +377,12 @@ func (a *Adapter) ReadMessages(ctx context.Context, msgCh chan<- *OBv11Message)
continue
}
// Only handle message events.
// Capture bot's own QQ number from incoming messages.
if msg.SelfID != 0 && a.selfID == "" {
a.selfID = fmt.Sprintf("%d", msg.SelfID)
fmt.Printf("[qq:%s] self ID captured: %s\n", a.configName, a.selfID)
}
if msg.PostType == "message" {
select {
case msgCh <- &msg:
@@ -312,13 +419,64 @@ func extractText(msg *OBv11Message) string {
return ""
}
var cqImageRegex = regexp.MustCompile(`\[CQ:image,[^\]]*\]`)
var cqURLRegex = regexp.MustCompile(`\burl=([^,\]]+)`)
// extractAttachments extracts image URLs from OBv11Message.
// Handles both string format (CQ codes in raw_message) and array format (parsed segments).
func extractAttachments(msg *OBv11Message) []bridge.Attachment {
var attachments []bridge.Attachment
// Array format: iterate segments looking for type="image".
if segments, ok := msg.Message.([]interface{}); ok {
for _, s := range segments {
if seg, ok := s.(map[string]interface{}); ok {
if seg["type"] != "image" {
continue
}
data, _ := seg["data"].(map[string]interface{})
if data == nil {
continue
}
url, _ := data["url"].(string)
file, _ := data["file"].(string)
if url == "" {
continue
}
attachments = append(attachments, bridge.Attachment{
Type: "image",
URL: url,
FileName: file,
})
}
}
return attachments
}
// String format: parse CQ codes from RawMessage or string Message.
raw := msg.RawMessage
if raw == "" {
if s, ok := msg.Message.(string); ok {
raw = s
}
}
matches := cqImageRegex.FindAllString(raw, -1)
for _, m := range matches {
urlMatch := cqURLRegex.FindStringSubmatch(m)
if len(urlMatch) >= 2 {
attachments = append(attachments, bridge.Attachment{
Type: "image",
URL: urlMatch[1],
})
}
}
return attachments
}
// convertMarkdownToQQ converts common markdown to QQ-supported format.
func convertMarkdownToQQ(md string) string {
// QQ supports basic markdown: **bold**, *italic*, ~~strikethrough~~
// Remove unsupported elements (headings, code blocks, links).
md = removeHeadings(md)
md = removeCodeBlocks(md)
// Preserve bold, italic, strikethrough which QQ supports.
return md
}
@@ -332,7 +490,6 @@ func removeHeadings(s string) string {
}
func removeCodeBlocks(s string) string {
// Simple: remove ``` markers.
result := ""
inCode := false
for _, line := range splitLines(s) {
@@ -376,7 +533,6 @@ func stripPrefix(s, prefix string) string {
}
func replaceLine(s, old, new string) string {
// Simple: find old line and replace with new.
idx := indexOf(s, old)
if idx < 0 {
return s
@@ -44,6 +44,23 @@ func (m *IdentityMapper) Resolve(platform, platformUID string) (*permissions.Pla
return id, nil
}
// ResolveOrNil finds the Cyrene user for a platform identity, returning nil for unknown users.
func (m *IdentityMapper) ResolveOrNil(platform, platformUID string) *permissions.PlatformIdentity {
m.mu.RLock()
defer m.mu.RUnlock()
plat, ok := m.byPlatform[platform]
if !ok {
return nil
}
return plat[platformUID]
}
// IsAdmin returns true if the given platform user is a registered admin.
func (m *IdentityMapper) IsAdmin(platform, platformUID string) bool {
id := m.ResolveOrNil(platform, platformUID)
return id != nil && id.PermissionLevel == "admin"
}
// List returns all identities for a platform.
func (m *IdentityMapper) List(platform string) []permissions.PlatformIdentity {
m.mu.RLock()
@@ -1,12 +1,22 @@
package bridge
import (
"context"
"fmt"
"sync"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/permissions"
)
// adapterKey returns the unique key for an adapter in the router map.
// Uses ConfigName() if the adapter implements it, otherwise PlatformName().
func adapterKey(a PlatformAdapter) string {
if named, ok := a.(interface{ ConfigName() string }); ok {
return named.ConfigName()
}
return a.PlatformName()
}
// PlatformRouter manages all platform adapters and routes messages.
type PlatformRouter struct {
mu sync.RWMutex
@@ -37,11 +47,37 @@ func NewPlatformRouter(mapper *IdentityMapper, checker *permissions.Checker) *Pl
}
}
// RegisterAdapter adds a platform adapter.
// RegisterAdapter adds a platform adapter, keyed by its config name.
func (r *PlatformRouter) RegisterAdapter(a PlatformAdapter) {
r.mu.Lock()
defer r.mu.Unlock()
r.adapters[a.PlatformName()] = a
r.adapters[adapterKey(a)] = a
}
// RemoveAdapter disconnects and removes a platform adapter.
func (r *PlatformRouter) RemoveAdapter(platform string) {
r.mu.Lock()
a, ok := r.adapters[platform]
if ok {
delete(r.adapters, platform)
}
r.mu.Unlock()
if ok {
a.Disconnect(context.Background())
}
}
// ReplaceAdapter disconnects the old adapter (if present), registers the new one,
// and connects it. Returns an error if the new adapter fails to connect.
func (r *PlatformRouter) ReplaceAdapter(a PlatformAdapter) error {
key := adapterKey(a)
r.mu.Lock()
if old, ok := r.adapters[key]; ok {
old.Disconnect(context.Background())
}
r.adapters[key] = a
r.mu.Unlock()
return a.Connect(context.Background())
}
// GetAdapter returns the adapter for a platform.
@@ -55,7 +91,7 @@ func (r *PlatformRouter) GetAdapter(platform string) (PlatformAdapter, error) {
return a, nil
}
// ListAdapters returns all registered adapter names.
// ListAdapters returns all registered adapter names (config names).
func (r *PlatformRouter) ListAdapters() []string {
r.mu.RLock()
defer r.mu.RUnlock()
@@ -66,14 +102,28 @@ func (r *PlatformRouter) ListAdapters() []string {
return names
}
// GetAdaptersByPlatform returns all registered adapters for a given platform type.
func (r *PlatformRouter) GetAdaptersByPlatform(platform string) []PlatformAdapter {
r.mu.RLock()
defer r.mu.RUnlock()
var result []PlatformAdapter
for _, a := range r.adapters {
if a.PlatformName() == platform {
result = append(result, a)
}
}
return result
}
// SetMessageHandler sets the callback for processing unified messages.
func (r *PlatformRouter) SetMessageHandler(h MessageHandler) {
r.handler = h
}
// RouteMessage converts a platform message to unified, checks permissions, and dispatches.
func (r *PlatformRouter) RouteMessage(platform string, rawMsg interface{}) (*UnifiedResponse, error) {
a, err := r.GetAdapter(platform)
// adapterKey is the config name (e.g., "qq", "qq-home") used to look up the adapter instance.
func (r *PlatformRouter) RouteMessage(adapterKey string, rawMsg interface{}) (*UnifiedResponse, error) {
a, err := r.GetAdapter(adapterKey)
if err != nil {
return nil, err
}
@@ -83,18 +133,22 @@ func (r *PlatformRouter) RouteMessage(platform string, rawMsg interface{}) (*Uni
return nil, fmt.Errorf("convert to unified: %w", err)
}
// Resolve identity.
identity, err := r.mapper.Resolve(platform, unified.SenderID)
if err != nil {
return nil, fmt.Errorf("identity not found: %w", err)
// Preserve original platform UID before identity mapping.
unified.OriginalSenderUID = unified.SenderID
unified.OriginalRawMessage = rawMsg
// Capture bot's own UID for @mention detection.
if selfAware, ok := a.(interface{ SelfID() string }); ok {
unified.BotUID = selfAware.SelfID()
}
// Merge identity info into the unified message.
unified.SenderID = identity.CyreneUser
unified.SenderName = identity.Nickname
// Apply permission-based filtering.
_ = identity // used by permission checks on tools
// Resolve identity (nil for unknown users; caller decides routing).
// Use platform type (e.g. "qq") for identity resolution, not adapter key.
identity := r.mapper.ResolveOrNil(a.PlatformName(), unified.SenderID)
if identity != nil {
unified.SenderID = identity.CyreneUser
unified.SenderName = identity.Nickname
}
// Update channel context.
r.updateContext(unified)
@@ -108,8 +162,9 @@ func (r *PlatformRouter) RouteMessage(platform string, rawMsg interface{}) (*Uni
return nil, err
}
response.Platform = platform
response.PlatformHints = r.platformHints(platform)
// Use adapter key for response routing so SendResponse finds the correct instance.
response.Platform = adapterKey
response.PlatformHints = r.platformHints(adapterKey)
return response, nil
}
@@ -21,6 +21,12 @@ type UnifiedMessage struct {
RawData interface{} `json:"raw_data,omitempty"`
Timestamp time.Time `json:"timestamp"`
// Routing metadata.
RouteType string `json:"route_type,omitempty"` // "normal", "silent", "admin_mention"
OriginalSenderUID string `json:"original_sender_uid,omitempty"` // preserved before identity mapping
OriginalRawMessage interface{} `json:"-"` // preserved for SendMessage wiring
BotUID string `json:"-"` // bot's own platform UID, set by router
}
// Attachment represents a file/image/voice attachment.
@@ -0,0 +1,150 @@
package config
import (
"encoding/json"
"fmt"
"os"
"sync"
)
// BlocklistMode is either "blacklist" or "whitelist".
type BlocklistSettings struct {
Mode string `json:"mode"` // "blacklist" (default) or "whitelist"
GroupIDs []string `json:"group_ids"` // group IDs to block/allow
UserIDs []string `json:"user_ids"` // private chat user IDs to block/allow
}
// BlocklistStore manages persistence of blocklist settings.
type BlocklistStore struct {
mu sync.RWMutex
path string
settings BlocklistSettings
}
// NewBlocklistStore loads or creates blocklist settings file.
func NewBlocklistStore(path string) (*BlocklistStore, error) {
s := &BlocklistStore{
path: path,
settings: BlocklistSettings{
Mode: "blacklist",
GroupIDs: []string{},
UserIDs: []string{},
},
}
if err := s.load(); err != nil {
return nil, err
}
return s, nil
}
func (s *BlocklistStore) load() error {
data, err := os.ReadFile(s.path)
if err != nil {
if os.IsNotExist(err) {
return s.save() // write defaults
}
return fmt.Errorf("read blocklist file: %w", err)
}
if len(data) == 0 {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
if err := json.Unmarshal(data, &s.settings); err != nil {
return fmt.Errorf("parse blocklist file: %w", err)
}
if s.settings.Mode == "" {
s.settings.Mode = "blacklist"
}
if s.settings.GroupIDs == nil {
s.settings.GroupIDs = []string{}
}
if s.settings.UserIDs == nil {
s.settings.UserIDs = []string{}
}
return nil
}
func (s *BlocklistStore) save() error {
s.mu.RLock()
data, err := json.MarshalIndent(s.settings, "", " ")
s.mu.RUnlock()
if err != nil {
return fmt.Errorf("marshal blocklist: %w", err)
}
tmpPath := s.path + ".tmp"
if err := os.WriteFile(tmpPath, data, 0640); err != nil {
return fmt.Errorf("write blocklist: %w", err)
}
return os.Rename(tmpPath, s.path)
}
// Get returns current blocklist settings.
func (s *BlocklistStore) Get() BlocklistSettings {
s.mu.RLock()
defer s.mu.RUnlock()
cp := BlocklistSettings{
Mode: s.settings.Mode,
GroupIDs: make([]string, len(s.settings.GroupIDs)),
UserIDs: make([]string, len(s.settings.UserIDs)),
}
copy(cp.GroupIDs, s.settings.GroupIDs)
copy(cp.UserIDs, s.settings.UserIDs)
return cp
}
// Set updates and persists blocklist settings.
func (s *BlocklistStore) Set(bs BlocklistSettings) error {
if bs.Mode != "blacklist" && bs.Mode != "whitelist" {
return fmt.Errorf("invalid mode: %s (must be blacklist or whitelist)", bs.Mode)
}
if bs.GroupIDs == nil {
bs.GroupIDs = []string{}
}
if bs.UserIDs == nil {
bs.UserIDs = []string{}
}
s.mu.Lock()
s.settings = bs
s.mu.Unlock()
return s.save()
}
// IsBlocked checks whether a message should be blocked based on channel type and ID.
// In blacklist mode: returns true if the id is IN the list.
// In whitelist mode: returns true if the id is NOT in the list.
// Admin users should call this with isAdmin=true to always bypass.
func (s *BlocklistStore) IsBlocked(channelType, channelID, senderID string, isAdmin bool) bool {
if isAdmin {
return false
}
s.mu.RLock()
defer s.mu.RUnlock()
switch s.settings.Mode {
case "whitelist":
// Block if NOT in the whitelist.
if channelType == "group" {
return !contains(s.settings.GroupIDs, channelID)
}
return !contains(s.settings.UserIDs, senderID)
case "blacklist":
fallthrough
default:
// Block if IN the blacklist.
if channelType == "group" {
return contains(s.settings.GroupIDs, channelID)
}
return contains(s.settings.UserIDs, senderID)
}
}
func contains(list []string, val string) bool {
for _, v := range list {
if v == val {
return true
}
}
return false
}
@@ -1,6 +1,9 @@
package config
import "os"
import (
"os"
"strings"
)
// Config holds Platform Bridge configuration.
type Config struct {
@@ -11,9 +14,17 @@ type Config struct {
InternalToken string
// Platform-specific.
QQBotPort string // port for QQ OBv11 reverse WebSocket
TelegramToken string // Telegram Bot API token
TelegramWebhookURL string // public webhook URL for Telegram
QQBotPort string // port for QQ OBv11 reverse WebSocket
TelegramToken string // Telegram Bot API token
TelegramWebhookURL string // public webhook URL for Telegram
// Silent observation mode.
PlatformSilentEnabled bool // PLATFORM_SILENT_ENABLED, default true
AdminNicknames []string // ADMIN_NICKNAMES, default ["开拓者"]
AdminMentionKeywords []string // ADMIN_MENTION_KEYWORDS, default ["昔涟","Cyrene","管理员"]
// Message sending.
MessageSendIntervalMs int // MSG_SEND_INTERVAL_MS, minimum interval between platform messages (default 2000)
}
func Load() *Config {
@@ -48,5 +59,54 @@ func Load() *Config {
if v := os.Getenv("TELEGRAM_WEBHOOK_URL"); v != "" {
cfg.TelegramWebhookURL = v
}
// Silent observation defaults.
cfg.PlatformSilentEnabled = getEnvBool("PLATFORM_SILENT_ENABLED", true)
cfg.AdminNicknames = getEnvList("ADMIN_NICKNAMES", []string{"开拓者"})
cfg.AdminMentionKeywords = getEnvList("ADMIN_MENTION_KEYWORDS", []string{"昔涟", "Cyrene", "管理员"})
cfg.MessageSendIntervalMs = getEnvInt("MSG_SEND_INTERVAL_MS", 2000)
return cfg
}
func getEnvBool(key string, defaultVal bool) bool {
v := os.Getenv(key)
if v == "" {
return defaultVal
}
return v == "true" || v == "1" || v == "yes"
}
func getEnvInt(key string, defaultVal int) int {
v := os.Getenv(key)
if v == "" {
return defaultVal
}
n := 0
for _, c := range v {
if c >= '0' && c <= '9' {
n = n*10 + int(c-'0')
} else {
return defaultVal
}
}
return n
}
func getEnvList(key string, defaultVal []string) []string {
v := os.Getenv(key)
if v == "" {
return defaultVal
}
parts := strings.Split(v, ",")
result := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
result = append(result, p)
}
}
if len(result) == 0 {
return defaultVal
}
return result
}
@@ -11,6 +11,7 @@ import (
// PlatformConfig holds persistent configuration for one platform adapter.
type PlatformConfig struct {
Name string `json:"name"`
Platform string `json:"platform"` // base platform type: "qq", "telegram", etc.
Enabled bool `json:"enabled"`
Label string `json:"label"`
Fields map[string]string `json:"fields"`
@@ -51,6 +52,12 @@ func (s *Store) load() error {
if err := json.Unmarshal(data, &s.configs); err != nil {
return fmt.Errorf("parse config file: %w", err)
}
// Backward compat: old configs without platform field default to Name.
for _, c := range s.configs {
if c.Platform == "" {
c.Platform = c.Name
}
}
return nil
}
@@ -0,0 +1,44 @@
package handler
import (
"encoding/json"
"net/http"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/config"
)
// BlocklistHandler exposes CRUD for blocklist settings.
type BlocklistHandler struct {
store *config.BlocklistStore
}
func NewBlocklistHandler(store *config.BlocklistStore) *BlocklistHandler {
return &BlocklistHandler{store: store}
}
func (h *BlocklistHandler) RegisterRoutes(mux *http.ServeMux) {
mux.HandleFunc("/api/v1/settings/blocklist", h.handleBlocklist)
}
func (h *BlocklistHandler) handleBlocklist(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
writeJSON(w, http.StatusOK, h.store.Get())
case "POST", "PUT":
var bs config.BlocklistSettings
if err := json.NewDecoder(r.Body).Decode(&bs); err != nil {
writeJSON(w, http.StatusBadRequest, errResp("invalid JSON: "+err.Error()))
return
}
if err := h.store.Set(bs); err != nil {
writeJSON(w, http.StatusBadRequest, errResp(err.Error()))
return
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"status": "saved",
"settings": h.store.Get(),
})
default:
writeJSON(w, http.StatusMethodNotAllowed, errResp("method not allowed"))
}
}
@@ -8,21 +8,27 @@ import (
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/config"
)
var knownPlatforms = map[string]bool{
var validPlatformTypes = map[string]bool{
"qq": true, "telegram": true, "webhook": true,
"wechat": true, "feishu": true, "discord": true,
}
// ConfigHandler exposes CRUD endpoints for platform configs.
type ConfigHandler struct {
store *config.Store
router *bridge.PlatformRouter
store *config.Store
router *bridge.PlatformRouter
onChanged func(name, platform string, enabled bool, fields map[string]string)
}
func NewConfigHandler(store *config.Store, router *bridge.PlatformRouter) *ConfigHandler {
return &ConfigHandler{store: store, router: router}
}
// SetOnConfigChanged sets a callback invoked after config is saved or deleted.
func (h *ConfigHandler) SetOnConfigChanged(fn func(name, platform string, enabled bool, fields map[string]string)) {
h.onChanged = fn
}
func (h *ConfigHandler) RegisterRoutes(mux *http.ServeMux) {
mux.HandleFunc("/api/v1/configs", h.listConfigs)
mux.HandleFunc("/api/v1/configs/", h.handleConfig)
@@ -33,6 +39,7 @@ func (h *ConfigHandler) listConfigs(w http.ResponseWriter, r *http.Request) {
type configSummary struct {
Name string `json:"name"`
Platform string `json:"platform"`
Enabled bool `json:"enabled"`
Label string `json:"label,omitempty"`
Fields map[string]string `json:"fields"`
@@ -46,8 +53,13 @@ func (h *ConfigHandler) listConfigs(w http.ResponseWriter, r *http.Request) {
if a, err := h.router.GetAdapter(c.Name); err == nil {
connected = a.IsConnected()
}
platform := c.Platform
if platform == "" {
platform = c.Name
}
result = append(result, configSummary{
Name: c.Name,
Platform: platform,
Enabled: c.Enabled,
Label: c.Label,
Fields: c.Fields,
@@ -71,6 +83,7 @@ func (h *ConfigHandler) listConfigs(w http.ResponseWriter, r *http.Request) {
}
result = append(result, configSummary{
Name: name,
Platform: name,
Enabled: false,
Fields: map[string]string{},
Connected: connected,
@@ -92,10 +105,6 @@ func (h *ConfigHandler) handleConfig(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusBadRequest, errResp("missing config name"))
return
}
if !knownPlatforms[name] {
writeJSON(w, http.StatusBadRequest, errResp("unknown platform: "+name))
return
}
switch r.Method {
case "GET":
@@ -120,26 +129,37 @@ func (h *ConfigHandler) getConfig(w http.ResponseWriter, r *http.Request, name s
connected = a.IsConnected()
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"name": cfg.Name,
"enabled": cfg.Enabled,
"label": cfg.Label,
"fields": cfg.Fields,
"updated_at": cfg.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
"connected": connected,
"name": cfg.Name,
"platform": cfg.Platform,
"enabled": cfg.Enabled,
"label": cfg.Label,
"fields": cfg.Fields,
"updated_at": cfg.UpdatedAt.Format("2006-01-02T15:04:05Z07:00"),
"connected": connected,
})
}
func (h *ConfigHandler) saveConfig(w http.ResponseWriter, r *http.Request, name string) {
var body struct {
Enabled *bool `json:"enabled"`
Label string `json:"label"`
Fields map[string]string `json:"fields"`
Platform *string `json:"platform"`
Enabled *bool `json:"enabled"`
Label string `json:"label"`
Fields map[string]string `json:"fields"`
}
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
writeJSON(w, http.StatusBadRequest, errResp("invalid JSON: "+err.Error()))
return
}
platform := name
if body.Platform != nil && *body.Platform != "" {
platform = *body.Platform
}
if !validPlatformTypes[platform] {
writeJSON(w, http.StatusBadRequest, errResp("unknown or missing platform type: "+platform))
return
}
enabled := true
if body.Enabled != nil {
enabled = *body.Enabled
@@ -151,29 +171,48 @@ func (h *ConfigHandler) saveConfig(w http.ResponseWriter, r *http.Request, name
}
cfg := config.PlatformConfig{
Name: name,
Enabled: enabled,
Label: body.Label,
Fields: fields,
Name: name,
Platform: platform,
Enabled: enabled,
Label: body.Label,
Fields: fields,
}
if err := h.store.Set(cfg); err != nil {
writeJSON(w, http.StatusInternalServerError, errResp(err.Error()))
return
}
// Trigger hot-reload.
if h.onChanged != nil {
h.onChanged(name, platform, enabled, fields)
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"name": name,
"enabled": enabled,
"label": body.Label,
"fields": fields,
"status": "saved",
"name": name,
"platform": platform,
"enabled": enabled,
"label": body.Label,
"fields": fields,
"status": "saved",
})
}
func (h *ConfigHandler) deleteConfig(w http.ResponseWriter, r *http.Request, name string) {
// Get platform type before deleting (needed for onChanged callback).
platform := name
if cfg, err := h.store.Get(name); err == nil && cfg.Platform != "" {
platform = cfg.Platform
}
if err := h.store.Delete(name); err != nil {
writeJSON(w, http.StatusNotFound, errResp(err.Error()))
return
}
// Trigger hot-reload: disable and clear fields.
if h.onChanged != nil {
h.onChanged(name, platform, false, nil)
}
writeJSON(w, http.StatusOK, map[string]string{"status": "deleted", "name": name})
}
@@ -4,16 +4,18 @@ import (
"net/http"
"strconv"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/config"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/logging"
)
// LogHandler exposes message log retrieval endpoints.
type LogHandler struct {
logger *logging.Logger
store *config.Store
}
func NewLogHandler(logger *logging.Logger) *LogHandler {
return &LogHandler{logger: logger}
func NewLogHandler(logger *logging.Logger, store *config.Store) *LogHandler {
return &LogHandler{logger: logger, store: store}
}
func (h *LogHandler) RegisterRoutes(mux *http.ServeMux) {
@@ -27,6 +29,14 @@ func (h *LogHandler) handleLogs(w http.ResponseWriter, r *http.Request) {
return
}
// Resolve platform type from config name (e.g. "qq-home" → "qq").
platform := name
if h.store != nil {
if cfg, err := h.store.Get(name); err == nil && cfg.Platform != "" {
platform = cfg.Platform
}
}
limit := 100
if l := r.URL.Query().Get("limit"); l != "" {
if n, err := strconv.Atoi(l); err == nil && n > 0 && n <= 1000 {
@@ -34,7 +44,7 @@ func (h *LogHandler) handleLogs(w http.ResponseWriter, r *http.Request) {
}
}
entries, err := h.logger.ReadLogs(name, limit)
entries, err := h.logger.ReadLogs(platform, limit)
if err != nil {
writeJSON(w, http.StatusInternalServerError, errResp(err.Error()))
return
@@ -43,7 +53,7 @@ func (h *LogHandler) handleLogs(w http.ResponseWriter, r *http.Request) {
entries = []logging.LogEntry{}
}
writeJSON(w, http.StatusOK, map[string]interface{}{
"platform": name,
"platform": platform,
"total": len(entries),
"logs": entries,
})
@@ -0,0 +1,86 @@
package handler
import (
"encoding/json"
"net/http"
"sync"
"github.com/gorilla/websocket"
"git.yeij.top/AskaEth/Cyrene/platform-bridge/internal/logging"
)
var wsUpgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
// LogWSHub broadcasts log entries to connected WebSocket clients.
type LogWSHub struct {
mu sync.Mutex
clients map[*websocket.Conn]chan logging.LogEntry
}
// NewLogWSHub creates a LogWSHub and subscribes to the logger.
func NewLogWSHub(logger *logging.Logger) *LogWSHub {
h := &LogWSHub{
clients: make(map[*websocket.Conn]chan logging.LogEntry),
}
logger.OnLog(func(entry logging.LogEntry) {
h.broadcast(entry)
})
return h
}
func (h *LogWSHub) broadcast(entry logging.LogEntry) {
h.mu.Lock()
defer h.mu.Unlock()
for _, ch := range h.clients {
select {
case ch <- entry:
default:
}
}
}
// ServeWS handles WebSocket upgrade and streams log entries to the client.
func (h *LogWSHub) ServeWS(w http.ResponseWriter, r *http.Request) {
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
return
}
ch := make(chan logging.LogEntry, 64)
h.mu.Lock()
h.clients[conn] = ch
h.mu.Unlock()
// Write goroutine: drains ch until it is closed.
done := make(chan struct{})
go func() {
defer close(done)
for entry := range ch {
data, _ := json.Marshal(entry)
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
return
}
}
}()
// Read goroutine: detect client disconnect.
// (websocket requires a reader to detect close frames.)
go func() {
for {
if _, _, err := conn.ReadMessage(); err != nil {
break
}
}
// Client disconnected — stop broadcasting, close channel.
h.mu.Lock()
delete(h.clients, conn)
h.mu.Unlock()
close(ch)
}()
<-done
conn.Close()
}
@@ -25,11 +25,23 @@ type LogEntry struct {
Error string `json:"error,omitempty"`
}
// LogListener receives log entries as they are written.
type LogListener func(LogEntry)
// Logger writes message logs to per-platform JSONL files.
type Logger struct {
mu sync.Mutex
dir string
files map[string]*os.File
mu sync.Mutex
dir string
files map[string]*os.File
listeners []LogListener
}
// OnLog registers a listener that is called for every log entry written.
// The listener is called synchronously; avoid heavy work in the callback.
func (l *Logger) OnLog(fn LogListener) {
l.mu.Lock()
defer l.mu.Unlock()
l.listeners = append(l.listeners, fn)
}
// NewLogger creates a Logger, ensuring the log directory exists.
@@ -60,12 +72,23 @@ func (l *Logger) Log(entry LogEntry) error {
}
l.mu.Lock()
defer l.mu.Unlock()
if _, err := f.Write(append(data, '\n')); err != nil {
l.mu.Unlock()
return fmt.Errorf("write log: %w", err)
}
return f.Sync()
if err := f.Sync(); err != nil {
l.mu.Unlock()
return err
}
listeners := make([]LogListener, len(l.listeners))
copy(listeners, l.listeners)
l.mu.Unlock()
// Notify listeners outside the lock.
for _, fn := range listeners {
fn(entry)
}
return nil
}
// ReadLogs reads the last N log entries for a platform, newest first.