#!/usr/bin/env python3 """ State Manager - Generic checkpoint system for any OpenClaw agent/operation. Any agent can import this to save/resume work across Gateway crashes. """ import json import os import uuid from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any STATE_DIR = Path("/home/ubuntu/.openclaw/workspace/state") STATE_DIR.mkdir(parents=True, exist_ok=True) # Timeout for "resuming" state - after this, operation reverts to "running" for retry RESUME_TIMEOUT_SECONDS = 600 # 10 minutes class OperationState: """Represents the state of a long-running operation.""" def __init__( self, operation_type: str, agent_name: str, description: str, channel: str, # Primary/origin channel (discord:xxx or telegram:xxx) notify_user: str, resume_prompt: str, progress: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None ): self.operation_id = f"{operation_type}-{uuid.uuid4().hex[:8]}" self.operation_type = operation_type self.agent_name = agent_name self.description = description self.channel = channel # Origin channel - notifications go here only self.notify_user = notify_user self.resume_prompt = resume_prompt self.progress = progress or {} self.metadata = metadata or {} self.started_at = datetime.now(timezone.utc).isoformat() self.last_checkpoint = self.started_at self.status = "running" # running | resuming | completed | failed | cancelled self.user_cancelled = False def to_dict(self) -> Dict[str, Any]: return { "operation_id": self.operation_id, "operation_type": self.operation_type, "agent_name": self.agent_name, "description": self.description, "channel": self.channel, "notify_user": self.notify_user, "resume_prompt": self.resume_prompt, "progress": self.progress, "metadata": self.metadata, "started_at": self.started_at, "last_checkpoint": self.last_checkpoint, "status": self.status, "user_cancelled": self.user_cancelled } def save(self) -> Path: """Persist state to disk.""" self.last_checkpoint = datetime.now(timezone.utc).isoformat() filepath = STATE_DIR / f"operation-{self.operation_id}.json" with open(filepath, 'w') as f: json.dump(self.to_dict(), f, indent=2) return filepath def complete(self): """Mark operation as completed and clean up state file.""" self.status = "completed" filepath = STATE_DIR / f"operation-{self.operation_id}.json" if filepath.exists(): # Keep completed ops for 24h then archive, or delete now completed_dir = STATE_DIR / "completed" completed_dir.mkdir(exist_ok=True) os.rename(filepath, completed_dir / f"{self.operation_id}-{datetime.now().strftime('%Y%m%d')}.json") def cancel(self): """Mark as user-cancelled (won't auto-resume).""" self.user_cancelled = True self.status = "cancelled" self.save() @classmethod def load(cls, operation_id: str) -> Optional["OperationState"]: """Load existing operation state.""" filepath = STATE_DIR / f"operation-{operation_id}.json" if not filepath.exists(): return None with open(filepath) as f: data = json.load(f) state = cls( operation_type=data["operation_type"], agent_name=data["agent_name"], description=data["description"], channel=data["channel"], notify_user=data["notify_user"], resume_prompt=data["resume_prompt"], progress=data.get("progress", {}), metadata=data.get("metadata", {}) ) state.operation_id = data["operation_id"] state.started_at = data["started_at"] state.last_checkpoint = data["last_checkpoint"] state.status = data.get("status", "running") state.user_cancelled = data.get("user_cancelled", False) return state @classmethod def list_interrupted(cls) -> list: """List all operations that should be resumed. Includes: - 'running' status: active operations that were interrupted - 'resuming' status: only if resume timeout has passed (retry needed) """ interrupted = [] now = datetime.now(timezone.utc) for filepath in STATE_DIR.glob("operation-*.json"): try: with open(filepath) as f: data = json.load(f) status = data.get("status") user_cancelled = data.get("user_cancelled", False) if user_cancelled: continue if status == "running": interrupted.append(data) elif status == "resuming": # Check if timeout has passed resumed_at_str = data.get("metadata", {}).get("resumed_at") if resumed_at_str: resumed_at = datetime.fromisoformat(resumed_at_str) elapsed = (now - resumed_at).total_seconds() if elapsed > RESUME_TIMEOUT_SECONDS: # Timeout passed, revert to running for retry data["status"] = "running" data["metadata"]["resumed_at"] = None data["metadata"]["resume_timeout_reached"] = now.isoformat() # Save the reverted state with open(filepath, 'w') as f: json.dump(data, f, indent=2) interrupted.append(data) # else: still within timeout, don't include (avoid spam) else: # No resume timestamp, treat as running interrupted.append(data) except Exception: continue return interrupted # Simple usage example for any agent: if __name__ == "__main__": # Example: Starting a new operation state = OperationState( operation_type="research", agent_name="henry", description="AI industry trend analysis for Q2 2026", channel="discord:1486939151008923751", notify_user="taro83", resume_prompt="Continue AI industry trend analysis. Review workspace/state for checkpoint data.", progress={"step": 1, "of": 5, "current": "Gathering source materials..."} ) state.save() print(f"State saved: {state.operation_id}") # Example: Updating progress state.progress["step"] = 2 state.progress["current"] = "Analyzing OpenAI announcements..." state.save() # Example: Completion # state.complete()