#!/usr/bin/env python3 """CDP 前端页面验证:截图 + 控制台错误 + 登录测试""" import json, time, base64, os from websocket import create_connection PAGE_URL = "http://localhost:5199/" CDP_WS = "ws://127.0.0.1:9225/devtools/browser/b2fca0da-35d6-4180-8413-eddf53753c6a" def send_cmd(ws, method, params=None, msg_id=1): payload = json.dumps({"id": msg_id, "method": method, "params": params or {}}) ws.send(payload) def recv_until(ws, timeout=5): ws.settimeout(timeout) results = [] try: while True: data = ws.recv() results.append(data) except: pass return results def find_event(msgs, method): for m in msgs: try: d = json.loads(m) if d.get("method") == method: return d except: pass return None def find_result(msgs, msg_id): for m in msgs: try: d = json.loads(m) if d.get("id") == msg_id: return d except: pass return None print("=== CDP 前端页面验证 ===") # 1. 连接到浏览器 ws = create_connection(CDP_WS, timeout=10) print(f" Connected to CDP") # 2. 获取已有页面的 targetId (Navigated page) pages_resp = os.popen('curl -s http://127.0.0.1:9225/json').read() pages = json.loads(pages_resp) target_id = None for p in pages: if p.get("url","").startswith("http://localhost:5199"): target_id = p["id"] print(f" Found page: {p['title'][:80]} id={target_id}") break if not target_id: print(" No existing page found, creating new one...") send_cmd(ws, "Target.createTarget", {"url": PAGE_URL}) results = recv_until(ws, 3) print(f" Target.createTarget results: {results}") # 3. 连接到页面 target (通过 Target.attachToTarget) print(f"\n Attaching to target {target_id}...") send_cmd(ws, "Target.attachToTarget", {"targetId": target_id, "flatten": True}, 1) results = recv_until(ws, 3) result = find_result(results, 1) if result: session_id = result.get("result",{}).get("sessionId","") print(f" Attached, sessionId={session_id[:20]}...") else: print(f" Attach failed: {results}") ws.close() exit(1) # 4. 启用 Runtime 和 Console print("\n--- Enabling domains ---") for method, sid, mid in [("Runtime.enable", session_id, 2), ("Page.enable", session_id, 3), ("Log.enable", session_id, 4)]: payload = json.dumps({"id": mid, "method": method, "params": {}, "sessionId": session_id}) ws.send(payload) results = recv_until(ws, 3) print(f" Enable results: {len(results)} messages") # 5. 截图 print("\n--- Taking screenshot ---") send_cmd(ws, "Page.captureScreenshot", {"format": "png"}, 10) # send with sessionId payload = json.dumps({"id": 5, "method": "Page.captureScreenshot", "params": {"format": "png"}, "sessionId": session_id}) ws.send(payload) results = recv_until(ws, 5) screen_result = find_result(results, 5) if screen_result: img_data = screen_result.get("result",{}).get("data","") if img_data: img_bytes = base64.b64decode(img_data) img_path = "/tmp/cyrene_screenshot_round12.png" with open(img_path, "wb") as f: f.write(img_bytes) print(f" Screenshot saved: {img_path} ({len(img_bytes)} bytes)") else: print(f" No image data: {str(screen_result)[:200]}") else: print(f" Screenshot failed: {results}") # 6. 获取控制台日志 print("\n--- Console logs ---") ws.settimeout(2) console_logs = [] for i in range(15): # 最多等15秒 try: data = ws.recv() try: d = json.loads(data) if d.get("method") in ("Runtime.consoleAPICalled", "Log.entryAdded"): entry = d.get("params",{}).get("entry",{}) if d["method"]=="Log.entryAdded" else d.get("params",{}) level = entry.get("level","log") if d["method"]=="Log.entryAdded" else d["params"].get("type","log") text = entry.get("text","") if d["method"]=="Log.entryAdded" else " ".join([a.get("value","") for a in d["params"].get("args",[])]) url = entry.get("url","") if d["method"]=="Log.entryAdded" else "" console_logs.append(f"[{level}] {text[:200]} {url}") except: pass except: break if console_logs: for l in console_logs: print(f" {l}") else: print(" No console messages captured. Let me poll...") # 7. 使用 Runtime.evaluate 获取页面状态 print("\n--- Evaluating page state ---") checks = [ ('document.title', "document.title"), ('document.readyState', "document.readyState"), ('Root element exists', "document.getElementById('root') ? 'yes' : 'no'"), ('Body children count', "document.body ? document.body.children.length : -1"), ('Has login form?', "document.querySelector('form') ? 'yes' : 'no'"), ('Has error boundary?', "document.querySelector('[class*=error]') ? 'yes' : 'no'"), ('Window errors', "window.__LAST_ERROR__ || 'none'"), ] for label, expr in checks: payload = json.dumps({ "id": 100 + len(console_logs), "method": "Runtime.evaluate", "params": {"expression": expr, "returnByValue": True}, "sessionId": session_id }) ws.send(payload) results = recv_until(ws, 2) r = find_result(results, 100 + len(console_logs)) val = r.get("result",{}).get("result",{}).get("value","?") if r else "?" print(f" {label}: {val}") # 8. 测试登录流程 (登录到 localhost:5199) print("\n--- Testing login flow ---") TOKEN = open("/tmp/cyrene_test_token.txt").read().strip() login_js = f""" (async function() {{ try {{ const resp = await fetch('http://localhost:8080/api/v1/auth/login', {{ method: 'POST', headers: {{'Content-Type': 'application/json'}}, body: JSON.stringify({{username:'yeij0942',password:'Jiang1143218570'}}) }}); const data = await resp.json(); return JSON.stringify({{status: resp.status, hasToken: !!data.token, userId: data.user_id}}); }} catch(e) {{ return 'Error: ' + e.message; }} }})() """ payload = json.dumps({ "id": 200, "method": "Runtime.evaluate", "params": {"expression": login_js, "returnByValue": True, "awaitPromise": True}, "sessionId": session_id }) ws.send(payload) results = recv_until(ws, 5) r = find_result(results, 200) if r: val = r.get("result",{}).get("result",{}).get("value","?") print(f" Login test result: {val}") else: print(f" Login test failed: {results}") # 9. 页面上 JavaScript 错误检测 print("\n--- Checking for JS errors via Runtime.exceptionThrown ---") ws.settimeout(1) error_msgs = [] for i in range(5): try: data = ws.recv() d = json.loads(data) if d.get("method") == "Runtime.exceptionThrown": exc = d.get("params",{}).get("exceptionDetails",{}) error_msgs.append(f" ERROR: {exc.get('text','')} at {exc.get('url','')}:{exc.get('lineNumber','')}") except: break if error_msgs: for e in error_msgs: print(e) else: print(" No JS exceptions detected") ws.close() print("\n[DONE]")