package main import ( "context" "encoding/json" "fmt" "log" "net/http" "os" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" "github.com/joho/godotenv" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/background" aiConfig "git.yeij.top/AskaEth/Cyrene/ai-core/internal/config" ctxbuild "git.yeij.top/AskaEth/Cyrene/ai-core/internal/context" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/host" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/llm" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/memory" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/model" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/orchestrator" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/persona" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/rag" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/subsession" "git.yeij.top/AskaEth/Cyrene/ai-core/internal/tools" plgManager "git.yeij.top/AskaEth/Cyrene/pkg/plugins/manager" plgSDK "git.yeij.top/AskaEth/Cyrene/pkg/plugins/sdk" pluginCalc "git.yeij.top/AskaEth/Cyrene/pkg/plugins/calculator" pluginCrypto "git.yeij.top/AskaEth/Cyrene/pkg/plugins/crypto" pluginDate "git.yeij.top/AskaEth/Cyrene/pkg/plugins/datetime" pluginFile "git.yeij.top/AskaEth/Cyrene/pkg/plugins/file" pluginHTTP "git.yeij.top/AskaEth/Cyrene/pkg/plugins/http" pluginJSON "git.yeij.top/AskaEth/Cyrene/pkg/plugins/json" pluginMD "git.yeij.top/AskaEth/Cyrene/pkg/plugins/markdown" pluginRand "git.yeij.top/AskaEth/Cyrene/pkg/plugins/random" pluginText "git.yeij.top/AskaEth/Cyrene/pkg/plugins/text" pluginWF "git.yeij.top/AskaEth/Cyrene/pkg/plugins/web_fetch" pluginWS "git.yeij.top/AskaEth/Cyrene/pkg/plugins/web_search" ) var cfg Config func main() { // 自动加载 .env 文件(优先从可执行文件位置反推仓库根目录) _ = godotenv.Load() // 先尝试当前目录 if exe, err := os.Executable(); err == nil { _ = godotenv.Load(filepath.Join(filepath.Dir(exe), "..", "..", ".env")) } // 兜底:如果 LLM_MODEL 仍未设置,打印提示 if os.Getenv("LLM_MODEL") == "" { log.Println("ℹ 未找到 .env 文件,将使用环境变量或默认值") } log.SetFlags(log.LstdFlags | log.Lshortfile) log.Println("🧠 AI-Core 服务启动中...") // 加载配置 cfg = loadConfig() // 初始化人格加载器 personaDir := cfg.PersonaDir if personaDir == "" { personaDir = "./internal/persona" } personaLoader, err := persona.NewLoader(personaDir) if err != nil { log.Fatalf("加载人格配置失败: %v", err) } log.Printf("已加载 %d 个人格: %v", len(personaLoader.List()), personaLoader.List()) // 初始化模型配置加载器 (Phase 6: 多模型配置) configPath := getEnv("MODELS_CONFIG_PATH", "../models.json") configLoader, err := aiConfig.NewLoader(configPath) if err != nil { log.Printf("⚠ 模型配置加载失败,回退到 .env: %v", err) configLoader = nil } // 构建 .env 回退配置 envFallback := llm.OpenAIConfig{ BaseURL: cfg.LLMBaseURL, APIKey: cfg.LLMAPIKey, Model: cfg.LLMModel, FallbackModel: cfg.LLMFallbackModel, MaxRetries: 3, Timeout: 180 * time.Second, } // 创建 ModelSelector (优先使用 models.json,回退到 .env) modelSelector := llm.NewModelSelector(configLoader, envFallback) // 为不同用途创建独立的适配器,支持目的路由 // 在 .env 回退模式下,所有适配器共享同一 provider chatAdapter := modelSelector.DefaultAdapter() provider, _ := modelSelector.Select(context.Background(), llm.PurposeIntentAnalysis) intentAdapter := llm.NewAdapter(provider) provider, _ = modelSelector.Select(context.Background(), llm.PurposeDeepThinking) thinkerAdapter := llm.NewAdapter(provider) provider, _ = modelSelector.Select(context.Background(), llm.PurposeMemoryExtraction) memoryAdapter := llm.NewAdapter(provider) provider, _ = modelSelector.Select(context.Background(), llm.PurposeToolCalling) toolAdapter := llm.NewAdapter(provider) if configLoader != nil && configLoader.HasConfig() { log.Printf("LLM适配器已就绪: models.json 驱动 (chat=%s, intent=%s, think=%s, memory=%s, tool=%s)", chatAdapter.ModelName(), intentAdapter.ModelName(), thinkerAdapter.ModelName(), memoryAdapter.ModelName(), toolAdapter.ModelName()) } else { log.Printf("LLM适配器已就绪: .env 驱动 (模型=%s)", chatAdapter.ModelName()) } // 初始化记忆系统 var memStore *memory.Store var memRetriever *memory.Retriever var memExtractor *memory.Extractor if cfg.DatabaseURL != "" { memStore = memory.NewStore(cfg.DatabaseURL) defer memStore.Close() memRetriever = memory.NewRetriever(memStore, nil) // 记忆提取器使用 memory purpose 适配器 memExtractor = memory.NewExtractor(memStore, func(ctx context.Context, messages []model.LLMMessage) (*model.LLMResponse, error) { return memoryAdapter.Chat(ctx, messages) }) log.Println("记忆提取器已就绪") } // 初始化会话历史存储 convStore := ctxbuild.NewConversationStore(50) // 从数据库恢复主会话历史(避免重启丢失上下文) adminUserID := "admin" adminSessionID := "admin-session-main" if cfg.DatabaseURL != "" { convStore.SetDatabaseURL(cfg.DatabaseURL) if err := convStore.LoadFromDB(cfg.DatabaseURL, adminSessionID, 50); err != nil { log.Printf("⚠ 从数据库恢复会话历史失败(不影响服务启动): %v", err) } } log.Println("会话历史存储已就绪 (上限50条)") // 初始化上下文构建器 ctxBuilder := ctxbuild.NewBuilder(convStore) // 初始化 IoT 客户端 var iotClient *tools.IoTClient if cfg.IoTServiceURL != "" { iotClient = tools.NewIoTClient(cfg.IoTServiceURL) log.Printf("IoT 客户端已就绪: %s", cfg.IoTServiceURL) } else { log.Println("IoT 客户端未配置 (IOT_SERVICE_URL 和 IOT_DEBUG_SERVICE_URL 均为空)") } // 初始化主机操控管理器 (沙箱执行 + 文件系统隔离) hostSandbox := host.NewSandbox(host.DefaultSandboxConfig()) directBackend := host.NewDirectBackend(hostSandbox) hostManager := host.NewManager(directBackend) dataDir := getEnv("DATA_DIR", "/tmp/cyrene_data") hostManager.SetAllowedDirs([]string{dataDir, os.TempDir(), "."}) log.Printf("主机操控管理器已就绪: 沙箱执行 + 文件隔离 (数据目录=%s)", dataDir) // 初始化完整OS环境管理器 (WSL/Docker,无沙箱限制,供 os_* 工具使用) osManager := createOSManager() if osManager != nil { log.Printf("完整OS环境管理器已就绪: backend=%s", osManager.BackendName()) } else { log.Println("完整OS环境管理器未配置 (设置 HOST_EXEC_BACKEND=wsl 或 docker 以启用)") } // 初始化 RAG 知识库 (Phase 6.6: 知识库 RAG 增强) knowledgeDir := getEnv("KNOWLEDGE_DIR", "./data/knowledge") ragEmbedder := rag.NewEmbedder(cfg.LLMBaseURL, cfg.LLMAPIKey, "text-embedding-3-small") knowledgeStore := rag.NewKnowledgeStore(ragEmbedder, knowledgeDir) knowledgeRetriever := rag.NewRetriever(knowledgeStore) log.Printf("RAG 知识库已就绪: 目录=%s, 嵌入模型=text-embedding-3-small", knowledgeDir) // 初始化工具注册中心 (使用共享插件模块) toolRegistry := plgManager.NewToolRegistry() var visionProvider llm.LLMProvider var ocrProvider llm.LLMProvider if getEnvBool("ENABLE_TOOLS", true) { // 11 个共享通用插件 — 注册其工具到统一注册中心 registerPluginTools(toolRegistry, &pluginCalc.CalculatorPlugin{}) registerPluginTools(toolRegistry, &pluginDate.DatetimePlugin{}) registerPluginTools(toolRegistry, &pluginText.TextPlugin{}) registerPluginTools(toolRegistry, &pluginCrypto.CryptoPlugin{}) registerPluginTools(toolRegistry, &pluginRand.RandomPlugin{}) registerPluginTools(toolRegistry, &pluginMD.MarkdownPlugin{}) registerPluginTools(toolRegistry, &pluginJSON.JSONPlugin{}) registerPluginTools(toolRegistry, pluginFile.NewFilePlugin(dataDir)) registerPluginTools(toolRegistry, pluginHTTP.NewHTTPPlugin()) searxngURL := getEnv("SEARXNG_URL", "") if searxngURL != "" { registerPluginTools(toolRegistry, pluginWS.NewWebSearchPluginWithURL(searxngURL)) } else { registerPluginTools(toolRegistry, pluginWS.NewWebSearchPlugin()) } registerPluginTools(toolRegistry, pluginWF.NewWebFetchPlugin()) // ai-core 专属工具 — 通过 sdk.Tool 适配器注册 if iotClient != nil { toolRegistry.Register(wrapTool(tools.NewIoTQueryTool(iotClient), "iot_query", "Query IoT Devices", "iot")) toolRegistry.Register(wrapTool(tools.NewIoTControlTool(iotClient), "iot_control", "Control IoT Devices", "iot")) } if hostManager != nil { toolRegistry.Register(wrapTool(tools.NewHostExecTool(hostManager), "host_exec", "Host Command Execution", "system")) toolRegistry.Register(wrapTool(tools.NewHostFileTool(hostManager), "host_file", "Host File Operations", "system")) toolRegistry.Register(wrapTool(tools.NewHostSystemTool(hostManager), "host_system", "Host System Info", "system")) } if osManager != nil { toolRegistry.Register(wrapTool(tools.NewOSExecTool(osManager), "os_exec", "OS Command Execution", "system")) toolRegistry.Register(wrapTool(tools.NewOSFileTool(osManager), "os_file", "OS File Operations", "system")) toolRegistry.Register(wrapTool(tools.NewOSSystemTool(osManager), "os_system", "OS System Info", "system")) } visionProvider = nil if configLoader != nil && configLoader.HasConfig() { cfg := configLoader.GetConfig() if route, ok := cfg.Routing["vision"]; ok && len(route.FallbackChain) > 0 { for _, mid := range route.FallbackChain { if _, ok := cfg.Models[mid]; ok { visionProvider, _ = modelSelector.Select(context.Background(), llm.PurposeVision) log.Printf("视觉模型已启用: %s", visionProvider.ModelName()) break } } } } if visionProvider == nil { log.Println("视觉模型未配置,vision_analyze 将使用 base64 模式") } // 初始化 OCR 模型(与视觉模型并行调用,提供文字提取结果给会话模型自主判断) ocrProvider = nil if configLoader != nil && configLoader.HasConfig() { cfg := configLoader.GetConfig() if route, ok := cfg.Routing["ocr"]; ok && len(route.FallbackChain) > 0 { for _, mid := range route.FallbackChain { if _, ok := cfg.Models[mid]; ok { ocrProvider, _ = modelSelector.Select(context.Background(), llm.PurposeOCR) log.Printf("OCR模型已启用: %s", ocrProvider.ModelName()) break } } } } if ocrProvider == nil { log.Println("OCR模型未配置,图片文字提取将复用视觉模型") } toolRegistry.Register(wrapTool(tools.NewVisionTool(visionProvider), "vision_analyze", "Image Vision Analysis & OCR", "multimodal")) if knowledgeRetriever != nil { toolRegistry.Register(wrapTool(tools.NewKnowledgeSearchTool(knowledgeRetriever), "knowledge_search", "Search Knowledge Base", "knowledge")) toolRegistry.Register(wrapTool(tools.NewKnowledgeIngestTool(knowledgeStore), "knowledge_ingest", "Ingest Knowledge Document", "knowledge")) } log.Printf("工具注册中心已就绪: %d 个工具 (%v)", len(toolRegistry.DefinitionNames()), toolRegistry.DefinitionNames()) } // 初始化后台思考器(增强版:支持工具调用和记忆管理) thinkerCfg := background.DefaultThinkerConfig() // 创建记忆服务 HTTP 客户端(用于持久化思考日志到 memory-service) memServiceURL := getEnv("MEMORY_SERVICE_URL", "http://localhost:8091") memClient := memory.NewClient(memServiceURL) log.Printf("记忆服务客户端已就绪: %s", memServiceURL) thinker := background.NewThinker( thinkerCfg, personaLoader, memRetriever, thinkerAdapter, toolAdapter, iotClient, memStore, toolRegistry, convStore, adminUserID, adminSessionID, memClient, ) // 初始化动态调度加载器 (Phase: thinking-schedule) schedulePath := getEnv("THINKING_SCHEDULE_PATH", "../thinking_schedule.json") if scheduleLoader, err := background.NewScheduleLoader(schedulePath); err != nil { log.Printf("⚠ 思考调度配置加载失败,使用固定间隔: %v", err) } else if scheduleLoader.HasConfig() { thinker.SetScheduleLoader(scheduleLoader) log.Println("[后台思考] 动态调度已启用 (thinking_schedule.json)") } else { log.Println("[后台思考] 调度配置文件不存在,使用 .env 固定间隔") } thinker.Start() defer thinker.Stop() // 设置主动消息推送回调(调用 Gateway 内部 API) gatewayURL := getEnv("GATEWAY_URL", "http://localhost:8080") internalToken := os.Getenv("INTERNAL_SERVICE_TOKEN") if internalToken != "" { proactiveClient := &http.Client{Timeout: 5 * time.Second} thinker.SetMessagePusher(func(userID, sessionID, message string) { reqBody, _ := json.Marshal(map[string]string{ "user_id": userID, "session_id": sessionID, "content": message, }) req, _ := http.NewRequest("POST", gatewayURL+"/api/v1/internal/proactive-message", strings.NewReader(string(reqBody))) req.Header.Set("Content-Type", "application/json") req.Header.Set("X-Internal-Token", internalToken) resp, err := proactiveClient.Do(req) if err != nil { log.Printf("[主动消息] 推送失败: %v", err) return } resp.Body.Close() if resp.StatusCode == http.StatusOK { log.Printf("[主动消息] 已推送到 Gateway: user=%s, len=%d", userID, len(message)) } else { log.Printf("[主动消息] Gateway 返回 %d", resp.StatusCode) } }) log.Printf("[主动消息] 推送已启用 (Gateway=%s)", gatewayURL) } else { log.Println("[主动消息] 未配置 INTERNAL_SERVICE_TOKEN,主动消息推送已禁用") } // 健康检查与对话API的HTTP mux mux := http.NewServeMux() // 初始化子会话管理器 subManager := subsession.NewManager(chatAdapter) // 注册子会话提供者 subManager.Register(subsession.NewGeneralProvider(personaLoader)) if memRetriever != nil { memProvider := subsession.NewMemoryProvider(memRetriever) memProvider.SetFuzzySearch(memoryAdapter, memClient) subManager.Register(memProvider) } if iotClient != nil { subManager.Register(subsession.NewIoTProvider(iotClient, personaDir)) } subManager.Register(subsession.NewReviewProvider()) if knowledgeRetriever != nil { subManager.Register(subsession.NewKnowledgeProvider(knowledgeRetriever)) } log.Printf("子会话管理器已就绪: %d 个提供者 (%v)", len(subManager.ListProviders()), subManager.ListProviders()) // 构建新的 Orchestrator (v2.0) — 传入 purpose 专用适配器 orch := orchestrator.NewOrchestrator( personaLoader, ctxBuilder, chatAdapter, intentAdapter, subManager, memRetriever, memExtractor, ) orch.SetToolRegistry(toolRegistry) if visionProvider != nil { orch.SetVisionProvider(visionProvider) log.Printf("对话编排器: 视觉模型已注入 (%s)", visionProvider.ModelName()) } if ocrProvider != nil { orch.SetOCRProvider(ocrProvider) log.Printf("对话编排器: OCR模型已注入 (%s)", ocrProvider.ModelName()) } log.Println("对话编排器 v2.0 已就绪") _ = orch // 注册对话API端点 mux.HandleFunc("/api/v1/chat", func(w http.ResponseWriter, r *http.Request) { handleChat(w, r, orch, ctxBuilder, personaLoader, memRetriever, memExtractor, iotClient, thinker, toolRegistry) }) // 注册记忆API端点 mux.HandleFunc("/api/v1/memory/search", func(w http.ResponseWriter, r *http.Request) { handleMemorySearch(w, r, memRetriever) }) mux.HandleFunc("/api/v1/memory", func(w http.ResponseWriter, r *http.Request) { handleMemoryCRUD(w, r, memStore, memExtractor) }) // Phase 2: 在线状态通知端点 (Gateway -> ai-core) mux.HandleFunc("/api/v1/internal/presence", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } if r.Header.Get("X-Internal-Token") != os.Getenv("INTERNAL_SERVICE_TOKEN") { w.WriteHeader(http.StatusUnauthorized) return } var req struct { UserID string `json:"user_id"` Status string `json:"status"` SessionID string `json:"session_id"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { w.WriteHeader(http.StatusBadRequest) return } online := req.Status == "online" thinker.UpdatePresence(online, req.SessionID) w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"status":"ok"}`)) }) mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"status":"ok","service":"ai-core","model":"` + chatAdapter.ModelName() + `"}`)) }) // LLM 调用日志(调试用) mux.HandleFunc("/api/v1/llm-calls", func(w http.ResponseWriter, r *http.Request) { limit := 50 if n, err := fmt.Sscanf(r.URL.Query().Get("limit"), "%d", &limit); n != 1 || err != nil || limit <= 0 { limit = 50 } if limit > 500 { limit = 500 } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "calls": llm.GetCalls(limit), "total": len(llm.GetCalls(0)), }) }) // LLM 调用 SSE 实时推送 mux.HandleFunc("/api/v1/llm-calls/stream", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } ch, done := llm.SubscribeCalls() defer llm.UnsubscribeCalls(ch) for { select { case rec := <-ch: data, _ := json.Marshal(rec) fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() case <-done: return case <-r.Context().Done(): return } } }) // 工具调用记录 mux.HandleFunc("/api/v1/tools/calls", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } toolName := r.URL.Query().Get("tool_name") limit := 50 if n, err := fmt.Sscanf(r.URL.Query().Get("limit"), "%d", &limit); n != 1 || err != nil || limit <= 0 { limit = 50 } if limit > 500 { limit = 500 } page, _ := strconv.Atoi(r.URL.Query().Get("page")) if page < 1 { page = 1 } offset := (page - 1) * limit calls, total := toolRegistry.GetCallLogs(toolName, limit, offset) if calls == nil { calls = []plgManager.CallLogRecord{} } totalPages := total / limit if total%limit != 0 { totalPages++ } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "calls": calls, "total": total, "total_pages": totalPages, "page": page, "limit": limit, }) }) // 工具调用统计 mux.HandleFunc("/api/v1/tools/calls/stats", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(toolRegistry.GetCallStats()) }) // OS 环境监控端点 mux.HandleFunc("/api/v1/system/info", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } result := map[string]interface{}{ "os_enabled": osManager != nil, } if osManager != nil { result["backend"] = osManager.BackendName() result["system"] = osManager.SystemInfo() if disk, err := osManager.DiskUsage("/"); err == nil { result["disk"] = disk } } if hostManager != nil { result["host"] = map[string]interface{}{ "backend": hostManager.BackendName(), "system": hostManager.SystemInfo(), } } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) }) // 启动HTTP服务 srv := &http.Server{ Addr: ":" + cfg.Port, Handler: mux, } go func() { log.Printf("🚀 AI-Core 服务已启动在端口 %s", cfg.Port) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatalf("服务启动失败: %v", err) } }() // 优雅关闭 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit log.Println("正在关闭 AI-Core 服务...") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() srv.Shutdown(ctx) log.Println("AI-Core 服务已关闭") } // Config AI-Core配置 type Config struct { Port string PersonaDir string LLMBaseURL string LLMAPIKey string LLMModel string LLMFallbackModel string DatabaseURL string IoTServiceURL string AdminNickname string // 昔涟对管理员用户的基本称呼 } func loadConfig() Config { return Config{ Port: getEnv("AI_CORE_PORT", "8081"), PersonaDir: getEnv("PERSONA_DIR", "./internal/persona"), LLMBaseURL: getEnv("LLM_API_URL", "https://api.openai.com/v1"), LLMAPIKey: getEnv("LLM_API_KEY", ""), LLMModel: getEnv("LLM_MODEL", "gpt-4o"), LLMFallbackModel: getEnv("LLM_FALLBACK_MODEL", "gpt-4o-mini"), DatabaseURL: buildDatabaseURL(), IoTServiceURL: getEnvWithFallback("IOT_SERVICE_URL", "IOT_DEBUG_SERVICE_URL", ""), AdminNickname: getEnv("ADMIN_NICKNAME", "管理员"), } } func buildDatabaseURL() string { host := getEnv("POSTGRES_HOST", "localhost") port := getEnv("POSTGRES_PORT", "5432") user := getEnv("POSTGRES_USER", "cyrene") password := getEnv("POSTGRES_PASSWORD", "cyrene_pass") dbname := getEnv("POSTGRES_DB", "cyrene_ai") sslmode := getEnv("POSTGRES_SSLMODE", "disable") return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s", user, password, host, port, dbname, sslmode) } func getEnv(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback } // getEnvWithFallback 获取环境变量,优先使用 primaryKey,如果为空则回退到 fallbackKey func getEnvWithFallback(primaryKey, fallbackKey, defaultVal string) string { if v := os.Getenv(primaryKey); v != "" { return v } if v := os.Getenv(fallbackKey); v != "" { return v } return defaultVal } func getEnvBool(key string, fallback bool) bool { v := os.Getenv(key) if v == "" { return fallback } switch strings.ToLower(v) { case "true", "1", "yes", "on": return true case "false", "0", "no", "off": return false default: return fallback } } // createOSManager 根据 HOST_EXEC_BACKEND 环境变量创建完整OS环境管理器。 // 支持 "wsl" 和 "docker" 两种后端。返回 nil 表示未配置或配置无效。 func createOSManager() *host.Manager { backend := strings.ToLower(os.Getenv("HOST_EXEC_BACKEND")) switch backend { case "wsl": distro := getEnv("WSL_DISTRO", "Ubuntu-22.04") username := getEnv("WSL_USER", "cyrene") password := os.Getenv("WSL_USER_PASSWORD") maxTimeout := time.Duration(getEnvInt("HOST_EXEC_MAX_TIMEOUT", 300)) * time.Second wslBackend := host.NewWSLBackend(distro, username, password, maxTimeout) return host.NewManager(wslBackend) case "docker": container := getEnv("SANDBOX_CONTAINER", "cyrene-sandbox") image := getEnv("SANDBOX_IMAGE", "ubuntu:22.04") maxTimeout := time.Duration(getEnvInt("HOST_EXEC_MAX_TIMEOUT", 300)) * time.Second dockerBackend := host.NewDockerBackend(container, image, maxTimeout) return host.NewManager(dockerBackend) default: return nil } } // getEnvInt 获取整数类型的环境变量 func getEnvInt(key string, fallback int) int { v := os.Getenv(key) if v == "" { return fallback } n, err := strconv.Atoi(v) if err != nil || n <= 0 { return fallback } return n } // registerPluginTools 从插件实例注册其所有工具到注册中心 func registerPluginTools(registry *plgManager.ToolRegistry, plugin plgSDK.Plugin) { for _, t := range plugin.Tools() { if err := registry.Register(t); err != nil { log.Printf("⚠ 注册工具失败: %v", err) } } } // wrapTool 包装 ai-core 旧 ToolExecutor 为 sdk.Tool func wrapTool(executor tools.ToolExecutor, id, displayName, category string) plgSDK.Tool { return &toolAdapter{ executor: executor, def: plgSDK.ToolDefinition{ ID: id, Name: id, DisplayName: displayName, Description: executor.Definition().Description, Category: category, Complexity: plgSDK.ComplexitySimple, Parameters: executor.Definition().Parameters, }, } } type toolAdapter struct { plgSDK.BaseTool executor tools.ToolExecutor def plgSDK.ToolDefinition } func (a *toolAdapter) Definition() plgSDK.ToolDefinition { return a.def } func (a *toolAdapter) Execute(ctx context.Context, args map[string]interface{}) (*plgSDK.ToolResult, error) { result, err := a.executor.Execute(ctx, args) if err != nil { return nil, err } return &plgSDK.ToolResult{ ToolName: result.ToolName, Success: result.Success, Output: result.Data, Error: result.Error, }, nil } // buildOpenAITools 将工具注册中心的定义转换为 LLM 层的 OpenAITool 格式 func buildOpenAITools(registry *plgManager.ToolRegistry) []llm.OpenAITool { if registry == nil || !registry.IsEnabled() { return nil } defs := registry.Definitions() if len(defs) == 0 { return nil } result := make([]llm.OpenAITool, 0, len(defs)) for _, d := range defs { result = append(result, llm.OpenAITool{ Type: "function", Function: llm.OpenAIToolFunc{ Name: d.Name, Description: d.Description, Parameters: d.Parameters, }, }) } return result } // handleChat 处理对话请求(SSE 流式响应)— 使用新 Orchestrator v2.0 func handleChat( w http.ResponseWriter, r *http.Request, orch *orchestrator.Orchestrator, ctxBuilder *ctxbuild.Builder, _ *persona.Loader, _ *memory.Retriever, _ *memory.Extractor, iotClient *tools.IoTClient, thinker *background.Thinker, _ *plgManager.ToolRegistry, ) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 解析请求 var req struct { UserID string `json:"user_id"` SessionID string `json:"session_id"` Message string `json:"message"` Images []string `json:"images,omitempty"` // 图片 base64 data URL Mode string `json:"mode"` Nickname string `json:"nickname,omitempty"` Source struct { Platform string `json:"platform"` ChannelID string `json:"channel_id"` ChannelType string `json:"channel_type"` SenderName string `json:"sender_name"` OriginalUID string `json:"original_uid"` } `json:"source,omitempty"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "无效的请求体", http.StatusBadRequest) return } if req.Mode == "" { req.Mode = "text" } // 平台静默观察模式:只记录消息、提取记忆、触发后台思考,不生成回复。 if req.Mode == "platform_silent" { if thinker != nil { thinker.RecordUserMessage(req.SessionID) } ctxBuilder.CacheMessage(req.SessionID, model.RoleUser, req.Message) // 从观察到的群聊消息中提取记忆。 orch.ExtractMemoriesOnly(r.Context(), req.UserID, req.SessionID, req.Message) if thinker != nil { thinker.TriggerPostChatThink() } w.Header().Set("Content-Type", "application/json") w.Write([]byte(`{"status":"silent_processed"}`)) return } ctx := r.Context() // 0. 记录用户活动(重置闲置计时器) if thinker != nil { thinker.RecordUserMessage(req.SessionID) } // 确定用户昵称 userNickname := req.Nickname if userNickname == "" { userNickname = cfg.AdminNickname } // 1. 设置 SSE 响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } // 1.5 缓存用户消息到会话历史(在 Orchestrator 之前,确保顺序正确:user → assistant) ctxBuilder.CacheMessage(req.SessionID, model.RoleUser, req.Message) // 2. 调用 Orchestrator 处理(替代原有的线性处理流程) // Orchestrator 内部处理:意图分析 → 子会话分派 → 结果汇总 → 综合生成回复 eventCh, err := orch.ProcessInput(ctx, orchestrator.ProcessParams{ UserID: req.UserID, SessionID: req.SessionID, Message: req.Message, Images: req.Images, Mode: req.Mode, Nickname: userNickname, ChannelType: req.Source.ChannelType, }) if err != nil { errData, _ := json.Marshal(map[string]string{"delta": "", "error": fmt.Sprintf("处理失败: %v", err)}) fmt.Fprintf(w, "data: %s\n\n", errData) flusher.Flush() fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() return } messageID := fmt.Sprintf("msg-%d", time.Now().UnixNano()) // 3. 流式输出 SSE var fullContent string for event := range eventCh { switch event.Type { case model.StreamToolProgress: tp := event.ToolProgress progressData, _ := json.Marshal(map[string]interface{}{ "type": "tool_progress", "tool_name": tp.ToolName, "status": tp.Status, "progress": tp.Progress, "message": tp.Message, "message_id": messageID, }) fmt.Fprintf(w, "data: %s\n\n", progressData) flusher.Flush() case model.StreamError: log.Printf("[chat] 流式错误: %v", event.Error) errData, _ := json.Marshal(map[string]string{"delta": "", "error": event.Error.Error()}) fmt.Fprintf(w, "data: %s\n\n", errData) flusher.Flush() fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() return case model.StreamDelta: fullContent += event.Delta deltaData, _ := json.Marshal(map[string]string{ "delta": event.Delta, "message_id": messageID, }) fmt.Fprintf(w, "data: %s\n\n", deltaData) flusher.Flush() case model.StreamSegments: // 发送断句信息 segData, _ := json.Marshal(map[string]interface{}{ "message_id": messageID, "mode": req.Mode, "segments": event.Segments, }) fmt.Fprintf(w, "data: %s\n\n", segData) flusher.Flush() case model.StreamReview: // 发送审查后的结构化消息(动作消息 + 聊天消息) reviewData, _ := json.Marshal(map[string]interface{}{ "message_id": messageID, "review_messages": event.ReviewMessages, }) fmt.Fprintf(w, "data: %s\n\n", reviewData) flusher.Flush() case model.StreamDone: // 下发结束标记 endData, _ := json.Marshal(map[string]interface{}{ "message_id": messageID, "mode": req.Mode, "done": true, }) fmt.Fprintf(w, "data: %s\n\n", endData) flusher.Flush() fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() } } // 4. 对话完成后触发昔涟的自主思考(事件驱动,非定时) if thinker != nil { thinker.TriggerPostChatThink() } } // handleMemorySearch 处理记忆搜索请求 func handleMemorySearch( w http.ResponseWriter, r *http.Request, memRetriever *memory.Retriever, ) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } userID := r.URL.Query().Get("user_id") if userID == "" { http.Error(w, "缺少 user_id 参数", http.StatusBadRequest) return } query := r.URL.Query().Get("q") if query == "" { http.Error(w, "缺少 q 参数", http.StatusBadRequest) return } if memRetriever == nil { log.Printf("[memory] 记忆检索器未初始化: 数据库不可用") w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "user_id": userID, "query": query, "memories": []interface{}{}, "error": "记忆系统未就绪", "errorType": "memory_store_unavailable", "hint": "PostgreSQL 数据库不可用,请检查数据库连接配置", }) return } ctx := r.Context() memories, err := memRetriever.Retrieve(ctx, userID, query) if err != nil { log.Printf("[memory] 检索失败: %v", err) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "user_id": userID, "query": query, "memories": []interface{}{}, "error": fmt.Sprintf("检索失败: %v", err), "errorType": "retrieve_failed", }) return } if memories == nil { memories = []memory.MemoryEntry{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "user_id": userID, "query": query, "memories": memories, "total": len(memories), }) } // handleMemoryCRUD 处理记忆的 CRUD 操作 func handleMemoryCRUD( w http.ResponseWriter, r *http.Request, memStore *memory.Store, memExtractor *memory.Extractor, ) { switch r.Method { case http.MethodGet: // 列出用户的所有记忆 userID := r.URL.Query().Get("user_id") if userID == "" { http.Error(w, "缺少 user_id 参数", http.StatusBadRequest) return } if memStore == nil { log.Printf("[memory] 记忆存储未初始化: 数据库不可用") w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "user_id": userID, "memories": []interface{}{}, "error": "记忆系统未就绪", "errorType": "memory_store_unavailable", "hint": "PostgreSQL 数据库不可用,请检查数据库连接配置", }) return } ctx := r.Context() memories, err := memStore.Query(ctx, model.MemoryQuery{ UserID: userID, Limit: 50, }) if err != nil { log.Printf("[memory] 查询失败: %v", err) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "user_id": userID, "memories": []interface{}{}, "error": fmt.Sprintf("查询失败: %v", err), "errorType": "query_failed", }) return } if memories == nil { memories = []model.MemoryEntry{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "user_id": userID, "memories": memories, "total": len(memories), }) case http.MethodDelete: // 删除单条记忆: DELETE /api/v1/memory?id=xxx memoryID := r.URL.Query().Get("id") if memoryID == "" { http.Error(w, "缺少 id 参数", http.StatusBadRequest) return } if memStore == nil { log.Printf("[memory] 记忆存储未初始化: 无法删除") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusServiceUnavailable) json.NewEncoder(w).Encode(map[string]interface{}{ "error": "记忆系统未就绪", "errorType": "memory_store_unavailable", "hint": "PostgreSQL 数据库不可用,请检查数据库连接配置", }) return } ctx := r.Context() if err := memStore.Delete(ctx, memoryID); err != nil { log.Printf("[memory] 删除失败: %v", err) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(map[string]interface{}{ "error": fmt.Sprintf("删除失败: %v", err), "errorType": "delete_failed", }) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "deleted", "memory_id": memoryID, }) case http.MethodPost: // 手动添加记忆 var req struct { UserID string `json:"user_id"` Content string `json:"content"` Category string `json:"category"` Priority int `json:"priority"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { http.Error(w, "无效的请求体", http.StatusBadRequest) return } if req.UserID == "" || req.Content == "" { http.Error(w, "缺少 user_id 或 content", http.StatusBadRequest) return } if req.Category == "" { req.Category = "other" } if req.Priority <= 0 { req.Priority = 1 } if memStore == nil { log.Printf("[memory] 记忆存储未初始化: 无法保存") w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusServiceUnavailable) json.NewEncoder(w).Encode(map[string]interface{}{ "error": "记忆系统未就绪", "errorType": "memory_store_unavailable", "hint": "PostgreSQL 数据库不可用,请检查数据库连接配置", }) return } entry := &model.MemoryEntry{ UserID: req.UserID, Content: req.Content, Category: model.MemoryCategory(req.Category), Priority: model.MemoryPriority(req.Priority), } ctx := r.Context() if err := memStore.Save(ctx, entry); err != nil { log.Printf("[memory] 保存失败: %v", err) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(map[string]interface{}{ "error": fmt.Sprintf("保存失败: %v", err), "errorType": "save_failed", }) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusCreated) json.NewEncoder(w).Encode(map[string]interface{}{ "status": "saved", "memory": entry, }) default: http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } }