Files
Cyrene/debug/cache/test_security_round13_p2.py
T
AskaEth a058b0ab8e fix: 第一轮修复 - 记忆管理/IoT操控/历史消息持久化/动作消息/链路优化/安全配置
- 修复记忆管理数据库连接不可用 (ai-core重编译+Unicode修复)
- 修复IoT子会话工具调用链路日志缺失
- 新增最终审查子会话(review_provider) 支持消息格式解析拆分
- 实现历史消息持久化(后端存储+前端分页加载)
- 前端新增动作消息(ActionMessage)类型和渲染
- 优化对话链路速度(非阻塞子会话+快速问候通道)
- JWT密钥环境变量化(无默认值启动panic)
- Token自动刷新机制(401拦截器+refresh接口)
- WebSocket指数退避重连(jitter+最大10次)
- localStorage清理一致性(cyrene_前缀+版本检查)
- IoT环境变量统一为IOT_SERVICE_URL
2026-05-21 23:10:07 +08:00

281 lines
10 KiB
Python

#!/usr/bin/env python3
"""
安全审计测试脚本 - 第13轮 Phase 2
测试被速率限制阻塞的项目: 用户名枚举、注册、越权访问
"""
import subprocess
import json
import time
BASE = "http://localhost:8080"
HDR = {"Content-Type": "application/json"}
def curl(method, path, headers=None, data=None):
cmd = ["curl", "-s", "-w", "\n%{http_code}", "-X", method, f"{BASE}{path}", "--max-time", "10"]
if headers:
for k, v in headers.items():
cmd += ["-H", f"{k}: {v}"]
if data:
cmd += ["-d", data]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
output = result.stdout.strip()
lines = output.rsplit("\n", 1)
body = lines[0]
try:
status = int(lines[1])
except:
status = 0
return status, body
def pr(test, status, body, expected=None):
icon = "" if (expected is None or status == expected) else ""
print(f"{icon} [{status}] {test}")
print(f" {body[:200]}")
if expected and status != expected:
print(f" 期望: {expected}, 实际: {status}")
print()
# Get admin token
print("🔑 获取管理员 token...")
s, b = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"yeij0942","password":"Jiang1143218570"}))
admin_token = json.loads(b).get("token","")
print(f" token: {admin_token[:30]}...\n")
time.sleep(2) # 等待一点时间让令牌桶恢复
# ==========================================
# 测试 1: 用户名枚举
# ==========================================
print("=" * 60)
print(" 用户名枚举测试")
print("=" * 60)
# 登录不存在的用户
s1, b1 = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"nonexistent_user_xyz_12345","password":"any_password"}))
pr("不存在用户登录", s1, b1, 401)
time.sleep(1)
# 登录存在的用户但密码错误
s2, b2 = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"yeij0942","password":"wrong_password"}))
pr("存在用户但密码错误", s2, b2, 401)
print(f" 不存在用户消息: {b1}")
print(f" 存在用户消息: {b2}")
if b1 == b2:
print(" ✅ 错误消息一致,防止用户名枚举")
else:
print(" ⚠️ 错误消息不同,可能存在用户名枚举风险")
time.sleep(1)
# ==========================================
# 测试 2: 注册端点详细测试
# ==========================================
print("\n" + "=" * 60)
print(" 注册端点安全测试")
print("=" * 60)
# 2.1 极短用户名 (2字符)
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"ab","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("极短用户名 (2字符 ab)", s, b, 400)
time.sleep(0.5)
# 2.2 XSS in username
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"<script>alert(1)</script>","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("XSS in username", s, b, 400)
time.sleep(0.5)
# 2.3 Special chars in username
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"test user","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("空格在用户名中", s, b, 400)
time.sleep(0.5)
# 2.4 Unicode in username
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"test😀user","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("Emoji in username", s, b, 400)
time.sleep(0.5)
# 2.5 Forward slash in username (路径遍历)
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"../../etc/passwd","password":"123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("路径遍历 in username", s, b, 400)
time.sleep(0.5)
# 2.6 XSS in nickname (应该允许注册但需要检查nickname是否被过滤)
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testnick99","password":"Test123456","email":"test99@test.com","nickname":"<script>alert('xss')</script>","verify_code":"000000"
}))
pr("XSS in nickname", s, b, None)
if s == 201:
resp = json.loads(b)
nn = resp.get("nickname","")
print(f" 昵称返回值: {nn}")
if "<script>" in nn:
print(" ⚠️ 昵称未过滤 XSS!")
time.sleep(0.5)
# 2.7 超长用户名
long_user = "a" * 100
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":long_user,"password":"Test123456","email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("超长用户名 (100字符)", s, b, 400)
time.sleep(0.5)
# 2.8 超长密码
long_pwd = "a" * 500
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testpwd99","password":long_pwd,"email":"test@test.com","nickname":"Test","verify_code":"000000"
}))
pr("超长密码 (500字符)", s, b, None)
time.sleep(0.5)
# 2.9 无效邮箱
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testemail99","password":"Test123456","email":"not-an-email","nickname":"Test","verify_code":"000000"
}))
pr("无效邮箱格式", s, b, 400)
time.sleep(0.5)
# ==========================================
# 测试 3: 创建普通用户并测试越权
# ==========================================
print("\n" + "=" * 60)
print(" 授权/越权测试")
print("=" * 60)
# 注册普通用户
s, b = curl("POST", "/api/v1/auth/register", HDR, json.dumps({
"username":"testaudit42","password":"Test123456","email":"audit42@test.com","nickname":"Auditor","verify_code":"000000"
}))
user_token = None
if s == 201:
user_token = json.loads(b).get("token","")
print(f"✅ 创建测试用户成功: user_testaudit42")
else:
print(f"注册失败 [{s}]: {b[:100]}")
# 尝试登录
s, b = curl("POST", "/api/v1/auth/login", HDR, json.dumps({"username":"testaudit42","password":"Test123456"}))
if s == 200:
user_token = json.loads(b).get("token","")
print(f"✅ 登录测试用户成功")
time.sleep(1)
if user_token:
# 3.1 普通用户访问 admin API
s, b = curl("GET", "/api/v1/admin/sessions", {"Authorization": f"Bearer {user_token}"})
pr("普通用户 -> GET /api/v1/admin/sessions", s, b, 403)
time.sleep(0.3)
s, b = curl("GET", "/api/v1/admin/sessions/active", {"Authorization": f"Bearer {user_token}"})
pr("普通用户 -> GET /api/v1/admin/sessions/active", s, b, 403)
# 3.2 创建 session 时尝试指定为 admin
time.sleep(0.3)
s, b = curl("POST", "/api/v1/sessions", {**HDR, "Authorization": f"Bearer {user_token}"}, json.dumps({
"user_id": "admin", "title": "hijack admin session", "is_main": True
}))
pr("普通用户创建 session 指定 user_id=admin", s, b, None)
if s == 201:
resp = json.loads(b)
uid = resp.get("user_id","")
if uid == "admin":
print(" 🔴 严重: 普通用户可以创建管理员身份的 session!")
else:
print(f" ✅ session owner 仍为: {uid}")
time.sleep(0.3)
# 3.3 普通用户查询 admin 的 session 列表
s, b = curl("GET", "/api/v1/sessions?user_id=admin", {"Authorization": f"Bearer {user_token}"})
pr("普通用户查询 admin 的 session 列表", s, b, None)
if s == 200:
sessions = json.loads(b)
count = len(sessions.get("sessions",[]))
if count > 0:
print(f" ⚠️ 可以查看 admin 的 {count} 个 session!")
time.sleep(0.3)
# 3.4 先创建一个归属 admin 的 session
s, b = curl("POST", "/api/v1/sessions", {**HDR, "Authorization": f"Bearer {admin_token}"}, json.dumps({
"title": "Admin Private Session", "is_main": False
}))
admin_session_id = None
if s == 201:
admin_session_id = json.loads(b).get("id","")
print(f" Admin 的 private session: {admin_session_id}")
if admin_session_id:
time.sleep(0.3)
# 普通用户尝试访问 admin 的 session
s, b = curl("GET", f"/api/v1/sessions/{admin_session_id}", {"Authorization": f"Bearer {user_token}"})
pr(f"普通用户查看 admin 的 session", s, b, 403 if s != 200 else None)
if s == 200:
print(f" 🔴 严重: 可以查看其他用户的 session!")
time.sleep(0.3)
# 普通用户尝试删除 admin 的 session
s, b = curl("DELETE", f"/api/v1/sessions/{admin_session_id}", {"Authorization": f"Bearer {user_token}"})
pr(f"普通用户删除 admin 的 session", s, b, 403 if s != 200 else None)
if s == 200:
print(f" 🔴 严重: 可以删除其他用户的 session!")
else:
print("⚠️ 无法获取普通用户 token,跳过越权测试")
# ==========================================
# 测试 4: 聊天消息 WebSocket 测试 (HTTP 层面)
# ==========================================
print("\n" + "=" * 60)
print(" 聊天 HTTP 端点测试")
print("=" * 60)
# 4.1 WebSocket 通过 query param 传递 token 测试
s, b = curl("GET", "/ws/chat?token=invalid_fake_token_123")
pr("WS token 验证 - 无效 token", s, b, 401)
time.sleep(0.3)
s, b = curl("GET", "/ws/chat")
pr("WS 无 token", s, b, 401)
time.sleep(0.3)
# 4.2 普通用户能否通过 WS 连接
if user_token:
s, b = curl("GET", f"/ws/chat?token={user_token}")
pr("普通用户 WS 连接 (主对话)", s, b, 403)
# ==========================================
# 测试 5: 文件上传端点测试
# ==========================================
print("\n" + "=" * 60)
print(" 文件上传端点测试")
print("=" * 60)
# 5.1 无文件上传
s, b = curl("POST", "/api/v1/files/upload", {"Authorization": f"Bearer {admin_token}"})
pr("无文件字段上传", s, b, 400)
time.sleep(0.5)
# 5.2 尝试用 GET 访问上传端点
s, b = curl("GET", "/api/v1/files/upload", {"Authorization": f"Bearer {admin_token}"})
pr("GET 上传端点 (应405/404)", s, b, 404)
time.sleep(0.5)
# 5.3 普通用户访问文件列表 (如果普通用户token可用)
if user_token:
s, b = curl("GET", "/api/v1/files", {"Authorization": f"Bearer {user_token}"})
pr("普通用户访问文件列表", s, b, None)
print("\n✅ Phase 2 测试完成")