fix: 第二轮深度调试修复 - useSpeechSynthesis守卫+NPE防护+E2E测试完善

- P0: useSpeechSynthesis.ts cancel()增加isSupported守卫
- P0: iot_provider.go 添加loader nil检查防止NPE panic
- 新增CDP E2E v4测试脚本 14项全绿通过
- 生成第二轮修复报告 docs/debug/2026-05-21-round2-fixes.md
This commit is contained in:
2026-05-22 00:10:37 +08:00
parent a058b0ab8e
commit b15e1c9541
7 changed files with 1236 additions and 8 deletions
+368
View File
@@ -0,0 +1,368 @@
#!/usr/bin/env python3
"""CDP E2E 综合测试 v4 — 通过 UI 登录并测试全流程"""
import json, time, urllib.request, base64
from websocket import create_connection
FRONTEND_URL = "http://localhost:5199"
CDP_URL = "http://127.0.0.1:9225"
CREDENTIALS = {"username": "yeij0942", "password": "Jiang1143218570"}
passed = 0
failed = 0
def check(name, condition, detail=""):
global passed, failed
if condition:
passed += 1
print(f"{name}")
else:
failed += 1
print(f"{name} {detail}")
print("=" * 60)
print("CDP E2E 测试 v4 — UI 登录 + 全流程验证")
print("=" * 60)
# Step 1: Create fresh page
print("\n--- Step 1: 创建新页面 ---")
req = urllib.request.Request(f"{CDP_URL}/json/new?url=about:blank", method="PUT")
resp = urllib.request.urlopen(req, timeout=10)
page = json.loads(resp.read())
ws_url = page["webSocketDebuggerUrl"]
print(f" Page: {page.get('id')}")
ws = create_connection(ws_url, timeout=15)
msg_id = [0]
def cdp(method, params=None):
msg_id[0] += 1
mid = msg_id[0]
ws.send(json.dumps({"id": mid, "method": method, "params": params or {}}))
return mid
def recv_all(timeout=3):
ws.settimeout(timeout)
msgs = []
try:
while True:
msgs.append(ws.recv())
except:
pass
return msgs
def find_result(msgs, mid):
for m in msgs:
try:
d = json.loads(m)
if d.get("id") == mid:
return d.get("result", {})
except:
pass
return None
# Enable domains
cdp("Page.enable")
cdp("Network.enable")
cdp("Runtime.enable")
cdp("Log.enable")
time.sleep(0.5)
recv_all(0.5)
# Step 2: Navigate to frontend
print("\n--- Step 2: 导航到前端 ---")
mid_nav = cdp("Page.navigate", {"url": FRONTEND_URL + "/"})
time.sleep(4)
msgs = recv_all(3)
# Check for JS errors
has_js_error = False
for m in msgs:
d = json.loads(m)
if d.get("method") == "Runtime.exceptionThrown":
has_js_error = True
exc = d.get("params", {}).get("exceptionDetails", {})
print(f" [JS ERROR] {exc.get('text', '')}")
check("页面无 JS 异常", not has_js_error)
# Step 3: Check localStorage (should NOT have test-token-cyrene)
print("\n--- Step 3: localStorage 检查 ---")
mid_ls = cdp("Runtime.evaluate", {
"expression": "JSON.stringify({token: localStorage.getItem('token'), user_id: localStorage.getItem('user_id'), allKeys: Object.keys(localStorage)})"
})
time.sleep(0.5)
result = find_result(recv_all(1), mid_ls)
ls_val = ""
if result:
ls_val = result.get("result", {}).get("value", "")
print(f" localStorage: {ls_val}")
check("无硬编码 test-token-cyrene", "test-token-cyrene" not in ls_val)
# Step 4: Fill login form and click login
print("\n--- Step 4: 通过 UI 登录 ---")
# Find username input and fill
mid_user = cdp("Runtime.evaluate", {
"expression": f"""
(() => {{
const inputs = document.querySelectorAll('input');
let usernameInput = null;
let passwordInput = null;
let loginBtn = null;
for (const inp of inputs) {{
const placeholder = (inp.placeholder || '').toLowerCase();
const type = (inp.type || '').toLowerCase();
if (placeholder.includes('用户') || placeholder.includes('账号') || placeholder.includes('username')) {{
usernameInput = inp;
}} else if (type === 'password' || placeholder.includes('密码') || placeholder.includes('password')) {{
passwordInput = inp;
}}
}}
if (usernameInput) {{
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
nativeInputValueSetter.call(usernameInput, '{CREDENTIALS["username"]}');
usernameInput.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
if (passwordInput) {{
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
nativeInputValueSetter.call(passwordInput, '{CREDENTIALS["password"]}');
passwordInput.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
// Find login SUBMIT button (text is "进入昔涟的世界 ♪", NOT the mode switcher "登录")
const buttons = document.querySelectorAll('button');
for (const btn of buttons) {{
const text = btn.textContent.trim();
if (text.includes('进入') || text.includes('昔涟')) {{
loginBtn = btn;
break;
}}
}}
return JSON.stringify({{
foundUser: !!usernameInput,
foundPass: !!passwordInput,
foundBtn: !!loginBtn,
btnText: loginBtn ? loginBtn.textContent.trim() : 'N/A',
inputCount: inputs.length,
bodySnippet: document.body.innerText.substring(0, 200)
}});
}})()
"""
})
time.sleep(0.5)
result = find_result(recv_all(1), mid_user)
form_info = {}
if result:
try:
form_info = json.loads(result.get("result", {}).get("value", "{}"))
print(f" 表单状态: {json.dumps(form_info, indent=2, ensure_ascii=False)}")
except:
print(f" Form result: {result.get('result', {}).get('value', '')[:200]}")
check("找到用户名输入框", form_info.get("foundUser", False))
check("找到密码输入框", form_info.get("foundPass", False))
check("找到登录按钮", form_info.get("foundBtn", False))
# Click login
if form_info.get("foundBtn"):
mid_click = cdp("Runtime.evaluate", {
"expression": """
(() => {
const buttons = document.querySelectorAll('button');
for (const btn of buttons) {
const text = btn.textContent.trim();
if (text.includes('进入') || text.includes('昔涟')) {
btn.click();
return 'clicked';
}
}
return 'not_found';
})()
"""
})
time.sleep(4)
msgs = recv_all(4)
# Check API calls after login
api_calls = []
console_errors = []
for m in msgs:
d = json.loads(m)
method = d.get("method", "")
params = d.get("params", {})
if method == "Network.requestWillBeSent":
req = params.get("request", {})
url = req.get("url", "")
headers = req.get("headers", {})
auth = headers.get("Authorization", "") or headers.get("authorization", "")
if "/api/" in url:
api_calls.append(f"{req.get('method','?')} {url[:100]}")
elif method == "Runtime.consoleAPICalled":
args = params.get("args", [])
if params.get("type") == "error":
texts = [str(a.get("value", "") or a.get("description", ""))[:150] for a in args]
console_errors.append(" ".join(texts))
elif method == "Runtime.exceptionThrown":
exc = params.get("exceptionDetails", {})
console_errors.append(f"JS: {exc.get('text', '')}")
print(f" API 调用: {len(api_calls)}")
for c in api_calls[:8]:
print(f" {c}")
auth_errors = [e for e in console_errors if "401" in e or "认证" in e or "无效" in e]
check("无认证错误", len(auth_errors) == 0, str(auth_errors[:2]) if auth_errors else "")
# Step 5: Check post-login UI
print("\n--- Step 5: 登录后 UI 检查 ---")
mid_ui = cdp("Runtime.evaluate", {
"expression": """
(() => {
const hasSidebar = !!(document.querySelector('aside, nav[class*="side"], [class*="sidebar"], [class*="Sidebar"]'));
const hasChatArea = !!(document.querySelector('[class*="chat"], [class*="Chat"], [class*="message"]'));
const token = localStorage.getItem('token');
const userId = localStorage.getItem('user_id');
const bodyText = document.body.innerText.substring(0, 300);
return JSON.stringify({hasSidebar, hasChatArea, hasToken: !!token, userId, bodyText});
})()
"""
})
time.sleep(0.5)
result = find_result(recv_all(1), mid_ui)
ui_state = {}
if result:
try:
ui_state = json.loads(result.get("result", {}).get("value", "{}"))
print(f" UI: {json.dumps(ui_state, indent=2, ensure_ascii=False)[:500]}")
except:
pass
check("localStorage 有 token", ui_state.get("hasToken", False))
check("侧边栏或聊天区域存在", ui_state.get("hasSidebar", False) or ui_state.get("hasChatArea", False))
# Step 6: Screenshot
mid_ss = cdp("Page.captureScreenshot", {"format": "png"})
time.sleep(1)
result = find_result(recv_all(2), mid_ss)
if result:
img_data = result.get("data", "")
if img_data:
with open("/tmp/cdp_screenshot_final.png", "wb") as f:
f.write(base64.b64decode(img_data))
print(" 截图: /tmp/cdp_screenshot_final.png")
ws.close()
# ============================================================
# Part B: WebSocket + Backend Tests
# ============================================================
print("\n" + "=" * 60)
print("Part B: Backend API 深度测试")
print("=" * 60)
# Login to get token
login_req = urllib.request.Request(
"http://localhost:8080/api/v1/auth/login",
data=json.dumps(CREDENTIALS).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
resp = urllib.request.urlopen(login_req, timeout=10)
login_data = json.loads(resp.read())
token = login_data.get("token", "")
user_id = login_data.get("user_id", "")
print(f" Token: {token[:30]}..., User: {user_id}")
AUTH = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
BASE = "http://localhost:8080/api/v1"
# B1: Sessions
print("\n--- B1: 会话管理 ---")
req = urllib.request.Request(f"{BASE}/sessions?user_id={user_id}", headers=AUTH)
resp = urllib.request.urlopen(req, timeout=10)
sessions = json.loads(resp.read())
sessions_list = sessions.get("sessions", sessions.get("data", []))
check("会话列表", isinstance(sessions_list, list), f"count={len(sessions_list) if isinstance(sessions_list, list) else 'N/A'}")
# Create session
req = urllib.request.Request(
f"{BASE}/sessions",
data=json.dumps({"user_id": user_id, "title": "E2E Test Session"}).encode(),
headers=AUTH,
method="POST"
)
resp = urllib.request.urlopen(req, timeout=10)
session_data = json.loads(resp.read())
session_id = session_data.get("session_id", session_data.get("id", ""))
check("创建新会话", bool(session_id))
# B2: Chat History
if session_id:
print("\n--- B2: 聊天历史 ---")
req = urllib.request.Request(
f"{BASE}/sessions/{session_id}/messages?limit=10",
headers=AUTH
)
resp = urllib.request.urlopen(req, timeout=10)
msgs = json.loads(resp.read())
check("获取消息历史", resp.status == 200)
# B3: Memory
print("\n--- B3: 记忆系统 ---")
req = urllib.request.Request(f"{BASE}/memory", headers=AUTH)
try:
resp = urllib.request.urlopen(req, timeout=10)
mem_list = json.loads(resp.read())
check("记忆列表", resp.status in [200, 404])
except Exception as e:
check("记忆列表", False, str(e)[:80])
# B4: IoT via WebSocket
print("\n--- B4: IoT 设备(通过 WebSocket---")
import websocket as ws_lib
ws_url = f"ws://localhost:8080/ws/chat?token={token}&session_id={session_id}"
try:
sock = ws_lib.create_connection(ws_url, timeout=10)
print(f" WebSocket 已连接")
check("WebSocket 连接", True)
# Send IoT query
iot_query = json.dumps({
"type": "message",
"content": "列出所有IoT设备",
"session_id": session_id,
"timestamp": int(time.time() * 1000)
})
sock.send(iot_query)
print(f" 已发送 IoT 查询")
# Read responses
sock.settimeout(8)
responses = []
start = time.time()
while time.time() - start < 8:
try:
msg = sock.recv()
data = json.loads(msg)
msg_type = data.get("type", "?")
content = str(data.get("content", ""))[:80]
responses.append(f"[{msg_type}] {content}")
if msg_type in ("response", "stream_end"):
break
except:
break
print(f" 收到 {len(responses)} 条消息:")
for r in responses[:10]:
print(f" {r}")
has_response = any("response" in r or "device" in r.lower() or "iot" in r.lower() for r in responses)
check("收到 IoT 响应", len(responses) > 0)
sock.close()
except Exception as e:
check("WebSocket/IoT 测试", False, str(e)[:80])
# Summary
print("\n" + "=" * 60)
print(f"测试完成: {passed} 通过, {failed} 失败, 总计 {passed+failed}")
print("=" * 60)