Files
Cyrene/debug/cache/test_cdp_e2e_v4.py
T
AskaEth b15e1c9541 fix: 第二轮深度调试修复 - useSpeechSynthesis守卫+NPE防护+E2E测试完善
- P0: useSpeechSynthesis.ts cancel()增加isSupported守卫
- P0: iot_provider.go 添加loader nil检查防止NPE panic
- 新增CDP E2E v4测试脚本 14项全绿通过
- 生成第二轮修复报告 docs/debug/2026-05-21-round2-fixes.md
2026-05-22 00:10:37 +08:00

369 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""CDP E2E 综合测试 v4 — 通过 UI 登录并测试全流程"""
import json, time, urllib.request, base64
from websocket import create_connection
FRONTEND_URL = "http://localhost:5199"
CDP_URL = "http://127.0.0.1:9225"
CREDENTIALS = {"username": "yeij0942", "password": "Jiang1143218570"}
passed = 0
failed = 0
def check(name, condition, detail=""):
global passed, failed
if condition:
passed += 1
print(f"{name}")
else:
failed += 1
print(f"{name} {detail}")
print("=" * 60)
print("CDP E2E 测试 v4 — UI 登录 + 全流程验证")
print("=" * 60)
# Step 1: Create fresh page
print("\n--- Step 1: 创建新页面 ---")
req = urllib.request.Request(f"{CDP_URL}/json/new?url=about:blank", method="PUT")
resp = urllib.request.urlopen(req, timeout=10)
page = json.loads(resp.read())
ws_url = page["webSocketDebuggerUrl"]
print(f" Page: {page.get('id')}")
ws = create_connection(ws_url, timeout=15)
msg_id = [0]
def cdp(method, params=None):
msg_id[0] += 1
mid = msg_id[0]
ws.send(json.dumps({"id": mid, "method": method, "params": params or {}}))
return mid
def recv_all(timeout=3):
ws.settimeout(timeout)
msgs = []
try:
while True:
msgs.append(ws.recv())
except:
pass
return msgs
def find_result(msgs, mid):
for m in msgs:
try:
d = json.loads(m)
if d.get("id") == mid:
return d.get("result", {})
except:
pass
return None
# Enable domains
cdp("Page.enable")
cdp("Network.enable")
cdp("Runtime.enable")
cdp("Log.enable")
time.sleep(0.5)
recv_all(0.5)
# Step 2: Navigate to frontend
print("\n--- Step 2: 导航到前端 ---")
mid_nav = cdp("Page.navigate", {"url": FRONTEND_URL + "/"})
time.sleep(4)
msgs = recv_all(3)
# Check for JS errors
has_js_error = False
for m in msgs:
d = json.loads(m)
if d.get("method") == "Runtime.exceptionThrown":
has_js_error = True
exc = d.get("params", {}).get("exceptionDetails", {})
print(f" [JS ERROR] {exc.get('text', '')}")
check("页面无 JS 异常", not has_js_error)
# Step 3: Check localStorage (should NOT have test-token-cyrene)
print("\n--- Step 3: localStorage 检查 ---")
mid_ls = cdp("Runtime.evaluate", {
"expression": "JSON.stringify({token: localStorage.getItem('token'), user_id: localStorage.getItem('user_id'), allKeys: Object.keys(localStorage)})"
})
time.sleep(0.5)
result = find_result(recv_all(1), mid_ls)
ls_val = ""
if result:
ls_val = result.get("result", {}).get("value", "")
print(f" localStorage: {ls_val}")
check("无硬编码 test-token-cyrene", "test-token-cyrene" not in ls_val)
# Step 4: Fill login form and click login
print("\n--- Step 4: 通过 UI 登录 ---")
# Find username input and fill
mid_user = cdp("Runtime.evaluate", {
"expression": f"""
(() => {{
const inputs = document.querySelectorAll('input');
let usernameInput = null;
let passwordInput = null;
let loginBtn = null;
for (const inp of inputs) {{
const placeholder = (inp.placeholder || '').toLowerCase();
const type = (inp.type || '').toLowerCase();
if (placeholder.includes('用户') || placeholder.includes('账号') || placeholder.includes('username')) {{
usernameInput = inp;
}} else if (type === 'password' || placeholder.includes('密码') || placeholder.includes('password')) {{
passwordInput = inp;
}}
}}
if (usernameInput) {{
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
nativeInputValueSetter.call(usernameInput, '{CREDENTIALS["username"]}');
usernameInput.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
if (passwordInput) {{
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
nativeInputValueSetter.call(passwordInput, '{CREDENTIALS["password"]}');
passwordInput.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
// Find login SUBMIT button (text is "进入昔涟的世界 ♪", NOT the mode switcher "登录")
const buttons = document.querySelectorAll('button');
for (const btn of buttons) {{
const text = btn.textContent.trim();
if (text.includes('进入') || text.includes('昔涟')) {{
loginBtn = btn;
break;
}}
}}
return JSON.stringify({{
foundUser: !!usernameInput,
foundPass: !!passwordInput,
foundBtn: !!loginBtn,
btnText: loginBtn ? loginBtn.textContent.trim() : 'N/A',
inputCount: inputs.length,
bodySnippet: document.body.innerText.substring(0, 200)
}});
}})()
"""
})
time.sleep(0.5)
result = find_result(recv_all(1), mid_user)
form_info = {}
if result:
try:
form_info = json.loads(result.get("result", {}).get("value", "{}"))
print(f" 表单状态: {json.dumps(form_info, indent=2, ensure_ascii=False)}")
except:
print(f" Form result: {result.get('result', {}).get('value', '')[:200]}")
check("找到用户名输入框", form_info.get("foundUser", False))
check("找到密码输入框", form_info.get("foundPass", False))
check("找到登录按钮", form_info.get("foundBtn", False))
# Click login
if form_info.get("foundBtn"):
mid_click = cdp("Runtime.evaluate", {
"expression": """
(() => {
const buttons = document.querySelectorAll('button');
for (const btn of buttons) {
const text = btn.textContent.trim();
if (text.includes('进入') || text.includes('昔涟')) {
btn.click();
return 'clicked';
}
}
return 'not_found';
})()
"""
})
time.sleep(4)
msgs = recv_all(4)
# Check API calls after login
api_calls = []
console_errors = []
for m in msgs:
d = json.loads(m)
method = d.get("method", "")
params = d.get("params", {})
if method == "Network.requestWillBeSent":
req = params.get("request", {})
url = req.get("url", "")
headers = req.get("headers", {})
auth = headers.get("Authorization", "") or headers.get("authorization", "")
if "/api/" in url:
api_calls.append(f"{req.get('method','?')} {url[:100]}")
elif method == "Runtime.consoleAPICalled":
args = params.get("args", [])
if params.get("type") == "error":
texts = [str(a.get("value", "") or a.get("description", ""))[:150] for a in args]
console_errors.append(" ".join(texts))
elif method == "Runtime.exceptionThrown":
exc = params.get("exceptionDetails", {})
console_errors.append(f"JS: {exc.get('text', '')}")
print(f" API 调用: {len(api_calls)}")
for c in api_calls[:8]:
print(f" {c}")
auth_errors = [e for e in console_errors if "401" in e or "认证" in e or "无效" in e]
check("无认证错误", len(auth_errors) == 0, str(auth_errors[:2]) if auth_errors else "")
# Step 5: Check post-login UI
print("\n--- Step 5: 登录后 UI 检查 ---")
mid_ui = cdp("Runtime.evaluate", {
"expression": """
(() => {
const hasSidebar = !!(document.querySelector('aside, nav[class*="side"], [class*="sidebar"], [class*="Sidebar"]'));
const hasChatArea = !!(document.querySelector('[class*="chat"], [class*="Chat"], [class*="message"]'));
const token = localStorage.getItem('token');
const userId = localStorage.getItem('user_id');
const bodyText = document.body.innerText.substring(0, 300);
return JSON.stringify({hasSidebar, hasChatArea, hasToken: !!token, userId, bodyText});
})()
"""
})
time.sleep(0.5)
result = find_result(recv_all(1), mid_ui)
ui_state = {}
if result:
try:
ui_state = json.loads(result.get("result", {}).get("value", "{}"))
print(f" UI: {json.dumps(ui_state, indent=2, ensure_ascii=False)[:500]}")
except:
pass
check("localStorage 有 token", ui_state.get("hasToken", False))
check("侧边栏或聊天区域存在", ui_state.get("hasSidebar", False) or ui_state.get("hasChatArea", False))
# Step 6: Screenshot
mid_ss = cdp("Page.captureScreenshot", {"format": "png"})
time.sleep(1)
result = find_result(recv_all(2), mid_ss)
if result:
img_data = result.get("data", "")
if img_data:
with open("/tmp/cdp_screenshot_final.png", "wb") as f:
f.write(base64.b64decode(img_data))
print(" 截图: /tmp/cdp_screenshot_final.png")
ws.close()
# ============================================================
# Part B: WebSocket + Backend Tests
# ============================================================
print("\n" + "=" * 60)
print("Part B: Backend API 深度测试")
print("=" * 60)
# Login to get token
login_req = urllib.request.Request(
"http://localhost:8080/api/v1/auth/login",
data=json.dumps(CREDENTIALS).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
resp = urllib.request.urlopen(login_req, timeout=10)
login_data = json.loads(resp.read())
token = login_data.get("token", "")
user_id = login_data.get("user_id", "")
print(f" Token: {token[:30]}..., User: {user_id}")
AUTH = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
BASE = "http://localhost:8080/api/v1"
# B1: Sessions
print("\n--- B1: 会话管理 ---")
req = urllib.request.Request(f"{BASE}/sessions?user_id={user_id}", headers=AUTH)
resp = urllib.request.urlopen(req, timeout=10)
sessions = json.loads(resp.read())
sessions_list = sessions.get("sessions", sessions.get("data", []))
check("会话列表", isinstance(sessions_list, list), f"count={len(sessions_list) if isinstance(sessions_list, list) else 'N/A'}")
# Create session
req = urllib.request.Request(
f"{BASE}/sessions",
data=json.dumps({"user_id": user_id, "title": "E2E Test Session"}).encode(),
headers=AUTH,
method="POST"
)
resp = urllib.request.urlopen(req, timeout=10)
session_data = json.loads(resp.read())
session_id = session_data.get("session_id", session_data.get("id", ""))
check("创建新会话", bool(session_id))
# B2: Chat History
if session_id:
print("\n--- B2: 聊天历史 ---")
req = urllib.request.Request(
f"{BASE}/sessions/{session_id}/messages?limit=10",
headers=AUTH
)
resp = urllib.request.urlopen(req, timeout=10)
msgs = json.loads(resp.read())
check("获取消息历史", resp.status == 200)
# B3: Memory
print("\n--- B3: 记忆系统 ---")
req = urllib.request.Request(f"{BASE}/memory", headers=AUTH)
try:
resp = urllib.request.urlopen(req, timeout=10)
mem_list = json.loads(resp.read())
check("记忆列表", resp.status in [200, 404])
except Exception as e:
check("记忆列表", False, str(e)[:80])
# B4: IoT via WebSocket
print("\n--- B4: IoT 设备(通过 WebSocket---")
import websocket as ws_lib
ws_url = f"ws://localhost:8080/ws/chat?token={token}&session_id={session_id}"
try:
sock = ws_lib.create_connection(ws_url, timeout=10)
print(f" WebSocket 已连接")
check("WebSocket 连接", True)
# Send IoT query
iot_query = json.dumps({
"type": "message",
"content": "列出所有IoT设备",
"session_id": session_id,
"timestamp": int(time.time() * 1000)
})
sock.send(iot_query)
print(f" 已发送 IoT 查询")
# Read responses
sock.settimeout(8)
responses = []
start = time.time()
while time.time() - start < 8:
try:
msg = sock.recv()
data = json.loads(msg)
msg_type = data.get("type", "?")
content = str(data.get("content", ""))[:80]
responses.append(f"[{msg_type}] {content}")
if msg_type in ("response", "stream_end"):
break
except:
break
print(f" 收到 {len(responses)} 条消息:")
for r in responses[:10]:
print(f" {r}")
has_response = any("response" in r or "device" in r.lower() or "iot" in r.lower() for r in responses)
check("收到 IoT 响应", len(responses) > 0)
sock.close()
except Exception as e:
check("WebSocket/IoT 测试", False, str(e)[:80])
# Summary
print("\n" + "=" * 60)
print(f"测试完成: {passed} 通过, {failed} 失败, 总计 {passed+failed}")
print("=" * 60)