#!/usr/bin/env python3 import websocket import json import time token = "6a938dff0b0f06dcebabb84639d5b901d4f1aa523a9c4250" url = "ws://127.0.0.1:18789" pending = {} def on_message(ws, message): msg = json.loads(message) msg_type = msg.get("type") event = msg.get("event") method = msg.get("method") req_id = msg.get("id") print(f"[MSG] type={msg_type} event={event} method={method} id={req_id}") if msg_type == "event" and event == "connect.challenge": req_id = str(int(time.time() * 1000)) + "_1" pending[req_id] = "connect" ws.send(json.dumps({ "type": "req", "id": req_id, "method": "connect", "params": { "minProtocol": 3, "maxProtocol": 3, "client": {"id": "openclaw-control-ui", "version": "1.0.0", "platform": "web", "mode": "ui"}, "role": "operator", "scopes": ["operator.read", "operator.write", "operator.admin"], "caps": ["tool-events"], "userAgent": "test-script", "locale": "en-US", "auth": {"token": token} } })) elif msg_type == "res" and pending.get(req_id) == "connect" and msg.get("ok"): print("\n=== CONNECTED ===\n") time.sleep(0.3) # Send status request sid = str(int(time.time() * 1000)) + "_2" pending[sid] = "status" print(f"[SEND] status id={sid}") ws.send(json.dumps({"type": "req", "id": sid, "method": "status", "params": {}})) def on_open(ws): print("[OPEN]") def on_close(ws, c, m): print(f"[CLOSE] {c}") ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message, on_close=on_close) ws.run_forever()