fix: 第二轮修复 — 数据库启动检查、会话持久化、URL路由、设备排序等
1. DevTools 启动前检查数据库状态,失败时自动尝试启动 2. ai-core 添加数据库断线重连机制 (30秒间隔) 3. Dashboard 添加数据库状态卡片 (启动/停止/重启) 4. Gateway 会话空闲超时管理 (30分钟标记空闲) 5. 会话/消息 PostgreSQL 持久化 (SessionStore + REST API) 6. 前端服务端会话持久化 + URL hash 路由 + 侧边栏管理 7. 管理员回到主对话按钮 8. IoT 设备卡片固定排序 9. 更新相关文档
This commit is contained in:
@@ -64,21 +64,16 @@ func main() {
|
||||
var memExtractor *memory.Extractor
|
||||
|
||||
if cfg.DatabaseURL != "" {
|
||||
memStore, err = memory.NewStore(cfg.DatabaseURL)
|
||||
if err != nil {
|
||||
log.Printf("⚠ 记忆存储初始化失败 (将跳过记忆功能): %v", err)
|
||||
} else {
|
||||
defer memStore.Close()
|
||||
log.Println("记忆存储已就绪")
|
||||
memStore = memory.NewStore(cfg.DatabaseURL)
|
||||
defer memStore.Close()
|
||||
|
||||
memRetriever = memory.NewRetriever(memStore, nil)
|
||||
memRetriever = memory.NewRetriever(memStore, nil)
|
||||
|
||||
// 记忆提取器使用LLM
|
||||
memExtractor = memory.NewExtractor(memStore, func(ctx context.Context, messages []model.LLMMessage) (*model.LLMResponse, error) {
|
||||
return llmAdapter.Chat(ctx, messages)
|
||||
})
|
||||
log.Println("记忆提取器已就绪")
|
||||
}
|
||||
// 记忆提取器使用LLM
|
||||
memExtractor = memory.NewExtractor(memStore, func(ctx context.Context, messages []model.LLMMessage) (*model.LLMResponse, error) {
|
||||
return llmAdapter.Chat(ctx, messages)
|
||||
})
|
||||
log.Println("记忆提取器已就绪")
|
||||
}
|
||||
|
||||
// 初始化会话历史存储
|
||||
|
||||
@@ -4,22 +4,100 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/yourname/cyrene-ai/ai-core/internal/model"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
const reconnectInterval = 30 * time.Second
|
||||
|
||||
// Store 记忆持久化存储(PostgreSQL + pgvector)
|
||||
type Store struct {
|
||||
db *sql.DB
|
||||
databaseURL string
|
||||
mu sync.RWMutex
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// errDBNotReady 数据库未就绪时返回的友好错误
|
||||
var errDBNotReady = fmt.Errorf("记忆系统未就绪: 数据库连接不可用,正在后台重试连接")
|
||||
|
||||
// NewStore 创建记忆存储
|
||||
func NewStore(connStr string) (*Store, error) {
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
// 连接失败时不返回 error,而是启动后台重连循环
|
||||
func NewStore(connStr string) *Store {
|
||||
s := &Store{
|
||||
databaseURL: connStr,
|
||||
}
|
||||
|
||||
// 尝试初始连接
|
||||
if err := s.Reconnect(); err != nil {
|
||||
log.Printf("[memory] ⚠ 记忆存储初始化: 数据库连接失败 (%v),将在后台每30秒重试", err)
|
||||
} else {
|
||||
log.Println("[memory] 记忆存储已就绪")
|
||||
}
|
||||
|
||||
// 启动后台重连 goroutine
|
||||
go s.reconnectLoop()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// reconnectLoop 后台重连循环
|
||||
func (s *Store) reconnectLoop() {
|
||||
ticker := time.NewTicker(reconnectInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
s.mu.RLock()
|
||||
ready := s.db != nil
|
||||
s.mu.RUnlock()
|
||||
|
||||
if ready {
|
||||
// 数据库已连接,检查连接是否仍然有效
|
||||
s.mu.RLock()
|
||||
db := s.db
|
||||
s.mu.RUnlock()
|
||||
if db != nil {
|
||||
if err := db.Ping(); err != nil {
|
||||
log.Printf("[memory] ⚠ 数据库连接丢失: %v,开始重连", err)
|
||||
s.mu.Lock()
|
||||
if s.db != nil {
|
||||
s.db.Close()
|
||||
s.db = nil
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !s.IsReady() {
|
||||
if err := s.Reconnect(); err != nil {
|
||||
log.Printf("[memory] ⚠ 数据库重连失败: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reconnect 尝试重连数据库并执行迁移
|
||||
func (s *Store) Reconnect() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// 如果已有有效连接,先检查
|
||||
if s.db != nil {
|
||||
if err := s.db.Ping(); err == nil {
|
||||
return nil // 仍然有效
|
||||
}
|
||||
// 连接已失效,关闭旧连接
|
||||
s.db.Close()
|
||||
s.db = nil
|
||||
}
|
||||
|
||||
db, err := sql.Open("postgres", s.databaseURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("连接数据库失败: %w", err)
|
||||
return fmt.Errorf("连接数据库失败: %w", err)
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(25)
|
||||
@@ -27,15 +105,36 @@ func NewStore(connStr string) (*Store, error) {
|
||||
db.SetConnMaxLifetime(5 * time.Minute)
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("数据库ping失败: %w", err)
|
||||
db.Close()
|
||||
return fmt.Errorf("数据库ping失败: %w", err)
|
||||
}
|
||||
|
||||
s := &Store{db: db}
|
||||
s.db = db
|
||||
|
||||
// 执行建表迁移
|
||||
if err := s.migrate(); err != nil {
|
||||
return nil, fmt.Errorf("数据库迁移失败: %w", err)
|
||||
log.Printf("[memory] ⚠ 数据库迁移失败: %v", err)
|
||||
s.db.Close()
|
||||
s.db = nil
|
||||
return fmt.Errorf("数据库迁移失败: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
log.Println("[memory] ✅ 数据库重连成功,记忆系统已就绪")
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsReady 返回数据库是否可用
|
||||
func (s *Store) IsReady() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.db != nil
|
||||
}
|
||||
|
||||
// getDB 获取当前数据库连接(带读锁保护)
|
||||
func (s *Store) getDB() *sql.DB {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.db
|
||||
}
|
||||
|
||||
// migrate 创建表结构
|
||||
@@ -73,6 +172,11 @@ func (s *Store) migrate() error {
|
||||
|
||||
// Save 保存记忆
|
||||
func (s *Store) Save(ctx context.Context, entry *model.MemoryEntry) error {
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return errDBNotReady
|
||||
}
|
||||
|
||||
query := `INSERT INTO memories (user_id, content, summary, category, priority, session_id, source, embedding, expires_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
RETURNING id, created_at`
|
||||
@@ -86,7 +190,7 @@ func (s *Store) Save(ctx context.Context, entry *model.MemoryEntry) error {
|
||||
embedding = fmt.Sprintf("[%s]", joinFloats(vec))
|
||||
}
|
||||
|
||||
return s.db.QueryRowContext(ctx, query,
|
||||
return db.QueryRowContext(ctx, query,
|
||||
entry.UserID, entry.Content, entry.Summary,
|
||||
string(entry.Category), int(entry.Priority),
|
||||
entry.SessionID, entry.Source, embedding, entry.ExpiresAt,
|
||||
@@ -95,13 +199,18 @@ func (s *Store) Save(ctx context.Context, entry *model.MemoryEntry) error {
|
||||
|
||||
// GetByID 根据ID获取记忆
|
||||
func (s *Store) GetByID(ctx context.Context, id string) (*model.MemoryEntry, error) {
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return nil, errDBNotReady
|
||||
}
|
||||
|
||||
query := `SELECT id, user_id, content, summary, category, priority, session_id, source,
|
||||
access_count, last_access, created_at, expires_at
|
||||
FROM memories WHERE id = $1`
|
||||
|
||||
entry := &model.MemoryEntry{}
|
||||
var category string
|
||||
err := s.db.QueryRowContext(ctx, query, id).Scan(
|
||||
err := db.QueryRowContext(ctx, query, id).Scan(
|
||||
&entry.ID, &entry.UserID, &entry.Content, &entry.Summary,
|
||||
&category, &entry.Priority, &entry.SessionID, &entry.Source,
|
||||
&entry.AccessCount, &entry.LastAccess, &entry.CreatedAt, &entry.ExpiresAt,
|
||||
@@ -122,6 +231,11 @@ func (s *Store) GetByID(ctx context.Context, id string) (*model.MemoryEntry, err
|
||||
|
||||
// Query 按条件查询记忆
|
||||
func (s *Store) Query(ctx context.Context, q model.MemoryQuery) ([]model.MemoryEntry, error) {
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return nil, errDBNotReady
|
||||
}
|
||||
|
||||
if q.Limit <= 0 {
|
||||
q.Limit = 10
|
||||
}
|
||||
@@ -147,7 +261,7 @@ func (s *Store) Query(ctx context.Context, q model.MemoryQuery) ([]model.MemoryE
|
||||
query += fmt.Sprintf(" ORDER BY priority DESC, created_at DESC LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
|
||||
args = append(args, q.Limit, q.Offset)
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
rows, err := db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("查询记忆失败: %w", err)
|
||||
}
|
||||
@@ -173,13 +287,21 @@ func (s *Store) Query(ctx context.Context, q model.MemoryQuery) ([]model.MemoryE
|
||||
|
||||
// Delete 删除记忆
|
||||
func (s *Store) Delete(ctx context.Context, id string) error {
|
||||
_, err := s.db.ExecContext(ctx, `DELETE FROM memories WHERE id = $1`, id)
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return errDBNotReady
|
||||
}
|
||||
_, err := db.ExecContext(ctx, `DELETE FROM memories WHERE id = $1`, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// PurgeExpired 清理过期记忆
|
||||
func (s *Store) PurgeExpired(ctx context.Context) (int64, error) {
|
||||
result, err := s.db.ExecContext(ctx,
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return 0, errDBNotReady
|
||||
}
|
||||
result, err := db.ExecContext(ctx,
|
||||
`DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at < NOW()`)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@@ -189,6 +311,11 @@ func (s *Store) PurgeExpired(ctx context.Context) (int64, error) {
|
||||
|
||||
// SearchByVector 向量相似度搜索
|
||||
func (s *Store) SearchByVector(ctx context.Context, userID string, embedding []float64, limit int) ([]model.MemoryEntry, error) {
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return nil, errDBNotReady
|
||||
}
|
||||
|
||||
if limit <= 0 {
|
||||
limit = 5
|
||||
}
|
||||
@@ -202,7 +329,7 @@ func (s *Store) SearchByVector(ctx context.Context, userID string, embedding []f
|
||||
ORDER BY embedding <=> $1
|
||||
LIMIT $3`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, vecStr, userID, limit)
|
||||
rows, err := db.QueryContext(ctx, query, vecStr, userID, limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("向量搜索失败: %w", err)
|
||||
}
|
||||
@@ -229,13 +356,23 @@ func (s *Store) SearchByVector(ctx context.Context, userID string, embedding []f
|
||||
}
|
||||
|
||||
func (s *Store) incrementAccess(ctx context.Context, id string) {
|
||||
s.db.ExecContext(ctx,
|
||||
db := s.getDB()
|
||||
if db == nil {
|
||||
return
|
||||
}
|
||||
db.ExecContext(ctx,
|
||||
`UPDATE memories SET access_count = access_count + 1, last_access = NOW() WHERE id = $1`, id)
|
||||
}
|
||||
|
||||
// Close 关闭数据库连接
|
||||
func (s *Store) Close() error {
|
||||
return s.db.Close()
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.db != nil {
|
||||
return s.db.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// joinFloats 将 float64 切片转为逗号分隔字符串
|
||||
|
||||
Reference in New Issue
Block a user