#!/usr/bin/env python3 """ Example agent showing how to use the resilience system. This demonstrates checkpointing patterns for any agent. """ import sys import time from pathlib import Path # Add workspace to path sys.path.insert(0, str(Path(__file__).parent.parent)) from tools.state_manager import OperationState def example_long_running_task(): """ Example: A research agent doing multi-step analysis. Any agent can follow this pattern. """ # STEP 1: Create operation state at the start state = OperationState( operation_type="research", agent_name="example_agent", # Your agent's name description="Example multi-step analysis task", channel="discord:1486939151008923751", notify_user="taro83", resume_prompt="Continue the analysis. Load state file to see current progress.", progress={"step": 0, "of": 5, "current": "Initializing..."}, metadata={"urls": [], "findings": []} # Any extra data you need ) state.save() print(f"✓ Operation started: {state.operation_id}") try: # STEP 2: Define your workflow steps steps = [ "Gathering source materials", "Analyzing key data points", "Cross-referencing findings", "Synthesizing conclusions", "Generating final report" ] for i, step_name in enumerate(steps, 1): # Update progress before starting step state.progress["step"] = i state.progress["of"] = len(steps) state.progress["current"] = step_name state.save() print(f" → Step {i}/{len(steps)}: {step_name}") # Do the actual work here # ... your step logic ... time.sleep(1) # Simulating work # Update metadata if needed state.metadata["findings"].append(f"Finding from step {i}") state.save() # If Gateway crashes here, the monitor will: # 1. Detect Gateway is down # 2. Wait for it to come back # 3. Send resume command with state.operation_id # 4. Agent restarts from step i # STEP 3: Mark complete when done state.complete() print(f"✓ Operation completed: {state.operation_id}") except KeyboardInterrupt: print("\n⚠ Interrupted by user - state saved, will resume on recovery") # State remains in 'running' status, will auto-resume except Exception as e: print(f"\n✗ Error: {e}") state.status = "failed" state.metadata["error"] = str(e) state.save() raise def example_resume_from_state(operation_id: str): """ Example: Resuming an interrupted operation. Called when the agent receives a resume command. """ state = OperationState.load(operation_id) if not state: print(f"✗ State not found: {operation_id}") return print(f"✓ Resuming operation: {state.description}") print(f" Progress: Step {state.progress.get('step', '?')}") print(f" Last checkpoint: {state.last_checkpoint}") # Continue from where we left off current_step = state.progress.get("step", 0) # Skip completed steps, resume from current # ... your resume logic here ... # Remember to call state.complete() when done! if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Example resilient agent") parser.add_argument("--resume", help="Resume from state ID") args = parser.parse_args() if args.resume: example_resume_from_state(args.resume) else: example_long_running_task()