fix: 第一轮修复 - 记忆管理/IoT操控/历史消息持久化/动作消息/链路优化/安全配置

- 修复记忆管理数据库连接不可用 (ai-core重编译+Unicode修复)
- 修复IoT子会话工具调用链路日志缺失
- 新增最终审查子会话(review_provider) 支持消息格式解析拆分
- 实现历史消息持久化(后端存储+前端分页加载)
- 前端新增动作消息(ActionMessage)类型和渲染
- 优化对话链路速度(非阻塞子会话+快速问候通道)
- JWT密钥环境变量化(无默认值启动panic)
- Token自动刷新机制(401拦截器+refresh接口)
- WebSocket指数退避重连(jitter+最大10次)
- localStorage清理一致性(cyrene_前缀+版本检查)
- IoT环境变量统一为IOT_SERVICE_URL
This commit is contained in:
2026-05-21 23:10:07 +08:00
parent 8b7d4ec19a
commit a058b0ab8e
53 changed files with 5535 additions and 241 deletions
+283
View File
@@ -0,0 +1,283 @@
#!/usr/bin/env python3
"""Round 11 API Contract Test - with rate-limit awareness"""
import urllib.request
import urllib.error
import json
import time
import sys
import os
BASE = "http://localhost:8080/api/v1"
PASS = 0
FAIL = 0
TOKEN = ""
ADMIN_TOKEN = ""
SESS_ID = ""
def req(method, path, data=None, auth=None, ct="application/json"):
url = f"{BASE}{path}"
headers = {"Content-Type": ct}
if auth:
headers["Authorization"] = f"Bearer {auth}"
body = None
if data is not None:
body = json.dumps(data).encode()
try:
r = urllib.request.Request(url, data=body, headers=headers, method=method)
resp = urllib.request.urlopen(r, timeout=10)
return resp.status, resp.read().decode()
except urllib.error.HTTPError as e:
return e.code, e.read().decode()
except Exception as e:
return 0, str(e)
def test(name, method, path, expected, data=None, auth=None, skip_msg=None):
global PASS, FAIL
if skip_msg:
print(f" SKIP | {name}: {skip_msg}")
return None, None
code, body = req(method, path, data, auth)
status = "PASS" if code == expected else "FAIL"
if status == "PASS":
PASS += 1
else:
FAIL += 1
body_preview = body[:100].replace('\n',' ') if body else ""
print(f" {status} | {name} | expected={expected} got={code} | {body_preview}")
return code, body
print("=" * 60)
print(" Round 11 API Contract Test Suite")
print(f" Started at {time.strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 60)
# === Part 1: Health ===
print("\n--- Part 1: Health ---")
test("GET /health", "GET", "/health", 200)
test("HEAD /health", "HEAD", "/health", 200)
# === Part 2: Auth Register ===
print("\n--- Part 2: Auth Register ---")
UN = f"testapi_{int(time.time())}"
PW = "TestPass123!"
test("Register missing username", "POST", "/auth/register", 400,
{"password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"})
time.sleep(1)
test("Register missing password", "POST", "/auth/register", 400,
{"username":"testuser","email":"test@test.com","nickname":"Test","verify_code":"000000"})
time.sleep(1)
test("Register username too short", "POST", "/auth/register", 400,
{"username":"ab","password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"})
time.sleep(1)
test("Register username too long", "POST", "/auth/register", 400,
{"username":"a"*33,"password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"})
time.sleep(1)
test("Register username special chars", "POST", "/auth/register", 400,
{"username":"user@name!","password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"})
time.sleep(1)
test("Register password too short", "POST", "/auth/register", 400,
{"username":"test_abc","password":"Ab1!","email":"test@test.com","nickname":"Test","verify_code":"000000"})
time.sleep(1)
# Normal registration
code, body = test("Register normal", "POST", "/auth/register", 201,
{"username":UN,"password":PW,"email":"test@test.com","nickname":"TestUser","verify_code":"000000"})
time.sleep(1)
# Duplicate
test("Register duplicate", "POST", "/auth/register", 409,
{"username":UN,"password":PW,"email":"test@test.com","nickname":"TestUser","verify_code":"000000"})
# === Part 3: Auth Login ===
print("\n--- Part 3: Auth Login ---")
time.sleep(1)
test("Login wrong username", "POST", "/auth/login", 401,
{"username":"nonexistent_user_99","password":"TestPass123!"})
time.sleep(1)
test("Login wrong password", "POST", "/auth/login", 401,
{"username":UN,"password":"WrongPass999!"})
time.sleep(1)
# Correct login
code, body = req("POST", "/auth/login", {"username":UN,"password":PW})
if code == 200:
data = json.loads(body)
TOKEN = data.get("token","")
print(f" PASS | Login correct | 200 | user_id={data.get('user_id')} token={'OK' if TOKEN else 'MISSING'}")
PASS += 1
else:
print(f" FAIL | Login correct | expected=200 got={code} | {body[:100]}")
FAIL += 1
time.sleep(1)
# Admin login
code, body = req("POST", "/auth/login", {"username":"admin","password":"admin123"})
if code == 200:
data = json.loads(body)
ADMIN_TOKEN = data.get("token","")
print(f" PASS | Admin login | 200 | token={'OK' if ADMIN_TOKEN else 'MISSING'}")
PASS += 1
else:
print(f" FAIL | Admin login | expected=200 got={code} | {body[:100]}")
FAIL += 1
# JWT validation
if TOKEN:
time.sleep(1)
test("JWT valid (refresh)", "POST", "/auth/refresh", 200, auth=TOKEN)
time.sleep(1)
test("Login missing password", "POST", "/auth/login", 400, {"username":"testuser"})
time.sleep(1)
test("Login empty body", "POST", "/auth/login", 400, {})
time.sleep(1)
test("Login username format invalid", "POST", "/auth/login", 400, {"username":"ab","password":"Test123!"})
# === Part 4: Session API ===
print("\n--- Part 4: Session API ---")
test("Sessions list no auth", "GET", "/sessions", 401)
test("Sessions create no auth", "POST", "/sessions", 401, {"title":"Test"})
if TOKEN:
time.sleep(1)
test("Sessions list with auth", "GET", "/sessions", 200, auth=TOKEN)
time.sleep(1)
code, body = test("Sessions create", "POST", "/sessions", 201, {"title":"Round 11 Test"}, auth=TOKEN)
if code == 201:
try:
SESS_ID = json.loads(body).get("id","")
except: pass
if SESS_ID:
time.sleep(1)
test("Sessions get existing", "GET", f"/sessions/{SESS_ID}", 200, auth=TOKEN)
time.sleep(1)
test("Sessions get non-existent", "GET", "/sessions/session_nonexistent123", 404, auth=TOKEN)
time.sleep(1)
test("Sessions create empty title", "POST", "/sessions", 201, {}, auth=TOKEN)
if SESS_ID:
time.sleep(1)
test("Sessions delete existing", "DELETE", f"/sessions/{SESS_ID}", 200, auth=TOKEN)
time.sleep(1)
test("Sessions delete non-existent", "DELETE", "/sessions/session_nonexistent", 200, auth=TOKEN)
time.sleep(1)
test("Messages get non-existent session", "GET", "/sessions/session_nonexistent/messages", 200, auth=TOKEN)
else:
print(" SKIP: No token available for session tests")
# === Part 5: Files ===
print("\n--- Part 5: Files API ---")
test("Files list no auth", "GET", "/files", 401)
if TOKEN:
time.sleep(1)
test("Files list with auth", "GET", "/files", 200, auth=TOKEN)
time.sleep(1)
test("Files get non-existent", "GET", "/files/file_nonexistent", 404, auth=TOKEN)
# === Part 6: Knowledge ===
print("\n--- Part 6: Knowledge API ---")
test("Knowledge bases no auth", "GET", "/knowledge/bases", 401)
if TOKEN:
time.sleep(1)
test("Knowledge bases with auth", "GET", "/knowledge/bases", 200, auth=TOKEN)
time.sleep(1)
test("Knowledge get non-existent", "GET", "/knowledge/bases/kb_nonexistent", 404, auth=TOKEN)
# === Part 7: Automation ===
print("\n--- Part 7: Automation API ---")
test("Automation rules no auth", "GET", "/automation/rules", 401)
if TOKEN:
time.sleep(1)
test("Automation rules with auth", "GET", "/automation/rules", 200, auth=TOKEN)
time.sleep(1)
test("Automation scenes with auth", "GET", "/automation/scenes", 200, auth=TOKEN)
time.sleep(1)
test("Automation get non-existent rule", "GET", "/automation/rules/rule_nonexistent", 404, auth=TOKEN)
# === Part 8: Reminders ===
print("\n--- Part 8: Reminders API ---")
test("Reminders list no auth", "GET", "/reminders", 401)
if TOKEN:
time.sleep(1)
test("Reminders list with auth (needs user_id)", "GET", "/reminders?user_id=test_user", 200, auth=TOKEN)
time.sleep(1)
test("Reminders create missing fields", "POST", "/reminders", 400, {}, auth=TOKEN)
# === Part 9: Briefings ===
print("\n--- Part 9: Briefings API ---")
test("Briefings get no auth", "GET", "/briefings", 401)
if TOKEN:
time.sleep(1)
test("Briefings get with auth", "GET", "/briefings?user_id=test_user", 200, auth=TOKEN)
time.sleep(1)
test("Briefings latest with auth", "GET", "/briefings/latest?user_id=test_user", 200, auth=TOKEN)
# === Part 10: Notifications ===
print("\n--- Part 10: Notifications API ---")
test("Notifications push no auth", "POST", "/notifications/push", 401, {"message":"test"})
if TOKEN:
time.sleep(1)
test("Notifications push empty", "POST", "/notifications/push", 400, {}, auth=TOKEN)
# === Part 11: Memories ===
print("\n--- Part 11: Memories API ---")
test("Memories search no auth", "GET", "/memory/search", 401)
if TOKEN:
time.sleep(1)
test("Memories search with auth", "GET", "/memory/search?q=test", 200, auth=TOKEN)
time.sleep(1)
test("Memories list with auth", "GET", "/memory", 200, auth=TOKEN)
time.sleep(1)
test("Memories add missing fields", "POST", "/memory", 400, {}, auth=TOKEN)
# === Part 12: Voice ===
print("\n--- Part 12: Voice API ---")
if TOKEN:
time.sleep(1)
test("Voice status with auth", "GET", "/voice/status", 200, auth=TOKEN)
# === Part 13: Admin ===
print("\n--- Part 13: Admin endpoints ---")
if ADMIN_TOKEN:
time.sleep(1)
test("Admin sessions with admin", "GET", "/admin/sessions", 200, auth=ADMIN_TOKEN)
time.sleep(1)
test("Admin sessions/active", "GET", "/admin/sessions/active", 200, auth=ADMIN_TOKEN)
if TOKEN:
time.sleep(1)
test("Admin sessions without admin", "GET", "/admin/sessions", 403, auth=TOKEN)
# === Part 14: Invalid token ===
print("\n--- Part 14: Invalid token ---")
test("Sessions invalid token", "GET", "/sessions", 401, auth="invalidtoken12345")
test("Sessions expired/malformed token", "GET", "/sessions", 401, auth="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.dozjgNryP4J3jVmNHl0w5N_XgL0n3I9PlFUP0THsR8k")
# === Part 15: Token refresh ===
print("\n--- Part 15: Token refresh ---")
test("Refresh no auth", "POST", "/auth/refresh", 401)
test("Refresh invalid token", "POST", "/auth/refresh", 401, auth="invalidtoken")
# === Summary ===
print("\n" + "=" * 60)
print(" TEST SUMMARY")
print("=" * 60)
TOTAL = PASS + FAIL
print(f"Total: {TOTAL} | Passed: {PASS} | Failed: {FAIL}")
print(f"Success rate: {PASS/TOTAL*100:.1f}%" if TOTAL > 0 else "No tests run")
print(f"\nTest user: {UN}")
print(f"Token: {'available' if TOKEN else 'MISSING'}")
print(f"Admin token: {'available' if ADMIN_TOKEN else 'MISSING'}")
# Save env
if TOKEN:
with open("/tmp/round11_testenv", "w") as f:
f.write(f"TOKEN={TOKEN}\n")
f.write(f"ADMIN_TOKEN={ADMIN_TOKEN}\n")
f.write(f"TEST_USER={UN}\n")
+383
View File
@@ -0,0 +1,383 @@
#!/bin/bash
# Round 11 API Contract Test Script
# Tests all API endpoints for correctness, error handling, and boundary conditions
BASE="http://localhost:8080/api/v1"
PASS=0
FAIL=0
RESULTS=""
# Helper: run a test and record result
test_case() {
local name="$1"
local expected="$2"
local method="$3"
local url="$4"
local data="$5"
local auth="$6"
if [ -n "$data" ]; then
resp=$(curl -s -w "\n%{http_code}" -X "$method" "$url" \
-H "Content-Type: application/json" \
${auth:+-H "Authorization: Bearer $auth"} \
-d "$data" 2>&1)
else
resp=$(curl -s -w "\n%{http_code}" -X "$method" "$url" \
-H "Content-Type: application/json" \
${auth:+-H "Authorization: Bearer $auth"} 2>&1)
fi
actual=$(echo "$resp" | tail -1)
body=$(echo "$resp" | sed '$d')
if [ "$actual" = "$expected" ]; then
PASS=$((PASS + 1))
RESULTS+="PASS | $name | $expected | $actual\n"
else
FAIL=$((FAIL + 1))
RESULTS+="FAIL | $name | $expected | $actual | body=${body:0:120}\n"
fi
}
echo "============================================"
echo " Round 11 API Contract Test Suite"
echo " Gateway: $BASE"
echo " Started at $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================"
echo ""
# ============================================================
# PART 1: Health + Public
# ============================================================
echo "--- Part 1: Health & Public Endpoints ---"
test_case "GET /health" "200" "GET" "$BASE/health"
test_case "HEAD /health" "200" "HEAD" "$BASE/health"
# ============================================================
# PART 2: Auth - Register
# ============================================================
echo "--- Part 2: Auth Register ---"
# Use unique username to avoid conflicts
UN="testapi_$(date +%s)"
PW="TestPass123!"
test_case "Register: missing username" "400" "POST" "$BASE/auth/register" \
'{"password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"}'
test_case "Register: missing password" "400" "POST" "$BASE/auth/register" \
'{"username":"testuser","email":"test@test.com","nickname":"Test","verify_code":"000000"}'
test_case "Register: username too short (<3)" "400" "POST" "$BASE/auth/register" \
'{"username":"ab","password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"}'
test_case "Register: username too long (>32)" "400" "POST" "$BASE/auth/register" \
'{"username":"abcdefghijklmnopqrstuvwxyz1234567890","password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"}'
test_case "Register: username with special chars" "400" "POST" "$BASE/auth/register" \
'{"username":"user@name!","password":"Test123!","email":"test@test.com","nickname":"Test","verify_code":"000000"}'
test_case "Register: password too short (<6)" "400" "POST" "$BASE/auth/register" \
'{"username":"test_abc","password":"Ab1!","email":"test@test.com","nickname":"Test","verify_code":"000000"}'
test_case "Register: normal registration" "201" "POST" "$BASE/auth/register" \
"{\"username\":\"$UN\",\"password\":\"$PW\",\"email\":\"test@test.com\",\"nickname\":\"TestUser\",\"verify_code\":\"000000\"}"
# Try to register same username again
test_case "Register: duplicate username" "409" "POST" "$BASE/auth/register" \
"{\"username\":\"$UN\",\"password\":\"$PW\",\"email\":\"test@test.com\",\"nickname\":\"TestUser\",\"verify_code\":\"000000\"}"
# ============================================================
# PART 3: Auth - Login
# ============================================================
echo "--- Part 3: Auth Login ---"
test_case "Login: wrong username" "401" "POST" "$BASE/auth/login" \
'{"username":"nonexistent_user_99","password":"TestPass123!"}'
test_case "Login: wrong password" "401" "POST" "$BASE/auth/login" \
"{\"username\":\"$UN\",\"password\":\"WrongPass999!\"}"
# Correct login
LOGIN_RESP=$(curl -s -w "\n%{http_code}" -X POST "$BASE/auth/login" \
-H "Content-Type: application/json" \
-d "{\"username\":\"$UN\",\"password\":\"$PW\"}")
LOGIN_CODE=$(echo "$LOGIN_RESP" | tail -1)
LOGIN_BODY=$(echo "$LOGIN_RESP" | sed '$d')
if [ "$LOGIN_CODE" = "200" ]; then
TOKEN=$(echo "$LOGIN_BODY" | python3 -c "import sys,json;print(json.load(sys.stdin).get('token',''))" 2>/dev/null)
USER_ID=$(echo "$LOGIN_BODY" | python3 -c "import sys,json;print(json.load(sys.stdin).get('user_id',''))" 2>/dev/null)
PASS=$((PASS + 1))
RESULTS+="PASS | Login: correct credentials | 200 | 200\n"
echo " -> Got token OK, user_id=$USER_ID"
else
FAIL=$((FAIL + 1))
RESULTS+="FAIL | Login: correct credentials | 200 | $LOGIN_CODE | body=${LOGIN_BODY:0:120}\n"
echo " -> Login FAILED: $LOGIN_CODE - $LOGIN_BODY"
TOKEN=""
fi
# Admin login
ADMIN_RESP=$(curl -s -w "\n%{http_code}" -X POST "$BASE/auth/login" \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin123"}')
ADMIN_CODE=$(echo "$ADMIN_RESP" | tail -1)
ADMIN_BODY=$(echo "$ADMIN_RESP" | sed '$d')
if [ "$ADMIN_CODE" = "200" ]; then
ADMIN_TOKEN=$(echo "$ADMIN_BODY" | python3 -c "import sys,json;print(json.load(sys.stdin).get('token',''))" 2>/dev/null)
PASS=$((PASS + 1))
RESULTS+="PASS | Login: admin credentials | 200 | 200\n"
echo " -> Admin token OK"
else
FAIL=$((FAIL + 1))
RESULTS+="FAIL | Login: admin credentials | 200 | $ADMIN_CODE | body=${ADMIN_BODY:0:120}\n"
echo " -> Admin login FAILED: $ADMIN_CODE"
ADMIN_TOKEN=""
fi
# Validate JWT token - try to use it
if [ -n "$TOKEN" ]; then
REFRESH_RESP=$(curl -s -w "\n%{http_code}" -X POST "$BASE/auth/refresh" \
-H "Authorization: Bearer $TOKEN")
REFRESH_CODE=$(echo "$REFRESH_RESP" | tail -1)
if [ "$REFRESH_CODE" = "200" ]; then
PASS=$((PASS + 1))
RESULTS+="PASS | JWT token valid (refresh) | 200 | 200\n"
else
FAIL=$((FAIL + 1))
RESULTS+="FAIL | JWT token valid (refresh) | 200 | $REFRESH_CODE\n"
fi
fi
# Login: missing fields
test_case "Login: missing password" "400" "POST" "$BASE/auth/login" \
'{"username":"testuser"}'
test_case "Login: empty body" "400" "POST" "$BASE/auth/login" '{}'
test_case "Login: username format invalid" "400" "POST" "$BASE/auth/login" \
'{"username":"ab","password":"Test123!"}'
# ============================================================
# PART 4: Session API
# ============================================================
echo "--- Part 4: Session API ---"
# Unauthenticated
test_case "Sessions: list no auth" "401" "GET" "$BASE/sessions"
test_case "Sessions: create no auth" "401" "POST" "$BASE/sessions" '{"title":"Test"}'
if [ -n "$TOKEN" ]; then
# Authenticated - List
test_case "Sessions: list with auth" "200" "GET" "$BASE/sessions" "" "$TOKEN"
# Create session
SESS_RESP=$(curl -s -w "\n%{http_code}" -X POST "$BASE/sessions" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{"title":"Round 11 Test Session"}')
SESS_CODE=$(echo "$SESS_RESP" | tail -1)
SESS_BODY=$(echo "$SESS_RESP" | sed '$d')
SESS_ID=$(echo "$SESS_BODY" | python3 -c "import sys,json;print(json.load(sys.stdin).get('id',''))" 2>/dev/null)
if [ "$SESS_CODE" = "201" ]; then
PASS=$((PASS + 1))
RESULTS+="PASS | Sessions: create | 201 | 201\n"
echo " -> Created session: $SESS_ID"
else
FAIL=$((FAIL + 1))
RESULTS+="FAIL | Sessions: create | 201 | $SESS_CODE\n"
fi
# Get session
if [ -n "$SESS_ID" ]; then
test_case "Sessions: get existing" "200" "GET" "$BASE/sessions/$SESS_ID" "" "$TOKEN"
fi
# Get non-existent session
test_case "Sessions: get non-existent" "404" "GET" "$BASE/sessions/session_nonexistent123" "" "$TOKEN"
# Create with empty title (should default)
test_case "Sessions: create empty title" "201" "POST" "$BASE/sessions" '{}' "$TOKEN"
# Delete session
if [ -n "$SESS_ID" ]; then
test_case "Sessions: delete existing" "200" "DELETE" "$BASE/sessions/$SESS_ID" "" "$TOKEN"
fi
# Delete non-existent
test_case "Sessions: delete non-existent" "200" "DELETE" "$BASE/sessions/session_nonexistent" "" "$TOKEN"
# Get messages for non-existent session
test_case "Sessions: messages non-existent" "200" "GET" "$BASE/sessions/session_nonexistent/messages" "" "$TOKEN"
# Test cross-user access
if [ -n "$ADMIN_TOKEN" ] && [ -n "$SESS_ID" ]; then
echo " -> Testing cross-user access..."
fi
else
FAIL=$((FAIL + 6))
echo " -> SKIPPED (no token)"
fi
# ============================================================
# PART 5: Files API
# ============================================================
echo "--- Part 5: Files API ---"
test_case "Files: list no auth" "401" "GET" "$BASE/files"
if [ -n "$TOKEN" ]; then
test_case "Files: list with auth" "200" "GET" "$BASE/files" "" "$TOKEN"
test_case "Files: get non-existent" "404" "GET" "$BASE/files/file_nonexistent" "" "$TOKEN"
else
FAIL=$((FAIL + 2))
fi
# ============================================================
# PART 6: Knowledge API
# ============================================================
echo "--- Part 6: Knowledge API ---"
test_case "Knowledge: list bases no auth" "401" "GET" "$BASE/knowledge/bases"
if [ -n "$TOKEN" ]; then
test_case "Knowledge: list bases with auth" "200" "GET" "$BASE/knowledge/bases" "" "$TOKEN"
test_case "Knowledge: get non-existent" "404" "GET" "$BASE/knowledge/bases/kb_nonexistent" "" "$TOKEN"
else
FAIL=$((FAIL + 2))
fi
# ============================================================
# PART 7: Automation API
# ============================================================
echo "--- Part 7: Automation API ---"
test_case "Automation: list rules no auth" "401" "GET" "$BASE/automation/rules"
if [ -n "$TOKEN" ]; then
test_case "Automation: list rules with auth" "200" "GET" "$BASE/automation/rules" "" "$TOKEN"
test_case "Automation: list scenes with auth" "200" "GET" "$BASE/automation/scenes" "" "$TOKEN"
test_case "Automation: get non-existent rule" "404" "GET" "$BASE/automation/rules/rule_nonexistent" "" "$TOKEN"
else
FAIL=$((FAIL + 3))
fi
# ============================================================
# PART 8: Reminders API
# ============================================================
echo "--- Part 8: Reminders API ---"
test_case "Reminders: list no auth" "401" "GET" "$BASE/reminders"
if [ -n "$TOKEN" ]; then
test_case "Reminders: list with auth" "200" "GET" "$BASE/reminders" "" "$TOKEN"
test_case "Reminders: create missing fields" "400" "POST" "$BASE/reminders" '{}' "$TOKEN"
else
FAIL=$((FAIL + 2))
fi
# ============================================================
# PART 9: Briefings API
# ============================================================
echo "--- Part 9: Briefings API ---"
test_case "Briefings: get no auth" "401" "GET" "$BASE/briefings"
if [ -n "$TOKEN" ]; then
test_case "Briefings: get with auth" "200" "GET" "$BASE/briefings" "" "$TOKEN"
test_case "Briefings: latest with auth" "200" "GET" "$BASE/briefings/latest" "" "$TOKEN"
else
FAIL=$((FAIL + 2))
fi
# ============================================================
# PART 10: Notifications API
# ============================================================
echo "--- Part 10: Notifications API ---"
test_case "Notifications: push no auth" "401" "POST" "$BASE/notifications/push" '{"message":"test"}'
if [ -n "$TOKEN" ]; then
test_case "Notifications: push empty" "400" "POST" "$BASE/notifications/push" '{}' "$TOKEN"
else
FAIL=$((FAIL + 1))
fi
# ============================================================
# PART 11: Memories API
# ============================================================
echo "--- Part 11: Memories API ---"
test_case "Memories: search no auth" "401" "GET" "$BASE/memory/search"
if [ -n "$TOKEN" ]; then
test_case "Memories: search with auth" "200" "GET" "$BASE/memory/search?q=test" "" "$TOKEN"
test_case "Memories: list with auth" "200" "GET" "$BASE/memory" "" "$TOKEN"
test_case "Memories: add missing fields" "400" "POST" "$BASE/memory" '{}' "$TOKEN"
else
FAIL=$((FAIL + 3))
fi
# ============================================================
# PART 12: Voice API
# ============================================================
echo "--- Part 12: Voice API ---"
if [ -n "$TOKEN" ]; then
test_case "Voice: status with auth" "200" "GET" "$BASE/voice/status" "" "$TOKEN"
else
FAIL=$((FAIL + 1))
fi
# ============================================================
# PART 13: Admin endpoints
# ============================================================
echo "--- Part 13: Admin endpoints ---"
if [ -n "$ADMIN_TOKEN" ]; then
test_case "Admin: sessions with admin" "200" "GET" "$BASE/admin/sessions" "" "$ADMIN_TOKEN"
test_case "Admin: sessions/active with admin" "200" "GET" "$BASE/admin/sessions/active" "" "$ADMIN_TOKEN"
else
FAIL=$((FAIL + 2))
fi
# Non-admin user trying admin
if [ -n "$TOKEN" ]; then
test_case "Admin: sessions without admin" "403" "GET" "$BASE/admin/sessions" "" "$TOKEN"
else
FAIL=$((FAIL + 1))
fi
# ============================================================
# PART 14: Invalid token
# ============================================================
echo "--- Part 14: Invalid Token ---"
test_case "Sessions: invalid token" "401" "GET" "$BASE/sessions" "" "invalidtoken12345"
test_case "Sessions: expired/malformed" "401" "GET" "$BASE/sessions" "" "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIn0.dozjgNryP4J3jVmNHl0w5N_XgL0n3I9PlFUP0THsR8k"
# ============================================================
# PART 15: Token refresh
# ============================================================
echo "--- Part 15: Token Refresh ---"
test_case "Refresh: no auth" "401" "POST" "$BASE/auth/refresh"
test_case "Refresh: invalid token" "401" "POST" "$BASE/auth/refresh" "" "invalidtoken"
# ============================================================
# Summary
# ============================================================
echo ""
echo "============================================"
echo " TEST SUMMARY"
echo "============================================"
TOTAL=$((PASS + FAIL))
echo "Total: $TOTAL | Passed: $PASS | Failed: $FAIL"
echo ""
echo -e "$RESULTS" | column -t -s '|'
echo ""
echo "============================================"
echo " Test run completed at $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================"
# Save the token and user info for reference
if [ -n "$TOKEN" ]; then
echo "TOKEN=$TOKEN" > /tmp/round11_testenv
echo "USER_ID=$USER_ID" >> /tmp/round11_testenv
echo "ADMIN_TOKEN=$ADMIN_TOKEN" >> /tmp/round11_testenv
echo "TEST_USER=$UN" >> /tmp/round11_testenv
fi
+54
View File
@@ -0,0 +1,54 @@
#!/usr/bin/env python3
"""Quick CDP check: read root innerHTML"""
import json
import urllib.request
from websocket import create_connection
req = urllib.request.Request('http://127.0.0.1:9225/json/list')
resp = json.loads(urllib.request.urlopen(req, timeout=5))
ws_url = None
for p in resp:
if 'localhost:5199' in p.get('url', ''):
ws_url = p['webSocketDebuggerUrl']
break
if not ws_url:
print("No page found!")
exit(1)
ws = create_connection(ws_url, timeout=10)
def cdp(method, params, msg_id):
payload = json.dumps({"id": msg_id, "method": method, "params": params})
ws.send(payload)
def recv_id(tid, timeout=3):
ws.settimeout(timeout)
while True:
try:
m = json.loads(ws.recv())
if m.get("id") == tid:
return m.get("result", {})
except Exception:
return None
cdp("Runtime.enable", {}, 1)
recv_id(1)
# Get root innerHTML
cdp("Runtime.evaluate", {"expression": "document.getElementById('root').innerHTML.substring(0, 1000)", "returnByValue": True}, 10)
r = recv_id(10)
if r:
html = r.get("result", {}).get("value", "null")
print("=== Root innerHTML (前1000字符) ===")
print(html)
else:
print("No result")
# Root child count
cdp("Runtime.evaluate", {"expression": "document.getElementById('root').children.length", "returnByValue": True}, 11)
r = recv_id(11)
val = r.get("result", {}).get("value", "?") if r else "?"
print("\nRoot children:", val)
ws.close()
+212
View File
@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""CDP 前端页面验证:截图 + 控制台错误 + 登录测试"""
import json, time, base64, os
from websocket import create_connection
PAGE_URL = "http://localhost:5199/"
CDP_WS = "ws://127.0.0.1:9225/devtools/browser/b2fca0da-35d6-4180-8413-eddf53753c6a"
def send_cmd(ws, method, params=None, msg_id=1):
payload = json.dumps({"id": msg_id, "method": method, "params": params or {}})
ws.send(payload)
def recv_until(ws, timeout=5):
ws.settimeout(timeout)
results = []
try:
while True:
data = ws.recv()
results.append(data)
except:
pass
return results
def find_event(msgs, method):
for m in msgs:
try:
d = json.loads(m)
if d.get("method") == method:
return d
except:
pass
return None
def find_result(msgs, msg_id):
for m in msgs:
try:
d = json.loads(m)
if d.get("id") == msg_id:
return d
except:
pass
return None
print("=== CDP 前端页面验证 ===")
# 1. 连接到浏览器
ws = create_connection(CDP_WS, timeout=10)
print(f" Connected to CDP")
# 2. 获取已有页面的 targetId (Navigated page)
pages_resp = os.popen('curl -s http://127.0.0.1:9225/json').read()
pages = json.loads(pages_resp)
target_id = None
for p in pages:
if p.get("url","").startswith("http://localhost:5199"):
target_id = p["id"]
print(f" Found page: {p['title'][:80]} id={target_id}")
break
if not target_id:
print(" No existing page found, creating new one...")
send_cmd(ws, "Target.createTarget", {"url": PAGE_URL})
results = recv_until(ws, 3)
print(f" Target.createTarget results: {results}")
# 3. 连接到页面 target (通过 Target.attachToTarget)
print(f"\n Attaching to target {target_id}...")
send_cmd(ws, "Target.attachToTarget", {"targetId": target_id, "flatten": True}, 1)
results = recv_until(ws, 3)
result = find_result(results, 1)
if result:
session_id = result.get("result",{}).get("sessionId","")
print(f" Attached, sessionId={session_id[:20]}...")
else:
print(f" Attach failed: {results}")
ws.close()
exit(1)
# 4. 启用 Runtime 和 Console
print("\n--- Enabling domains ---")
for method, sid, mid in [("Runtime.enable", session_id, 2), ("Page.enable", session_id, 3), ("Log.enable", session_id, 4)]:
payload = json.dumps({"id": mid, "method": method, "params": {}, "sessionId": session_id})
ws.send(payload)
results = recv_until(ws, 3)
print(f" Enable results: {len(results)} messages")
# 5. 截图
print("\n--- Taking screenshot ---")
send_cmd(ws, "Page.captureScreenshot", {"format": "png"}, 10)
# send with sessionId
payload = json.dumps({"id": 5, "method": "Page.captureScreenshot", "params": {"format": "png"}, "sessionId": session_id})
ws.send(payload)
results = recv_until(ws, 5)
screen_result = find_result(results, 5)
if screen_result:
img_data = screen_result.get("result",{}).get("data","")
if img_data:
img_bytes = base64.b64decode(img_data)
img_path = "/tmp/cyrene_screenshot_round12.png"
with open(img_path, "wb") as f:
f.write(img_bytes)
print(f" Screenshot saved: {img_path} ({len(img_bytes)} bytes)")
else:
print(f" No image data: {str(screen_result)[:200]}")
else:
print(f" Screenshot failed: {results}")
# 6. 获取控制台日志
print("\n--- Console logs ---")
ws.settimeout(2)
console_logs = []
for i in range(15): # 最多等15秒
try:
data = ws.recv()
try:
d = json.loads(data)
if d.get("method") in ("Runtime.consoleAPICalled", "Log.entryAdded"):
entry = d.get("params",{}).get("entry",{}) if d["method"]=="Log.entryAdded" else d.get("params",{})
level = entry.get("level","log") if d["method"]=="Log.entryAdded" else d["params"].get("type","log")
text = entry.get("text","") if d["method"]=="Log.entryAdded" else " ".join([a.get("value","") for a in d["params"].get("args",[])])
url = entry.get("url","") if d["method"]=="Log.entryAdded" else ""
console_logs.append(f"[{level}] {text[:200]} {url}")
except:
pass
except:
break
if console_logs:
for l in console_logs:
print(f" {l}")
else:
print(" No console messages captured. Let me poll...")
# 7. 使用 Runtime.evaluate 获取页面状态
print("\n--- Evaluating page state ---")
checks = [
('document.title', "document.title"),
('document.readyState', "document.readyState"),
('Root element exists', "document.getElementById('root') ? 'yes' : 'no'"),
('Body children count', "document.body ? document.body.children.length : -1"),
('Has login form?', "document.querySelector('form') ? 'yes' : 'no'"),
('Has error boundary?', "document.querySelector('[class*=error]') ? 'yes' : 'no'"),
('Window errors', "window.__LAST_ERROR__ || 'none'"),
]
for label, expr in checks:
payload = json.dumps({
"id": 100 + len(console_logs),
"method": "Runtime.evaluate",
"params": {"expression": expr, "returnByValue": True},
"sessionId": session_id
})
ws.send(payload)
results = recv_until(ws, 2)
r = find_result(results, 100 + len(console_logs))
val = r.get("result",{}).get("result",{}).get("value","?") if r else "?"
print(f" {label}: {val}")
# 8. 测试登录流程 (登录到 localhost:5199)
print("\n--- Testing login flow ---")
TOKEN = open("/tmp/cyrene_test_token.txt").read().strip()
login_js = f"""
(async function() {{
try {{
const resp = await fetch('http://localhost:8080/api/v1/auth/login', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify({{username:'yeij0942',password:'Jiang1143218570'}})
}});
const data = await resp.json();
return JSON.stringify({{status: resp.status, hasToken: !!data.token, userId: data.user_id}});
}} catch(e) {{
return 'Error: ' + e.message;
}}
}})()
"""
payload = json.dumps({
"id": 200,
"method": "Runtime.evaluate",
"params": {"expression": login_js, "returnByValue": True, "awaitPromise": True},
"sessionId": session_id
})
ws.send(payload)
results = recv_until(ws, 5)
r = find_result(results, 200)
if r:
val = r.get("result",{}).get("result",{}).get("value","?")
print(f" Login test result: {val}")
else:
print(f" Login test failed: {results}")
# 9. 页面上 JavaScript 错误检测
print("\n--- Checking for JS errors via Runtime.exceptionThrown ---")
ws.settimeout(1)
error_msgs = []
for i in range(5):
try:
data = ws.recv()
d = json.loads(data)
if d.get("method") == "Runtime.exceptionThrown":
exc = d.get("params",{}).get("exceptionDetails",{})
error_msgs.append(f" ERROR: {exc.get('text','')} at {exc.get('url','')}:{exc.get('lineNumber','')}")
except:
break
if error_msgs:
for e in error_msgs:
print(e)
else:
print(" No JS exceptions detected")
ws.close()
print("\n[DONE]")
+134
View File
@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""CDP 前端验证:截图 + 控制台检查 + 页面状态"""
import json, time, base64, urllib.request, os
# Step 1: 导航到前端页面 (PUT /json/new)
print("=== CDP 前端验证 ===")
print("Navigating to page...")
req = urllib.request.Request("http://127.0.0.1:9225/json/new?url=http://localhost:5199/", method="PUT")
try:
resp = urllib.request.urlopen(req, timeout=10)
page_data = json.loads(resp.read())
page_id = page_data.get("id","")
ws_url = page_data.get("webSocketDebuggerUrl","")
print(f" Page: id={page_id} title={page_data.get('title','')[:80]}")
print(f" WS_URL: {ws_url[:80]}")
except Exception as e:
print(f" Navigation failed: {e}")
import sys; sys.exit(1)
# Step 2: 连接到页面 WebSocket
from websocket import create_connection
ws = create_connection(ws_url, timeout=10)
print(" Connected to page WebSocket")
def cdp(method, params=None, msg_id=1):
payload = json.dumps({"id": msg_id, "method": method, "params": params or {}})
ws.send(payload)
def recv_msgs(timeout=3):
ws.settimeout(timeout)
msgs = []
try:
while True:
msgs.append(ws.recv())
except:
pass
return msgs
def find_result(msgs, msg_id):
for m in msgs:
try:
d = json.loads(m)
if d.get("id") == msg_id:
return d.get("result",{})
except:
pass
return None
# Step 3: 启用 domains
print("\nEnabling domains...")
cdp("Runtime.enable", {}, 1)
cdp("Page.enable", {}, 2)
cdp("Log.enable", {}, 3)
recv_msgs(2)
print(" Domains enabled")
# Step 4: 等待页面完全加载后截图
print("\nWaiting for page to load...")
time.sleep(3)
cdp("Page.captureScreenshot", {"format": "png"}, 10)
msgs = recv_msgs(5)
r = find_result(msgs, 10)
if r and r.get("data"):
img = base64.b64decode(r["data"])
with open("/tmp/cyrene_screenshot_round12.png", "wb") as f:
f.write(img)
print(f" Screenshot saved: {len(img)} bytes")
else:
print(f" Screenshot failed: {str(r)[:200]}")
# Step 5: 获取控制台日志
print("\nConsole messages (Log.entryAdded):")
ws.settimeout(2)
logs = []
for i in range(15):
try:
data = ws.recv()
d = json.loads(data)
if d.get("method") == "Log.entryAdded":
entry = d["params"]["entry"]
level = entry.get("level","log")
text = entry.get("text","")
url = entry.get("url","")
logs.append(f" [{level}] {text[:200]} ({url})")
elif d.get("method") == "Runtime.exceptionThrown":
exc = d["params"].get("exceptionDetails",{})
logs.append(f" [EXCEPTION] {exc.get('text','')}")
except:
break
if logs:
for l in logs:
print(l)
else:
print(" (none)")
# Step 6: 页面状态检查
print("\nPage state:")
checks = [
("title", "document.title"),
("readyState", "document.readyState"),
("root exists?", "document.getElementById('root') ? 'yes' : 'no'"),
("body children", "document.body ? document.body.children.length : -1"),
("login form?", "document.querySelector('form') ? 'yes' : 'no'"),
("all text content (first 300)", "document.body ? (document.body.innerText || '').substring(0, 300) : 'no body'"),
]
for idx, (label, expr) in enumerate(checks):
cdp("Runtime.evaluate", {"expression": expr, "returnByValue": True}, 100 + idx)
msgs = recv_msgs(2)
r = find_result(msgs, 100 + idx)
val = r.get("result",{}).get("value","?") if r else "?"
print(f" {label}: {val}")
# Step 7: 前端 JS 错误检测
print("\nJS Errors (Runtime.exceptionThrown):")
ws.settimeout(1)
errors = []
for i in range(5):
try:
d = json.loads(ws.recv())
if d.get("method") == "Runtime.exceptionThrown":
exc = d["params"]["exceptionDetails"]
errors.append(f" {exc.get('text','')} at {exc.get('url','')}:{exc.get('lineNumber','')}")
except:
break
if errors:
for e in errors:
print(e)
else:
print(" No JS exceptions detected")
ws.close()
print("\n[DONE]")
+198
View File
@@ -0,0 +1,198 @@
#!/usr/bin/env python3
"""CDP v3: 深度诊断前端白屏问题 — 检查 DOM、网络、JS 模块加载"""
import json, time, base64, urllib.request, os
# Step 1: 打开新页面
print("=== Step 1: 导航到前端页面 ===")
req = urllib.request.Request("http://127.0.0.1:9225/json/new?url=http://localhost:5199/", method="PUT")
resp = urllib.request.urlopen(req, timeout=10)
page_data = json.loads(resp.read())
page_id = page_data.get("id","")
ws_url = page_data.get("webSocketDebuggerUrl","")
print(f" Page ID: {page_id}")
print(f" WS URL: {ws_url[:80]}")
from websocket import create_connection
ws = create_connection(ws_url, timeout=10)
def cdp(method, params=None, msg_id=1):
ws.send(json.dumps({"id": msg_id, "method": method, "params": params or {}}))
def recv_all(timeout=3):
ws.settimeout(timeout)
msgs = []
try:
while True:
msgs.append(ws.recv())
except:
pass
return msgs
def find_result(msgs, msg_id):
for m in msgs:
try:
d = json.loads(m)
if d.get("id") == msg_id:
return d.get("result",{})
except:
pass
return None
# Step 2: 启用所有 domains
print("\n=== Step 2: 启用所有 domains ===")
cdp("Page.enable", {}, 1)
cdp("Network.enable", {}, 2)
cdp("Runtime.enable", {}, 3)
cdp("Log.enable", {}, 4)
cdp("DOM.enable", {}, 5)
recv_all(1)
# Step 3: 重新 navigate 以捕获网络事件
print("\n=== Step 3: 重新导航 ===")
cdp("Page.navigate", {"url": "http://localhost:5199/"}, 10)
time.sleep(3)
# 收集在此期间的所有消息
print(" 收集网络/运行时事件...")
all_msgs = recv_all(5)
# 分析网络事件
print("\n=== Step 4: 网络请求分析 ===")
requests = {}
for m in all_msgs:
try:
d = json.loads(m)
except:
continue
method = d.get("method","")
params = d.get("params",{})
if method == "Network.requestWillBeSent":
req_info = params.get("request", {})
url = req_info.get("url","")
req_id = params.get("requestId","")
rtype = params.get("type","")
requests[req_id] = {"url": url, "type": rtype, "status": "pending"}
elif method == "Network.responseReceived":
req_id = params.get("requestId","")
resp = params.get("response",{})
status = resp.get("status",0)
mime = resp.get("mimeType","")
if req_id in requests:
requests[req_id]["status"] = status
requests[req_id]["mime"] = mime
elif method == "Network.loadingFailed":
req_id = params.get("requestId","")
err = params.get("errorText","")
if req_id in requests:
requests[req_id]["status"] = "FAILED"
requests[req_id]["error"] = err
elif method == "Runtime.exceptionThrown":
exc = params.get("exceptionDetails",{})
print(f" [EXCEPTION] {exc.get('text','')} at {exc.get('url','')}:{exc.get('lineNumber','')}")
elif method == "Log.entryAdded":
entry = params.get("entry",{})
print(f" [CONSOLE:{entry.get('level','log')}] {entry.get('text','')[:200]}")
elif method == "Runtime.consoleAPICalled":
args = params.get("args",[])
msg_type = params.get("type","log")
texts = []
for a in args:
t = a.get("value","") or a.get("description","")
texts.append(str(t)[:150])
print(f" [CONSOLE:{msg_type}] {' '.join(texts)}")
print(f"\n Total network requests: {len(requests)}")
for rid, info in requests.items():
status_str = str(info.get("status","?"))
type_str = info.get("type","?")
url_short = info.get("url","")[-80:]
error_str = f" ERROR={info.get('error','')}" if info.get("error") else ""
print(f" [{type_str}] {status_str} {url_short}{error_str}")
# Step 5: 运行时评估 - 深度检查
print("\n=== Step 5: 运行时深度评估 ===")
checks = [
("window.location.href", "window.location.href"),
("document.readyState", "document.readyState"),
("document.title", "document.title"),
("root element", "document.getElementById('root') ? 'EXISTS' : 'NULL'"),
("body innerHTML length", "document.body ? document.body.innerHTML.length : -1"),
("body children count", "document.body ? document.body.children.length : -1"),
("head children count", "document.head ? document.head.children.length : -1"),
("all scripts", "Array.from(document.querySelectorAll('script')).map(s => ({src:s.src,type:s.type,async:s.async,defer:s.defer})).slice(0,5)"),
("React DOM check", "typeof React !== 'undefined' ? 'React global found' : (document.querySelector('#root') ? 'root exists but no React global' : 'root missing')"),
("SW registration check", "'serviceWorker' in navigator ? 'SW API available' : 'NO SW API'"),
("localStorage token", "localStorage.getItem('token') || 'no token'"),
]
for idx, (label, expr) in enumerate(checks):
msg_id = 100 + idx
cdp("Runtime.evaluate", {"expression": expr, "returnByValue": True}, msg_id)
recv_msgs = recv_all(2)
r = find_result(recv_msgs, msg_id)
if r:
val = r.get("result",{}).get("value","?")
# 处理 object 类型
if isinstance(val, dict) and "objectId" in r.get("result",{}):
val = r["result"].get("description","[object]")
err = r.get("result",{}).get("description","") or ""
print(f" {label}: {val}")
if r.get("result",{}).get("type") == "object":
# Try to get properties
obj_id = r["result"].get("objectId","")
if obj_id:
cdp("Runtime.getProperties", {"objectId": obj_id, "ownProperties": True}, 900 + idx)
prop_msgs = recv_all(2)
prop_r = find_result(prop_msgs, 900 + idx)
if prop_r:
for prop in prop_r.get("result",[]):
v = prop.get("value",{})
print(f" .{prop.get('name','?')} = {v.get('value', v.get('description','?'))}")
else:
print(f" {label}: NO RESULT")
# Step 6: 截图
print("\n=== Step 6: 截图 ===")
cdp("Page.captureScreenshot", {"format": "png", "captureBeyondViewport": True}, 20)
recv_msgs = recv_all(5)
r = find_result(recv_msgs, 20)
if r and r.get("data"):
img = base64.b64decode(r["data"])
fp = "/tmp/cyrene_screenshot_round12_v3.png"
with open(fp, "wb") as f:
f.write(img)
print(f" Screenshot: {len(img)} bytes -> {fp}")
else:
print(f" Screenshot failed: {str(r)[:200]}")
# Step 7: 检查是否有 JS 异常
print("\n=== Step 7: 最终 JS 异常检查 ===")
ws.settimeout(1)
has_exception = False
for i in range(5):
try:
d = json.loads(ws.recv())
if d.get("method") == "Runtime.exceptionThrown":
exc = d["params"]["exceptionDetails"]
print(f" [EXCEPTION] text={exc.get('text','')}")
print(f" url={exc.get('url','')}")
print(f" line={exc.get('lineNumber','')} col={exc.get('columnNumber','')}")
stack = exc.get("stackTrace",{})
for frame in stack.get("callFrames",[]):
print(f" at {frame.get('functionName','')} {frame.get('url','')}:{frame.get('lineNumber','')}")
has_exception = True
except:
break
if not has_exception:
print(" No exceptions caught")
ws.close()
print("\n[DONE v3]")
+164
View File
@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""Round 12: 聊天 E2E + 子服务连通性测试"""
import json, urllib.request, urllib.error, sys
BASE = "http://localhost:8080"
TOKEN_FILE = "/tmp/cyrene_test_token.txt"
SID_FILE = "/tmp/cyrene_test_sid.txt"
def req(method, path, body=None, token=None, timeout=10):
url = f"{BASE}{path}"
data = json.dumps(body).encode() if body else None
r = urllib.request.Request(url, data=data, method=method)
r.add_header("Content-Type", "application/json")
if token:
r.add_header("Authorization", f"Bearer {token}")
try:
resp = urllib.request.urlopen(r, timeout=timeout)
return resp.status, resp.read().decode()
except urllib.error.HTTPError as e:
return e.code, e.read().decode()
except Exception as e:
return 0, str(e)
# ====== Step 1: Admin Login ======
print("=== Step 1: Admin Login ===")
status, body = req("POST", "/api/v1/auth/login",
{"username":"yeij0942","password":"Jiang1143218570"})
print(f" status={status}")
d = json.loads(body)
token = d.get("token", "")
print(f" user_id={d.get('user_id')} has_token={'token' in d} token_len={len(token)}")
# ====== Step 2: 所有服务健康检查 ======
print("\n=== Step 2: 子服务健康检查 ===")
services = [
("ai-core (8081)", "http://localhost:8081/api/v1/health"),
("memory-service (8091)", "http://localhost:8091/api/v1/health"),
("tool-engine (8092)", "http://localhost:8092/api/v1/health"),
("voice-service (8093)", "http://localhost:8093/api/v1/health"),
("gateway (8080)", "http://localhost:8080/api/v1/health"),
]
for name, url in services:
r = urllib.request.Request(url)
try:
resp = urllib.request.urlopen(r, timeout=5)
body = resp.read().decode()
print(f" [{resp.status}] {name}: {body[:120]}")
except Exception as e:
print(f" [FAIL] {name}: {e}")
# ====== Step 3: 获取/创建 Session ======
print("\n=== Step 3: Session 管理 ===")
status, body = req("GET", "/api/v1/sessions", token=token)
print(f" GET /sessions status={status}")
try:
sessions = json.loads(body)
if isinstance(sessions, list) and len(sessions) > 0:
sid = sessions[0].get("id", "")
print(f" existing session: {sid}")
else:
# 创建
status, body = req("POST", "/api/v1/sessions", {"user_id": "admin"}, token=token)
print(f" POST /sessions status={status} body={body[:200]}")
sid = json.loads(body).get("id", "")
print(f" new session: {sid}")
except Exception as e:
print(f" parse error: {e}")
sid = ""
# ====== Step 4: 聊天消息发送 (测试 ai-core 转发) ======
print(f"\n=== Step 4: 聊天消息发送 (session={sid}) ===")
if sid and token:
msg_body = {
"session_id": sid,
"message": "你好,请简单介绍一下你自己",
"mode": "text"
}
status, body = req("POST", "/api/v1/chat", msg_body, token=token)
print(f" POST /api/v1/chat status={status}")
if status == 404:
print(f" WARNING: /api/v1/chat 端点返回 404 - 端点可能未注册!")
# 检查 gateway 日志
import subprocess
result = subprocess.run(["grep", "-i", "chat", "/tmp/gateway.log"], capture_output=True, text=True)
print(f" gateway log (chat): {result.stdout[:500]}")
else:
print(f" response: {body[:500]}")
else:
print(" SKIPPED: no session or token")
# ====== Step 5: 直接调用 ai-core chat ======
print("\n=== Step 5: 直接调用 ai-core /api/v1/chat ===")
ai_body = {
"user_id": "admin",
"session_id": sid or "test_session_001",
"message": "Hello",
"mode": "text"
}
r = urllib.request.Request("http://localhost:8081/api/v1/chat",
data=json.dumps(ai_body).encode(),
method="POST")
r.add_header("Content-Type", "application/json")
r.add_header("Accept", "text/event-stream")
try:
resp = urllib.request.urlopen(r, timeout=30)
print(f" ai-core status={resp.status}")
# 读取前5行 SSE
lines = []
for i in range(5):
line = resp.readline().decode().strip()
lines.append(line)
print(f" first 5 SSE lines: {lines}")
except Exception as e:
print(f" ai-core FAIL: {e}")
# ====== Step 6: memory-service 直调测试 ======
print("\n=== Step 6: memory-service 直调测试 ===")
r = urllib.request.Request("http://localhost:8091/api/v1/memory/search",
data=json.dumps({"query": "test", "user_id": "admin", "limit": 5}).encode(),
method="POST")
r.add_header("Content-Type", "application/json")
try:
resp = urllib.request.urlopen(r, timeout=5)
print(f" memory/search status={resp.status} body={resp.read().decode()[:200]}")
except Exception as e:
print(f" memory/search FAIL: {e}")
# ====== Step 7: tool-engine 直调测试 ======
print("\n=== Step 7: tool-engine 直调测试 ===")
r = urllib.request.Request("http://localhost:8092/api/v1/tools",
data=json.dumps({"action": "list"}).encode(),
method="POST")
r.add_header("Content-Type", "application/json")
try:
resp = urllib.request.urlopen(r, timeout=5)
print(f" tools/list status={resp.status} body={resp.read().decode()[:200]}")
except Exception as e:
print(f" tools/list FAIL: {e}")
# ====== Step 8: Gateway 路由检查 ======
print("\n=== Step 8: Gateway 路由检查 (哪些chat端点注册了) ===")
for path in ["/api/v1/chat", "/api/v1/chat/stream", "/api/v1/chat/send"]:
status, _ = req("POST", path, {"message": "test"}, token=token)
print(f" POST {path}: {status}")
# ====== Step 9: WebSocket 端点检查 ======
print("\n=== Step 9: WebSocket 端点基本测试 ===")
import socket
# 先检查 /ws/chat 是否可访问 (HTTP层面)
r = urllib.request.Request("http://localhost:8080/ws/chat")
try:
resp = urllib.request.urlopen(r, timeout=3)
print(f" GET /ws/chat (no upgrade): {resp.status}")
except urllib.error.HTTPError as e:
print(f" GET /ws/chat HTTP status: {e.code} body={e.read().decode()[:100]}")
except Exception as e:
print(f" GET /ws/chat: {e}")
# 保存 token 和 sid
with open(TOKEN_FILE, "w") as f:
f.write(token)
with open(SID_FILE, "w") as f:
f.write(sid or "")
print(f"\n[DONE] Token saved to {TOKEN_FILE}, sid={sid}")
+267
View File
@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""第16轮诊断:IoT&A 自动化规则引擎 + 控件CDP 测试脚本"""
import json, urllib.request, urllib.error, sys, time, subprocess
GATEWAY = "http://localhost:8080"
CDP = "http://127.0.0.1:9225"
def req(method, path, body=None, headers=None):
url = f"{GATEWAY}{path}"
if headers is None:
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
else:
data = None
try:
r = urllib.request.Request(url, data=data, headers=headers, method=method)
resp = urllib.request.urlopen(r, timeout=10)
return json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode()
try:
return json.loads(body)
except:
return {"error": body, "status": e.code}
except Exception as e:
return {"error": str(e)}
def get_token():
result = req("POST", "/api/v1/auth/login", {"username": "yeij0942", "password": "Jiang1143218570"})
return result.get("token", "")
print("=" * 60)
print("1. JWT Token 获取")
print("=" * 60)
token = get_token()
if token:
print(f"✅ Token 获取成功: {token[:40]}...")
else:
print("❌ Token 获取失败")
sys.exit(1)
auth_headers = {"Authorization": f"Bearer {token}"}
print("\n" + "=" * 60)
print("2. Automation 规则引擎 API CRUD 测试")
print("=" * 60)
# 2a. GET Rules (should be empty)
print("\n2a. GET /api/v1/automation/rules")
result = req("GET", "/api/v1/automation/rules", headers=auth_headers)
count = result.get("count", -1)
print(f" count={count} {'' if count is not None else '⚠️'}")
# 2b. POST Create Rule
print("\n2b. POST /api/v1/automation/rules (创建规则)")
create_body = {
"name": "测试规则-关灯",
"description": "每天晚上22点关闭客厅灯",
"trigger_type": "schedule",
"trigger_config": {"time": "22:00", "days": ["mon","tue","wed","thu","fri"]},
"actions": [{"type": "set_device", "device_id": "light-livingroom", "property": "status", "value": "off"}]
}
result = req("POST", "/api/v1/automation/rules", body=create_body, headers=auth_headers)
rule_id = result.get("rule", {}).get("id", "")
print(f" success={result.get('success')} rule_id={rule_id[:16] if rule_id else 'N/A'}...")
if not result.get("success"):
print(f" ERROR: {json.dumps(result, indent=2, ensure_ascii=False)[:500]}")
# 2c. GET Rules after create
if rule_id:
print("\n2c. GET /api/v1/automation/rules")
result = req("GET", "/api/v1/automation/rules", headers=auth_headers)
name0 = result.get("rules", [{}])[0].get("name", "N/A") if result.get("rules") else "N/A"
print(f" count={result.get('count')} first_name={name0}")
# 2d. GET single
print(f"\n2d. GET /api/v1/automation/rules/{rule_id}")
result = req("GET", f"/api/v1/automation/rules/{rule_id}", headers=auth_headers)
r = result.get("rule", {})
print(f" name={r.get('name')} trigger_type={r.get('trigger_type')} enabled={r.get('enabled')}")
# 2e. PUT Update
print(f"\n2e. PUT /api/v1/automation/rules/{rule_id}")
result = req("PUT", f"/api/v1/automation/rules/{rule_id}", body={"name": "测试规则-已更新", "enabled": False}, headers=auth_headers)
r = result.get("rule", {})
print(f" success={result.get('success')} name={r.get('name')} enabled={r.get('enabled')}")
# 2f. DELETE
print(f"\n2f. DELETE /api/v1/automation/rules/{rule_id}")
result = req("DELETE", f"/api/v1/automation/rules/{rule_id}", headers=auth_headers)
print(f" success={result.get('success')}")
# 2g. Verify deleted
print("\n2g. GET /api/v1/automation/rules (验证删除)")
result = req("GET", "/api/v1/automation/rules", headers=auth_headers)
print(f" count={result.get('count')} {'' if result.get('count') == 0 else '⚠️'}")
# 2h. Unauthenticated
print("\n2h. GET /api/v1/automation/rules (未认证)")
result = req("GET", "/api/v1/automation/rules")
print(f" error={result.get('error','N/A')[:60]}")
# 2i. Manual trigger
print("\n2i. POST /api/v1/automation/rules/:id/trigger (手动触发)")
cbody = {"name": "触发测试", "trigger_type": "manual", "actions": [{"type": "notify", "title": "测试", "body": "通知"}]}
result = req("POST", "/api/v1/automation/rules", body=cbody, headers=auth_headers)
trid = result.get("rule", {}).get("id", "")
if trid:
result = req("POST", f"/api/v1/automation/rules/{trid}/trigger", headers=auth_headers)
print(f" success={result.get('success')} msg={result.get('message','')}")
req("DELETE", f"/api/v1/automation/rules/{trid}", headers=auth_headers)
else:
print(" ❌ 创建失败")
# 2j. Scenes
print("\n2j. GET /api/v1/automation/scenes")
result = req("GET", "/api/v1/automation/scenes", headers=auth_headers)
print(f" count={result.get('count')}")
print("\n" + "=" * 60)
print("3. IoT 调试服务测试")
print("=" * 60)
res = json.loads(urllib.request.urlopen("http://localhost:8083/api/v1/devices").read())
print(f"3a. 设备总数: {res.get('total', 0)}")
for d in res.get("devices", []):
print(f" {d['name']} ({d['type']}): {d.get('status','')}")
print("\n3b. Toggle light-bedroom")
r = urllib.request.Request("http://localhost:8083/api/v1/devices/light-bedroom/toggle", method="POST")
result = json.loads(urllib.request.urlopen(r).read())
dev = result.get("device", {})
print(f" action={result.get('action')} {dev.get('name')} status={dev.get('status')}")
print("\n3c. Set temperature (ac-livingroom)")
r = urllib.request.Request("http://localhost:8083/api/v1/devices/ac-livingroom/set",
data=json.dumps({"field": "temperature", "value": 24}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
result = json.loads(urllib.request.urlopen(r).read())
dev = result.get("device", {})
print(f" {dev.get('name')} temperature={dev.get('temperature')}°C")
print("\n3d. History")
result = json.loads(urllib.request.urlopen("http://localhost:8083/api/v1/devices/light-bedroom/history").read())
print(f" device_id={result.get('device_id')} history={len(result.get('history',[]))} entries")
print("\n" + "=" * 60)
print("4. CDP 前端 IoT 控件验证")
print("=" * 60)
pages = json.loads(urllib.request.urlopen(f"{CDP}/json").read())
target = None
for p in pages:
if "localhost:5199" in p.get("url", ""):
target = p
break
if target:
ws_url = target.get("webSocketDebuggerUrl", "")
print(f"4a. 目标页面: {target.get('title','')[:60]}")
print(f"4b. WebSocket: {ws_url[:80]}...")
# Use websockets to execute JS
print("\n4c. CDP Runtime.evaluate 检查 IoTStatusBar")
try:
import asyncio
try:
import websockets
except ImportError:
print(" ⚠️ websockets not installed, trying pip install...")
subprocess.run([sys.executable, "-m", "pip", "install", "websockets", "-q"], timeout=30)
import websockets
async def cdp_eval():
async with websockets.connect(ws_url, max_size=2**24) as ws:
# Runtime.enable
await ws.send(json.dumps({"id": 1, "method": "Runtime.enable"}))
await asyncio.wait_for(ws.recv(), timeout=5)
# Evaluate JS
js_code = """
(function() {
var r = {};
var allDivs = document.querySelectorAll('div');
r.totalDivs = allDivs.length;
r.iotTexts = [];
for (var i = 0; i < allDivs.length; i++) {
var t = allDivs[i].textContent || '';
if (t.indexOf('IoT') !== -1 || t.indexOf('iot') !== -1) {
r.iotTexts.push(t.substring(0, 100));
if (r.iotTexts.length >= 10) break;
}
}
r.hasRoot = !!document.getElementById('root');
r.title = document.title;
r.bodyText = (document.body ? document.body.innerText : '').substring(0, 300);
return JSON.stringify(r);
})()
"""
await ws.send(json.dumps({"id": 2, "method": "Runtime.evaluate",
"params": {"expression": js_code, "returnByValue": True}}))
resp = await asyncio.wait_for(ws.recv(), timeout=10)
data = json.loads(resp)
result_val = data.get("result", {}).get("result", {}).get("value", "N/A")
return result_val
result = asyncio.new_event_loop().run_until_complete(cdp_eval())
parsed = json.loads(result) if result and result != "N/A" else {}
print(f" title={parsed.get('title','')}")
print(f" hasRoot={parsed.get('hasRoot')} totalDivs={parsed.get('totalDivs')}")
print(f" iotTexts={parsed.get('iotTexts',[])}")
print(f" bodyText(first 300): {parsed.get('bodyText','')}")
except Exception as e:
print(f" ❌ CDP 错误: {e}")
else:
print("4a. ❌ 未找到 localhost:5199 页面")
for p in pages:
print(f" {p.get('url','?')[:100]}")
print("\n" + "=" * 60)
print("5. tool-engine IoT 工具执行测试")
print("=" * 60)
# 5a. iot_query
print("\n5a. tool-engine iot_query")
r = urllib.request.Request("http://localhost:8092/api/v1/tools/execute",
data=json.dumps({"tool": "iot_query", "arguments": {}}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
result = json.loads(urllib.request.urlopen(r, timeout=10).read())
print(f" output: {result.get('output','')[:200]}")
if result.get('error'):
print(f" error: {result['error'][:200]}")
except Exception as e:
print(f"{e}")
# 5b. iot_control toggle
print("\n5b. tool-engine iot_control (toggle ac-livingroom)")
r = urllib.request.Request("http://localhost:8092/api/v1/tools/execute",
data=json.dumps({"tool": "iot_control", "arguments": {"device_id": "ac-livingroom", "action": "toggle"}}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
result = json.loads(urllib.request.urlopen(r, timeout=10).read())
print(f" output: {result.get('output','')[:200]}")
if result.get('error'):
print(f" error: {result['error'][:200]}")
except Exception as e:
print(f"{e}")
# 5c. iot_control set_temperature
print("\n5c. tool-engine iot_control (set_temperature 28)")
r = urllib.request.Request("http://localhost:8092/api/v1/tools/execute",
data=json.dumps({"tool": "iot_control", "arguments": {"device_id": "ac-livingroom", "action": "set_temperature", "value": 28}}).encode(),
headers={"Content-Type": "application/json"}, method="POST")
try:
result = json.loads(urllib.request.urlopen(r, timeout=10).read())
print(f" output: {result.get('output','')[:200]}")
if result.get('error'):
print(f" error: {result['error'][:200]}")
except Exception as e:
print(f"{e}")
print("\n" + "=" * 60)
print("诊断测试完成")
print("=" * 60)
+437
View File
@@ -0,0 +1,437 @@
#!/usr/bin/env python3
"""
安全审计测试脚本 - 第13轮
测试 JWT 安全、认证端点安全、输入验证、授权越权、敏感信息泄露
"""
import subprocess
import json
import sys
import time
import base64
BASE = "http://localhost:8080"
HEADERS_JSON = {"Content-Type": "application/json"}
def curl(method, path, headers=None, data=None, expected_status=None, timeout=10):
"""执行 curl 请求并返回 (status_code, response_body, response_headers)"""
cmd = ["curl", "-s", "-w", "\n%{http_code}", "-X", method, f"{BASE}{path}", "--max-time", str(timeout)]
if headers:
for k, v in headers.items():
cmd += ["-H", f"{k}: {v}"]
if data:
cmd += ["-d", data]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout+5)
output = result.stdout.strip()
# 分离 body 和 status code
if "\n" in output:
lines = output.rsplit("\n", 1)
body = lines[0]
status = lines[1]
else:
body = ""
status = output
try:
status = int(status)
except ValueError:
status = 0
# 单独获取响应头
hdr_cmd = ["curl", "-s", "-I", "-X", method, f"{BASE}{path}", "--max-time", str(timeout)]
if headers:
for k, v in headers.items():
hdr_cmd += ["-H", f"{k}: {v}"]
hdr_result = subprocess.run(hdr_cmd, capture_output=True, text=True, timeout=timeout+5)
resp_headers = {}
for line in hdr_result.stdout.strip().split("\n"):
if ":" in line:
key, val = line.split(":", 1)
resp_headers[key.strip().lower()] = val.strip()
return status, body, resp_headers
def print_result(test_name, status, body, expected_status=None, detail=""):
"""打印测试结果"""
body_short = body[:200] if body else "(empty)"
status_icon = "" if (expected_status is None or status == expected_status) else ""
print(f"{status_icon} [{status}] {test_name}")
if detail:
print(f" {detail}")
print(f" Response: {body_short}")
print()
def test_group(name):
print(f"\n{'='*70}")
print(f" {name}")
print(f"{'='*70}\n")
# ============================================================
# 0. 先获取有效的 admin token
# ============================================================
print("🔑 获取管理员 token...")
s, b, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON,
json.dumps({"username": "yeij0942", "password": "Jiang1143218570"}))
if s == 200:
admin_token = json.loads(b).get("token", "")
print(f" 管理员 token 获取成功: {admin_token[:30]}...")
else:
print(f" ❌ 获取失败: {s} {b}")
sys.exit(1)
# ============================================================
# 1. JWT 令牌安全性审计
# ============================================================
test_group("1. JWT 令牌安全性审计")
# 1.1 无 token 访问受保护端点
s, b, h = curl("GET", "/api/v1/sessions", expected_status=401)
print_result("1.1 无 token 访问受保护端点", s, b, 401)
# 1.2 伪造 token
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiYWRtaW4iLCJleHAiOjk5OTk5OTk5OTksImlhdCI6MH0.fake-signature"
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": f"Bearer {fake_token}"}, expected_status=401)
print_result("1.2 伪造 token (错误签名)", s, b, 401)
# 1.3 过期 token (手动构造一个已过期的)
# 使用 Python 生成一个过期的 JWT
import hmac
import hashlib
import base64
def b64url(data):
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
header = b64url(json.dumps({"alg":"HS256","typ":"JWT"}).encode())
# exp = 2020-01-01 (1577836800)
payload = b64url(json.dumps({"user_id":"admin","exp":1577836800,"iat":1577836800}).encode())
signing_input = f"{header}.{payload}".encode()
# 使用默认密钥 "change-me-in-production"
sig = hmac.new(b"change-me-in-production", signing_input, hashlib.sha256).digest()
expired_token = f"{header}.{payload}.{b64url(sig)}"
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": f"Bearer {expired_token}"}, expected_status=401)
print_result("1.3 过期 token", s, b, 401)
# 1.4 空 token
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": "Bearer "}, expected_status=401)
print_result("1.4 空 token", s, b, 401)
# 1.5 错误的 Authorization 格式
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": "Basic YWRtaW46cGFzcw=="}, expected_status=401)
print_result("1.5 错误格式 (Basic auth)", s, b, 401)
# 1.6 token 放到 query param 而非 header (WebSocket 用这个)
s, b, h = curl("GET", f"/api/v1/sessions?token={admin_token}", expected_status=401)
print_result("1.6 token 作为 query param (REST API)", s, b, 401, "REST API 应该只接受 Header 中的 token")
# 1.7 检查 token 过期时间
# 1.7 手动解码 JWT 检查过期时间 (纯标准库)
try:
parts = admin_token.split(".")
if len(parts) >= 2:
# 补齐 base64 padding
payload_b64 = parts[1]
padding = 4 - len(payload_b64) % 4
if padding != 4:
payload_b64 += "=" * padding
decoded_bytes = base64.urlsafe_b64decode(payload_b64)
decoded = json.loads(decoded_bytes)
exp = decoded.get("exp", 0)
iat = decoded.get("iat", 0)
hours = (exp - iat) / 3600
print(f"1.7 JWT 过期时间分析:")
print(f" 签发时间 (iat): {iat} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(iat))})")
print(f" 过期时间 (exp): {exp} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(exp))})")
print(f" 有效时长: {hours:.0f} 小时 ({hours/24:.1f} 天)")
if hours > 168: # 超过7天
print(f" ⚠️ 警告: JWT 过期时间过长 ({hours/24:.0f} 天),建议缩短至 24-72 小时")
else:
print(f" ✅ 过期时间合理")
except Exception as e:
print(f" ❌ 解码失败: {e}")
print()
# ============================================================
# 2. 认证端点安全测试
# ============================================================
test_group("2. 认证端点安全测试")
# 2.1 SQL 注入测试 - Login
sql_payloads = [
("' OR '1'='1", "any"),
("admin'--", "any"),
("'; DROP TABLE users; --", "any"),
("' UNION SELECT 1,2,3,4,5,6 --", "any"),
("admin' OR 1=1 --", "any"),
("1'='1", "any"),
]
for payload, pwd in sql_payloads:
data = json.dumps({"username": payload, "password": pwd})
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data, expected_status=None)
# 对于 SQL 注入 payload,期望返回 400 (格式无效) 或 401 (认证失败)
# 但绝对不能是 200 (成功登录)
if s == 200:
print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, None, "🔴 严重: SQL 注入可能成功!")
elif s == 400:
print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, 400, "✅ 用户名格式校验拦截")
else:
print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, 401)
# 2.2 SQL 注入测试 - Register
for payload in ["' OR '1'='1", "test'--", "'; DROP TABLE users; --"]:
data = json.dumps({
"username": payload,
"password": "test123456",
"email": "test@test.com",
"nickname": "Test",
"verify_code": "000000"
})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
print_result(f"2.2 SQL注入-Register: {payload[:40]}", s, b, 400 if s != 200 else None)
# 2.3 暴力破解防护测试 (连续错误登录)
print("2.3 暴力破解防护测试 (连续10次错误登录):")
rate_limited = False
for i in range(10):
data = json.dumps({"username": "yeij0942", "password": f"wrong_password_{i}"})
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
if s == 429:
rate_limited = True
print(f" 请求 {i+1}: [{s}] ⚡ 速率限制触发!")
break
print(f" 请求 {i+1}: [{s}]")
if not rate_limited:
print(f" ⚠️ 10次错误登录后仍未触发速率限制")
else:
print(f" ✅ 速率限制在第 {i+1} 次请求时触发")
# 2.4 用户名枚举测试
print("\n2.4 用户名枚举测试:")
# 测试不存在的用户
data = json.dumps({"username": "nonexistent_user_xyz_12345", "password": "any_password"})
s1, b1, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
# 测试存在的用户 (但密码错误)
data = json.dumps({"username": "yeij0942", "password": "wrong_password"})
s2, b2, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
print(f" 不存在用户: [{s1}] {b1[:150]}")
print(f" 存在但密码错: [{s2}] {b2[:150]}")
if s1 == s2 and "用户名或密码错误" in b1 and "用户名或密码错误" in b2:
print(f" ✅ 错误消息一致,防止用户名枚举")
elif b1 != b2:
print(f" ⚠️ 警告: 错误消息不同,可能存在用户名枚举风险")
print()
# 2.5 注册端点 - 极短用户名
data = json.dumps({"username": "ab", "password": "123456", "email": "test@test.com", "nickname": "Test", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
print_result("2.5 极短用户名 (ab, 2字符)", s, b, 400)
# 2.6 特殊字符用户名
for uname in ["test<script>", "test user", "test/user", "test😀user"]:
data = json.dumps({"username": uname, "password": "123456", "email": "test@test.com", "nickname": "Test", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
print_result(f"2.6 特殊字符用户名: {uname[:30]}", s, b, 400)
# 2.7 XSS payload in nickname
data = json.dumps({"username": "testuser99", "password": "123456", "email": "test@test.com", "nickname": "<script>alert(1)</script>", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
# nickname 应该不需要特殊的 XSS 过滤,但需要确保后端对其做了转义
print_result("2.7 XSS payload in nickname", s, b, 403 if s != 200 else None, "期望:注册被拒绝或昵称被转义/过滤")
# ============================================================
# 3. 输入验证审计
# ============================================================
test_group("3. 输入验证审计 - Session 端点")
# 创建普通用户 token (如果可以)
# 先尝试注册
data = json.dumps({"username": "testuser42", "password": "Test123456", "email": "test42@test.com", "nickname": "Tester42", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
user_token = None
if s == 201:
user_token = json.loads(b).get("token", "")
print(f" 创建测试用户成功,token={user_token[:30]}...")
else:
print(f" 创建测试用户失败 [{s}]: {b[:100]}")
# 尝试登录
data = json.dumps({"username": "testuser42", "password": "Test123456"})
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
if s == 200:
user_token = json.loads(b).get("token", "")
print(f" 登录测试用户成功,token={user_token[:30]}...")
else:
print(f" 登录测试用户也失败 [{s}]: {b[:100]}")
# 3.1 创建 session 时携带过长的 title
long_title = "A" * 1000
data = json.dumps({"title": long_title, "is_main": False})
s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data)
print_result("3.1 超长 session title (1000字符)", s, b, 201 if s == 201 else None)
# 3.2 创建 session 时指定其他用户的 user_id (越权)
data = json.dumps({"user_id": "user_hacker", "title": "hijacked session"})
s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data)
print_result("3.2 Session 创建时指定其他 user_id (越权)", s, b, None, "检查是否允许 user_id 覆盖 JWT 身份")
if s == 201:
resp = json.loads(b)
actual_user_id = resp.get("user_id", "")
if actual_user_id == "user_hacker":
print(f" 🔴 严重: 允许通过请求体覆盖 user_id 为 {actual_user_id}!")
else:
print(f" ✅ 忽略请求体中的 user_id,使用 JWT 身份: {actual_user_id}")
# 3.3 列出其他用户的 sessions
s, b, h = curl("GET", "/api/v1/sessions?user_id=user_hacker", {"Authorization": f"Bearer {admin_token}"})
print_result("3.3 查看其他用户的 session 列表 (越权)", s, b, None, "检查是否允许查询其他用户的 session")
if s == 200:
sessions_data = json.loads(b)
print(f" ⚠️ 可以查看其他用户的 session 列表")
# 3.4 空消息 / 特殊 Unicode 测试 (通过 WebSocket 模拟)
# 由于 WebSocket 测试需要脚本,这里仅做 HTTP 层面的检查
# 3.5 非法 session ID
s, b, h = curl("GET", "/api/v1/sessions/../../../etc/passwd", {"Authorization": f"Bearer {admin_token}"})
print_result("3.5 路径遍历 in session ID", s, b, 404)
s, b, h = curl("GET", "/api/v1/sessions/<script>alert(1)</script>", {"Authorization": f"Bearer {admin_token}"})
print_result("3.6 XSS in session ID", s, b, 404)
# 3.7 超长 session ID
long_sid = "session_" + "A" * 500
s, b, h = curl("GET", f"/api/v1/sessions/{long_sid}", {"Authorization": f"Bearer {admin_token}"})
print_result("3.7 超长 session ID (500字符)", s, b, None)
# ============================================================
# 4. 授权/越权测试
# ============================================================
test_group("4. 授权/越权测试")
# 4.1 未认证访问受保护端点
endpoints = [
("GET", "/api/v1/sessions"),
("GET", "/api/v1/files"),
("GET", "/api/v1/memory/search"),
("GET", "/api/v1/reminders"),
("POST", "/api/v1/automation/rules"),
]
for method, path in endpoints:
s, b, h = curl(method, path, expected_status=401)
print_result(f"4.1 未认证访问: {method} {path}", s, b, 401)
# 4.2 普通用户访问管理员 API
if user_token:
admin_endpoints = [
("GET", "/api/v1/admin/sessions"),
("GET", "/api/v1/admin/sessions/active"),
]
for method, path in admin_endpoints:
s, b, h = curl(method, path, {"Authorization": f"Bearer {user_token}"}, expected_status=403)
print_result(f"4.2 普通用户访问管理员API: {method} {path}", s, b, 403)
# 4.3 删除其他用户的 session
# 先创建一个 session
data = json.dumps({"title": "Test Session for Auth Test"})
s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data)
test_session_id = None
if s == 201:
test_session_id = json.loads(b).get("id", "")
print(f" 创建测试 session: {test_session_id}")
if test_session_id and user_token:
# 用普通用户 token 删除管理员的 session
s, b, h = curl("DELETE", f"/api/v1/sessions/{test_session_id}", {"Authorization": f"Bearer {user_token}"})
print_result(f"4.3 普通用户删除管理员的 session", s, b, None, "期望 403 或至少不允许删除")
if s == 200:
print(f" 🔴 严重: 用户可以删除其他用户的 session!")
# 4.4 查看其他用户的具体 session
if test_session_id and user_token:
s, b, h = curl("GET", f"/api/v1/sessions/{test_session_id}", {"Authorization": f"Bearer {user_token}"})
print_result(f"4.4 普通用户查看管理员的 session", s, b, None, "期望 403 或 404")
# ============================================================
# 5. 敏感信息泄露检查
# ============================================================
test_group("5. 敏感信息泄露检查")
# 5.1 检查响应头
s, b, h = curl("GET", "/api/v1/health")
print("5.1 HTTP 响应头分析:")
sensitive_headers = ["server", "x-powered-by", "x-aspnet-version", "x-generator"]
for header_name in sensitive_headers:
if header_name in h:
print(f" ⚠️ 敏感头: {header_name}: {h[header_name]}")
else:
print(f" ✅ 无敏感头: {header_name}")
# 5.2 检查安全头
security_headers = {
"x-content-type-options": "nosniff",
"x-frame-options": "DENY",
"x-xss-protection": "1; mode=block",
"referrer-policy": "strict-origin-when-cross-origin",
"strict-transport-security": None,
}
for header_name, expected in security_headers.items():
value = h.get(header_name, "")
if value:
print(f" ✅ 安全头: {header_name}: {value}")
else:
print(f" ⚠️ 缺少安全头: {header_name}")
# 5.3 CORS 配置
print("\n5.3 CORS 配置检查:")
s, b, h_origin = curl("GET", "/api/v1/health", {"Origin": "https://evil.com"})
acao = h_origin.get("access-control-allow-origin", "")
acac = h_origin.get("access-control-allow-credentials", "")
print(f" Access-Control-Allow-Origin: {acao}")
print(f" Access-Control-Allow-Credentials: {acac}")
if acac == "true" and acao == "https://evil.com":
print(f" 🔴 严重: CORS 允许任何来源携带凭据! 可被 CSRF 攻击利用")
elif acac == "true":
print(f" ⚠️ 警告: Access-Control-Allow-Credentials 为 true,需确认来源白名单")
# 5.4 检查错误响应是否泄露内部信息
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, "invalid_json{{{")
print(f"\n5.4 错误响应信息泄露:")
print(f" 响应: {b[:300]}")
if "panic" in b.lower() or "goroutine" in b.lower() or "stack" in b.lower():
print(f" 🔴 严重: 错误响应泄露堆栈跟踪!")
elif ".go:" in b and ("/" in b or "\\\\" in b):
print(f" ⚠️ 警告: 错误响应可能泄露文件路径!")
else:
print(f" ✅ 错误响应未泄露内部信息")
# 5.5 测试 .env 文件是否可访问
s, b, h = curl("GET", "/.env")
print(f"\n5.5 .env 文件访问: [{s}]")
s, b, h = curl("GET", "/api/v1/.env")
print(f" /api/v1/.env 访问: [{s}]")
s, b, h = curl("GET", "/api/v1/../.env")
print(f" /api/v1/../.env 访问: [{s}]")
# 5.6 测试敏感路径
for path in ["/admin", "/wp-admin", "/phpmyadmin", "/api/v1/debug", "/debug/pprof/"]:
s, b, h = curl("GET", path)
sens = "⚠️" if s != 404 else ""
print(f" {sens} {path}: [{s}]")
# 5.7 CSP 配置检查
print(f"\n5.7 Content-Security-Policy: {h.get('content-security-policy', 'MISSING')}")
# 5.8 WebSocket token 通过 query param 传输
print(f"\n5.8 WebSocket 认证: Token 通过 query param 传递 (risk of leaking in logs)")
print(f" 这是常见做法,但建议使用更短的生命周期或单独的 WS token")
# ============================================================
# 6. 总结
# ============================================================
test_group("6. 测试完成 - 汇总")
print("所有测试已执行完毕。请查看上方结果判断安全问题。")
print(f"管理员 token 前30字符: {admin_token[:30]}...")
if user_token:
print(f"普通用户 token 前30字符: {user_token[:30]}...")
+280
View File
@@ -0,0 +1,280 @@
#!/usr/bin/env python3
"""
安全审计测试脚本 - 第13轮 Phase 2
测试被速率限制阻塞的项目: 用户名枚举、注册、越权访问
"""
import subprocess
import json
import time
BASE = "http://localhost:8080"
HDR = {"Content-Type": "application/json"}
def curl(method, path, headers=None, data=None):
cmd = ["curl", "-s", "-w", "\n%{http_code}", "-X", method, f"{BASE}{path}", "--max-time", "10"]
if headers:
for k, v in headers.items():
cmd += ["-H", f"{k}: {v}"]
if data:
cmd += ["-d", data]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
output = result.stdout.strip()
lines = output.rsplit("\n", 1)
body = lines[0]
try:
status = int(lines[1])
except:
status = 0
return status, body
def pr(test, status, body, expected=None):
icon = "" if (expected is None or status == expected) else ""
print(f"{icon} [{status}] {test}")
print(f" {body[:200]}")
if expected and status != expected:
print(f" 期望: {expected}, 实际: {status}")
print()
# Get admin token
print("🔑 获取管理员 token...")
s, b = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"yeij0942","password":"Jiang1143218570"}))
admin_token = json.loads(b).get("token","")
print(f" token: {admin_token[:30]}...\n")
time.sleep(2) # 等待一点时间让令牌桶恢复
# ==========================================
# 测试 1: 用户名枚举
# ==========================================
print("=" * 60)
print(" 用户名枚举测试")
print("=" * 60)
# 登录不存在的用户
s1, b1 = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"nonexistent_user_xyz_12345","password":"any_password"}))
pr("不存在用户登录", s1, b1, 401)
time.sleep(1)
# 登录存在的用户但密码错误
s2, b2 = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"yeij0942","password":"wrong_password"}))
pr("存在用户但密码错误", s2, b2, 401)
print(f" 不存在用户消息: {b1}")
print(f" 存在用户消息: {b2}")
if b1 == b2:
print(" ✅ 错误消息一致,防止用户名枚举")
else:
print(" ⚠️ 错误消息不同,可能存在用户名枚举风险")
time.sleep(1)
# ==========================================
# 测试 2: 注册端点详细测试
# ==========================================
print("\n" + "=" * 60)
print(" 注册端点安全测试")
print("=" * 60)
# 2.1 极短用户名 (2字符)
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"ab","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("极短用户名 (2字符 ab)", s, b, 400)
time.sleep(0.5)
# 2.2 XSS in username
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"<script>alert(1)</script>","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("XSS in username", s, b, 400)
time.sleep(0.5)
# 2.3 Special chars in username
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"test user","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("空格在用户名中", s, b, 400)
time.sleep(0.5)
# 2.4 Unicode in username
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"test😀user","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("Emoji in username", s, b, 400)
time.sleep(0.5)
# 2.5 Forward slash in username (路径遍历)
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"../../etc/passwd","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("路径遍历 in username", s, b, 400)
time.sleep(0.5)
# 2.6 XSS in nickname (应该允许注册但需要检查nickname是否被过滤)
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testnick99","password":"Test123456","email":"test99@test.com","nickname":"<script>alert('xss')</script>","verify_code":"000000"
}))
pr("XSS in nickname", s, b, None)
if s == 201:
resp = json.loads(b)
nn = resp.get("nickname","")
print(f" 昵称返回值: {nn}")
if "<script>" in nn:
print(" ⚠️ 昵称未过滤 XSS!")
time.sleep(0.5)
# 2.7 超长用户名
long_user = "a" * 100
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":long_user,"password":"Test123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("超长用户名 (100字符)", s, b, 400)
time.sleep(0.5)
# 2.8 超长密码
long_pwd = "a" * 500
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testpwd99","password":long_pwd,"email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("超长密码 (500字符)", s, b, None)
time.sleep(0.5)
# 2.9 无效邮箱
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testemail99","password":"Test123456","email":"not-an-email","nickname":"Test","verify_code":"000000"
}))
pr("无效邮箱格式", s, b, 400)
time.sleep(0.5)
# ==========================================
# 测试 3: 创建普通用户并测试越权
# ==========================================
print("\n" + "=" * 60)
print(" 授权/越权测试")
print("=" * 60)
# 注册普通用户
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testaudit42","password":"Test123456","email":"audit42@test.com","nickname":"Auditor","verify_code":"000000"
}))
user_token = None
if s == 201:
user_token = json.loads(b).get("token","")
print(f"✅ 创建测试用户成功: user_testaudit42")
else:
print(f"注册失败 [{s}]: {b[:100]}")
# 尝试登录
s, b = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"testaudit42","password":"Test123456"}))
if s == 200:
user_token = json.loads(b).get("token","")
print(f"✅ 登录测试用户成功")
time.sleep(1)
if user_token:
# 3.1 普通用户访问 admin API
s, b = curl("GET", "/api/v1/admin/sessions", {"Authorization": f"Bearer {user_token}"})
pr("普通用户 -> GET /api/v1/admin/sessions", s, b, 403)
time.sleep(0.3)
s, b = curl("GET", "/api/v1/admin/sessions/active", {"Authorization": f"Bearer {user_token}"})
pr("普通用户 -> GET /api/v1/admin/sessions/active", s, b, 403)
# 3.2 创建 session 时尝试指定为 admin
time.sleep(0.3)
s, b = curl("POST", "/api/v1/sessions", {**HDR, "Authorization": f"Bearer {user_token}"}, json.dumps({
"user_id": "admin", "title": "hijack admin session", "is_main": True
}))
pr("普通用户创建 session 指定 user_id=admin", s, b, None)
if s == 201:
resp = json.loads(b)
uid = resp.get("user_id","")
if uid == "admin":
print(" 🔴 严重: 普通用户可以创建管理员身份的 session!")
else:
print(f" ✅ session owner 仍为: {uid}")
time.sleep(0.3)
# 3.3 普通用户查询 admin 的 session 列表
s, b = curl("GET", "/api/v1/sessions?user_id=admin", {"Authorization": f"Bearer {user_token}"})
pr("普通用户查询 admin 的 session 列表", s, b, None)
if s == 200:
sessions = json.loads(b)
count = len(sessions.get("sessions",[]))
if count > 0:
print(f" ⚠️ 可以查看 admin 的 {count} 个 session!")
time.sleep(0.3)
# 3.4 先创建一个归属 admin 的 session
s, b = curl("POST", "/api/v1/sessions", {**HDR, "Authorization": f"Bearer {admin_token}"}, json.dumps({
"title": "Admin Private Session", "is_main": False
}))
admin_session_id = None
if s == 201:
admin_session_id = json.loads(b).get("id","")
print(f" Admin 的 private session: {admin_session_id}")
if admin_session_id:
time.sleep(0.3)
# 普通用户尝试访问 admin 的 session
s, b = curl("GET", f"/api/v1/sessions/{admin_session_id}", {"Authorization": f"Bearer {user_token}"})
pr(f"普通用户查看 admin 的 session", s, b, 403 if s != 200 else None)
if s == 200:
print(f" 🔴 严重: 可以查看其他用户的 session!")
time.sleep(0.3)
# 普通用户尝试删除 admin 的 session
s, b = curl("DELETE", f"/api/v1/sessions/{admin_session_id}", {"Authorization": f"Bearer {user_token}"})
pr(f"普通用户删除 admin 的 session", s, b, 403 if s != 200 else None)
if s == 200:
print(f" 🔴 严重: 可以删除其他用户的 session!")
else:
print("⚠️ 无法获取普通用户 token,跳过越权测试")
# ==========================================
# 测试 4: 聊天消息 WebSocket 测试 (HTTP 层面)
# ==========================================
print("\n" + "=" * 60)
print(" 聊天 HTTP 端点测试")
print("=" * 60)
# 4.1 WebSocket 通过 query param 传递 token 测试
s, b = curl("GET", "/ws/chat?token=invalid_fake_token_123")
pr("WS token 验证 - 无效 token", s, b, 401)
time.sleep(0.3)
s, b = curl("GET", "/ws/chat")
pr("WS 无 token", s, b, 401)
time.sleep(0.3)
# 4.2 普通用户能否通过 WS 连接
if user_token:
s, b = curl("GET", f"/ws/chat?token={user_token}")
pr("普通用户 WS 连接 (主对话)", s, b, 403)
# ==========================================
# 测试 5: 文件上传端点测试
# ==========================================
print("\n" + "=" * 60)
print(" 文件上传端点测试")
print("=" * 60)
# 5.1 无文件上传
s, b = curl("POST", "/api/v1/files/upload", {"Authorization": f"Bearer {admin_token}"})
pr("无文件字段上传", s, b, 400)
time.sleep(0.5)
# 5.2 尝试用 GET 访问上传端点
s, b = curl("GET", "/api/v1/files/upload", {"Authorization": f"Bearer {admin_token}"})
pr("GET 上传端点 (应405/404)", s, b, 404)
time.sleep(0.5)
# 5.3 普通用户访问文件列表 (如果普通用户token可用)
if user_token:
s, b = curl("GET", "/api/v1/files", {"Authorization": f"Bearer {user_token}"})
pr("普通用户访问文件列表", s, b, None)
print("\n✅ Phase 2 测试完成")
+160
View File
@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""Round 12 Part 2: WebSocket 测试 + 子服务直调"""
import json, urllib.request, urllib.error, socket, struct, base64, os, hashlib, time
TOKEN = open("/tmp/cyrene_test_token.txt").read().strip()
SID = open("/tmp/cyrene_test_sid.txt").read().strip()
print(f"TOKEN={TOKEN[:20]}... SID={SID}")
# ====== Step 1: memory-service 正确端点测试 ======
print("\n=== Step 1: memory-service 正确端点 ===")
# 1a: GET /api/v1/memories?user_id=admin
r = urllib.request.Request("http://localhost:8091/api/v1/memories?user_id=admin")
try:
resp = urllib.request.urlopen(r, timeout=5)
print(f" GET /memories: {resp.status} {resp.read().decode()[:200]}")
except Exception as e:
print(f" GET /memories: FAIL {e}")
# 1b: POST /api/v1/memories/query
r = urllib.request.Request("http://localhost:8091/api/v1/memories/query",
data=json.dumps({"user_id":"admin","query_text":"test","limit":5}).encode())
r.add_header("Content-Type", "application/json")
try:
resp = urllib.request.urlopen(r, timeout=5)
print(f" POST /memories/query: {resp.status} {resp.read().decode()[:200]}")
except Exception as e:
print(f" POST /memories/query: FAIL {e}")
# 1c: 通过 gateway 代理 memory
print("\n --- Gateway proxy to memory ---")
headers = {"Authorization": f"Bearer {TOKEN}"}
# GET /api/v1/memory/search?q=test
r = urllib.request.Request("http://localhost:8080/api/v1/memory/search?q=test")
r.add_header("Authorization", f"Bearer {TOKEN}")
try:
resp = urllib.request.urlopen(r, timeout=10)
print(f" GW memory/search: {resp.status} {resp.read().decode()[:200]}")
except urllib.error.HTTPError as e:
print(f" GW memory/search: {e.code} {e.read().decode()[:200]}")
except Exception as e:
print(f" GW memory/search: FAIL {e}")
# ====== Step 2: tool-engine 正确端点测试 ======
print("\n=== Step 2: tool-engine 正确端点 ===")
# 2a: GET /api/v1/tools
r = urllib.request.Request("http://localhost:8092/api/v1/tools")
try:
resp = urllib.request.urlopen(r, timeout=5)
body = resp.read().decode()
tools_data = json.loads(body)
tool_names = [t.get("name","?") for t in tools_data.get("tools",[])]
print(f" GET /tools: {resp.status} total={tools_data.get('total')} names={tool_names}")
except Exception as e:
print(f" GET /tools: FAIL {e}")
# 2b: POST /api/v1/tools/calculator/execute
r = urllib.request.Request("http://localhost:8092/api/v1/tools/calculator/execute",
data=json.dumps({"arguments":{"expression":"2+3"}}).encode())
r.add_header("Content-Type", "application/json")
try:
resp = urllib.request.urlopen(r, timeout=5)
print(f" POST calc/execute: {resp.status} {resp.read().decode()[:200]}")
except Exception as e:
print(f" POST calc/execute: FAIL {e}")
# ====== Step 3: WebSocket 测试 (手动构造) ======
print("\n=== Step 3: WebSocket 连接测试 ===")
WS_KEY = base64.b64encode(os.urandom(16)).decode()
def ws_handshake():
"""Try WebSocket upgrade to /ws/chat?token=..."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
sock.connect(("127.0.0.1", 8080))
request = (
f"GET /ws/chat?token={TOKEN}&session_id={SID} HTTP/1.1\r\n"
f"Host: localhost:8080\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {WS_KEY}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"\r\n"
)
sock.send(request.encode())
response = b""
while b"\r\n\r\n" not in response:
response += sock.recv(4096)
headers = response.decode()
status_line = headers.split("\r\n")[0]
print(f" WS handshake: {status_line}")
if "101" in status_line:
print(f" ✅ WebSocket 升级成功!")
# Send a simple chat message
msg = json.dumps({"type":"message","content":"Hello Cyrene!","mode":"text"})
import struct as st
frame = bytearray()
frame.append(0x81) # FIN + text opcode
frame.append(0x80 | len(msg)) # MASK + length
mask_key = os.urandom(4)
frame.extend(mask_key)
masked = bytes([msg[i] ^ mask_key[i%4] for i in range(len(msg))])
frame.extend(masked)
sock.send(bytes(frame))
print(f" Sent: {msg}")
# Read response
time.sleep(3)
sock.settimeout(5)
try:
resp_data = sock.recv(4096)
print(f" Received {len(resp_data)} bytes: {resp_data[:500]}")
except socket.timeout:
print(f" ⚠️ No response within 3s - backend may be processing")
else:
print(f" ❌ WebSocket handshake failed")
print(f" Full response:\n{headers[:500]}")
except Exception as e:
print(f" WS connect FAIL: {e}")
finally:
sock.close()
ws_handshake()
# ====== Step 4: ai-core 直接调用的完整 SSE 响应 ======
print("\n=== Step 4: ai-core 完整 SSE 响应 ===")
ai_body = {"user_id":"admin","session_id":SID,"message":"用一句话介绍你自己","mode":"text"}
r = urllib.request.Request("http://localhost:8081/api/v1/chat",
data=json.dumps(ai_body).encode(), method="POST")
r.add_header("Content-Type", "application/json")
r.add_header("Accept", "text/event-stream")
try:
resp = urllib.request.urlopen(r, timeout=60)
full = []
for line in resp:
line = line.decode().strip()
if line.startswith("data:"):
full.append(line)
print(f" Lines received: {len(full)}")
print(f" Last 3 lines: {full[-3:]}")
except Exception as e:
print(f" FAIL: {e}")
# ====== Step 5: Gateway 通过 memory_handler 代理测试 ======
print("\n=== Step 5: Gateway memory proxy ===")
for path, qs in [("/api/v1/memory/search","q=hello"), ("/api/v1/memory","")]:
url = f"http://localhost:8080{path}"
if qs:
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {TOKEN}")
try:
resp = urllib.request.urlopen(req, timeout=10)
print(f" GET {path}: {resp.status} {resp.read().decode()[:200]}")
except urllib.error.HTTPError as e:
body = e.read().decode()[:200]
print(f" GET {path}: {e.code} {body}")
except Exception as e:
print(f" GET {path}: FAIL {e}")
print("\n[DONE]")