cd83eec39e
- rag.Embedder: LLM API 文本向量化 (OpenAI-compatible) - rag.KnowledgeStore: 文档分块 + 重叠窗口 + 余弦相似度搜索 - rag.Retriever: 高级知识检索 + 格式化摘要 - KnowledgeProvider: 子会话提供者,整合入编排管线 - knowledge_search / knowledge_ingest 工具 - EnrichmentData 管线全线支持 KnowledgeInfo Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
810 lines
24 KiB
Go
810 lines
24 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"os/signal"
|
||
"strings"
|
||
"syscall"
|
||
"time"
|
||
|
||
"github.com/joho/godotenv"
|
||
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/background"
|
||
aiConfig "github.com/yourname/cyrene-ai/ai-core/internal/config"
|
||
ctxbuild "github.com/yourname/cyrene-ai/ai-core/internal/context"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/host"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/llm"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/memory"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/model"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/orchestrator"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/persona"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/rag"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/subsession"
|
||
"github.com/yourname/cyrene-ai/ai-core/internal/tools"
|
||
)
|
||
|
||
var cfg Config
|
||
|
||
func main() {
|
||
// 自动加载 .env 文件(来自 backend/.env)
|
||
if err := godotenv.Load("../.env"); err != nil {
|
||
log.Println("ℹ 未找到 .env 文件,将使用环境变量或默认值")
|
||
}
|
||
|
||
log.SetFlags(log.LstdFlags | log.Lshortfile)
|
||
log.Println("🧠 AI-Core 服务启动中...")
|
||
|
||
// 加载配置
|
||
cfg = loadConfig()
|
||
|
||
// 初始化人格加载器
|
||
personaDir := cfg.PersonaDir
|
||
if personaDir == "" {
|
||
personaDir = "./internal/persona"
|
||
}
|
||
personaLoader, err := persona.NewLoader(personaDir)
|
||
if err != nil {
|
||
log.Fatalf("加载人格配置失败: %v", err)
|
||
}
|
||
log.Printf("已加载 %d 个人格: %v", len(personaLoader.List()), personaLoader.List())
|
||
|
||
// 初始化模型配置加载器 (Phase 6: 多模型配置)
|
||
configPath := getEnv("MODELS_CONFIG_PATH", "../models.json")
|
||
configLoader, err := aiConfig.NewLoader(configPath)
|
||
if err != nil {
|
||
log.Printf("⚠ 模型配置加载失败,回退到 .env: %v", err)
|
||
configLoader = nil
|
||
}
|
||
|
||
// 构建 .env 回退配置
|
||
envFallback := llm.OpenAIConfig{
|
||
BaseURL: cfg.LLMBaseURL,
|
||
APIKey: cfg.LLMAPIKey,
|
||
Model: cfg.LLMModel,
|
||
FallbackModel: cfg.LLMFallbackModel,
|
||
Timeout: 120 * time.Second,
|
||
}
|
||
|
||
// 创建 ModelSelector (优先使用 models.json,回退到 .env)
|
||
modelSelector := llm.NewModelSelector(configLoader, envFallback)
|
||
|
||
// 为不同用途创建独立的适配器,支持目的路由
|
||
// 在 .env 回退模式下,所有适配器共享同一 provider
|
||
chatAdapter := modelSelector.DefaultAdapter()
|
||
provider, _ := modelSelector.Select(context.Background(), llm.PurposeIntentAnalysis)
|
||
intentAdapter := llm.NewAdapter(provider)
|
||
provider, _ = modelSelector.Select(context.Background(), llm.PurposeDeepThinking)
|
||
thinkerAdapter := llm.NewAdapter(provider)
|
||
provider, _ = modelSelector.Select(context.Background(), llm.PurposeMemoryExtraction)
|
||
memoryAdapter := llm.NewAdapter(provider)
|
||
|
||
if configLoader != nil && configLoader.HasConfig() {
|
||
log.Printf("LLM适配器已就绪: models.json 驱动 (chat=%s, intent=%s, think=%s, memory=%s)",
|
||
chatAdapter.ModelName(), intentAdapter.ModelName(), thinkerAdapter.ModelName(), memoryAdapter.ModelName())
|
||
} else {
|
||
log.Printf("LLM适配器已就绪: .env 驱动 (模型=%s)", chatAdapter.ModelName())
|
||
}
|
||
|
||
// 初始化记忆系统
|
||
var memStore *memory.Store
|
||
var memRetriever *memory.Retriever
|
||
var memExtractor *memory.Extractor
|
||
|
||
if cfg.DatabaseURL != "" {
|
||
memStore = memory.NewStore(cfg.DatabaseURL)
|
||
defer memStore.Close()
|
||
|
||
memRetriever = memory.NewRetriever(memStore, nil)
|
||
|
||
// 记忆提取器使用 memory purpose 适配器
|
||
memExtractor = memory.NewExtractor(memStore, func(ctx context.Context, messages []model.LLMMessage) (*model.LLMResponse, error) {
|
||
return memoryAdapter.Chat(ctx, messages)
|
||
})
|
||
log.Println("记忆提取器已就绪")
|
||
}
|
||
|
||
// 初始化会话历史存储
|
||
convStore := ctxbuild.NewConversationStore(50)
|
||
log.Println("会话历史存储已就绪 (上限50条)")
|
||
|
||
// 初始化上下文构建器
|
||
ctxBuilder := ctxbuild.NewBuilder(convStore)
|
||
|
||
// 初始化 IoT 客户端
|
||
var iotClient *tools.IoTClient
|
||
if cfg.IoTServiceURL != "" {
|
||
iotClient = tools.NewIoTClient(cfg.IoTServiceURL)
|
||
log.Printf("IoT 客户端已就绪: %s", cfg.IoTServiceURL)
|
||
} else {
|
||
log.Println("IoT 客户端未配置 (IOT_SERVICE_URL 和 IOT_DEBUG_SERVICE_URL 均为空)")
|
||
}
|
||
|
||
// 初始化主机操控管理器 (Phase 6.2: 沙箱执行 + 文件系统隔离)
|
||
hostSandbox := host.NewSandbox(host.DefaultSandboxConfig())
|
||
hostManager := host.NewManager(hostSandbox)
|
||
dataDir := getEnv("DATA_DIR", "/tmp/cyrene_data")
|
||
hostManager.SetAllowedDirs([]string{dataDir, os.TempDir(), "."})
|
||
log.Printf("主机操控管理器已就绪: 沙箱执行 + 文件隔离 (数据目录=%s)", dataDir)
|
||
|
||
// 初始化 RAG 知识库 (Phase 6.6: 知识库 RAG 增强)
|
||
knowledgeDir := getEnv("KNOWLEDGE_DIR", "./data/knowledge")
|
||
ragEmbedder := rag.NewEmbedder(cfg.LLMBaseURL, cfg.LLMAPIKey, "text-embedding-3-small")
|
||
knowledgeStore := rag.NewKnowledgeStore(ragEmbedder, knowledgeDir)
|
||
knowledgeRetriever := rag.NewRetriever(knowledgeStore)
|
||
log.Printf("RAG 知识库已就绪: 目录=%s, 嵌入模型=text-embedding-3-small", knowledgeDir)
|
||
|
||
// 初始化工具注册中心
|
||
toolRegistry := tools.NewRegistry()
|
||
if getEnvBool("ENABLE_TOOLS", true) {
|
||
toolRegistry.Register(tools.NewWebFetchTool())
|
||
toolRegistry.Register(tools.NewWebSearchTool())
|
||
toolRegistry.Register(tools.NewCalculatorTool())
|
||
toolRegistry.Register(tools.NewDateTimeTool())
|
||
toolRegistry.Register(tools.NewHTTPTool())
|
||
toolRegistry.Register(tools.NewJSONTool())
|
||
toolRegistry.Register(tools.NewTextTool())
|
||
toolRegistry.Register(tools.NewRandomTool())
|
||
toolRegistry.Register(tools.NewCryptoTool())
|
||
toolRegistry.Register(tools.NewMarkdownTool())
|
||
|
||
// File tool uses DATA_DIR or defaults to /tmp/cyrene_data
|
||
toolRegistry.Register(tools.NewFileTool(dataDir))
|
||
|
||
if iotClient != nil {
|
||
toolRegistry.Register(tools.NewIoTQueryTool(iotClient))
|
||
toolRegistry.Register(tools.NewIoTControlTool(iotClient))
|
||
}
|
||
|
||
// Phase 6.2: 主机操控工具
|
||
if hostManager != nil {
|
||
toolRegistry.Register(tools.NewHostExecTool(hostManager))
|
||
toolRegistry.Register(tools.NewHostFileTool(hostManager))
|
||
toolRegistry.Register(tools.NewHostSystemTool(hostManager))
|
||
}
|
||
|
||
// Phase 6.3: 视觉理解工具
|
||
toolRegistry.Register(tools.NewVisionTool())
|
||
|
||
// Phase 6.6: 知识库 RAG 工具
|
||
if knowledgeRetriever != nil {
|
||
toolRegistry.Register(tools.NewKnowledgeSearchTool(knowledgeRetriever))
|
||
toolRegistry.Register(tools.NewKnowledgeIngestTool(knowledgeStore))
|
||
}
|
||
log.Printf("工具注册中心已就绪: %d 个工具 (%v)", len(toolRegistry.ListTools()), toolRegistry.ListTools())
|
||
}
|
||
|
||
// 初始化后台思考器(增强版:支持工具调用和记忆管理)
|
||
thinkerCfg := background.DefaultThinkerConfig()
|
||
adminUserID := "admin"
|
||
adminSessionID := "admin-session-main"
|
||
|
||
// 创建记忆服务 HTTP 客户端(用于持久化思考日志到 memory-service)
|
||
memServiceURL := getEnv("MEMORY_SERVICE_URL", "http://localhost:8091")
|
||
memClient := memory.NewClient(memServiceURL)
|
||
log.Printf("记忆服务客户端已就绪: %s", memServiceURL)
|
||
|
||
thinker := background.NewThinker(
|
||
thinkerCfg,
|
||
personaLoader,
|
||
memRetriever,
|
||
thinkerAdapter,
|
||
iotClient,
|
||
memStore,
|
||
toolRegistry,
|
||
convStore,
|
||
adminUserID,
|
||
adminSessionID,
|
||
memClient,
|
||
)
|
||
thinker.Start()
|
||
defer thinker.Stop()
|
||
|
||
// 设置主动消息推送回调(调用 Gateway 内部 API)
|
||
gatewayURL := getEnv("GATEWAY_URL", "http://localhost:8080")
|
||
internalToken := os.Getenv("INTERNAL_SERVICE_TOKEN")
|
||
if internalToken != "" {
|
||
proactiveClient := &http.Client{Timeout: 5 * time.Second}
|
||
thinker.SetMessagePusher(func(userID, sessionID, message string) {
|
||
reqBody, _ := json.Marshal(map[string]string{
|
||
"user_id": userID,
|
||
"session_id": sessionID,
|
||
"content": message,
|
||
})
|
||
req, _ := http.NewRequest("POST", gatewayURL+"/api/v1/internal/proactive-message", strings.NewReader(string(reqBody)))
|
||
req.Header.Set("Content-Type", "application/json")
|
||
req.Header.Set("X-Internal-Token", internalToken)
|
||
resp, err := proactiveClient.Do(req)
|
||
if err != nil {
|
||
log.Printf("[主动消息] 推送失败: %v", err)
|
||
return
|
||
}
|
||
resp.Body.Close()
|
||
if resp.StatusCode == http.StatusOK {
|
||
log.Printf("[主动消息] 已推送到 Gateway: user=%s, len=%d", userID, len(message))
|
||
} else {
|
||
log.Printf("[主动消息] Gateway 返回 %d", resp.StatusCode)
|
||
}
|
||
})
|
||
log.Printf("[主动消息] 推送已启用 (Gateway=%s)", gatewayURL)
|
||
} else {
|
||
log.Println("[主动消息] 未配置 INTERNAL_SERVICE_TOKEN,主动消息推送已禁用")
|
||
}
|
||
|
||
// 健康检查与对话API的HTTP mux
|
||
mux := http.NewServeMux()
|
||
|
||
// 初始化子会话管理器
|
||
subManager := subsession.NewManager(chatAdapter)
|
||
|
||
// 注册子会话提供者
|
||
subManager.Register(subsession.NewGeneralProvider(personaLoader))
|
||
if memRetriever != nil {
|
||
subManager.Register(subsession.NewMemoryProvider(memRetriever))
|
||
}
|
||
if iotClient != nil {
|
||
subManager.Register(subsession.NewIoTProvider(iotClient, personaDir))
|
||
}
|
||
subManager.Register(subsession.NewReviewProvider())
|
||
if knowledgeRetriever != nil {
|
||
subManager.Register(subsession.NewKnowledgeProvider(knowledgeRetriever))
|
||
}
|
||
log.Printf("子会话管理器已就绪: %d 个提供者 (%v)", len(subManager.ListProviders()), subManager.ListProviders())
|
||
|
||
// 构建新的 Orchestrator (v2.0) — 传入 purpose 专用适配器
|
||
orch := orchestrator.NewOrchestrator(
|
||
personaLoader,
|
||
ctxBuilder,
|
||
chatAdapter,
|
||
intentAdapter,
|
||
subManager,
|
||
memRetriever,
|
||
memExtractor,
|
||
)
|
||
log.Println("对话编排器 v2.0 已就绪")
|
||
_ = orch
|
||
|
||
// 注册对话API端点
|
||
mux.HandleFunc("/api/v1/chat", func(w http.ResponseWriter, r *http.Request) {
|
||
handleChat(w, r, orch, ctxBuilder, personaLoader, memRetriever, memExtractor, iotClient, thinker, toolRegistry)
|
||
})
|
||
|
||
// 注册记忆API端点
|
||
mux.HandleFunc("/api/v1/memory/search", func(w http.ResponseWriter, r *http.Request) {
|
||
handleMemorySearch(w, r, memRetriever)
|
||
})
|
||
mux.HandleFunc("/api/v1/memory", func(w http.ResponseWriter, r *http.Request) {
|
||
handleMemoryCRUD(w, r, memStore, memExtractor)
|
||
})
|
||
|
||
// Phase 2: 在线状态通知端点 (Gateway -> ai-core)
|
||
mux.HandleFunc("/api/v1/internal/presence", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
if r.Header.Get("X-Internal-Token") != os.Getenv("INTERNAL_SERVICE_TOKEN") {
|
||
w.WriteHeader(http.StatusUnauthorized)
|
||
return
|
||
}
|
||
var req struct {
|
||
UserID string `json:"user_id"`
|
||
Status string `json:"status"`
|
||
SessionID string `json:"session_id"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
w.WriteHeader(http.StatusBadRequest)
|
||
return
|
||
}
|
||
online := req.Status == "online"
|
||
thinker.UpdatePresence(online, req.SessionID)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.Write([]byte(`{"status":"ok"}`))
|
||
})
|
||
|
||
mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.Write([]byte(`{"status":"ok","service":"ai-core","model":"` + chatAdapter.ModelName() + `"}`))
|
||
})
|
||
|
||
// 启动HTTP服务
|
||
srv := &http.Server{
|
||
Addr: ":" + cfg.Port,
|
||
Handler: mux,
|
||
}
|
||
|
||
go func() {
|
||
log.Printf("🚀 AI-Core 服务已启动在端口 %s", cfg.Port)
|
||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||
log.Fatalf("服务启动失败: %v", err)
|
||
}
|
||
}()
|
||
|
||
// 优雅关闭
|
||
quit := make(chan os.Signal, 1)
|
||
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||
<-quit
|
||
log.Println("正在关闭 AI-Core 服务...")
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
srv.Shutdown(ctx)
|
||
log.Println("AI-Core 服务已关闭")
|
||
}
|
||
|
||
// Config AI-Core配置
|
||
type Config struct {
|
||
Port string
|
||
PersonaDir string
|
||
LLMBaseURL string
|
||
LLMAPIKey string
|
||
LLMModel string
|
||
LLMFallbackModel string
|
||
DatabaseURL string
|
||
IoTServiceURL string
|
||
AdminNickname string // 昔涟对管理员用户的基本称呼
|
||
}
|
||
|
||
func loadConfig() Config {
|
||
return Config{
|
||
Port: getEnv("AI_CORE_PORT", "8081"),
|
||
PersonaDir: getEnv("PERSONA_DIR", "./internal/persona"),
|
||
LLMBaseURL: getEnv("LLM_API_URL", "https://api.openai.com/v1"),
|
||
LLMAPIKey: getEnv("LLM_API_KEY", ""),
|
||
LLMModel: getEnv("LLM_MODEL", "gpt-4o"),
|
||
LLMFallbackModel: getEnv("LLM_FALLBACK_MODEL", "gpt-4o-mini"),
|
||
DatabaseURL: buildDatabaseURL(),
|
||
IoTServiceURL: getEnvWithFallback("IOT_SERVICE_URL", "IOT_DEBUG_SERVICE_URL", ""),
|
||
AdminNickname: getEnv("ADMIN_NICKNAME", "管理员"),
|
||
}
|
||
}
|
||
|
||
func buildDatabaseURL() string {
|
||
host := getEnv("POSTGRES_HOST", "localhost")
|
||
port := getEnv("POSTGRES_PORT", "5432")
|
||
user := getEnv("POSTGRES_USER", "cyrene")
|
||
password := getEnv("POSTGRES_PASSWORD", "cyrene_pass")
|
||
dbname := getEnv("POSTGRES_DB", "cyrene_ai")
|
||
sslmode := getEnv("POSTGRES_SSLMODE", "disable")
|
||
|
||
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s",
|
||
user, password, host, port, dbname, sslmode)
|
||
}
|
||
|
||
func getEnv(key, fallback string) string {
|
||
if v := os.Getenv(key); v != "" {
|
||
return v
|
||
}
|
||
return fallback
|
||
}
|
||
|
||
// getEnvWithFallback 获取环境变量,优先使用 primaryKey,如果为空则回退到 fallbackKey
|
||
func getEnvWithFallback(primaryKey, fallbackKey, defaultVal string) string {
|
||
if v := os.Getenv(primaryKey); v != "" {
|
||
return v
|
||
}
|
||
if v := os.Getenv(fallbackKey); v != "" {
|
||
return v
|
||
}
|
||
return defaultVal
|
||
}
|
||
|
||
func getEnvBool(key string, fallback bool) bool {
|
||
v := os.Getenv(key)
|
||
if v == "" {
|
||
return fallback
|
||
}
|
||
switch strings.ToLower(v) {
|
||
case "true", "1", "yes", "on":
|
||
return true
|
||
case "false", "0", "no", "off":
|
||
return false
|
||
default:
|
||
return fallback
|
||
}
|
||
}
|
||
|
||
// buildOpenAITools 将工具注册中心的定义转换为 LLM 层的 OpenAITool 格式
|
||
func buildOpenAITools(registry *tools.Registry) []llm.OpenAITool {
|
||
if registry == nil || !registry.IsEnabled() {
|
||
return nil
|
||
}
|
||
defs := registry.GetDefinitions()
|
||
if len(defs) == 0 {
|
||
return nil
|
||
}
|
||
result := make([]llm.OpenAITool, 0, len(defs))
|
||
for _, d := range defs {
|
||
result = append(result, llm.OpenAITool{
|
||
Type: "function",
|
||
Function: llm.OpenAIToolFunc{
|
||
Name: d.Name,
|
||
Description: d.Description,
|
||
Parameters: d.Parameters,
|
||
},
|
||
})
|
||
}
|
||
return result
|
||
}
|
||
|
||
// handleChat 处理对话请求(SSE 流式响应)— 使用新 Orchestrator v2.0
|
||
func handleChat(
|
||
w http.ResponseWriter,
|
||
r *http.Request,
|
||
orch *orchestrator.Orchestrator,
|
||
ctxBuilder *ctxbuild.Builder,
|
||
_ *persona.Loader,
|
||
_ *memory.Retriever,
|
||
_ *memory.Extractor,
|
||
iotClient *tools.IoTClient,
|
||
thinker *background.Thinker,
|
||
_ *tools.Registry,
|
||
) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
|
||
// 解析请求
|
||
var req struct {
|
||
UserID string `json:"user_id"`
|
||
SessionID string `json:"session_id"`
|
||
Message string `json:"message"`
|
||
Images []string `json:"images,omitempty"` // 图片 base64 data URL
|
||
Mode string `json:"mode"`
|
||
Nickname string `json:"nickname,omitempty"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
http.Error(w, "无效的请求体", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
if req.Mode == "" {
|
||
req.Mode = "text"
|
||
}
|
||
|
||
ctx := r.Context()
|
||
|
||
// 0. 记录用户活动(重置闲置计时器)
|
||
if thinker != nil {
|
||
thinker.RecordUserMessage(req.SessionID)
|
||
}
|
||
|
||
// 确定用户昵称
|
||
userNickname := req.Nickname
|
||
if userNickname == "" {
|
||
userNickname = cfg.AdminNickname
|
||
}
|
||
|
||
// 0.1 缓存用户消息到会话历史
|
||
ctxBuilder.CacheMessage(req.SessionID, model.RoleUser, req.Message)
|
||
|
||
// 1. 设置 SSE 响应头
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.Header().Set("Connection", "keep-alive")
|
||
w.Header().Set("X-Accel-Buffering", "no")
|
||
|
||
flusher, ok := w.(http.Flusher)
|
||
if !ok {
|
||
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
|
||
// 2. 调用 Orchestrator 处理(替代原有的线性处理流程)
|
||
// Orchestrator 内部处理:意图分析 → 子会话分派 → 结果汇总 → 综合生成回复
|
||
eventCh, err := orch.ProcessInput(ctx, orchestrator.ProcessParams{
|
||
UserID: req.UserID,
|
||
SessionID: req.SessionID,
|
||
Message: req.Message,
|
||
Images: req.Images,
|
||
Mode: req.Mode,
|
||
Nickname: userNickname,
|
||
})
|
||
if err != nil {
|
||
errData, _ := json.Marshal(map[string]string{"delta": "", "error": fmt.Sprintf("处理失败: %v", err)})
|
||
fmt.Fprintf(w, "data: %s\n\n", errData)
|
||
flusher.Flush()
|
||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||
flusher.Flush()
|
||
return
|
||
}
|
||
|
||
messageID := fmt.Sprintf("msg-%d", time.Now().UnixNano())
|
||
|
||
// 3. 流式输出 SSE
|
||
var fullContent string
|
||
for event := range eventCh {
|
||
switch event.Type {
|
||
case model.StreamError:
|
||
log.Printf("[chat] 流式错误: %v", event.Error)
|
||
errData, _ := json.Marshal(map[string]string{"delta": "", "error": event.Error.Error()})
|
||
fmt.Fprintf(w, "data: %s\n\n", errData)
|
||
flusher.Flush()
|
||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||
flusher.Flush()
|
||
return
|
||
|
||
case model.StreamDelta:
|
||
fullContent += event.Delta
|
||
deltaData, _ := json.Marshal(map[string]string{
|
||
"delta": event.Delta,
|
||
"message_id": messageID,
|
||
})
|
||
fmt.Fprintf(w, "data: %s\n\n", deltaData)
|
||
flusher.Flush()
|
||
|
||
case model.StreamSegments:
|
||
// 发送断句信息
|
||
segData, _ := json.Marshal(map[string]interface{}{
|
||
"message_id": messageID,
|
||
"mode": req.Mode,
|
||
"segments": event.Segments,
|
||
})
|
||
fmt.Fprintf(w, "data: %s\n\n", segData)
|
||
flusher.Flush()
|
||
|
||
case model.StreamReview:
|
||
// 发送审查后的结构化消息(动作消息 + 聊天消息)
|
||
reviewData, _ := json.Marshal(map[string]interface{}{
|
||
"message_id": messageID,
|
||
"review_messages": event.ReviewMessages,
|
||
})
|
||
fmt.Fprintf(w, "data: %s\n\n", reviewData)
|
||
flusher.Flush()
|
||
|
||
case model.StreamDone:
|
||
// 下发结束标记
|
||
endData, _ := json.Marshal(map[string]interface{}{
|
||
"message_id": messageID,
|
||
"mode": req.Mode,
|
||
"done": true,
|
||
})
|
||
fmt.Fprintf(w, "data: %s\n\n", endData)
|
||
flusher.Flush()
|
||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||
flusher.Flush()
|
||
}
|
||
}
|
||
|
||
// 4. 对话完成后触发昔涟的自主思考(事件驱动,非定时)
|
||
if thinker != nil {
|
||
thinker.TriggerPostChatThink()
|
||
}
|
||
|
||
}
|
||
|
||
|
||
// handleMemorySearch 处理记忆搜索请求
|
||
func handleMemorySearch(
|
||
w http.ResponseWriter,
|
||
r *http.Request,
|
||
memRetriever *memory.Retriever,
|
||
) {
|
||
if r.Method != http.MethodGet {
|
||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
|
||
userID := r.URL.Query().Get("user_id")
|
||
if userID == "" {
|
||
http.Error(w, "缺少 user_id 参数", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
query := r.URL.Query().Get("q")
|
||
if query == "" {
|
||
http.Error(w, "缺少 q 参数", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
if memRetriever == nil {
|
||
log.Printf("[memory] 记忆检索器未初始化: 数据库不可用")
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"user_id": userID,
|
||
"query": query,
|
||
"memories": []interface{}{},
|
||
"error": "记忆系统未就绪",
|
||
"errorType": "memory_store_unavailable",
|
||
"hint": "PostgreSQL 数据库不可用,请检查数据库连接配置",
|
||
})
|
||
return
|
||
}
|
||
|
||
ctx := r.Context()
|
||
memories, err := memRetriever.Retrieve(ctx, userID, query)
|
||
if err != nil {
|
||
log.Printf("[memory] 检索失败: %v", err)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"user_id": userID,
|
||
"query": query,
|
||
"memories": []interface{}{},
|
||
"error": fmt.Sprintf("检索失败: %v", err),
|
||
"errorType": "retrieve_failed",
|
||
})
|
||
return
|
||
}
|
||
|
||
if memories == nil {
|
||
memories = []memory.MemoryEntry{}
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"user_id": userID,
|
||
"query": query,
|
||
"memories": memories,
|
||
"total": len(memories),
|
||
})
|
||
}
|
||
|
||
// handleMemoryCRUD 处理记忆的 CRUD 操作
|
||
func handleMemoryCRUD(
|
||
w http.ResponseWriter,
|
||
r *http.Request,
|
||
memStore *memory.Store,
|
||
memExtractor *memory.Extractor,
|
||
) {
|
||
switch r.Method {
|
||
case http.MethodGet:
|
||
// 列出用户的所有记忆
|
||
userID := r.URL.Query().Get("user_id")
|
||
if userID == "" {
|
||
http.Error(w, "缺少 user_id 参数", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
if memStore == nil {
|
||
log.Printf("[memory] 记忆存储未初始化: 数据库不可用")
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"user_id": userID,
|
||
"memories": []interface{}{},
|
||
"error": "记忆系统未就绪",
|
||
"errorType": "memory_store_unavailable",
|
||
"hint": "PostgreSQL 数据库不可用,请检查数据库连接配置",
|
||
})
|
||
return
|
||
}
|
||
|
||
ctx := r.Context()
|
||
memories, err := memStore.Query(ctx, model.MemoryQuery{
|
||
UserID: userID,
|
||
Limit: 50,
|
||
})
|
||
if err != nil {
|
||
log.Printf("[memory] 查询失败: %v", err)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"user_id": userID,
|
||
"memories": []interface{}{},
|
||
"error": fmt.Sprintf("查询失败: %v", err),
|
||
"errorType": "query_failed",
|
||
})
|
||
return
|
||
}
|
||
|
||
if memories == nil {
|
||
memories = []model.MemoryEntry{}
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"user_id": userID,
|
||
"memories": memories,
|
||
"total": len(memories),
|
||
})
|
||
|
||
case http.MethodDelete:
|
||
// 删除单条记忆: DELETE /api/v1/memory?id=xxx
|
||
memoryID := r.URL.Query().Get("id")
|
||
if memoryID == "" {
|
||
http.Error(w, "缺少 id 参数", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
if memStore == nil {
|
||
log.Printf("[memory] 记忆存储未初始化: 无法删除")
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(http.StatusServiceUnavailable)
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"error": "记忆系统未就绪",
|
||
"errorType": "memory_store_unavailable",
|
||
"hint": "PostgreSQL 数据库不可用,请检查数据库连接配置",
|
||
})
|
||
return
|
||
}
|
||
|
||
ctx := r.Context()
|
||
if err := memStore.Delete(ctx, memoryID); err != nil {
|
||
log.Printf("[memory] 删除失败: %v", err)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(http.StatusInternalServerError)
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"error": fmt.Sprintf("删除失败: %v", err),
|
||
"errorType": "delete_failed",
|
||
})
|
||
return
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"status": "deleted",
|
||
"memory_id": memoryID,
|
||
})
|
||
|
||
case http.MethodPost:
|
||
// 手动添加记忆
|
||
var req struct {
|
||
UserID string `json:"user_id"`
|
||
Content string `json:"content"`
|
||
Category string `json:"category"`
|
||
Priority int `json:"priority"`
|
||
}
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
http.Error(w, "无效的请求体", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
if req.UserID == "" || req.Content == "" {
|
||
http.Error(w, "缺少 user_id 或 content", http.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
if req.Category == "" {
|
||
req.Category = "other"
|
||
}
|
||
if req.Priority <= 0 {
|
||
req.Priority = 1
|
||
}
|
||
|
||
if memStore == nil {
|
||
log.Printf("[memory] 记忆存储未初始化: 无法保存")
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(http.StatusServiceUnavailable)
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"error": "记忆系统未就绪",
|
||
"errorType": "memory_store_unavailable",
|
||
"hint": "PostgreSQL 数据库不可用,请检查数据库连接配置",
|
||
})
|
||
return
|
||
}
|
||
|
||
entry := &model.MemoryEntry{
|
||
UserID: req.UserID,
|
||
Content: req.Content,
|
||
Category: model.MemoryCategory(req.Category),
|
||
Priority: model.MemoryPriority(req.Priority),
|
||
}
|
||
|
||
ctx := r.Context()
|
||
if err := memStore.Save(ctx, entry); err != nil {
|
||
log.Printf("[memory] 保存失败: %v", err)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(http.StatusInternalServerError)
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"error": fmt.Sprintf("保存失败: %v", err),
|
||
"errorType": "save_failed",
|
||
})
|
||
return
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(http.StatusCreated)
|
||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||
"status": "saved",
|
||
"memory": entry,
|
||
})
|
||
|
||
default:
|
||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||
}
|
||
|
||
}
|