#!/usr/bin/env python3 """Round 12 Part 2: WebSocket 测试 + 子服务直调""" import json, urllib.request, urllib.error, socket, struct, base64, os, hashlib, time TOKEN = open("/tmp/cyrene_test_token.txt").read().strip() SID = open("/tmp/cyrene_test_sid.txt").read().strip() print(f"TOKEN={TOKEN[:20]}... SID={SID}") # ====== Step 1: memory-service 正确端点测试 ====== print("\n=== Step 1: memory-service 正确端点 ===") # 1a: GET /api/v1/memories?user_id=admin r = urllib.request.Request("http://localhost:8091/api/v1/memories?user_id=admin") try: resp = urllib.request.urlopen(r, timeout=5) print(f" GET /memories: {resp.status} {resp.read().decode()[:200]}") except Exception as e: print(f" GET /memories: FAIL {e}") # 1b: POST /api/v1/memories/query r = urllib.request.Request("http://localhost:8091/api/v1/memories/query", data=json.dumps({"user_id":"admin","query_text":"test","limit":5}).encode()) r.add_header("Content-Type", "application/json") try: resp = urllib.request.urlopen(r, timeout=5) print(f" POST /memories/query: {resp.status} {resp.read().decode()[:200]}") except Exception as e: print(f" POST /memories/query: FAIL {e}") # 1c: 通过 gateway 代理 memory print("\n --- Gateway proxy to memory ---") headers = {"Authorization": f"Bearer {TOKEN}"} # GET /api/v1/memory/search?q=test r = urllib.request.Request("http://localhost:8080/api/v1/memory/search?q=test") r.add_header("Authorization", f"Bearer {TOKEN}") try: resp = urllib.request.urlopen(r, timeout=10) print(f" GW memory/search: {resp.status} {resp.read().decode()[:200]}") except urllib.error.HTTPError as e: print(f" GW memory/search: {e.code} {e.read().decode()[:200]}") except Exception as e: print(f" GW memory/search: FAIL {e}") # ====== Step 2: tool-engine 正确端点测试 ====== print("\n=== Step 2: tool-engine 正确端点 ===") # 2a: GET /api/v1/tools r = urllib.request.Request("http://localhost:8092/api/v1/tools") try: resp = urllib.request.urlopen(r, timeout=5) body = resp.read().decode() tools_data = json.loads(body) tool_names = [t.get("name","?") for t in tools_data.get("tools",[])] print(f" GET /tools: {resp.status} total={tools_data.get('total')} names={tool_names}") except Exception as e: print(f" GET /tools: FAIL {e}") # 2b: POST /api/v1/tools/calculator/execute r = urllib.request.Request("http://localhost:8092/api/v1/tools/calculator/execute", data=json.dumps({"arguments":{"expression":"2+3"}}).encode()) r.add_header("Content-Type", "application/json") try: resp = urllib.request.urlopen(r, timeout=5) print(f" POST calc/execute: {resp.status} {resp.read().decode()[:200]}") except Exception as e: print(f" POST calc/execute: FAIL {e}") # ====== Step 3: WebSocket 测试 (手动构造) ====== print("\n=== Step 3: WebSocket 连接测试 ===") WS_KEY = base64.b64encode(os.urandom(16)).decode() def ws_handshake(): """Try WebSocket upgrade to /ws/chat?token=...""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5) try: sock.connect(("127.0.0.1", 8080)) request = ( f"GET /ws/chat?token={TOKEN}&session_id={SID} HTTP/1.1\r\n" f"Host: localhost:8080\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {WS_KEY}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"\r\n" ) sock.send(request.encode()) response = b"" while b"\r\n\r\n" not in response: response += sock.recv(4096) headers = response.decode() status_line = headers.split("\r\n")[0] print(f" WS handshake: {status_line}") if "101" in status_line: print(f" ✅ WebSocket 升级成功!") # Send a simple chat message msg = json.dumps({"type":"message","content":"Hello Cyrene!","mode":"text"}) import struct as st frame = bytearray() frame.append(0x81) # FIN + text opcode frame.append(0x80 | len(msg)) # MASK + length mask_key = os.urandom(4) frame.extend(mask_key) masked = bytes([msg[i] ^ mask_key[i%4] for i in range(len(msg))]) frame.extend(masked) sock.send(bytes(frame)) print(f" Sent: {msg}") # Read response time.sleep(3) sock.settimeout(5) try: resp_data = sock.recv(4096) print(f" Received {len(resp_data)} bytes: {resp_data[:500]}") except socket.timeout: print(f" ⚠️ No response within 3s - backend may be processing") else: print(f" ❌ WebSocket handshake failed") print(f" Full response:\n{headers[:500]}") except Exception as e: print(f" WS connect FAIL: {e}") finally: sock.close() ws_handshake() # ====== Step 4: ai-core 直接调用的完整 SSE 响应 ====== print("\n=== Step 4: ai-core 完整 SSE 响应 ===") ai_body = {"user_id":"admin","session_id":SID,"message":"用一句话介绍你自己","mode":"text"} r = urllib.request.Request("http://localhost:8081/api/v1/chat", data=json.dumps(ai_body).encode(), method="POST") r.add_header("Content-Type", "application/json") r.add_header("Accept", "text/event-stream") try: resp = urllib.request.urlopen(r, timeout=60) full = [] for line in resp: line = line.decode().strip() if line.startswith("data:"): full.append(line) print(f" Lines received: {len(full)}") print(f" Last 3 lines: {full[-3:]}") except Exception as e: print(f" FAIL: {e}") # ====== Step 5: Gateway 通过 memory_handler 代理测试 ====== print("\n=== Step 5: Gateway memory proxy ===") for path, qs in [("/api/v1/memory/search","q=hello"), ("/api/v1/memory","")]: url = f"http://localhost:8080{path}" if qs: url += f"?{qs}" req = urllib.request.Request(url) req.add_header("Authorization", f"Bearer {TOKEN}") try: resp = urllib.request.urlopen(req, timeout=10) print(f" GET {path}: {resp.status} {resp.read().decode()[:200]}") except urllib.error.HTTPError as e: body = e.read().decode()[:200] print(f" GET {path}: {e.code} {body}") except Exception as e: print(f" GET {path}: FAIL {e}") print("\n[DONE]")