#!/usr/bin/env python3 """ 安全审计测试脚本 - 第13轮 测试 JWT 安全、认证端点安全、输入验证、授权越权、敏感信息泄露 """ import subprocess import json import sys import time import base64 BASE = "http://localhost:8080" HEADERS_JSON = {"Content-Type": "application/json"} def curl(method, path, headers=None, data=None, expected_status=None, timeout=10): """执行 curl 请求并返回 (status_code, response_body, response_headers)""" cmd = ["curl", "-s", "-w", "\n%{http_code}", "-X", method, f"{BASE}{path}", "--max-time", str(timeout)] if headers: for k, v in headers.items(): cmd += ["-H", f"{k}: {v}"] if data: cmd += ["-d", data] result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout+5) output = result.stdout.strip() # 分离 body 和 status code if "\n" in output: lines = output.rsplit("\n", 1) body = lines[0] status = lines[1] else: body = "" status = output try: status = int(status) except ValueError: status = 0 # 单独获取响应头 hdr_cmd = ["curl", "-s", "-I", "-X", method, f"{BASE}{path}", "--max-time", str(timeout)] if headers: for k, v in headers.items(): hdr_cmd += ["-H", f"{k}: {v}"] hdr_result = subprocess.run(hdr_cmd, capture_output=True, text=True, timeout=timeout+5) resp_headers = {} for line in hdr_result.stdout.strip().split("\n"): if ":" in line: key, val = line.split(":", 1) resp_headers[key.strip().lower()] = val.strip() return status, body, resp_headers def print_result(test_name, status, body, expected_status=None, detail=""): """打印测试结果""" body_short = body[:200] if body else "(empty)" status_icon = "✅" if (expected_status is None or status == expected_status) else "❌" print(f"{status_icon} [{status}] {test_name}") if detail: print(f" {detail}") print(f" Response: {body_short}") print() def test_group(name): print(f"\n{'='*70}") print(f" {name}") print(f"{'='*70}\n") # ============================================================ # 0. 先获取有效的 admin token # ============================================================ print("🔑 获取管理员 token...") s, b, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, json.dumps({"username": "yeij0942", "password": "Jiang1143218570"})) if s == 200: admin_token = json.loads(b).get("token", "") print(f" 管理员 token 获取成功: {admin_token[:30]}...") else: print(f" ❌ 获取失败: {s} {b}") sys.exit(1) # ============================================================ # 1. JWT 令牌安全性审计 # ============================================================ test_group("1. JWT 令牌安全性审计") # 1.1 无 token 访问受保护端点 s, b, h = curl("GET", "/api/v1/sessions", expected_status=401) print_result("1.1 无 token 访问受保护端点", s, b, 401) # 1.2 伪造 token fake_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiYWRtaW4iLCJleHAiOjk5OTk5OTk5OTksImlhdCI6MH0.fake-signature" s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": f"Bearer {fake_token}"}, expected_status=401) print_result("1.2 伪造 token (错误签名)", s, b, 401) # 1.3 过期 token (手动构造一个已过期的) # 使用 Python 生成一个过期的 JWT import hmac import hashlib import base64 def b64url(data): return base64.urlsafe_b64encode(data).rstrip(b'=').decode() header = b64url(json.dumps({"alg":"HS256","typ":"JWT"}).encode()) # exp = 2020-01-01 (1577836800) payload = b64url(json.dumps({"user_id":"admin","exp":1577836800,"iat":1577836800}).encode()) signing_input = f"{header}.{payload}".encode() # 使用默认密钥 "change-me-in-production" sig = hmac.new(b"change-me-in-production", signing_input, hashlib.sha256).digest() expired_token = f"{header}.{payload}.{b64url(sig)}" s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": f"Bearer {expired_token}"}, expected_status=401) print_result("1.3 过期 token", s, b, 401) # 1.4 空 token s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": "Bearer "}, expected_status=401) print_result("1.4 空 token", s, b, 401) # 1.5 错误的 Authorization 格式 s, b, h = curl("GET", "/api/v1/sessions", {"Authorization": "Basic YWRtaW46cGFzcw=="}, expected_status=401) print_result("1.5 错误格式 (Basic auth)", s, b, 401) # 1.6 token 放到 query param 而非 header (WebSocket 用这个) s, b, h = curl("GET", f"/api/v1/sessions?token={admin_token}", expected_status=401) print_result("1.6 token 作为 query param (REST API)", s, b, 401, "REST API 应该只接受 Header 中的 token") # 1.7 检查 token 过期时间 # 1.7 手动解码 JWT 检查过期时间 (纯标准库) try: parts = admin_token.split(".") if len(parts) >= 2: # 补齐 base64 padding payload_b64 = parts[1] padding = 4 - len(payload_b64) % 4 if padding != 4: payload_b64 += "=" * padding decoded_bytes = base64.urlsafe_b64decode(payload_b64) decoded = json.loads(decoded_bytes) exp = decoded.get("exp", 0) iat = decoded.get("iat", 0) hours = (exp - iat) / 3600 print(f"1.7 JWT 过期时间分析:") print(f" 签发时间 (iat): {iat} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(iat))})") print(f" 过期时间 (exp): {exp} ({time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(exp))})") print(f" 有效时长: {hours:.0f} 小时 ({hours/24:.1f} 天)") if hours > 168: # 超过7天 print(f" ⚠️ 警告: JWT 过期时间过长 ({hours/24:.0f} 天),建议缩短至 24-72 小时") else: print(f" ✅ 过期时间合理") except Exception as e: print(f" ❌ 解码失败: {e}") print() # ============================================================ # 2. 认证端点安全测试 # ============================================================ test_group("2. 认证端点安全测试") # 2.1 SQL 注入测试 - Login sql_payloads = [ ("' OR '1'='1", "any"), ("admin'--", "any"), ("'; DROP TABLE users; --", "any"), ("' UNION SELECT 1,2,3,4,5,6 --", "any"), ("admin' OR 1=1 --", "any"), ("1'='1", "any"), ] for payload, pwd in sql_payloads: data = json.dumps({"username": payload, "password": pwd}) s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data, expected_status=None) # 对于 SQL 注入 payload,期望返回 400 (格式无效) 或 401 (认证失败) # 但绝对不能是 200 (成功登录) if s == 200: print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, None, "🔴 严重: SQL 注入可能成功!") elif s == 400: print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, 400, "✅ 用户名格式校验拦截") else: print_result(f"2.1 SQL注入-Login: {payload[:40]}", s, b, 401) # 2.2 SQL 注入测试 - Register for payload in ["' OR '1'='1", "test'--", "'; DROP TABLE users; --"]: data = json.dumps({ "username": payload, "password": "test123456", "email": "test@test.com", "nickname": "Test", "verify_code": "000000" }) s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data) print_result(f"2.2 SQL注入-Register: {payload[:40]}", s, b, 400 if s != 200 else None) # 2.3 暴力破解防护测试 (连续错误登录) print("2.3 暴力破解防护测试 (连续10次错误登录):") rate_limited = False for i in range(10): data = json.dumps({"username": "yeij0942", "password": f"wrong_password_{i}"}) s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data) if s == 429: rate_limited = True print(f" 请求 {i+1}: [{s}] ⚡ 速率限制触发!") break print(f" 请求 {i+1}: [{s}]") if not rate_limited: print(f" ⚠️ 10次错误登录后仍未触发速率限制") else: print(f" ✅ 速率限制在第 {i+1} 次请求时触发") # 2.4 用户名枚举测试 print("\n2.4 用户名枚举测试:") # 测试不存在的用户 data = json.dumps({"username": "nonexistent_user_xyz_12345", "password": "any_password"}) s1, b1, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data) # 测试存在的用户 (但密码错误) data = json.dumps({"username": "yeij0942", "password": "wrong_password"}) s2, b2, _ = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data) print(f" 不存在用户: [{s1}] {b1[:150]}") print(f" 存在但密码错: [{s2}] {b2[:150]}") if s1 == s2 and "用户名或密码错误" in b1 and "用户名或密码错误" in b2: print(f" ✅ 错误消息一致,防止用户名枚举") elif b1 != b2: print(f" ⚠️ 警告: 错误消息不同,可能存在用户名枚举风险") print() # 2.5 注册端点 - 极短用户名 data = json.dumps({"username": "ab", "password": "123456", "email": "test@test.com", "nickname": "Test", "verify_code": "000000"}) s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data) print_result("2.5 极短用户名 (ab, 2字符)", s, b, 400) # 2.6 特殊字符用户名 for uname in ["test", "verify_code": "000000"}) s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data) # nickname 应该不需要特殊的 XSS 过滤,但需要确保后端对其做了转义 print_result("2.7 XSS payload in nickname", s, b, 403 if s != 200 else None, "期望:注册被拒绝或昵称被转义/过滤") # ============================================================ # 3. 输入验证审计 # ============================================================ test_group("3. 输入验证审计 - Session 端点") # 创建普通用户 token (如果可以) # 先尝试注册 data = json.dumps({"username": "testuser42", "password": "Test123456", "email": "test42@test.com", "nickname": "Tester42", "verify_code": "000000"}) s, b, h = curl("POST", "/api/v1/auth/register", HEADERS_JSON, data) user_token = None if s == 201: user_token = json.loads(b).get("token", "") print(f" 创建测试用户成功,token={user_token[:30]}...") else: print(f" 创建测试用户失败 [{s}]: {b[:100]}") # 尝试登录 data = json.dumps({"username": "testuser42", "password": "Test123456"}) s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, data) if s == 200: user_token = json.loads(b).get("token", "") print(f" 登录测试用户成功,token={user_token[:30]}...") else: print(f" 登录测试用户也失败 [{s}]: {b[:100]}") # 3.1 创建 session 时携带过长的 title long_title = "A" * 1000 data = json.dumps({"title": long_title, "is_main": False}) s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data) print_result("3.1 超长 session title (1000字符)", s, b, 201 if s == 201 else None) # 3.2 创建 session 时指定其他用户的 user_id (越权) data = json.dumps({"user_id": "user_hacker", "title": "hijacked session"}) s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data) print_result("3.2 Session 创建时指定其他 user_id (越权)", s, b, None, "检查是否允许 user_id 覆盖 JWT 身份") if s == 201: resp = json.loads(b) actual_user_id = resp.get("user_id", "") if actual_user_id == "user_hacker": print(f" 🔴 严重: 允许通过请求体覆盖 user_id 为 {actual_user_id}!") else: print(f" ✅ 忽略请求体中的 user_id,使用 JWT 身份: {actual_user_id}") # 3.3 列出其他用户的 sessions s, b, h = curl("GET", "/api/v1/sessions?user_id=user_hacker", {"Authorization": f"Bearer {admin_token}"}) print_result("3.3 查看其他用户的 session 列表 (越权)", s, b, None, "检查是否允许查询其他用户的 session") if s == 200: sessions_data = json.loads(b) print(f" ⚠️ 可以查看其他用户的 session 列表") # 3.4 空消息 / 特殊 Unicode 测试 (通过 WebSocket 模拟) # 由于 WebSocket 测试需要脚本,这里仅做 HTTP 层面的检查 # 3.5 非法 session ID s, b, h = curl("GET", "/api/v1/sessions/../../../etc/passwd", {"Authorization": f"Bearer {admin_token}"}) print_result("3.5 路径遍历 in session ID", s, b, 404) s, b, h = curl("GET", "/api/v1/sessions/", {"Authorization": f"Bearer {admin_token}"}) print_result("3.6 XSS in session ID", s, b, 404) # 3.7 超长 session ID long_sid = "session_" + "A" * 500 s, b, h = curl("GET", f"/api/v1/sessions/{long_sid}", {"Authorization": f"Bearer {admin_token}"}) print_result("3.7 超长 session ID (500字符)", s, b, None) # ============================================================ # 4. 授权/越权测试 # ============================================================ test_group("4. 授权/越权测试") # 4.1 未认证访问受保护端点 endpoints = [ ("GET", "/api/v1/sessions"), ("GET", "/api/v1/files"), ("GET", "/api/v1/memory/search"), ("GET", "/api/v1/reminders"), ("POST", "/api/v1/automation/rules"), ] for method, path in endpoints: s, b, h = curl(method, path, expected_status=401) print_result(f"4.1 未认证访问: {method} {path}", s, b, 401) # 4.2 普通用户访问管理员 API if user_token: admin_endpoints = [ ("GET", "/api/v1/admin/sessions"), ("GET", "/api/v1/admin/sessions/active"), ] for method, path in admin_endpoints: s, b, h = curl(method, path, {"Authorization": f"Bearer {user_token}"}, expected_status=403) print_result(f"4.2 普通用户访问管理员API: {method} {path}", s, b, 403) # 4.3 删除其他用户的 session # 先创建一个 session data = json.dumps({"title": "Test Session for Auth Test"}) s, b, h = curl("POST", "/api/v1/sessions", {**HEADERS_JSON, "Authorization": f"Bearer {admin_token}"}, data) test_session_id = None if s == 201: test_session_id = json.loads(b).get("id", "") print(f" 创建测试 session: {test_session_id}") if test_session_id and user_token: # 用普通用户 token 删除管理员的 session s, b, h = curl("DELETE", f"/api/v1/sessions/{test_session_id}", {"Authorization": f"Bearer {user_token}"}) print_result(f"4.3 普通用户删除管理员的 session", s, b, None, "期望 403 或至少不允许删除") if s == 200: print(f" 🔴 严重: 用户可以删除其他用户的 session!") # 4.4 查看其他用户的具体 session if test_session_id and user_token: s, b, h = curl("GET", f"/api/v1/sessions/{test_session_id}", {"Authorization": f"Bearer {user_token}"}) print_result(f"4.4 普通用户查看管理员的 session", s, b, None, "期望 403 或 404") # ============================================================ # 5. 敏感信息泄露检查 # ============================================================ test_group("5. 敏感信息泄露检查") # 5.1 检查响应头 s, b, h = curl("GET", "/api/v1/health") print("5.1 HTTP 响应头分析:") sensitive_headers = ["server", "x-powered-by", "x-aspnet-version", "x-generator"] for header_name in sensitive_headers: if header_name in h: print(f" ⚠️ 敏感头: {header_name}: {h[header_name]}") else: print(f" ✅ 无敏感头: {header_name}") # 5.2 检查安全头 security_headers = { "x-content-type-options": "nosniff", "x-frame-options": "DENY", "x-xss-protection": "1; mode=block", "referrer-policy": "strict-origin-when-cross-origin", "strict-transport-security": None, } for header_name, expected in security_headers.items(): value = h.get(header_name, "") if value: print(f" ✅ 安全头: {header_name}: {value}") else: print(f" ⚠️ 缺少安全头: {header_name}") # 5.3 CORS 配置 print("\n5.3 CORS 配置检查:") s, b, h_origin = curl("GET", "/api/v1/health", {"Origin": "https://evil.com"}) acao = h_origin.get("access-control-allow-origin", "") acac = h_origin.get("access-control-allow-credentials", "") print(f" Access-Control-Allow-Origin: {acao}") print(f" Access-Control-Allow-Credentials: {acac}") if acac == "true" and acao == "https://evil.com": print(f" 🔴 严重: CORS 允许任何来源携带凭据! 可被 CSRF 攻击利用") elif acac == "true": print(f" ⚠️ 警告: Access-Control-Allow-Credentials 为 true,需确认来源白名单") # 5.4 检查错误响应是否泄露内部信息 s, b, h = curl("POST", "/api/v1/auth/login", HEADERS_JSON, "invalid_json{{{") print(f"\n5.4 错误响应信息泄露:") print(f" 响应: {b[:300]}") if "panic" in b.lower() or "goroutine" in b.lower() or "stack" in b.lower(): print(f" 🔴 严重: 错误响应泄露堆栈跟踪!") elif ".go:" in b and ("/" in b or "\\\\" in b): print(f" ⚠️ 警告: 错误响应可能泄露文件路径!") else: print(f" ✅ 错误响应未泄露内部信息") # 5.5 测试 .env 文件是否可访问 s, b, h = curl("GET", "/.env") print(f"\n5.5 .env 文件访问: [{s}]") s, b, h = curl("GET", "/api/v1/.env") print(f" /api/v1/.env 访问: [{s}]") s, b, h = curl("GET", "/api/v1/../.env") print(f" /api/v1/../.env 访问: [{s}]") # 5.6 测试敏感路径 for path in ["/admin", "/wp-admin", "/phpmyadmin", "/api/v1/debug", "/debug/pprof/"]: s, b, h = curl("GET", path) sens = "⚠️" if s != 404 else "✅" print(f" {sens} {path}: [{s}]") # 5.7 CSP 配置检查 print(f"\n5.7 Content-Security-Policy: {h.get('content-security-policy', 'MISSING')}") # 5.8 WebSocket token 通过 query param 传输 print(f"\n5.8 WebSocket 认证: Token 通过 query param 传递 (risk of leaking in logs)") print(f" 这是常见做法,但建议使用更短的生命周期或单独的 WS token") # ============================================================ # 6. 总结 # ============================================================ test_group("6. 测试完成 - 汇总") print("所有测试已执行完毕。请查看上方结果判断安全问题。") print(f"管理员 token 前30字符: {admin_token[:30]}...") if user_token: print(f"普通用户 token 前30字符: {user_token[:30]}...")