#!/usr/bin/env python3 import websocket import json import time token = "6a938dff0b0f06dcebabb84639d5b901d4f1aa523a9c4250" url = "ws://127.0.0.1:18789" pending = {} connected = False def on_message(ws, message): global connected msg = json.loads(message) msg_type = msg.get("type") req_id = msg.get("id") if msg_type == "event" and msg.get("event") == "connect.challenge": req_id = str(int(time.time() * 1000)) + "_1" connect_req = { "type": "req", "id": req_id, "method": "connect", "params": { "minProtocol": 3, "maxProtocol": 3, "client": { "id": "openclaw-control-ui", "version": "1.0.0", "platform": "web", "mode": "ui" }, "role": "operator", "scopes": ["operator.read", "operator.write", "operator.admin"], "caps": ["tool-events"], "userAgent": "test-script", "locale": "en-US", "auth": {"token": token} } } pending[req_id] = "connect" print(f"[SEND] connect id={req_id}") ws.send(json.dumps(connect_req)) elif msg_type == "res": method = pending.get(req_id, "unknown") print(f"[RECV] {method} id={req_id} ok={msg.get('ok')}") if method == "connect" and msg.get("ok"): connected = True # Wait a bit then request status time.sleep(0.5) status_id = str(int(time.time() * 1000)) + "_2" pending[status_id] = "status" status_req = {"type": "req", "id": status_id, "method": "status", "params": {}} print(f"[SEND] status id={status_id}") ws.send(json.dumps(status_req)) time.sleep(0.5) sessions_id = str(int(time.time() * 1000)) + "_3" pending[sessions_id] = "sessions.list" sessions_req = {"type": "req", "id": sessions_id, "method": "sessions.list", "params": {}} print(f"[SEND] sessions.list id={sessions_id}") ws.send(json.dumps(sessions_req)) # Keep connection open to receive responses time.sleep(2) ws.close() def on_close(ws, code, msg): print(f"[CLOSE] {code} {msg}") ws = websocket.WebSocketApp(url, on_message=on_message, on_close=on_close) ws.run_forever()