#!/usr/bin/env python3
"""
安全审计测试脚本 - 第13轮
测试 JWT 安全、认证端点安全、输入验证、授权越权、敏感信息泄露
"""
import subprocess
import json
import sys
import time
import base64
BASE = "http://localhost:8080"
HEADERS_JSON = {"Content-Type": "application/json"}
def curl(method, path, headers=None, data=None, expected_status=None, timeout=10):
"""执行 curl 请求并返回 (status_code, response_body, response_headers)"""
cmd = ["curl", "-s", "-w", "\n%{http_code}", "-X", method, f"{BASE}{path}", "--max-time", str(timeout)]
if headers:
for k, v in headers.items():
cmd += ["-H", f"{k}: {v}"]
if data:
cmd += ["-d", data]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout+5)
output = result.stdout.strip()
# 分离 body 和 status code
if "\n" in output:
lines = output.rsplit("\n", 1)
body = lines[0]
status = lines[1]
else:
body = ""
status = output
try:
status = int(status)
except ValueError:
status = 0
# 单独获取响应头
hdr_cmd = ["curl", "-s", "-I", "-X", method, f"{BASE}{path}", "--max-time", str(timeout)]
if headers:
for k, v in headers.items():
hdr_cmd += ["-H", f"{k}: {v}"]
hdr_result = subprocess.run(hdr_cmd, capture_output=True, text=True, timeout=timeout+5)
resp_headers = {}
for line in hdr_result.stdout.strip().split("\n"):
if ":" in line:
key, val = line.split(":", 1)
resp_headers[key.strip().lower()] = val.strip()
return status, body, resp_headers
def print_result(test_name, status, body, expected_status=None, detail=""):
"""打印测试结果"""
body_short = body[:200] if body else "(empty)"
status_icon = "✅" if (expected_status is None or status == expected_status) else "❌"
print(f"{status_icon} [{status}] {test_name}")
if detail:
print(f" {detail}")
print(f" Response: {body_short}")
print()
def test_group(name):
print(f"\n{'='*70}")
print(f" {name}")
print(f"{'='*70}\n")
# ============================================================
# 0. 先获取有效的 admin token
# ============================================================
print("🔑 获取管理员 token...")
s, b, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON,
json.dumps({"username": "yeij0942", "password": "Jiang1143218570"}))
if s == 200:
admin_token = json.loads(b).get("token", "")
print(f" 管理员 token 获取成功: {admin_token[:30]}...")
else:
print(f" ❌ 获取失败: {s} {b}")
sys.exit(1)
# ============================================================
# 1. JWT 令牌安全性审计
# ============================================================
test_group("1. JWT 令牌安全性审计")
# 1.1 无 token 访问受保护端点
s, b, h = curl("GET", "/api/v1/sessions", expected_status=401)
print_result("1.1 无 token 访问受保护端点", s, b, 401)
# 1.2 伪造 token
fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiYWRtaW4iLCJleHAiOjk5OTk5OTk5OTksImlhdCI6MH0.fake-signature"
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": f"Bearer {fake_token}"}, expected_status=401)
print_result("1.2 伪造 token (错误签名)", s, b, 401)
# 1.3 过期 token (手动构造一个已过期的)
# 使用 Python 生成一个过期的 JWT
import hmac
import hashlib
import base64
def b64url(data):
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
header = b64url(json.dumps({"alg":"HS256","typ":"JWT"}).encode())
# exp = 2020-01-01 (1577836800)
payload = b64url(json.dumps({"user_id":"admin","exp":1577836800,"iat":1577836800}).encode())
signing_input = f"{header}.{payload}".encode()
# 使用默认密钥 "change-me-in-production"
sig = hmac.new(b"change-me-in-production", signing_input, hashlib.sha256).digest()
expired_token = f"{header}.{payload}.{b64url(sig)}"
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": f"Bearer {expired_token}"}, expected_status=401)
print_result("1.3 过期 token", s, b, 401)
# 1.4 空 token
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": "Bearer "}, expected_status=401)
print_result("1.4 空 token", s, b, 401)
# 1.5 错误的 Authorization 格式
s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": "Basic YWRtaW46cGFzcw=="}, expected_status=401)
print_result("1.5 错误格式 (Basic auth)", s, b, 401)
# 1.6 token 放到 query param 而非 header (WebSocket 用这个)
s, b, h = curl("GET", f"/api/v1/sessions?token={admin_token}", expected_status=401)
print_result("1.6 token 作为 query param (REST API)", s, b, 401, "REST API 应该只接受 Header 中的 token")
# 1.7 检查 token 过期时间
# 1.7 手动解码 JWT 检查过期时间 (纯标准库)
try:
parts = admin_token.split(".")
if len(parts) >= 2:
# 补齐 base64 padding
payload_b64 = parts[1]
padding = 4 - len(payload_b64) % 4
if padding != 4:
payload_b64 += "=" * padding
decoded_bytes = base64.urlsafe_b64decode(payload_b64)
decoded = json.loads(decoded_bytes)
exp = decoded.get("exp", 0)
iat = decoded.get("iat", 0)
hours = (exp - iat) / 3600
print(f"1.7 JWT 过期时间分析:")
print(f" 签发时间 (iat): {iat} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(iat))})")
print(f" 过期时间 (exp): {exp} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(exp))})")
print(f" 有效时长: {hours:.0f} 小时 ({hours/24:.1f} 天)")
if hours > 168: # 超过7天
print(f" ⚠️ 警告: JWT 过期时间过长 ({hours/24:.0f} 天),建议缩短至 24-72 小时")
else:
print(f" ✅ 过期时间合理")
except Exception as e:
print(f" ❌ 解码失败: {e}")
print()
# ============================================================
# 2. 认证端点安全测试
# ============================================================
test_group("2. 认证端点安全测试")
# 2.1 SQL 注入测试 - Login
sql_payloads = [
("' OR '1'='1", "any"),
("admin'--", "any"),
("'; DROP TABLE users; --", "any"),
("' UNION SELECT 1,2,3,4,5,6 --", "any"),
("admin' OR 1=1 --", "any"),
("1'='1", "any"),
]
for payload, pwd in sql_payloads:
data = json.dumps({"username": payload, "password": pwd})
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data, expected_status=None)
# 对于 SQL 注入 payload,期望返回 400 (格式无效) 或 401 (认证失败)
# 但绝对不能是 200 (成功登录)
if s == 200:
print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, None, "🔴 严重: SQL 注入可能成功!")
elif s == 400:
print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, 400, "✅ 用户名格式校验拦截")
else:
print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, 401)
# 2.2 SQL 注入测试 - Register
for payload in ["' OR '1'='1", "test'--", "'; DROP TABLE users; --"]:
data = json.dumps({
"username": payload,
"password": "test123456",
"email": "test@test.com",
"nickname": "Test",
"verify_code": "000000"
})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
print_result(f"2.2 SQL注入-Register: {payload[:40]}", s, b, 400 if s != 200 else None)
# 2.3 暴力破解防护测试 (连续错误登录)
print("2.3 暴力破解防护测试 (连续10次错误登录):")
rate_limited = False
for i in range(10):
data = json.dumps({"username": "yeij0942", "password": f"wrong_password_{i}"})
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
if s == 429:
rate_limited = True
print(f" 请求 {i+1}: [{s}] ⚡ 速率限制触发!")
break
print(f" 请求 {i+1}: [{s}]")
if not rate_limited:
print(f" ⚠️ 10次错误登录后仍未触发速率限制")
else:
print(f" ✅ 速率限制在第 {i+1} 次请求时触发")
# 2.4 用户名枚举测试
print("\n2.4 用户名枚举测试:")
# 测试不存在的用户
data = json.dumps({"username": "nonexistent_user_xyz_12345", "password": "any_password"})
s1, b1, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
# 测试存在的用户 (但密码错误)
data = json.dumps({"username": "yeij0942", "password": "wrong_password"})
s2, b2, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
print(f" 不存在用户: [{s1}] {b1[:150]}")
print(f" 存在但密码错: [{s2}] {b2[:150]}")
if s1 == s2 and "用户名或密码错误" in b1 and "用户名或密码错误" in b2:
print(f" ✅ 错误消息一致,防止用户名枚举")
elif b1 != b2:
print(f" ⚠️ 警告: 错误消息不同,可能存在用户名枚举风险")
print()
# 2.5 注册端点 - 极短用户名
data = json.dumps({"username": "ab", "password": "123456", "email": "test@test.com", "nickname": "Test", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
print_result("2.5 极短用户名 (ab, 2字符)", s, b, 400)
# 2.6 特殊字符用户名
for uname in ["test", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
# nickname 应该不需要特殊的 XSS 过滤,但需要确保后端对其做了转义
print_result("2.7 XSS payload in nickname", s, b, 403 if s != 200 else None, "期望:注册被拒绝或昵称被转义/过滤")
# ============================================================
# 3. 输入验证审计
# ============================================================
test_group("3. 输入验证审计 - Session 端点")
# 创建普通用户 token (如果可以)
# 先尝试注册
data = json.dumps({"username": "testuser42", "password": "Test123456", "email": "test42@test.com", "nickname": "Tester42", "verify_code": "000000"})
s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data)
user_token = None
if s == 201:
user_token = json.loads(b).get("token", "")
print(f" 创建测试用户成功,token={user_token[:30]}...")
else:
print(f" 创建测试用户失败 [{s}]: {b[:100]}")
# 尝试登录
data = json.dumps({"username": "testuser42", "password": "Test123456"})
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data)
if s == 200:
user_token = json.loads(b).get("token", "")
print(f" 登录测试用户成功,token={user_token[:30]}...")
else:
print(f" 登录测试用户也失败 [{s}]: {b[:100]}")
# 3.1 创建 session 时携带过长的 title
long_title = "A" * 1000
data = json.dumps({"title": long_title, "is_main": False})
s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data)
print_result("3.1 超长 session title (1000字符)", s, b, 201 if s == 201 else None)
# 3.2 创建 session 时指定其他用户的 user_id (越权)
data = json.dumps({"user_id": "user_hacker", "title": "hijacked session"})
s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data)
print_result("3.2 Session 创建时指定其他 user_id (越权)", s, b, None, "检查是否允许 user_id 覆盖 JWT 身份")
if s == 201:
resp = json.loads(b)
actual_user_id = resp.get("user_id", "")
if actual_user_id == "user_hacker":
print(f" 🔴 严重: 允许通过请求体覆盖 user_id 为 {actual_user_id}!")
else:
print(f" ✅ 忽略请求体中的 user_id,使用 JWT 身份: {actual_user_id}")
# 3.3 列出其他用户的 sessions
s, b, h = curl("GET", "/api/v1/sessions?user_id=user_hacker", {"Authorization": f"Bearer {admin_token}"})
print_result("3.3 查看其他用户的 session 列表 (越权)", s, b, None, "检查是否允许查询其他用户的 session")
if s == 200:
sessions_data = json.loads(b)
print(f" ⚠️ 可以查看其他用户的 session 列表")
# 3.4 空消息 / 特殊 Unicode 测试 (通过 WebSocket 模拟)
# 由于 WebSocket 测试需要脚本,这里仅做 HTTP 层面的检查
# 3.5 非法 session ID
s, b, h = curl("GET", "/api/v1/sessions/../../../etc/passwd", {"Authorization": f"Bearer {admin_token}"})
print_result("3.5 路径遍历 in session ID", s, b, 404)
s, b, h = curl("GET", "/api/v1/sessions/", {"Authorization": f"Bearer {admin_token}"})
print_result("3.6 XSS in session ID", s, b, 404)
# 3.7 超长 session ID
long_sid = "session_" + "A" * 500
s, b, h = curl("GET", f"/api/v1/sessions/{long_sid}", {"Authorization": f"Bearer {admin_token}"})
print_result("3.7 超长 session ID (500字符)", s, b, None)
# ============================================================
# 4. 授权/越权测试
# ============================================================
test_group("4. 授权/越权测试")
# 4.1 未认证访问受保护端点
endpoints = [
("GET", "/api/v1/sessions"),
("GET", "/api/v1/files"),
("GET", "/api/v1/memory/search"),
("GET", "/api/v1/reminders"),
("POST", "/api/v1/automation/rules"),
]
for method, path in endpoints:
s, b, h = curl(method, path, expected_status=401)
print_result(f"4.1 未认证访问: {method} {path}", s, b, 401)
# 4.2 普通用户访问管理员 API
if user_token:
admin_endpoints = [
("GET", "/api/v1/admin/sessions"),
("GET", "/api/v1/admin/sessions/active"),
]
for method, path in admin_endpoints:
s, b, h = curl(method, path, {"Authorization": f"Bearer {user_token}"}, expected_status=403)
print_result(f"4.2 普通用户访问管理员API: {method} {path}", s, b, 403)
# 4.3 删除其他用户的 session
# 先创建一个 session
data = json.dumps({"title": "Test Session for Auth Test"})
s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data)
test_session_id = None
if s == 201:
test_session_id = json.loads(b).get("id", "")
print(f" 创建测试 session: {test_session_id}")
if test_session_id and user_token:
# 用普通用户 token 删除管理员的 session
s, b, h = curl("DELETE", f"/api/v1/sessions/{test_session_id}", {"Authorization": f"Bearer {user_token}"})
print_result(f"4.3 普通用户删除管理员的 session", s, b, None, "期望 403 或至少不允许删除")
if s == 200:
print(f" 🔴 严重: 用户可以删除其他用户的 session!")
# 4.4 查看其他用户的具体 session
if test_session_id and user_token:
s, b, h = curl("GET", f"/api/v1/sessions/{test_session_id}", {"Authorization": f"Bearer {user_token}"})
print_result(f"4.4 普通用户查看管理员的 session", s, b, None, "期望 403 或 404")
# ============================================================
# 5. 敏感信息泄露检查
# ============================================================
test_group("5. 敏感信息泄露检查")
# 5.1 检查响应头
s, b, h = curl("GET", "/api/v1/health")
print("5.1 HTTP 响应头分析:")
sensitive_headers = ["server", "x-powered-by", "x-aspnet-version", "x-generator"]
for header_name in sensitive_headers:
if header_name in h:
print(f" ⚠️ 敏感头: {header_name}: {h[header_name]}")
else:
print(f" ✅ 无敏感头: {header_name}")
# 5.2 检查安全头
security_headers = {
"x-content-type-options": "nosniff",
"x-frame-options": "DENY",
"x-xss-protection": "1; mode=block",
"referrer-policy": "strict-origin-when-cross-origin",
"strict-transport-security": None,
}
for header_name, expected in security_headers.items():
value = h.get(header_name, "")
if value:
print(f" ✅ 安全头: {header_name}: {value}")
else:
print(f" ⚠️ 缺少安全头: {header_name}")
# 5.3 CORS 配置
print("\n5.3 CORS 配置检查:")
s, b, h_origin = curl("GET", "/api/v1/health", {"Origin": "https://evil.com"})
acao = h_origin.get("access-control-allow-origin", "")
acac = h_origin.get("access-control-allow-credentials", "")
print(f" Access-Control-Allow-Origin: {acao}")
print(f" Access-Control-Allow-Credentials: {acac}")
if acac == "true" and acao == "https://evil.com":
print(f" 🔴 严重: CORS 允许任何来源携带凭据! 可被 CSRF 攻击利用")
elif acac == "true":
print(f" ⚠️ 警告: Access-Control-Allow-Credentials 为 true,需确认来源白名单")
# 5.4 检查错误响应是否泄露内部信息
s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, "invalid_json{{{")
print(f"\n5.4 错误响应信息泄露:")
print(f" 响应: {b[:300]}")
if "panic" in b.lower() or "goroutine" in b.lower() or "stack" in b.lower():
print(f" 🔴 严重: 错误响应泄露堆栈跟踪!")
elif ".go:" in b and ("/" in b or "\\\\" in b):
print(f" ⚠️ 警告: 错误响应可能泄露文件路径!")
else:
print(f" ✅ 错误响应未泄露内部信息")
# 5.5 测试 .env 文件是否可访问
s, b, h = curl("GET", "/.env")
print(f"\n5.5 .env 文件访问: [{s}]")
s, b, h = curl("GET", "/api/v1/.env")
print(f" /api/v1/.env 访问: [{s}]")
s, b, h = curl("GET", "/api/v1/../.env")
print(f" /api/v1/../.env 访问: [{s}]")
# 5.6 测试敏感路径
for path in ["/admin", "/wp-admin", "/phpmyadmin", "/api/v1/debug", "/debug/pprof/"]:
s, b, h = curl("GET", path)
sens = "⚠️" if s != 404 else "✅"
print(f" {sens} {path}: [{s}]")
# 5.7 CSP 配置检查
print(f"\n5.7 Content-Security-Policy: {h.get('content-security-policy', 'MISSING')}")
# 5.8 WebSocket token 通过 query param 传输
print(f"\n5.8 WebSocket 认证: Token 通过 query param 传递 (risk of leaking in logs)")
print(f" 这是常见做法,但建议使用更短的生命周期或单独的 WS token")
# ============================================================
# 6. 总结
# ============================================================
test_group("6. 测试完成 - 汇总")
print("所有测试已执行完毕。请查看上方结果判断安全问题。")
print(f"管理员 token 前30字符: {admin_token[:30]}...")
if user_token:
print(f"普通用户 token 前30字符: {user_token[:30]}...")