Files
Cyrene/backend/ai-core/internal/memory/client.go
T
AskaEth 87214b9441 feat: Phase 1+2 架构进化 — 连续思考链/主动消息决策/情感状态机/离线自主思考 (86文件)
Phase 1 (基础设施):
- ThinkChain 思考链连续性 + 差异化思考提示词 (persistent)
- AutonomousToolPolicy 工具安全策略 (safe/unsafe/conditional)
- MessageScheduler 自适应消息节奏 (Idle/Available/Busy)
- SessionEnrichmentStore 渐进式上下文丰富 (5层)
- ConversationBus 事件总线 + ResponseCache (dedup)
- pkg/logger 统一日志 + 所有 handler 替换 fmt.Printf
- NPE 守卫/链路优化/数据库表修复/Go workspace

Phase 2 (人格交互):
- EmotionState/EmotionTracker 情感状态机 (5种心情, 情绪衰减)
- ProactiveGuard 主动消息多维决策 (静默时段/紧急度/频率/校验)
- Gateway↔ai-core 在线状态感知链路 (presence notification)
- 离线思考频率控制 + 重连问候 + 离线消息排队

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 15:25:12 +08:00

310 lines
8.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package memory
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"time"
"github.com/yourname/cyrene-ai/ai-core/internal/model"
)
// Client 记忆服务 HTTP 客户端
// ai-core 通过此客户端调用独立的 memory-service
type Client struct {
baseURL string
httpClient *http.Client
}
// NewClient 创建记忆服务客户端
func NewClient(baseURL string) *Client {
return &Client{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 15 * time.Second,
},
}
}
// Ping 检查记忆服务是否可用
func (c *Client) Ping(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/v1/health", nil)
if err != nil {
return err
}
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("记忆服务健康检查失败: %d", resp.StatusCode)
}
return nil
}
// Save 保存记忆
func (c *Client) Save(ctx context.Context, entry *model.MemoryEntry) error {
body, _ := json.Marshal(map[string]interface{}{
"user_id": entry.UserID,
"content": entry.Content,
"summary": entry.Summary,
"category": string(entry.Category),
"priority": int(entry.Priority),
"importance": entry.Importance,
"keywords": entry.Keywords,
"session_id": entry.SessionID,
"source": entry.Source,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories", body)
if err != nil {
return fmt.Errorf("保存记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("保存记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
// 解析返回以获取 ID 和 CreatedAt
var result struct {
Memory *model.MemoryEntry `json:"memory"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err == nil && result.Memory != nil {
entry.ID = result.Memory.ID
entry.CreatedAt = result.Memory.CreatedAt
}
return nil
}
// Query 按条件查询记忆
func (c *Client) Query(ctx context.Context, q model.MemoryQuery) ([]model.MemoryEntry, error) {
url := fmt.Sprintf("%s/api/v1/memories?user_id=%s", c.baseURL, q.UserID)
if q.Category != "" {
url += "&category=" + string(q.Category)
}
if q.MinImportance > 0 {
url += fmt.Sprintf("&min_importance=%d", q.MinImportance)
}
if q.Limit > 0 {
url += fmt.Sprintf("&limit=%d", q.Limit)
}
resp, err := c.doRequest(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("查询记忆失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Memories []model.MemoryEntry `json:"memories"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析查询结果失败: %w", err)
}
return result.Memories, nil
}
// QueryByText 语义查询(POST /api/v1/memories/query
func (c *Client) QueryByText(ctx context.Context, userID, queryText, category string, minImportance, limit int) ([]model.MemoryEntry, error) {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
"query_text": queryText,
"category": category,
"min_importance": minImportance,
"limit": limit,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories/query", body)
if err != nil {
return nil, fmt.Errorf("语义查询失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("语义查询失败 (%d): %s", resp.StatusCode, string(respBody))
}
var result struct {
Memories []model.MemoryEntry `json:"memories"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析查询结果失败: %w", err)
}
return result.Memories, nil
}
// GetByID 根据ID获取记忆
func (c *Client) GetByID(ctx context.Context, id string) (*model.MemoryEntry, error) {
resp, err := c.doRequest(ctx, http.MethodGet, c.baseURL+"/api/v1/memories/"+id, nil)
if err != nil {
return nil, fmt.Errorf("获取记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil, nil
}
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("获取记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
var result struct {
Memory model.MemoryEntry `json:"memory"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析获取结果失败: %w", err)
}
return &result.Memory, nil
}
// Delete 删除记忆
func (c *Client) Delete(ctx context.Context, id string) error {
resp, err := c.doRequest(ctx, http.MethodDelete, c.baseURL+"/api/v1/memories/"+id, nil)
if err != nil {
return fmt.Errorf("删除记忆失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("删除记忆失败 (%d): %s", resp.StatusCode, string(respBody))
}
return nil
}
// GetMemoriesByCategory 按分类获取记忆
func (c *Client) GetMemoriesByCategory(ctx context.Context, userID string, category model.MemoryCategory) ([]model.MemoryEntry, error) {
return c.Query(ctx, model.MemoryQuery{
UserID: userID,
Category: category,
Limit: 50,
})
}
// ConsolidateMemories 合并相似记忆
func (c *Client) ConsolidateMemories(ctx context.Context, userID string) (int, error) {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories/consolidate", body)
if err != nil {
return 0, fmt.Errorf("合并记忆失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Merged int `json:"merged"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, fmt.Errorf("解析合并结果失败: %w", err)
}
return result.Merged, nil
}
// DecayMemories 衰减旧记忆
func (c *Client) DecayMemories(ctx context.Context, userID string) (int, int, error) {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/memories/decay", body)
if err != nil {
return 0, 0, fmt.Errorf("衰减记忆失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Decayed int `json:"decayed"`
Deleted int `json:"deleted"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return 0, 0, fmt.Errorf("解析衰减结果失败: %w", err)
}
return result.Decayed, result.Deleted, nil
}
// GetCategories 获取用户类别统计
func (c *Client) GetCategories(ctx context.Context, userID string) (map[string]int, error) {
url := fmt.Sprintf("%s/api/v1/memories/categories?user_id=%s", c.baseURL, userID)
resp, err := c.doRequest(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("获取类别统计失败: %w", err)
}
defer resp.Body.Close()
var result struct {
Categories map[string]int `json:"categories"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析类别统计失败: %w", err)
}
return result.Categories, nil
}
// SaveThinkingLog 持久化自主思考日志到 memory-service
func (c *Client) SaveThinkingLog(ctx context.Context, userID, content, toolCalls string, toolCallCount, contentLength int) error {
body, _ := json.Marshal(map[string]interface{}{
"user_id": userID,
"content": content,
"tool_calls": toolCalls,
"tool_call_count": toolCallCount,
"content_length": contentLength,
})
resp, err := c.doRequest(ctx, http.MethodPost, c.baseURL+"/api/v1/thinking", body)
if err != nil {
return fmt.Errorf("保存思考日志失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("保存思考日志失败 (%d): %s", resp.StatusCode, string(respBody))
}
return nil
}
// IsReady 检查记忆服务是否就绪
func (c *Client) IsReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
return c.Ping(ctx) == nil
}
// doRequest 内部 HTTP 请求辅助方法
func (c *Client) doRequest(ctx context.Context, method, url string, body []byte) (*http.Response, error) {
var reqBody io.Reader
if body != nil {
reqBody = bytes.NewReader(body)
}
req, err := http.NewRequestWithContext(ctx, method, url, reqBody)
if err != nil {
return nil, err
}
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
resp, err := c.httpClient.Do(req)
if err != nil {
logger.Printf("[memory-client] HTTP 请求失败 %s %s: %v", method, url, err)
return nil, err
}
return resp, nil
}