b15e1c9541
- P0: useSpeechSynthesis.ts cancel()增加isSupported守卫 - P0: iot_provider.go 添加loader nil检查防止NPE panic - 新增CDP E2E v4测试脚本 14项全绿通过 - 生成第二轮修复报告 docs/debug/2026-05-21-round2-fixes.md
369 lines
12 KiB
Python
369 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""CDP E2E 综合测试 v4 — 通过 UI 登录并测试全流程"""
|
||
import json, time, urllib.request, base64
|
||
from websocket import create_connection
|
||
|
||
FRONTEND_URL = "http://localhost:5199"
|
||
CDP_URL = "http://127.0.0.1:9225"
|
||
CREDENTIALS = {"username": "yeij0942", "password": "Jiang1143218570"}
|
||
|
||
passed = 0
|
||
failed = 0
|
||
|
||
def check(name, condition, detail=""):
|
||
global passed, failed
|
||
if condition:
|
||
passed += 1
|
||
print(f" ✅ {name}")
|
||
else:
|
||
failed += 1
|
||
print(f" ❌ {name} {detail}")
|
||
|
||
print("=" * 60)
|
||
print("CDP E2E 测试 v4 — UI 登录 + 全流程验证")
|
||
print("=" * 60)
|
||
|
||
# Step 1: Create fresh page
|
||
print("\n--- Step 1: 创建新页面 ---")
|
||
req = urllib.request.Request(f"{CDP_URL}/json/new?url=about:blank", method="PUT")
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
page = json.loads(resp.read())
|
||
ws_url = page["webSocketDebuggerUrl"]
|
||
print(f" Page: {page.get('id')}")
|
||
|
||
ws = create_connection(ws_url, timeout=15)
|
||
|
||
msg_id = [0]
|
||
def cdp(method, params=None):
|
||
msg_id[0] += 1
|
||
mid = msg_id[0]
|
||
ws.send(json.dumps({"id": mid, "method": method, "params": params or {}}))
|
||
return mid
|
||
|
||
def recv_all(timeout=3):
|
||
ws.settimeout(timeout)
|
||
msgs = []
|
||
try:
|
||
while True:
|
||
msgs.append(ws.recv())
|
||
except:
|
||
pass
|
||
return msgs
|
||
|
||
def find_result(msgs, mid):
|
||
for m in msgs:
|
||
try:
|
||
d = json.loads(m)
|
||
if d.get("id") == mid:
|
||
return d.get("result", {})
|
||
except:
|
||
pass
|
||
return None
|
||
|
||
# Enable domains
|
||
cdp("Page.enable")
|
||
cdp("Network.enable")
|
||
cdp("Runtime.enable")
|
||
cdp("Log.enable")
|
||
time.sleep(0.5)
|
||
recv_all(0.5)
|
||
|
||
# Step 2: Navigate to frontend
|
||
print("\n--- Step 2: 导航到前端 ---")
|
||
mid_nav = cdp("Page.navigate", {"url": FRONTEND_URL + "/"})
|
||
time.sleep(4)
|
||
msgs = recv_all(3)
|
||
|
||
# Check for JS errors
|
||
has_js_error = False
|
||
for m in msgs:
|
||
d = json.loads(m)
|
||
if d.get("method") == "Runtime.exceptionThrown":
|
||
has_js_error = True
|
||
exc = d.get("params", {}).get("exceptionDetails", {})
|
||
print(f" [JS ERROR] {exc.get('text', '')}")
|
||
check("页面无 JS 异常", not has_js_error)
|
||
|
||
# Step 3: Check localStorage (should NOT have test-token-cyrene)
|
||
print("\n--- Step 3: localStorage 检查 ---")
|
||
mid_ls = cdp("Runtime.evaluate", {
|
||
"expression": "JSON.stringify({token: localStorage.getItem('token'), user_id: localStorage.getItem('user_id'), allKeys: Object.keys(localStorage)})"
|
||
})
|
||
time.sleep(0.5)
|
||
result = find_result(recv_all(1), mid_ls)
|
||
ls_val = ""
|
||
if result:
|
||
ls_val = result.get("result", {}).get("value", "")
|
||
print(f" localStorage: {ls_val}")
|
||
check("无硬编码 test-token-cyrene", "test-token-cyrene" not in ls_val)
|
||
|
||
# Step 4: Fill login form and click login
|
||
print("\n--- Step 4: 通过 UI 登录 ---")
|
||
|
||
# Find username input and fill
|
||
mid_user = cdp("Runtime.evaluate", {
|
||
"expression": f"""
|
||
(() => {{
|
||
const inputs = document.querySelectorAll('input');
|
||
let usernameInput = null;
|
||
let passwordInput = null;
|
||
let loginBtn = null;
|
||
for (const inp of inputs) {{
|
||
const placeholder = (inp.placeholder || '').toLowerCase();
|
||
const type = (inp.type || '').toLowerCase();
|
||
if (placeholder.includes('用户') || placeholder.includes('账号') || placeholder.includes('username')) {{
|
||
usernameInput = inp;
|
||
}} else if (type === 'password' || placeholder.includes('密码') || placeholder.includes('password')) {{
|
||
passwordInput = inp;
|
||
}}
|
||
}}
|
||
if (usernameInput) {{
|
||
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
|
||
nativeInputValueSetter.call(usernameInput, '{CREDENTIALS["username"]}');
|
||
usernameInput.dispatchEvent(new Event('input', {{ bubbles: true }}));
|
||
}}
|
||
if (passwordInput) {{
|
||
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value').set;
|
||
nativeInputValueSetter.call(passwordInput, '{CREDENTIALS["password"]}');
|
||
passwordInput.dispatchEvent(new Event('input', {{ bubbles: true }}));
|
||
}}
|
||
// Find login SUBMIT button (text is "进入昔涟的世界 ♪", NOT the mode switcher "登录")
|
||
const buttons = document.querySelectorAll('button');
|
||
for (const btn of buttons) {{
|
||
const text = btn.textContent.trim();
|
||
if (text.includes('进入') || text.includes('昔涟')) {{
|
||
loginBtn = btn;
|
||
break;
|
||
}}
|
||
}}
|
||
return JSON.stringify({{
|
||
foundUser: !!usernameInput,
|
||
foundPass: !!passwordInput,
|
||
foundBtn: !!loginBtn,
|
||
btnText: loginBtn ? loginBtn.textContent.trim() : 'N/A',
|
||
inputCount: inputs.length,
|
||
bodySnippet: document.body.innerText.substring(0, 200)
|
||
}});
|
||
}})()
|
||
"""
|
||
})
|
||
time.sleep(0.5)
|
||
result = find_result(recv_all(1), mid_user)
|
||
form_info = {}
|
||
if result:
|
||
try:
|
||
form_info = json.loads(result.get("result", {}).get("value", "{}"))
|
||
print(f" 表单状态: {json.dumps(form_info, indent=2, ensure_ascii=False)}")
|
||
except:
|
||
print(f" Form result: {result.get('result', {}).get('value', '')[:200]}")
|
||
|
||
check("找到用户名输入框", form_info.get("foundUser", False))
|
||
check("找到密码输入框", form_info.get("foundPass", False))
|
||
check("找到登录按钮", form_info.get("foundBtn", False))
|
||
|
||
# Click login
|
||
if form_info.get("foundBtn"):
|
||
mid_click = cdp("Runtime.evaluate", {
|
||
"expression": """
|
||
(() => {
|
||
const buttons = document.querySelectorAll('button');
|
||
for (const btn of buttons) {
|
||
const text = btn.textContent.trim();
|
||
if (text.includes('进入') || text.includes('昔涟')) {
|
||
btn.click();
|
||
return 'clicked';
|
||
}
|
||
}
|
||
return 'not_found';
|
||
})()
|
||
"""
|
||
})
|
||
time.sleep(4)
|
||
msgs = recv_all(4)
|
||
|
||
# Check API calls after login
|
||
api_calls = []
|
||
console_errors = []
|
||
for m in msgs:
|
||
d = json.loads(m)
|
||
method = d.get("method", "")
|
||
params = d.get("params", {})
|
||
if method == "Network.requestWillBeSent":
|
||
req = params.get("request", {})
|
||
url = req.get("url", "")
|
||
headers = req.get("headers", {})
|
||
auth = headers.get("Authorization", "") or headers.get("authorization", "")
|
||
if "/api/" in url:
|
||
api_calls.append(f"{req.get('method','?')} {url[:100]}")
|
||
elif method == "Runtime.consoleAPICalled":
|
||
args = params.get("args", [])
|
||
if params.get("type") == "error":
|
||
texts = [str(a.get("value", "") or a.get("description", ""))[:150] for a in args]
|
||
console_errors.append(" ".join(texts))
|
||
elif method == "Runtime.exceptionThrown":
|
||
exc = params.get("exceptionDetails", {})
|
||
console_errors.append(f"JS: {exc.get('text', '')}")
|
||
|
||
print(f" API 调用: {len(api_calls)}")
|
||
for c in api_calls[:8]:
|
||
print(f" {c}")
|
||
|
||
auth_errors = [e for e in console_errors if "401" in e or "认证" in e or "无效" in e]
|
||
check("无认证错误", len(auth_errors) == 0, str(auth_errors[:2]) if auth_errors else "")
|
||
|
||
# Step 5: Check post-login UI
|
||
print("\n--- Step 5: 登录后 UI 检查 ---")
|
||
mid_ui = cdp("Runtime.evaluate", {
|
||
"expression": """
|
||
(() => {
|
||
const hasSidebar = !!(document.querySelector('aside, nav[class*="side"], [class*="sidebar"], [class*="Sidebar"]'));
|
||
const hasChatArea = !!(document.querySelector('[class*="chat"], [class*="Chat"], [class*="message"]'));
|
||
const token = localStorage.getItem('token');
|
||
const userId = localStorage.getItem('user_id');
|
||
const bodyText = document.body.innerText.substring(0, 300);
|
||
return JSON.stringify({hasSidebar, hasChatArea, hasToken: !!token, userId, bodyText});
|
||
})()
|
||
"""
|
||
})
|
||
time.sleep(0.5)
|
||
result = find_result(recv_all(1), mid_ui)
|
||
ui_state = {}
|
||
if result:
|
||
try:
|
||
ui_state = json.loads(result.get("result", {}).get("value", "{}"))
|
||
print(f" UI: {json.dumps(ui_state, indent=2, ensure_ascii=False)[:500]}")
|
||
except:
|
||
pass
|
||
|
||
check("localStorage 有 token", ui_state.get("hasToken", False))
|
||
check("侧边栏或聊天区域存在", ui_state.get("hasSidebar", False) or ui_state.get("hasChatArea", False))
|
||
|
||
# Step 6: Screenshot
|
||
mid_ss = cdp("Page.captureScreenshot", {"format": "png"})
|
||
time.sleep(1)
|
||
result = find_result(recv_all(2), mid_ss)
|
||
if result:
|
||
img_data = result.get("data", "")
|
||
if img_data:
|
||
with open("/tmp/cdp_screenshot_final.png", "wb") as f:
|
||
f.write(base64.b64decode(img_data))
|
||
print(" 截图: /tmp/cdp_screenshot_final.png")
|
||
|
||
ws.close()
|
||
|
||
# ============================================================
|
||
# Part B: WebSocket + Backend Tests
|
||
# ============================================================
|
||
print("\n" + "=" * 60)
|
||
print("Part B: Backend API 深度测试")
|
||
print("=" * 60)
|
||
|
||
# Login to get token
|
||
login_req = urllib.request.Request(
|
||
"http://localhost:8080/api/v1/auth/login",
|
||
data=json.dumps(CREDENTIALS).encode(),
|
||
headers={"Content-Type": "application/json"},
|
||
method="POST"
|
||
)
|
||
resp = urllib.request.urlopen(login_req, timeout=10)
|
||
login_data = json.loads(resp.read())
|
||
token = login_data.get("token", "")
|
||
user_id = login_data.get("user_id", "")
|
||
print(f" Token: {token[:30]}..., User: {user_id}")
|
||
|
||
AUTH = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
||
BASE = "http://localhost:8080/api/v1"
|
||
|
||
# B1: Sessions
|
||
print("\n--- B1: 会话管理 ---")
|
||
req = urllib.request.Request(f"{BASE}/sessions?user_id={user_id}", headers=AUTH)
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
sessions = json.loads(resp.read())
|
||
sessions_list = sessions.get("sessions", sessions.get("data", []))
|
||
check("会话列表", isinstance(sessions_list, list), f"count={len(sessions_list) if isinstance(sessions_list, list) else 'N/A'}")
|
||
|
||
# Create session
|
||
req = urllib.request.Request(
|
||
f"{BASE}/sessions",
|
||
data=json.dumps({"user_id": user_id, "title": "E2E Test Session"}).encode(),
|
||
headers=AUTH,
|
||
method="POST"
|
||
)
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
session_data = json.loads(resp.read())
|
||
session_id = session_data.get("session_id", session_data.get("id", ""))
|
||
check("创建新会话", bool(session_id))
|
||
|
||
# B2: Chat History
|
||
if session_id:
|
||
print("\n--- B2: 聊天历史 ---")
|
||
req = urllib.request.Request(
|
||
f"{BASE}/sessions/{session_id}/messages?limit=10",
|
||
headers=AUTH
|
||
)
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
msgs = json.loads(resp.read())
|
||
check("获取消息历史", resp.status == 200)
|
||
|
||
# B3: Memory
|
||
print("\n--- B3: 记忆系统 ---")
|
||
req = urllib.request.Request(f"{BASE}/memory", headers=AUTH)
|
||
try:
|
||
resp = urllib.request.urlopen(req, timeout=10)
|
||
mem_list = json.loads(resp.read())
|
||
check("记忆列表", resp.status in [200, 404])
|
||
except Exception as e:
|
||
check("记忆列表", False, str(e)[:80])
|
||
|
||
# B4: IoT via WebSocket
|
||
print("\n--- B4: IoT 设备(通过 WebSocket)---")
|
||
import websocket as ws_lib
|
||
|
||
ws_url = f"ws://localhost:8080/ws/chat?token={token}&session_id={session_id}"
|
||
try:
|
||
sock = ws_lib.create_connection(ws_url, timeout=10)
|
||
print(f" WebSocket 已连接")
|
||
check("WebSocket 连接", True)
|
||
|
||
# Send IoT query
|
||
iot_query = json.dumps({
|
||
"type": "message",
|
||
"content": "列出所有IoT设备",
|
||
"session_id": session_id,
|
||
"timestamp": int(time.time() * 1000)
|
||
})
|
||
sock.send(iot_query)
|
||
print(f" 已发送 IoT 查询")
|
||
|
||
# Read responses
|
||
sock.settimeout(8)
|
||
responses = []
|
||
start = time.time()
|
||
while time.time() - start < 8:
|
||
try:
|
||
msg = sock.recv()
|
||
data = json.loads(msg)
|
||
msg_type = data.get("type", "?")
|
||
content = str(data.get("content", ""))[:80]
|
||
responses.append(f"[{msg_type}] {content}")
|
||
if msg_type in ("response", "stream_end"):
|
||
break
|
||
except:
|
||
break
|
||
|
||
print(f" 收到 {len(responses)} 条消息:")
|
||
for r in responses[:10]:
|
||
print(f" {r}")
|
||
|
||
has_response = any("response" in r or "device" in r.lower() or "iot" in r.lower() for r in responses)
|
||
check("收到 IoT 响应", len(responses) > 0)
|
||
|
||
sock.close()
|
||
except Exception as e:
|
||
check("WebSocket/IoT 测试", False, str(e)[:80])
|
||
|
||
# Summary
|
||
print("\n" + "=" * 60)
|
||
print(f"测试完成: {passed} 通过, {failed} 失败, 总计 {passed+failed}")
|
||
print("=" * 60)
|