Files
Cyrene/backend/gateway/internal/engine/rule_engine.go
T
AskaEth 87214b9441 feat: Phase 1+2 架构进化 — 连续思考链/主动消息决策/情感状态机/离线自主思考 (86文件)
Phase 1 (基础设施):
- ThinkChain 思考链连续性 + 差异化思考提示词 (persistent)
- AutonomousToolPolicy 工具安全策略 (safe/unsafe/conditional)
- MessageScheduler 自适应消息节奏 (Idle/Available/Busy)
- SessionEnrichmentStore 渐进式上下文丰富 (5层)
- ConversationBus 事件总线 + ResponseCache (dedup)
- pkg/logger 统一日志 + 所有 handler 替换 fmt.Printf
- NPE 守卫/链路优化/数据库表修复/Go workspace

Phase 2 (人格交互):
- EmotionState/EmotionTracker 情感状态机 (5种心情, 情绪衰减)
- ProactiveGuard 主动消息多维决策 (静默时段/紧急度/频率/校验)
- Gateway↔ai-core 在线状态感知链路 (presence notification)
- 离线思考频率控制 + 重连问候 + 离线消息排队

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-23 15:25:12 +08:00

506 lines
12 KiB
Go

package engine
import (
"bytes"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/yourname/cyrene-ai/pkg/logger"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/yourname/cyrene-ai/gateway/internal/store"
"github.com/yourname/cyrene-ai/gateway/internal/ws"
)
// TriggerConfig 触发器配置
type TriggerConfig struct {
Cron string `json:"cron,omitempty"`
Time string `json:"time,omitempty"`
Days []string `json:"days,omitempty"`
DeviceID string `json:"device_id,omitempty"`
Property string `json:"property,omitempty"`
Operator string `json:"operator,omitempty"`
Value float64 `json:"value,omitempty"`
}
// Condition 条件定义
type Condition struct {
Type string `json:"type"`
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
DeviceID string `json:"device_id,omitempty"`
Property string `json:"property,omitempty"`
Operator string `json:"operator,omitempty"`
Value float64 `json:"value,omitempty"`
}
// Action 动作定义
type Action struct {
Type string `json:"type"`
DeviceID string `json:"device_id,omitempty"`
Property string `json:"property,omitempty"`
Value interface{} `json:"value,omitempty"`
Title string `json:"title,omitempty"`
Body string `json:"body,omitempty"`
}
// RuleEngine 规则引擎
type RuleEngine struct {
store *store.AutomationStore
hub *ws.Hub
iotServiceURL string
httpClient *http.Client
lastTriggered map[string]time.Time
mu sync.RWMutex
stopCh chan struct{}
running bool
}
// NewRuleEngine 创建规则引擎
func NewRuleEngine(as *store.AutomationStore, hub *ws.Hub) *RuleEngine {
iotServiceURL := os.Getenv("IOT_SERVICE_URL")
if iotServiceURL == "" {
iotServiceURL = "http://localhost:8083"
}
return &RuleEngine{
store: as,
hub: hub,
iotServiceURL: iotServiceURL,
httpClient: &http.Client{
Timeout: 5 * time.Second,
},
lastTriggered: make(map[string]time.Time),
stopCh: make(chan struct{}),
}
}
// Start 启动后台规则评估 goroutine
func (e *RuleEngine) Start() {
e.mu.Lock()
if e.running {
e.mu.Unlock()
return
}
e.running = true
e.mu.Unlock()
go e.loop()
logger.Printf("[RuleEngine] 规则引擎已启动 (IoT服务地址: %s)", e.iotServiceURL)
}
// Stop 停止规则引擎
func (e *RuleEngine) Stop() {
e.mu.Lock()
defer e.mu.Unlock()
if !e.running {
return
}
close(e.stopCh)
e.running = false
logger.Println("[RuleEngine] 规则引擎已停止")
}
// loop 规则引擎主循环
func (e *RuleEngine) loop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
// 首次立即评估
e.evaluateAllRules()
for {
select {
case <-e.stopCh:
return
case <-ticker.C:
e.evaluateAllRules()
}
}
}
// evaluateAllRules 评估所有启用的规则
func (e *RuleEngine) evaluateAllRules() {
rules, err := e.store.GetEnabledRules()
if err != nil {
logger.Printf("[RuleEngine] 获取启用的规则失败: %v", err)
return
}
if len(rules) == 0 {
return
}
for _, rule := range rules {
if e.evaluateRule(&rule) {
e.ExecuteRuleActions(&rule)
e.store.MarkRuleTriggered(rule.ID)
e.mu.Lock()
e.lastTriggered[rule.ID] = time.Now()
e.mu.Unlock()
}
}
}
// evaluateRule 评估单条规则是否应触发
func (e *RuleEngine) evaluateRule(rule *store.AutomationRule) bool {
// 防重复触发:同一规则在 1 分钟内不重复触发
e.mu.RLock()
lastTime, exists := e.lastTriggered[rule.ID]
e.mu.RUnlock()
if exists && time.Since(lastTime) < 1*time.Minute {
return false
}
// 解析 trigger_config
var triggerCfg TriggerConfig
if rule.TriggerConfig != nil {
if err := json.Unmarshal(*rule.TriggerConfig, &triggerCfg); err != nil {
logger.Printf("[RuleEngine] 解析触发器配置失败: rule=%s err=%v", rule.ID, err)
return false
}
}
// 评估触发器
triggered := false
switch rule.TriggerType {
case "schedule":
triggered = e.evaluateScheduleTrigger(triggerCfg)
case "device_state":
triggered = e.evaluateDeviceStateTrigger(triggerCfg)
case "manual":
// 不自动触发
return false
default:
return false
}
if !triggered {
return false
}
// 评估 conditions
var conditions []Condition
if rule.Conditions != nil {
if err := json.Unmarshal(*rule.Conditions, &conditions); err != nil {
logger.Printf("[RuleEngine] 解析条件失败: rule=%s err=%v", rule.ID, err)
return false
}
}
for _, cond := range conditions {
if !e.evaluateCondition(cond) {
return false
}
}
return true
}
// evaluateScheduleTrigger 评估定时触发器
func (e *RuleEngine) evaluateScheduleTrigger(cfg TriggerConfig) bool {
now := time.Now()
// 检查 days (星期)
if len(cfg.Days) > 0 {
weekday := strings.ToLower(now.Weekday().String()[:3])
found := false
for _, d := range cfg.Days {
if strings.ToLower(strings.TrimSpace(d)) == weekday {
found = true
break
}
}
if !found {
return false
}
}
// 检查 time
if cfg.Time != "" {
currentTime := now.Format("15:04")
return currentTime == cfg.Time
}
return false
}
// evaluateDeviceStateTrigger 评估设备状态触发器
func (e *RuleEngine) evaluateDeviceStateTrigger(cfg TriggerConfig) bool {
if cfg.DeviceID == "" || cfg.Property == "" || cfg.Operator == "" {
return false
}
// 从 IoT 服务获取设备状态
devices, err := e.fetchIoTDevices()
if err != nil {
logger.Printf("[RuleEngine] 获取设备状态失败: %v", err)
return false
}
// 查找目标设备
for _, d := range devices {
if d.ID != cfg.DeviceID {
continue
}
var actualValue float64
switch cfg.Property {
case "temperature":
actualValue = d.Temperature
case "value":
actualValue = d.Value
case "brightness":
actualValue = float64(d.Brightness)
case "position":
actualValue = float64(d.Position)
case "battery":
actualValue = float64(d.Battery)
default:
// 尝试从 properties 中获取
if props, ok := d.Properties[cfg.Property]; ok {
if v, ok := props.(float64); ok {
actualValue = v
}
} else {
return false
}
}
return compareValues(actualValue, cfg.Operator, cfg.Value)
}
return false
}
// evaluateCondition 评估单个条件
func (e *RuleEngine) evaluateCondition(cond Condition) bool {
switch cond.Type {
case "time_range":
if cond.Start == "" || cond.End == "" {
return true
}
now := time.Now()
currentTime := now.Format("15:04")
return currentTime >= cond.Start && currentTime <= cond.End
case "device_state":
if cond.DeviceID == "" || cond.Property == "" || cond.Operator == "" {
return true
}
devices, err := e.fetchIoTDevices()
if err != nil {
return true // 无法获取设备状态时不阻塞
}
for _, d := range devices {
if d.ID != cond.DeviceID {
continue
}
var actualValue float64
switch cond.Property {
case "temperature":
actualValue = d.Temperature
case "value":
actualValue = d.Value
case "brightness":
actualValue = float64(d.Brightness)
case "position":
actualValue = float64(d.Position)
case "battery":
actualValue = float64(d.Battery)
default:
if props, ok := d.Properties[cond.Property]; ok {
if v, ok := props.(float64); ok {
actualValue = v
}
} else {
return true
}
}
return compareValues(actualValue, cond.Operator, cond.Value)
}
return true
}
return true
}
// ExecuteRuleActions 执行规则的动作
func (e *RuleEngine) ExecuteRuleActions(rule *store.AutomationRule) {
var actions []Action
if rule.Actions != nil {
if err := json.Unmarshal(*rule.Actions, &actions); err != nil {
logger.Printf("[RuleEngine] 解析动作失败: rule=%s err=%v", rule.ID, err)
return
}
}
logger.Printf("[RuleEngine] 执行规则 %s (%s) 的 %d 个动作", rule.ID, rule.Name, len(actions))
for _, action := range actions {
switch action.Type {
case "set_device":
e.executeSetDevice(action)
case "notify":
e.executeNotify(action, rule.UserID)
default:
logger.Printf("[RuleEngine] 未知动作类型: %s", action.Type)
}
}
}
// ExecuteScene 手动触发场景
func (e *RuleEngine) ExecuteScene(sceneID, userID string) error {
rules, err := e.store.GetSceneRules(sceneID)
if err != nil {
return fmt.Errorf("获取场景规则失败: %w", err)
}
logger.Printf("[RuleEngine] 执行场景 %s,共 %d 条关联规则", sceneID, len(rules))
for _, rule := range rules {
if rule.Enabled {
e.ExecuteRuleActions(&rule)
e.store.MarkRuleTriggered(rule.ID)
e.mu.Lock()
e.lastTriggered[rule.ID] = time.Now()
e.mu.Unlock()
}
}
return nil
}
// executeSetDevice 执行设备控制动作
func (e *RuleEngine) executeSetDevice(action Action) {
url := fmt.Sprintf("%s/api/v1/devices/%s/set", e.iotServiceURL, action.DeviceID)
body := map[string]interface{}{
"property": action.Property,
"value": action.Value,
}
bodyBytes, _ := json.Marshal(body)
resp, err := e.httpClient.Post(url, "application/json", bytes.NewReader(bodyBytes))
if err != nil {
logger.Printf("[RuleEngine] 设备控制请求失败: device=%s property=%s err=%v",
action.DeviceID, action.Property, err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
logger.Printf("[RuleEngine] 设备控制成功: device=%s property=%s value=%v",
action.DeviceID, action.Property, action.Value)
} else {
logger.Printf("[RuleEngine] 设备控制失败: device=%s property=%s status=%d",
action.DeviceID, action.Property, resp.StatusCode)
}
}
// executeNotify 执行通知动作
func (e *RuleEngine) executeNotify(action Action, userID string) {
notif := ws.NotificationInfo{
ID: "notif_" + randomID(),
Type: "info",
Title: action.Title,
Body: action.Body,
Timestamp: time.Now().UTC().Format(time.RFC3339),
}
msg := ws.ServerMessage{
Type: "notification",
MessageID: "notif_" + randomID(),
Timestamp: time.Now().UnixMilli(),
Notification: &notif,
}
data, err := json.Marshal(msg)
if err != nil {
logger.Printf("[RuleEngine] 序列化通知失败: %v", err)
return
}
e.hub.SendToUser(userID, data)
logger.Printf("[RuleEngine] 通知已发送: user=%s title=%s", userID, action.Title)
}
// ========== 辅助方法 ==========
// IotDevice 设备信息(从 IoT 服务返回)
type IotDevice struct {
ID string `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
Status string `json:"status"`
Brightness int `json:"brightness,omitempty"`
Color string `json:"color,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
Mode string `json:"mode,omitempty"`
Position int `json:"position,omitempty"`
Value float64 `json:"value,omitempty"`
Unit string `json:"unit,omitempty"`
Battery int `json:"battery,omitempty"`
Properties map[string]interface{} `json:"properties,omitempty"`
}
// fetchIoTDevices 从 IoT 调试服务获取设备列表
func (e *RuleEngine) fetchIoTDevices() ([]IotDevice, error) {
resp, err := e.httpClient.Get(e.iotServiceURL + "/api/v1/devices")
if err != nil {
return nil, fmt.Errorf("请求IoT服务失败: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("IoT服务返回状态码 %d", resp.StatusCode)
}
var result struct {
Devices []IotDevice `json:"devices"`
Total int `json:"total"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析IoT设备列表失败: %w", err)
}
return result.Devices, nil
}
// compareValues 比较两个值
func compareValues(actual float64, operator string, expected float64) bool {
switch operator {
case "eq":
return actual == expected
case "neq":
return actual != expected
case "gt":
return actual > expected
case "gte":
return actual >= expected
case "lt":
return actual < expected
case "lte":
return actual <= expected
default:
return false
}
}
// randomID 使用 crypto/rand 生成随机 ID
func randomID() string {
b := make([]byte, 8)
rand.Read(b)
return hex.EncodeToString(b)
}