Files
Cyrene/backend/ai-core/internal/memory/client.go
T
AskaEth 91c9ee4b2d fix: 修复 AI 回复无法送达发送者 + 重复消息 + action角色泄露 + OS环境支持
广播逻辑重构:
- AI 回复 (stream_start/response/stream_segments/multi_message/stream_end) 改用 broadcastToUser 发送给所有客户端
- 用户消息回显保持 broadcastToUserExcept 排除发送者

消息去重与角色修复:
- CacheMessage(user) 移至回复生成后,避免本轮 LLM 调用出现重复用户消息
- action 角色消息在 DB 存储时映射为 assistant,DeepSeek 等模型不支持自定义角色
- stream_end defer 机制确保错误路径也会终止客户端思考指示器

OS 完整环境支持:
- host 包重构为 HostBackend 接口 + Direct/WSL/Docker 三种后端
- 新增 os_exec/os_file/os_system 工具供 AI 在完整 Linux 环境中自由操作

其他:
- 视觉模型注入 + 图片预处理后清空 Images 避免传给 Chat 模型
- 图片 URL 相对路径→绝对 URL 转换
- DevTools 链路追踪页面 + 重启修复
- 记忆搜索模糊匹配增强
- 后台思考定时调度支持
- 管理后台页面 (模型配置/用户管理等)
- docs/api 更新广播机制说明

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 12:46:17 +08:00

