feat: 消息并行处理 + QQ卡片完整解析 + 视觉OCR融合格式修复

- platform-bridge: 8-worker per-session 并行分发,同会话保序跨会话并行
- platform-bridge: 静默消息 fire-and-forget,不阻塞同用户后续消息
- QQ卡片: html.UnescapeString 解码 NapCat HTML实体,正确解析卡片JSON
- QQ卡片: 输出含应用名/简介/来源/封面URL,封面注入图片管线走视觉
- ai-core: 视觉+OCR结果融合为单句,单图不编号,避免LLM误解为多张图

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-06 09:08:03 +08:00
parent 67b204b23c
commit 4954c1e58b
3 changed files with 354 additions and 105 deletions
@@ -804,7 +804,7 @@ func (o *Orchestrator) PreprocessImages(ctx context.Context, message string, ima
var combined string
switch {
case visionDesc != "" && ocrDesc != "":
combined = fmt.Sprintf("视觉描述%s\n(图中文字:%s", visionDesc, ocrDesc)
combined = fmt.Sprintf("这张图片的内容%s(图中包含的文字:%s", visionDesc, ocrDesc)
case visionDesc != "":
combined = visionDesc
case ocrDesc != "":
@@ -826,7 +826,11 @@ func (o *Orchestrator) PreprocessImages(ctx context.Context, message string, ima
augmented := message
for i, desc := range descriptions {
augmented += fmt.Sprintf("\n\n[图片%d分析结果]: %s", i+1, desc)
label := "图片分析结果"
if len(descriptions) > 1 {
label = fmt.Sprintf("图片%d分析结果", i+1)
}
augmented += fmt.Sprintf("\n\n[%s]: %s", label, desc)
}
return augmented
}
+126 -73
View File
@@ -5,10 +5,12 @@ import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
@@ -171,16 +173,38 @@ func main() {
}
groupSessionID := fmt.Sprintf("platform_%s_%s", msg.Platform, msg.ChannelID)
// Helper: fire-and-forget a silent observation. Silent paths don't
// produce visible responses, so we can background them to avoid
// blocking the worker for the next message from the same user.
fireSilent := func(namespace string, imgs, vids, voices []string) {
go func() {
_, err := forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, imgs, vids, voices, isAdmin)
if err != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: err.Error(),
})
}
}()
}
switch {
case isMessageHistorical(msg, router):
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
response, routeErr = forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, nil, videoURLs, voiceURLs, isAdmin)
fireSilent(namespace, nil, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
case isAdmin && !isBotMentioned && shouldAdminBeSilent(msg, router):
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
response, routeErr = forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, imageURLs, videoURLs, voiceURLs, isAdmin)
fireSilent(namespace, imageURLs, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
case isAdmin:
msg.RouteType = "normal"
@@ -191,30 +215,16 @@ func main() {
response, routeErr = forwardToAICore(cfg, msg, "text", chatUserID, groupSessionID, imageURLs, videoURLs, voiceURLs, isAdmin)
case isMentioned:
// Non-admin user mentioned an admin. Don't respond in channel —
// the admin already gets QQ's native @notification. Observe silently.
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
response, routeErr = forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, imageURLs, videoURLs, voiceURLs, isAdmin)
fireSilent(namespace, imageURLs, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
case isSilent:
msg.RouteType = "silent"
namespace := buildMemoryNamespace(msg.Platform, msg.ChannelType, msg.ChannelID)
silentResponse, silentErr := forwardToAICore(cfg, msg, "platform_silent", namespace, namespace, imageURLs, videoURLs, voiceURLs, isAdmin)
if silentErr != nil {
msgLogger.Log(logging.LogEntry{
Timestamp: time.Now(),
Direction: "outgoing",
Platform: msg.Platform,
ChannelID: msg.ChannelID,
SenderID: msg.OriginalSenderUID,
Success: false,
Error: silentErr.Error(),
})
return nil, silentErr
}
response = silentResponse
routeErr = nil
fireSilent(namespace, imageURLs, videoURLs, voiceURLs)
response = &bridge.UnifiedResponse{Messages: []bridge.ResponseMessage{{DisplayType: "silent"}}, Platform: msg.Platform}
default:
msg.RouteType = "normal"
@@ -367,62 +377,86 @@ func startQQReaders(router *bridge.PlatformRouter) {
qqReaderCancelsMu.Unlock()
adapter := qqAdapter // capture for goroutine
qqMsgCh := make(chan *qqadapter.OBv11Message, 100)
go adapter.ReadMessages(ctx, qqMsgCh)
const numWorkers = 8
// Single reader pumps raw messages from the adapter.
rawCh := make(chan *qqadapter.OBv11Message, 100)
go adapter.ReadMessages(ctx, rawCh)
// Per-worker channels.
var workerChs [numWorkers]chan *qqadapter.OBv11Message
for i := 0; i < numWorkers; i++ {
workerChs[i] = make(chan *qqadapter.OBv11Message, 50)
}
// Dispatcher: route to worker by session hash so same conversation stays ordered.
go func() {
for msg := range qqMsgCh {
response, err := router.RouteMessage(adapterKey, msg)
if err != nil {
fmt.Printf("[qq:%s] route error: %v\n", adapterKey, err)
continue
}
if response != nil && len(response.Messages) > 0 && !hasOnlySilentMessages(response.Messages) {
messageType := msg.MessageType
userID := msg.UserID
groupID := msg.GroupID
// Filter non-empty messages.
var toSend []bridge.ResponseMessage
for _, rm := range response.Messages {
if rm.Content != "" {
rm.Content = qqadapter.ConvertMarkdownToQQ(rm.Content)
toSend = append(toSend, rm)
}
}
interval := time.Duration(adapter.SendIntervalMs()) * time.Millisecond
if interval <= 0 {
interval = 2 * time.Second
}
for i, rm := range toSend {
if i > 0 && interval > 0 {
select {
case <-ctx.Done():
return
case <-time.After(interval):
}
}
// Re-acquire current adapter for hot-reload safety.
cur, err := router.GetAdapter(adapterKey)
if err != nil {
continue
}
curQQ, ok := cur.(*qqadapter.Adapter)
if !ok {
continue
}
var sendErr error
switch messageType {
case "private":
sendErr = curQQ.SendMessage("private", userID, 0, rm.Content)
case "group":
sendErr = curQQ.SendMessage("group", 0, groupID, rm.Content)
}
if sendErr != nil {
fmt.Printf("[qq:%s] send msg error: %v\n", adapterKey, sendErr)
}
}
}
for msg := range rawCh {
idx := hashQQSession(msg) % numWorkers
workerChs[idx] <- msg
}
for i := 0; i < numWorkers; i++ {
close(workerChs[i])
}
}()
// Worker pool: each worker processes messages sequentially; workers run in parallel.
for _, wch := range workerChs {
wch := wch // capture
go func() {
for msg := range wch {
response, err := router.RouteMessage(adapterKey, msg)
if err != nil {
fmt.Printf("[qq:%s] route error: %v\n", adapterKey, err)
continue
}
if response != nil && len(response.Messages) > 0 && !hasOnlySilentMessages(response.Messages) {
messageType := msg.MessageType
userID := msg.UserID
groupID := msg.GroupID
// Filter non-empty messages.
var toSend []bridge.ResponseMessage
for _, rm := range response.Messages {
if rm.Content != "" {
rm.Content = qqadapter.ConvertMarkdownToQQ(rm.Content)
toSend = append(toSend, rm)
}
}
interval := time.Duration(adapter.SendIntervalMs()) * time.Millisecond
if interval <= 0 {
interval = 2 * time.Second
}
for i, rm := range toSend {
if i > 0 && interval > 0 {
select {
case <-ctx.Done():
return
case <-time.After(interval):
}
}
cur, err := router.GetAdapter(adapterKey)
if err != nil {
continue
}
curQQ, ok := cur.(*qqadapter.Adapter)
if !ok {
continue
}
var sendErr error
switch messageType {
case "private":
sendErr = curQQ.SendMessage("private", userID, 0, rm.Content)
case "group":
sendErr = curQQ.SendMessage("group", 0, groupID, rm.Content)
}
if sendErr != nil {
fmt.Printf("[qq:%s] send msg error: %v\n", adapterKey, sendErr)
}
}
}
}
}()
}
}
}
@@ -892,6 +926,25 @@ func hasOnlySilentMessages(messages []bridge.ResponseMessage) bool {
return true
}
// hashQQSession returns a hash for dispatching a QQ message to a worker.
// Messages from the same conversation (private or group) get the same hash,
// preserving ordering within a session while allowing cross-session parallelism.
func hashQQSession(msg *qqadapter.OBv11Message) uint32 {
h := fnv.New32a()
h.Write([]byte(msg.MessageType))
h.Write([]byte(":"))
if msg.MessageType == "group" {
// Hash by group+user: same user's messages in a group stay ordered,
// but different users in the same group can be processed in parallel.
h.Write([]byte(strconv.FormatInt(msg.GroupID, 10)))
h.Write([]byte(":"))
h.Write([]byte(strconv.FormatInt(msg.UserID, 10)))
} else {
h.Write([]byte(strconv.FormatInt(msg.UserID, 10)))
}
return h.Sum32()
}
func parseIntOr(s string, defaultVal int) int {
n := 0
for _, c := range s {
@@ -4,8 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"html"
"log"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
@@ -373,6 +374,7 @@ func (a *Adapter) ToUnified(rawMessage interface{}) (*bridge.UnifiedMessage, err
}
attachments := extractAttachments(msg)
attachments = append(attachments, extractCardImageURLs(msg)...)
return &bridge.UnifiedMessage{
SenderID: senderID,
@@ -507,8 +509,7 @@ func (a *Adapter) ReadMessages(ctx context.Context, msgCh chan<- *OBv11Message)
}
}
// parseJSONCardTitle extracts a human-readable title from a QQ JSON card.
// The data is the raw JSON string from the "data" field of a json-type message segment.
// parseJSONCardTitle extracts human-readable content from a QQ JSON card.
func parseJSONCardTitle(data string) string {
var card struct {
App string `json:"app"`
@@ -519,41 +520,155 @@ func parseJSONCardTitle(data string) string {
Detail1 struct {
Title string `json:"title"`
Desc string `json:"desc"`
Tag string `json:"tag"`
} `json:"detail_1"`
News struct {
Title string `json:"title"`
Desc string `json:"desc"`
Title string `json:"title"`
Desc string `json:"desc"`
Tag string `json:"tag"`
Preview string `json:"preview"`
} `json:"news"`
Music struct {
Title string `json:"title"`
Desc string `json:"desc"`
Title string `json:"title"`
Desc string `json:"desc"`
Tag string `json:"tag"`
Preview string `json:"preview"`
} `json:"music"`
} `json:"meta"`
}
if err := json.Unmarshal([]byte(data), &card); err != nil {
return parseJSONCardFallback(data)
}
var parts []string
push := func(s string) { if s != "" { parts = append(parts, s) } }
// Build header: [音乐分享] or [卡片]
label := cardAppLabel(card.App)
if card.Prompt != "" {
// Strip redundant QQ-specific prefix from prompt (e.g. "[QQ小程序]" when we already say "[小程序]").
prompt := card.Prompt
for _, pfx := range qqPromptPrefixes {
if strings.HasPrefix(prompt, pfx) {
prompt = strings.TrimPrefix(prompt, pfx)
break
}
}
push(label + " " + prompt)
} else if t := firstNonEmpty(card.Meta.Detail1.Title, card.Meta.News.Title, card.Meta.Music.Title, card.Title); t != "" {
push(label + " " + t)
}
// Append description and source tag if available.
desc := firstNonEmpty(card.Meta.Detail1.Desc, card.Meta.Music.Desc, card.Meta.News.Desc, card.Desc)
if desc != "" && desc != card.Prompt && desc != card.Title &&
(len(parts) == 0 || !strings.Contains(parts[0], desc)) {
push("简介:" + desc)
}
tag := firstNonEmpty(card.Meta.Music.Tag, card.Meta.News.Tag, card.Meta.Detail1.Tag)
if tag != "" && (len(parts) == 0 || !strings.Contains(parts[len(parts)-1], tag)) {
push("来源:" + tag)
}
preview := firstNonEmpty(card.Meta.Music.Preview, card.Meta.News.Preview)
if preview != "" {
push("封面:" + preview)
}
if len(parts) == 0 {
return parseJSONCardFallback(data)
}
return strings.Join(parts, "\n")
}
func firstNonEmpty(ss ...string) string {
for _, s := range ss {
if s != "" { return s }
}
return ""
}
// cardAppLabel maps QQ card app IDs to human-readable Chinese labels.
func cardAppLabel(app string) string {
if label, ok := cardAppMap[app]; ok {
return "[" + label + "]"
}
return "[卡片]"
}
var cardAppMap = map[string]string{
"com.tencent.music.lua": "音乐分享",
"com.tencent.structmsg": "结构化消息",
"com.tencent.qun.pro": "群Pro卡片",
"com.tencent.miniapp_01": "小程序",
"com.tencent.contact.lua": "联系人分享",
"com.tencent.groupphoto": "群相册",
"com.tencent.qqfiles": "文件分享",
"com.tencent.announcement": "群公告",
"com.tencent.mobileqq.ark": "ARK消息",
"com.tencent.tuwen.lua": "图文消息",
}
// QQ card prompts often include a self-describing prefix that overlaps with our label.
var qqPromptPrefixes = []string{
"[QQ小程序] ", "[QQ小程序]",
"[QQ音乐] ", "[QQ音乐]",
}
// parseJSONCardFallback tries a generic approach for unknown card structures.
func parseJSONCardFallback(data string) string {
var raw map[string]interface{}
if err := json.Unmarshal([]byte(data), &raw); err != nil {
return "[卡片消息]"
}
// Prefer prompt (e.g. "[分享]标题"), then meta titles, then top-level title.
if card.Prompt != "" {
return "[卡片] " + card.Prompt
var parts []string
push := func(s string) { if s != "" { parts = append(parts, s) } }
label := "[卡片]"
if app, ok := raw["app"].(string); ok {
label = cardAppLabel(app)
}
if card.Meta.Detail1.Title != "" {
return "[卡片] " + card.Meta.Detail1.Title
if s, _ := raw["prompt"].(string); s != "" {
push(label + " " + s)
} else if s, _ := raw["title"].(string); s != "" {
push(label + " " + s)
}
if card.Meta.News.Title != "" {
return "[卡片] " + card.Meta.News.Title
// Search meta for title and desc.
if meta, ok := raw["meta"].(map[string]interface{}); ok {
for _, v := range meta {
if sub, ok := v.(map[string]interface{}); ok {
if s, _ := sub["title"].(string); s != "" && len(parts) == 0 {
push(label + " " + s)
}
}
}
for _, v := range meta {
if sub, ok := v.(map[string]interface{}); ok {
if s, _ := sub["desc"].(string); s != "" {
push("简介:" + s)
break
}
}
}
for _, v := range meta {
if sub, ok := v.(map[string]interface{}); ok {
if s, _ := sub["tag"].(string); s != "" {
push("来源:" + s)
break
}
}
}
}
if card.Meta.Music.Title != "" {
return "[卡片] " + card.Meta.Music.Title
if s, _ := raw["desc"].(string); s != "" && len(parts) < 2 {
push("简介:" + s)
}
if card.Title != "" {
return "[卡片] " + card.Title
if len(parts) == 0 {
log.Printf("[qq] 卡片解析失败,原始JSON: %s", data)
return "[卡片消息]"
}
if card.Desc != "" {
return "[卡片] " + card.Desc
}
return "[卡片消息]"
return strings.Join(parts, "\n")
}
// cqSimplifyMap maps CQ code types to simplified Chinese labels.
@@ -579,11 +694,10 @@ func simplifyCQCodes(s string) string {
}
}
if typ == "json" {
// Parse data= field from [CQ:json,data=URL_ENCODED_JSON]
// Parse data= field from [CQ:json,data=HTML_ENCODED_JSON]
if dataVal := extractCQParam(match, "data"); dataVal != "" {
if decoded, err := url.QueryUnescape(dataVal); err == nil {
return parseJSONCardTitle(decoded)
}
decoded := html.UnescapeString(dataVal)
return parseJSONCardTitle(decoded)
}
return "[卡片消息]"
}
@@ -653,11 +767,24 @@ func extractText(msg *OBv11Message) string {
case "reply":
// Reply is handled separately in ToUnified with reply text.
text += "[回复]"
case "json":
case "json", "card":
if data, ok := s["data"].(map[string]interface{}); ok {
if inner, ok := data["data"].(string); ok && inner != "" {
text += parseJSONCardTitle(inner)
} else {
inner := data["data"]
switch v := inner.(type) {
case string:
if v != "" {
text += parseJSONCardTitle(v)
} else {
text += "[卡片消息]"
}
case map[string]interface{}:
// Already parsed — re-marshal to JSON string for parsing.
if b, err := json.Marshal(v); err == nil {
text += parseJSONCardTitle(string(b))
} else {
text += "[卡片消息]"
}
default:
text += "[卡片消息]"
}
} else {
@@ -678,6 +805,7 @@ var cqRecordRegex = regexp.MustCompile(`\[CQ:record,[^\]]*\]`)
var cqURLRegex = regexp.MustCompile(`\burl=([^,\]]+)`)
var cqDurationRegex = regexp.MustCompile(`\bduration=(\d+)`)
var cqAllRegex = regexp.MustCompile(`\[CQ:[^\]]+\]`)
var cqJSONRegex = regexp.MustCompile(`\[CQ:(?:json|card),[^\]]*data=[^\]]*\]`)
var boldRegex = regexp.MustCompile(`\*\*(.+?)\*\*`)
var italicRegex = regexp.MustCompile(`\*(.+?)\*`)
var strikethroughRegex = regexp.MustCompile(`~~(.+?)~~`)
@@ -779,6 +907,70 @@ func extractAttachments(msg *OBv11Message) []bridge.Attachment {
return attachments
}
// extractCardImageURLs finds json/card CQ codes in the raw message and extracts
// preview/cover image URLs so they can be fed to the vision pipeline.
func extractCardImageURLs(msg *OBv11Message) []bridge.Attachment {
raw := msg.RawMessage
if raw == "" {
if s, ok := msg.Message.(string); ok {
raw = s
}
}
// Match [CQ:json,...] or [CQ:card,...]
matches := cqJSONRegex.FindAllString(raw, -1)
if len(matches) == 0 {
return nil
}
var attachments []bridge.Attachment
seen := map[string]bool{}
for _, m := range matches {
dataVal := extractCQParam(m, "data")
if dataVal == "" {
continue
}
decoded := html.UnescapeString(dataVal)
urls := extractPreviewURLs(decoded)
for _, u := range urls {
if !seen[u] {
seen[u] = true
attachments = append(attachments, bridge.Attachment{Type: "image", URL: u})
}
}
}
return attachments
}
// extractPreviewURLs parses a QQ card JSON and returns any preview/cover image URLs.
func extractPreviewURLs(data string) []string {
var card struct {
Meta struct {
Music struct {
Preview string `json:"preview"`
} `json:"music"`
News struct {
Preview string `json:"preview"`
} `json:"news"`
} `json:"meta"`
// Some card types have preview at top level or in other locations.
Preview string `json:"preview"`
}
if err := json.Unmarshal([]byte(data), &card); err != nil {
return nil
}
var urls []string
if card.Meta.Music.Preview != "" {
urls = append(urls, card.Meta.Music.Preview)
}
if card.Meta.News.Preview != "" {
urls = append(urls, card.Meta.News.Preview)
}
if card.Preview != "" {
urls = append(urls, card.Preview)
}
return urls
}
// ConvertMarkdownToQQ converts markdown to QQ plain-text format.
func ConvertMarkdownToQQ(md string) string {
md = boldRegex.ReplaceAllString(md, "$1")