#!/usr/bin/env python3 import websocket import json import time token = "6a938dff0b0f06dcebabb84639d5b901d4f1aa523a9c4250" url = "ws://127.0.0.1:18789" pending = {} def on_message(ws, message): msg = json.loads(message) print(f"\nReceived: {json.dumps(msg, indent=2)}") if msg.get("type") == "event" and msg.get("event") == "connect.challenge": req_id = str(int(time.time() * 1000)) + "_1" connect_req = { "type": "req", "id": req_id, "method": "connect", "params": { "minProtocol": 3, "maxProtocol": 3, "client": { "id": "openclaw-control-ui", "version": "1.0.0", "platform": "web", "mode": "ui" }, "role": "operator", "scopes": ["operator.read", "operator.write", "operator.admin"], "caps": ["tool-events"], "userAgent": "test-script", "locale": "en-US", "auth": {"token": token} } } pending[req_id] = "connect" print(f"\nSending connect with id: {req_id}") ws.send(json.dumps(connect_req)) elif msg.get("type") == "res" and msg.get("ok"): req_id = msg.get("id") method = pending.get(req_id) print(f"\nResponse for id={req_id}, method={method}") if method == "connect": print("=== CONNECT SUCCESS ===") time.sleep(0.2) # Request status status_id = str(int(time.time() * 1000)) + "_2" pending[status_id] = "status" status_req = {"type": "req", "id": status_id, "method": "status", "params": {}} print(f"Sending status with id: {status_id}") ws.send(json.dumps(status_req)) time.sleep(0.2) # Request sessions sessions_id = str(int(time.time() * 1000)) + "_3" pending[sessions_id] = "sessions.list" sessions_req = {"type": "req", "id": sessions_id, "method": "sessions.list", "params": {}} print(f"Sending sessions.list with id: {sessions_id}") ws.send(json.dumps(sessions_req)) time.sleep(0.5) ws.close() def on_error(ws, error): print(f"Error: {error}") def on_close(ws, close_status_code, close_msg): print(f"\nConnection closed: {close_status_code} - {close_msg}") def on_open(ws): print("Connected to Gateway") ws = websocket.WebSocketApp(url, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close) ws.run_forever()