#!/usr/bin/env python3 """ Gateway Monitor - External watchdog that detects Gateway outages and triggers recovery. Run via cron every 1-2 minutes. """ import json import os import sys import time import requests from datetime import datetime, timezone from pathlib import Path # Configuration GATEWAY_URL = os.getenv("OPENCLAW_GATEWAY_URL", "http://127.0.0.1:18789") STATE_DIR = Path("/home/ubuntu/.openclaw/workspace/state") STATUS_FILE = Path("/tmp/gateway_monitor_status.json") HALT_SENT_FLAG = Path("/tmp/gateway_halt_notification_sent") LOCK_FILE = Path("/tmp/gateway_recovery.lock") DEBOUNCE_SECONDS = 30 # Don't spam notifications def log(msg): print(f"[{datetime.now(timezone.utc).isoformat()}] {msg}", flush=True) def check_gateway() -> bool: """Check if Gateway is responding.""" try: # Try WebSocket endpoint first resp = requests.get(f"{GATEWAY_URL}/health", timeout=5) return resp.status_code == 200 except Exception: # Fallback: try a simple TCP connect to WebSocket port import socket try: host = GATEWAY_URL.replace("http://", "").replace("https://", "").split(":")[0] port = int(GATEWAY_URL.split(":")[-1]) if ":" in GATEWAY_URL else 18789 sock = socket.create_connection((host, port), timeout=3) sock.close() return True except Exception: return False def get_last_status() -> dict: """Get previous Gateway status from file.""" if STATUS_FILE.exists(): try: with open(STATUS_FILE) as f: return json.load(f) except Exception: pass return {"status": "unknown", "timestamp": None, "halt_notified": False} def save_status(status: str, halt_notified: bool = False): """Save current Gateway status.""" data = { "status": status, "timestamp": datetime.now(timezone.utc).isoformat(), "halt_notified": halt_notified } with open(STATUS_FILE, 'w') as f: json.dump(data, f) def get_interrupted_operations() -> list: """Get list of operations that were running when Gateway died.""" interrupted = [] if not STATE_DIR.exists(): return interrupted for filepath in STATE_DIR.glob("operation-*.json"): try: with open(filepath) as f: data = json.load(f) if data.get("status") == "running" and not data.get("user_cancelled", False): interrupted.append(data) except Exception as e: log(f"Error reading {filepath}: {e}") return interrupted def get_default_channels() -> list: """Get default notification channels from config.""" config_path = Path(__file__).parent.parent / "config" / "notifier.json" if config_path.exists(): try: with open(config_path) as f: config = json.load(f) defaults = config.get("defaults", {}) # Support both old format (notification_channel) and new (notification_channels) if "notification_channels" in defaults: return defaults["notification_channels"] elif "notification_channel" in defaults: return [defaults["notification_channel"]] except Exception: pass return ["discord:1486939151008923751"] def get_default_notify_user() -> str: """Get default notify user from config.""" config_path = Path(__file__).parent.parent / "config" / "notifier.json" if config_path.exists(): try: with open(config_path) as f: config = json.load(f) return config.get("defaults", {}).get("notify_user", "taro83") except Exception: pass return "taro83" def send_halt_notification(): """Notify user that Gateway is down.""" if HALT_SENT_FLAG.exists(): # Already sent, don't spam return log("Gateway DOWN - sending halt notification") # Import notification module sys.path.insert(0, str(Path(__file__).parent)) try: from notify import send_notification interrupted = get_interrupted_operations() op_count = len(interrupted) message = f"⚠️ **Gateway DOWN** — Operations Halted\n\n" if op_count > 0: message += f"{op_count} operation(s) interrupted:\n" for op in interrupted[:5]: # Show first 5 message += f"• `{op['agent_name']}`: {op['description']}\n" if op_count > 5: message += f"• ... and {op_count - 5} more\n" else: message += "No active operations." message += f"\n⏱️ Halted at: " # Send to all channels with interrupted ops, or default channels channels = set() for op in interrupted: channels.add(op['channel']) if not channels: channels = set(get_default_channels()) notify_user = interrupted[0]['notify_user'] if interrupted else get_default_notify_user() for channel in channels: try: send_notification(channel, message, mention=notify_user) except Exception as e: log(f"Failed to send to {channel}: {e}") HALT_SENT_FLAG.touch() except Exception as e: log(f"Failed to send halt notification: {e}") def send_recovery_notification(interrupted_ops: list): """Notify user that Gateway is back and trigger recovery.""" log(f"Gateway UP - sending recovery notification for {len(interrupted_ops)} operation(s)") sys.path.insert(0, str(Path(__file__).parent)) try: from notify import send_notification, send_resume_command # Group by channel by_channel = {} for op in interrupted_ops: ch = op['channel'] if ch not in by_channel: by_channel[ch] = [] by_channel[ch].append(op) for channel, ops in by_channel.items(): # Build recovery message downtime = calculate_downtime() message = f"🟢 **Gateway Recovered** — Resuming Operations\n\n" message += f"⏱️ Downtime: {downtime}\n" message += f"📋 Interrupted operations: {len(ops)}\n\n" for op in ops: progress = op.get('progress', {}) step = progress.get('step', '?') total = progress.get('of', '?') current = progress.get('current', 'Unknown') message += f"**{op['agent_name']}**: {op['description']}\n" message += f"└ Progress: Step {step}/{total} — {current}\n\n" # Send notification notify_user = ops[0]['notify_user'] try: send_notification(channel, message, mention=notify_user) except Exception as e: log(f"Failed to send recovery to {channel}: {e}") # Trigger actual resume by sending the resume_prompt to the channel # This will wake up the agent and continue the work time.sleep(1) # Small delay between messages for op in ops: try: resume_msg = f"🔄 **Resuming**: {op['description']}\n\n{op['resume_prompt']}\n\n[State ID: `{op['operation_id']}`]" send_resume_command(channel, resume_msg, notify_user) # Mark as resuming (don't auto-resume again unless it fails) op['status'] = 'resuming' op['resumed_at'] = datetime.now(timezone.utc).isoformat() state_file = STATE_DIR / f"operation-{op['operation_id']}.json" with open(state_file, 'w') as f: json.dump(op, f, indent=2) except Exception as e: log(f"Failed to trigger resume for {op['operation_id']}: {e}") except Exception as e: log(f"Failed to send recovery notification: {e}") def calculate_downtime() -> str: """Calculate how long Gateway was down.""" last_status = get_last_status() if last_status.get('timestamp'): try: last_down = datetime.fromisoformat(last_status['timestamp']) now = datetime.now(timezone.utc) delta = now - last_down hours, remainder = divmod(int(delta.total_seconds()), 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: return f"{hours}h {minutes}m" elif minutes > 0: return f"{minutes}m {seconds}s" else: return f"{seconds}s" except Exception: pass return "unknown" def main(): # Prevent concurrent runs if LOCK_FILE.exists(): pid = None try: with open(LOCK_FILE) as f: pid = int(f.read().strip()) # Check if process still running os.kill(pid, 0) log("Another instance is running, exiting") return except (ProcessLookupError, ValueError, OSError): # Stale lock file pass with open(LOCK_FILE, 'w') as f: f.write(str(os.getpid())) try: current_status = "up" if check_gateway() else "down" last_status = get_last_status() log(f"Gateway status: {current_status} (was: {last_status.get('status', 'unknown')})") # Detect transitions if last_status.get('status') == 'down' and current_status == 'up': # Gateway just came back! interrupted = get_interrupted_operations() if interrupted: send_recovery_notification(interrupted) else: log("No interrupted operations to resume") # Clear halt flag if HALT_SENT_FLAG.exists(): HALT_SENT_FLAG.unlink() save_status("up", halt_notified=False) elif last_status.get('status') != 'down' and current_status == 'down': # Gateway just went down send_halt_notification() save_status("down", halt_notified=True) else: # No change save_status(current_status, halt_notified=HALT_SENT_FLAG.exists()) finally: LOCK_FILE.unlink(missing_ok=True) if __name__ == "__main__": main()