0717928496
## 调试日志
### 1. 插件管理器启动失败
- **症状**: DevTools 显示插件管理器一直"已停止",手动启动正常
- **排查**: 对比 process-manager.js 传入的环境变量 vs plugin-manager config.go 读取的变量
- **根因**: config.js 传入 PLUGIN_MANAGER_PORT=8094,但 config.go 读取 os.Getenv("PORT"),env 名不匹配。且 process.env 中 PORT 泄露时被误读为 9090,与 DevTools 端口冲突
- **修复**: config.js 将 PLUGIN_MANAGER_PORT → PORT,使 env 名与代码一致 (c3055f4)
### 2. 历史消息刷新后消失
- **症状**: 浏览器刷新后聊天历史清空
- **排查**: WebSocket history_response handler 中 if (msg.messages) 对空数组 [] 为 truthy
- **根因**: 后端返回空的 history_response (缓存为空) 时,空数组覆盖了 HTTP 已加载的消息
- **修复**: useWebSocket.ts 改为 if (msg.messages && msg.messages.length > 0),空数组走 else-if 分支仅打日志,不覆盖已有消息
### 3. Phase 6 多模型配置系统
- Gateway: ModelsConfigStore (JSON文件持久化) + Admin CRUD API (providers/models/routing)
- ai-core: ModelSelector 支持按 purpose 选择 + fallback_chain,无配置时回退 .env
- DevTools: 模型配置管理面板 (Providers/Models/Routing 三Tab)、在线模型查询代理、路由表单 checkbox 多选、关键词搜索过滤
- .gitignore: models.json + platform_configs.json
### 4. 多端客户端追踪
- Hub 新增 knownClients 映射 (clientID → KnownClient),在线/离线状态追踪
- 客户端备注持久化到 PostgreSQL
- DevTools 客户端管理面板
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
423 lines
12 KiB
Go
423 lines
12 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"fmt"
|
|
"github.com/yourname/cyrene-ai/pkg/logger"
|
|
"time"
|
|
|
|
_ "github.com/lib/pq"
|
|
)
|
|
|
|
// Session 会话模型
|
|
type Session struct {
|
|
ID string `json:"id"`
|
|
UserID string `json:"user_id"`
|
|
Title string `json:"title"`
|
|
IsMain bool `json:"is_main"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
// Message 消息模型
|
|
type Message struct {
|
|
ID int `json:"id"`
|
|
SessionID string `json:"session_id"`
|
|
Role string `json:"role"`
|
|
MsgType string `json:"msg_type"`
|
|
Content string `json:"content"`
|
|
ClientID string `json:"client_id"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// ClientRecord 客户端记录 (持久化)
|
|
type ClientRecord struct {
|
|
ClientID string `json:"client_id"`
|
|
UserID string `json:"user_id"`
|
|
DeviceName string `json:"device_name"`
|
|
UserAgent string `json:"user_agent"`
|
|
Note string `json:"note"`
|
|
FirstSeenAt time.Time `json:"first_seen_at"`
|
|
LastSeenAt time.Time `json:"last_seen_at"`
|
|
}
|
|
|
|
// SessionStore 会话持久化存储
|
|
type SessionStore struct {
|
|
db *sql.DB
|
|
}
|
|
|
|
// NewSessionStore 初始化数据库连接并自动建表
|
|
// 如果连接失败,返回 nil 和错误(调用方可以选择降级为仅内存模式)
|
|
func NewSessionStore(databaseURL string) (*SessionStore, error) {
|
|
db, err := sql.Open("postgres", databaseURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("无法打开数据库连接: %w", err)
|
|
}
|
|
|
|
// 配置连接池
|
|
db.SetMaxOpenConns(25)
|
|
db.SetMaxIdleConns(5)
|
|
db.SetConnMaxLifetime(5 * time.Minute)
|
|
|
|
// 验证连接
|
|
if err := db.Ping(); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("数据库连接验证失败: %w", err)
|
|
}
|
|
|
|
store := &SessionStore{db: db}
|
|
|
|
// 自动建表
|
|
if err := store.migrate(); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("数据库迁移失败: %w", err)
|
|
}
|
|
|
|
logger.Println("[SessionStore] PostgreSQL 持久化存储已初始化")
|
|
return store, nil
|
|
}
|
|
|
|
// migrate 自动创建表结构
|
|
func (s *SessionStore) migrate() error {
|
|
queries := []string{
|
|
`CREATE TABLE IF NOT EXISTS sessions (
|
|
id VARCHAR(64) PRIMARY KEY,
|
|
user_id VARCHAR(128) NOT NULL,
|
|
title VARCHAR(256) DEFAULT '',
|
|
is_main BOOLEAN DEFAULT FALSE,
|
|
created_at TIMESTAMP DEFAULT NOW(),
|
|
updated_at TIMESTAMP DEFAULT NOW()
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_sessions_user_id ON sessions(user_id)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_sessions_updated_at ON sessions(updated_at DESC)`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS messages (
|
|
id SERIAL PRIMARY KEY,
|
|
session_id VARCHAR(64) REFERENCES sessions(id) ON DELETE CASCADE,
|
|
role VARCHAR(16) NOT NULL,
|
|
msg_type VARCHAR(16) DEFAULT 'chat',
|
|
content TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT NOW()
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_messages_session_id ON messages(session_id)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(session_id, created_at)`,
|
|
|
|
// 为已存在的数据库添加 msg_type 列 (Phase 0.1)
|
|
`ALTER TABLE messages ADD COLUMN IF NOT EXISTS msg_type VARCHAR(16) DEFAULT 'chat'`,
|
|
// 为已存在的数据库添加 client_id 列 (Phase 5: 多端客户端追踪)
|
|
`ALTER TABLE messages ADD COLUMN IF NOT EXISTS client_id VARCHAR(128) DEFAULT ''`,
|
|
|
|
`CREATE TABLE IF NOT EXISTS clients (
|
|
client_id VARCHAR(128) PRIMARY KEY,
|
|
user_id VARCHAR(128) NOT NULL,
|
|
device_name VARCHAR(256) DEFAULT '',
|
|
user_agent VARCHAR(512) DEFAULT '',
|
|
note VARCHAR(256) DEFAULT '',
|
|
first_seen_at TIMESTAMP DEFAULT NOW(),
|
|
last_seen_at TIMESTAMP DEFAULT NOW()
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_clients_user_id ON clients(user_id)`,
|
|
}
|
|
|
|
for _, q := range queries {
|
|
if _, err := s.db.Exec(q); err != nil {
|
|
return fmt.Errorf("迁移SQL执行失败: %w\nSQL: %s", err, q)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CreateSession 创建新会话
|
|
func (s *SessionStore) CreateSession(userID, sessionID, title string, isMain bool) error {
|
|
now := time.Now()
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO sessions (id, user_id, title, is_main, created_at, updated_at)
|
|
VALUES ($1, $2, $3, $4, $5, $5)
|
|
ON CONFLICT (id) DO UPDATE SET updated_at = $5`,
|
|
sessionID, userID, title, isMain, now,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("创建会话失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetUserSessions 获取用户的所有会话(按 updated_at DESC 排序)
|
|
func (s *SessionStore) GetUserSessions(userID string) ([]Session, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT id, user_id, title, is_main, created_at, updated_at
|
|
FROM sessions WHERE user_id = $1
|
|
ORDER BY updated_at DESC`,
|
|
userID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("查询用户会话失败: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var sessions []Session
|
|
for rows.Next() {
|
|
var sess Session
|
|
if err := rows.Scan(&sess.ID, &sess.UserID, &sess.Title, &sess.IsMain, &sess.CreatedAt, &sess.UpdatedAt); err != nil {
|
|
return nil, fmt.Errorf("扫描会话行失败: %w", err)
|
|
}
|
|
sessions = append(sessions, sess)
|
|
}
|
|
|
|
if sessions == nil {
|
|
sessions = []Session{}
|
|
}
|
|
return sessions, rows.Err()
|
|
}
|
|
|
|
// GetSession 获取单个会话
|
|
func (s *SessionStore) GetSession(sessionID string) (*Session, error) {
|
|
var sess Session
|
|
err := s.db.QueryRow(
|
|
`SELECT id, user_id, title, is_main, created_at, updated_at
|
|
FROM sessions WHERE id = $1`,
|
|
sessionID,
|
|
).Scan(&sess.ID, &sess.UserID, &sess.Title, &sess.IsMain, &sess.CreatedAt, &sess.UpdatedAt)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("查询会话失败: %w", err)
|
|
}
|
|
return &sess, nil
|
|
}
|
|
|
|
// UpdateSessionTitle 更新会话标题
|
|
func (s *SessionStore) UpdateSessionTitle(sessionID, title string) error {
|
|
_, err := s.db.Exec(
|
|
`UPDATE sessions SET title = $1, updated_at = NOW() WHERE id = $2`,
|
|
title, sessionID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("更新会话标题失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateSessionTime 更新会话的 updated_at 时间戳
|
|
func (s *SessionStore) UpdateSessionTime(sessionID string) error {
|
|
_, err := s.db.Exec(
|
|
`UPDATE sessions SET updated_at = NOW() WHERE id = $1`,
|
|
sessionID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("更新会话时间失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteSession 删除会话(级联删除消息,但不删除记忆)
|
|
func (s *SessionStore) DeleteSession(sessionID string) error {
|
|
_, err := s.db.Exec(`DELETE FROM sessions WHERE id = $1`, sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("删除会话失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteAllUserSessions 删除用户的所有会话(但不删除记忆)
|
|
func (s *SessionStore) DeleteAllUserSessions(userID string) error {
|
|
_, err := s.db.Exec(`DELETE FROM sessions WHERE user_id = $1`, userID)
|
|
if err != nil {
|
|
return fmt.Errorf("删除用户所有会话失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AddMessage 添加一条消息到会话
|
|
func (s *SessionStore) AddMessage(sessionID, role, msgType, content, clientID string) error {
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO messages (session_id, role, msg_type, content, client_id) VALUES ($1, $2, $3, $4, $5)`,
|
|
sessionID, role, msgType, content, clientID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("添加消息失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetMessages 获取会话的消息列表(按时间正序,支持分页)
|
|
func (s *SessionStore) GetMessages(sessionID string, limit, offset int) ([]Message, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if offset < 0 {
|
|
offset = 0
|
|
}
|
|
|
|
rows, err := s.db.Query(
|
|
`SELECT id, session_id, role, COALESCE(msg_type, 'chat'), content, COALESCE(client_id, ''), created_at
|
|
FROM messages WHERE session_id = $1
|
|
ORDER BY created_at ASC
|
|
LIMIT $2 OFFSET $3`,
|
|
sessionID, limit, offset,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("查询消息失败: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var messages []Message
|
|
for rows.Next() {
|
|
var msg Message
|
|
if err := rows.Scan(&msg.ID, &msg.SessionID, &msg.Role, &msg.MsgType, &msg.Content, &msg.ClientID, &msg.CreatedAt); err != nil {
|
|
return nil, fmt.Errorf("扫描消息行失败: %w", err)
|
|
}
|
|
messages = append(messages, msg)
|
|
}
|
|
|
|
if messages == nil {
|
|
messages = []Message{}
|
|
}
|
|
return messages, rows.Err()
|
|
}
|
|
|
|
// ClearSessionMessages 清空会话的所有消息但不删除会话本身
|
|
func (s *SessionStore) ClearSessionMessages(sessionID string) error {
|
|
_, err := s.db.Exec(`DELETE FROM messages WHERE session_id = $1`, sessionID)
|
|
if err != nil {
|
|
return fmt.Errorf("清空会话消息失败: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SearchResult 搜索结果
|
|
type SearchResult struct {
|
|
MessageID int `json:"message_id"`
|
|
SessionID string `json:"session_id"`
|
|
SessionTitle string `json:"session_title"`
|
|
Role string `json:"role"`
|
|
MsgType string `json:"msg_type"`
|
|
Content string `json:"content"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
}
|
|
|
|
// SearchMessages 全文搜索消息 (使用 ILIKE 进行模糊匹配)
|
|
// 返回搜索结果列表、总数和可能的错误
|
|
func (s *SessionStore) SearchMessages(userID, query string, limit, offset int) ([]SearchResult, int, error) {
|
|
if limit <= 0 {
|
|
limit = 50
|
|
}
|
|
if offset < 0 {
|
|
offset = 0
|
|
}
|
|
|
|
// 获取匹配总数
|
|
var total int
|
|
countSQL := `SELECT COUNT(*) FROM messages m
|
|
JOIN sessions s ON m.session_id = s.id
|
|
WHERE s.user_id = $1 AND m.content ILIKE '%' || $2 || '%'`
|
|
if err := s.db.QueryRow(countSQL, userID, query).Scan(&total); err != nil {
|
|
return nil, 0, fmt.Errorf("搜索计数失败: %w", err)
|
|
}
|
|
|
|
// 分页查询,关联 sessions 获取会话标题
|
|
rows, err := s.db.Query(
|
|
`SELECT m.id, m.session_id, COALESCE(s.title, '') AS session_title, m.role, COALESCE(m.msg_type, 'chat'), m.content, m.created_at
|
|
FROM messages m
|
|
JOIN sessions s ON m.session_id = s.id
|
|
WHERE s.user_id = $1 AND m.content ILIKE '%' || $2 || '%'
|
|
ORDER BY m.created_at DESC
|
|
LIMIT $3 OFFSET $4`,
|
|
userID, query, limit, offset,
|
|
)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("搜索消息失败: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var results []SearchResult
|
|
for rows.Next() {
|
|
var r SearchResult
|
|
if err := rows.Scan(&r.MessageID, &r.SessionID, &r.SessionTitle, &r.Role, &r.MsgType, &r.Content, &r.CreatedAt); err != nil {
|
|
return nil, 0, fmt.Errorf("扫描搜索结果行失败: %w", err)
|
|
}
|
|
results = append(results, r)
|
|
}
|
|
|
|
if results == nil {
|
|
results = []SearchResult{}
|
|
}
|
|
return results, total, rows.Err()
|
|
}
|
|
|
|
// DB 返回底层数据库连接,供其他 store 复用
|
|
func (s *SessionStore) DB() *sql.DB {
|
|
return s.db
|
|
}
|
|
|
|
// Close 关闭数据库连接
|
|
func (s *SessionStore) Close() error {
|
|
if s.db != nil {
|
|
return s.db.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsAvailable 检查存储是否可用(数据库连接正常)
|
|
func (s *SessionStore) IsAvailable() bool {
|
|
if s.db == nil {
|
|
return false
|
|
}
|
|
return s.db.Ping() == nil
|
|
}
|
|
|
|
// ========== 多端客户端追踪 ==========
|
|
|
|
// UpsertClient inserts or updates a client record.
|
|
func (s *SessionStore) UpsertClient(clientID, userID, deviceName, userAgent string) error {
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO clients (client_id, user_id, device_name, user_agent, first_seen_at, last_seen_at)
|
|
VALUES ($1, $2, $3, $4, NOW(), NOW())
|
|
ON CONFLICT (client_id) DO UPDATE SET
|
|
device_name = EXCLUDED.device_name,
|
|
user_agent = EXCLUDED.user_agent,
|
|
last_seen_at = NOW()`,
|
|
clientID, userID, deviceName, userAgent,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert client failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetClients returns all known clients for a user.
|
|
func (s *SessionStore) GetClients(userID string) ([]ClientRecord, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT client_id, user_id, device_name, user_agent, note, first_seen_at, last_seen_at
|
|
FROM clients WHERE user_id = $1 ORDER BY last_seen_at DESC`,
|
|
userID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query clients failed: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []ClientRecord
|
|
for rows.Next() {
|
|
var cr ClientRecord
|
|
if err := rows.Scan(&cr.ClientID, &cr.UserID, &cr.DeviceName, &cr.UserAgent, &cr.Note, &cr.FirstSeenAt, &cr.LastSeenAt); err != nil {
|
|
return nil, fmt.Errorf("scan client row failed: %w", err)
|
|
}
|
|
result = append(result, cr)
|
|
}
|
|
if result == nil {
|
|
result = []ClientRecord{}
|
|
}
|
|
return result, rows.Err()
|
|
}
|
|
|
|
// UpdateClientNote sets the user-defined note for a client.
|
|
func (s *SessionStore) UpdateClientNote(clientID, note string) error {
|
|
_, err := s.db.Exec(`UPDATE clients SET note = $1 WHERE client_id = $2`, note, clientID)
|
|
if err != nil {
|
|
return fmt.Errorf("update client note failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|