335 lines
9.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package memory
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"time"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
)
// Client 记忆服务 HTTP 客户端
// ai-core 通过此客户端调用独立的 memory-service
type Client struct {
baseURL string
httpClient *http.Client
}
// NewClient 创建记忆服务客户端
func NewClient(baseURL string) *Client {
return &Client{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 15 * time.Second,
},
}
}
// Ping 检查记忆服务是否可用
func (c *Client) Ping(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/v1/health", nil)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("记忆服务健康检查失败: %d", resp.StatusCode)
}
return nil
}
// Save 保存记忆
func (c *Client) Save(ctx context.Context, entry *model.MemoryEntry) error {
body, _ := json.Marshal(map[string]interface{}{
"user_id": entry.UserID,
"content": entry.Content,
"summary": entry.Summary,
"category": string(entry.Category),
"priority": int(entry.Priority),
"importance": entry.Importance,
"keywords": entry.Keywords,
"session_id": entry.SessionID,
"source": entry.Source,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories", body)
if err != nil {
return fmt.Errorf("保存记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("保存记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
// 解析返回以获取 ID 和 CreatedAt
var result struct {
Memory *model.MemoryEntry `json:"memory"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err == nil && result.Memory != nil {
entry.ID = result.Memory.ID
entry.CreatedAt = result.Memory.CreatedAt
}
return nil
}
// Query 按条件查询记忆
func (c *Client) Query(ctx context.Context, q model.MemoryQuery) ([]model.MemoryEntry, error) {
url := fmt.Sprintf("%s/api/v1/memories?user_id=%s", c.baseURL, q.UserID)
if q.Category != "" {
url += "&category=" + string(q.Category)
}
if q.MinImportance > 0 {
url += fmt.Sprintf("&min_importance=%d", q.MinImportance)
}
if q.Limit > 0 {
url += fmt.Sprintf("&limit=%d", q.Limit)
}
resp, err := c.doRequest(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("查询记忆失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Memories []model.MemoryEntry `json:"memories"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析查询结果失败: %w", err)
}
return result.Memories, nil
}
// QueryByText 语义查询(POST /api/v1/memories/query
func (c *Client) QueryByText(ctx context.Context, userID, queryText, category string, minImportance, limit int) ([]model.MemoryEntry, error) {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
"query_text": queryText,
"category": category,
"min_importance": minImportance,
"limit": limit,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories/query", body)
if err != nil {
return nil, fmt.Errorf("语义查询失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("语义查询失败 (%d): %s", resp.StatusCode, string(respBody))
}
var result struct {
Memories []model.MemoryEntry `json:"memories"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析查询结果失败: %w", err)
}
return result.Memories, nil
}
// GetByID 根据ID获取记忆
func (c *Client) GetByID(ctx context.Context, id string) (*model.MemoryEntry, error) {
resp, err := c.doRequest(ctx, http.MethodGet, c.baseURL+"/api/v1/memories/"+id, nil)
if err != nil {
return nil, fmt.Errorf("获取记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("获取记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
var result struct {
Memory model.MemoryEntry `json:"memory"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析获取结果失败: %w", err)
}
return &result.Memory, nil
}
// Update 更新记忆
func (c *Client) Update(ctx context.Context, entry *model.MemoryEntry) error {
body, _ := json.Marshal(map[string]interface{}{
"content": entry.Content,
"summary": entry.Summary,
"category": string(entry.Category),
"priority": int(entry.Priority),
"importance": entry.Importance,
"keywords": entry.Keywords,
"source": entry.Source,
})
resp, err := c.doRequest(ctx, http.MethodPut, c.baseURL+"/api/v1/memories/"+entry.ID, body)
if err != nil {
return fmt.Errorf("更新记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("更新记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
return nil
}
// Delete 删除记忆
func (c *Client) Delete(ctx context.Context, id string) error {
resp, err := c.doRequest(ctx, http.MethodDelete, c.baseURL+"/api/v1/memories/"+id, nil)
if err != nil {
return fmt.Errorf("删除记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("删除记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
return nil
}
// GetMemoriesByCategory 按分类获取记忆
func (c *Client) GetMemoriesByCategory(ctx context.Context, userID string, category model.MemoryCategory) ([]model.MemoryEntry, error) {
return c.Query(ctx, model.MemoryQuery{
UserID: userID,
Category: category,
Limit: 50,
})
}
// ConsolidateMemories 合并相似记忆
func (c *Client) ConsolidateMemories(ctx context.Context, userID string) (int, error) {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories/consolidate", body)
if err != nil {
return 0, fmt.Errorf("合并记忆失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Merged int `json:"merged"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, fmt.Errorf("解析合并结果失败: %w", err)
}
return result.Merged, nil
}
// DecayMemories 衰减旧记忆
func (c *Client) DecayMemories(ctx context.Context, userID string) (int, int, error) {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories/decay", body)
if err != nil {
return 0, 0, fmt.Errorf("衰减记忆失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Decayed int `json:"decayed"`
Deleted int `json:"deleted"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, 0, fmt.Errorf("解析衰减结果失败: %w", err)
}
return result.Decayed, result.Deleted, nil
}
// GetCategories 获取用户类别统计
func (c *Client) GetCategories(ctx context.Context, userID string) (map[string]int, error) {
url := fmt.Sprintf("%s/api/v1/memories/categories?user_id=%s", c.baseURL, userID)
resp, err := c.doRequest(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("获取类别统计失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Categories map[string]int `json:"categories"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析类别统计失败: %w", err)
}
return result.Categories, nil
}
// SaveThinkingLog 持久化自主思考日志到 memory-service
func (c *Client) SaveThinkingLog(ctx context.Context, userID, content, toolCalls string, toolCallCount, contentLength int) error {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
"content": content,
"tool_calls": toolCalls,
"tool_call_count": toolCallCount,
"content_length": contentLength,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/thinking", body)
if err != nil {
return fmt.Errorf("保存思考日志失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("保存思考日志失败 (%d): %s", resp.StatusCode, string(respBody))
}
return nil
}
// IsReady 检查记忆服务是否就绪
func (c *Client) IsReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
return c.Ping(ctx) == nil
}
// doRequest 内部 HTTP 请求辅助方法
func (c *Client) doRequest(ctx context.Context, method, url string, body []byte) (*http.Response, error) {
var reqBody io.Reader
if body != nil {
reqBody = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, url, reqBody)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.httpClient.Do(req)
if err != nil {
logger.Printf("[memory-client] HTTP 请求失败 %s %s: %v", method, url, err)
return nil, err
}
return resp, nil
}