""" AI Desktop Agent - Cognitive Desktop Automation Combines vision, reasoning, and control for autonomous task execution """ import base64 import time from typing import Dict, List, Optional, Any, Callable from pathlib import Path import logging from desktop_control import DesktopController logger = logging.getLogger(__name__) class AIDesktopAgent: """ Intelligent desktop agent that combines computer vision, LLM reasoning, and desktop control for autonomous task execution. Can understand screen content, plan actions, and execute complex workflows. """ def __init__(self, llm_client=None, failsafe: bool = True): """ Initialize AI Desktop Agent. Args: llm_client: OpenClaw LLM client for reasoning (optional, will try to auto-detect) failsafe: Enable failsafe mode """ self.dc = DesktopController(failsafe=failsafe) self.llm_client = llm_client self.screen_width, self.screen_height = self.dc.get_screen_size() # Action history for learning self.action_history = [] # Application knowledge base self.app_knowledge = self._load_app_knowledge() logger.info("AI Desktop Agent initialized") def _load_app_knowledge(self) -> Dict[str, Dict]: """ Load application-specific knowledge. This can be extended with learned patterns. """ return { "mspaint": { "name": "Microsoft Paint", "launch_command": "mspaint", "common_actions": { "select_pencil": {"menu": "Tools", "position": "toolbar_left"}, "select_brush": {"menu": "Tools", "position": "toolbar"}, "select_color": {"menu": "Colors", "action": "click_palette"}, "draw_line": {"action": "drag", "tool_required": "line"}, } }, "notepad": { "name": "Notepad", "launch_command": "notepad", "common_actions": { "type_text": {"action": "type"}, "save": {"hotkey": ["ctrl", "s"]}, "new_file": {"hotkey": ["ctrl", "n"]}, } }, "calculator": { "name": "Calculator", "launch_command": "calc", "common_actions": { "calculate": {"action": "type_numbers"}, } } } def execute_task(self, task: str, max_steps: int = 50) -> Dict[str, Any]: """ Execute a high-level task autonomously. Args: task: Natural language task description max_steps: Maximum number of steps to attempt Returns: Execution result with status and details """ logger.info(f"Executing task: {task}") # Initialize result result = { "task": task, "status": "in_progress", "steps": [], "screenshots": [], "success": False } try: # Step 1: Analyze task and plan plan = self._plan_task(task) logger.info(f"Generated plan with {len(plan)} steps") # Step 2: Execute plan step by step for step_num, step in enumerate(plan, 1): if step_num > max_steps: logger.warning(f"Reached max steps ({max_steps})") break logger.info(f"Step {step_num}/{len(plan)}: {step['description']}") # Capture screen before action screenshot_before = self.dc.screenshot() # Execute step step_result = self._execute_step(step) result["steps"].append(step_result) # Capture screen after action screenshot_after = self.dc.screenshot() result["screenshots"].append({ "step": step_num, "before": screenshot_before, "after": screenshot_after }) # Verify step success if not step_result.get("success", False): logger.error(f"Step {step_num} failed: {step_result.get('error')}") result["status"] = "failed" result["failed_at_step"] = step_num return result # Small delay between steps time.sleep(0.5) result["status"] = "completed" result["success"] = True logger.info(f"Task completed successfully in {len(result['steps'])} steps") except Exception as e: logger.error(f"Task execution error: {e}") result["status"] = "error" result["error"] = str(e) return result def _plan_task(self, task: str) -> List[Dict[str, Any]]: """ Plan task execution using LLM reasoning. Args: task: Task description Returns: List of execution steps """ # For now, use rule-based planning # TODO: Integrate with OpenClaw LLM for intelligent planning # Parse task intent task_lower = task.lower() # Pattern matching for common tasks if "draw" in task_lower and "paint" in task_lower: return self._plan_paint_drawing(task) elif "type" in task_lower or "write" in task_lower: return self._plan_text_entry(task) elif "play" in task_lower and "game" in task_lower: return self._plan_game_play(task) elif "open" in task_lower or "launch" in task_lower: return self._plan_app_launch(task) else: # Generic plan - analyze and improvise return self._plan_generic(task) def _plan_paint_drawing(self, task: str) -> List[Dict]: """Plan for drawing in MS Paint.""" # Extract what to draw drawing_subject = self._extract_subject(task) return [ { "type": "launch_app", "app": "mspaint", "description": "Launch Microsoft Paint" }, { "type": "wait", "duration": 2.0, "description": "Wait for Paint to load" }, { "type": "activate_window", "title": "Paint", "description": "Ensure Paint window is active" }, { "type": "select_tool", "tool": "pencil", "description": "Select pencil tool" }, { "type": "draw", "subject": drawing_subject, "description": f"Draw {drawing_subject}" }, { "type": "screenshot", "save_as": "drawing_result.png", "description": "Capture the drawing" } ] def _plan_text_entry(self, task: str) -> List[Dict]: """Plan for text entry task.""" # Extract text to type text_content = self._extract_text_content(task) return [ { "type": "launch_app", "app": "notepad", "description": "Launch Notepad" }, { "type": "wait", "duration": 1.0, "description": "Wait for Notepad to load" }, { "type": "type_text", "text": text_content, "wpm": 80, "description": f"Type: {text_content[:50]}..." } ] def _plan_game_play(self, task: str) -> List[Dict]: """Plan for playing a game.""" game_name = self._extract_game_name(task) return [ { "type": "analyze_screen", "description": "Analyze game screen" }, { "type": "detect_game_state", "game": game_name, "description": f"Detect {game_name} state" }, { "type": "execute_game_loop", "game": game_name, "max_iterations": 100, "description": f"Play {game_name}" } ] def _plan_app_launch(self, task: str) -> List[Dict]: """Plan for launching an application.""" app_name = self._extract_app_name(task) return [ { "type": "launch_app", "app": app_name, "description": f"Launch {app_name}" }, { "type": "wait", "duration": 2.0, "description": f"Wait for {app_name} to load" } ] def _plan_generic(self, task: str) -> List[Dict]: """Generic planning fallback.""" return [ { "type": "analyze_screen", "description": "Analyze current screen state" }, { "type": "infer_action", "task": task, "description": f"Infer action for: {task}" } ] def _execute_step(self, step: Dict[str, Any]) -> Dict[str, Any]: """ Execute a single step. Args: step: Step definition Returns: Execution result """ step_type = step.get("type") result = {"step": step, "success": False} try: if step_type == "launch_app": self._do_launch_app(step["app"]) result["success"] = True elif step_type == "wait": time.sleep(step["duration"]) result["success"] = True elif step_type == "activate_window": success = self.dc.activate_window(step["title"]) result["success"] = success elif step_type == "select_tool": self._do_select_tool(step["tool"]) result["success"] = True elif step_type == "draw": self._do_draw(step["subject"]) result["success"] = True elif step_type == "type_text": self.dc.type_text(step["text"], wpm=step.get("wpm", 80)) result["success"] = True elif step_type == "screenshot": filename = step.get("save_as", "screenshot.png") self.dc.screenshot(filename=filename) result["success"] = True result["saved_to"] = filename elif step_type == "analyze_screen": analysis = self._analyze_screen() result["analysis"] = analysis result["success"] = True elif step_type == "execute_game_loop": game_result = self._execute_game_loop(step) result["game_result"] = game_result result["success"] = True else: result["error"] = f"Unknown step type: {step_type}" except Exception as e: logger.error(f"Step execution error: {e}") result["error"] = str(e) return result def _do_launch_app(self, app: str) -> None: """Launch an application.""" # Get launch command from knowledge base app_info = self.app_knowledge.get(app, {}) launch_cmd = app_info.get("launch_command", app) # Open Run dialog self.dc.hotkey('win', 'r') time.sleep(0.5) # Type and execute command self.dc.type_text(launch_cmd, wpm=100) self.dc.press('enter') logger.info(f"Launched: {app}") def _do_select_tool(self, tool: str) -> None: """Select a tool (e.g., in Paint).""" # This is simplified - in reality would use computer vision # to find and click the tool button # For Paint, tools are typically in the ribbon # We'll use hotkeys where possible if tool == "pencil": # In Paint, press 'P' for pencil self.dc.press('p') elif tool == "brush": self.dc.press('b') elif tool == "eraser": self.dc.press('e') logger.info(f"Selected tool: {tool}") def _do_draw(self, subject: str) -> None: """ Draw something on screen. This is a simplified implementation - would be enhanced with: - Image generation (use wan2gp to generate reference) - Trace generation (convert image to draw commands) - Executed drawing (execute the commands) """ logger.info(f"Drawing: {subject}") # Get canvas center (simplified - would detect canvas) canvas_x = self.screen_width // 2 canvas_y = self.screen_height // 2 # Simple drawing pattern (example: draw a simple shape) if "circle" in subject.lower(): self._draw_circle(canvas_x, canvas_y, radius=100) elif "square" in subject.lower(): self._draw_square(canvas_x, canvas_y, size=200) elif "star" in subject.lower(): self._draw_star(canvas_x, canvas_y, size=100) else: # Generic: draw a simple pattern self._draw_simple_pattern(canvas_x, canvas_y) logger.info(f"Completed drawing: {subject}") def _draw_circle(self, cx: int, cy: int, radius: int) -> None: """Draw a circle.""" import math points = [] for angle in range(0, 360, 5): rad = math.radians(angle) x = int(cx + radius * math.cos(rad)) y = int(cy + radius * math.sin(rad)) points.append((x, y)) # Draw by connecting points for i in range(len(points) - 1): self.dc.drag(points[i][0], points[i][1], points[i+1][0], points[i+1][1], duration=0.01) # Close the circle self.dc.drag(points[-1][0], points[-1][1], points[0][0], points[0][1], duration=0.01) def _draw_square(self, cx: int, cy: int, size: int) -> None: """Draw a square.""" half = size // 2 corners = [ (cx - half, cy - half), # Top-left (cx + half, cy - half), # Top-right (cx + half, cy + half), # Bottom-right (cx - half, cy + half), # Bottom-left ] # Draw sides for i in range(4): start = corners[i] end = corners[(i + 1) % 4] self.dc.drag(start[0], start[1], end[0], end[1], duration=0.2) def _draw_star(self, cx: int, cy: int, size: int) -> None: """Draw a 5-pointed star.""" import math points = [] for i in range(10): angle = math.radians(i * 36 - 90) radius = size if i % 2 == 0 else size // 2 x = int(cx + radius * math.cos(angle)) y = int(cy + radius * math.sin(angle)) points.append((x, y)) # Draw by connecting points for i in range(len(points)): start = points[i] end = points[(i + 1) % len(points)] self.dc.drag(start[0], start[1], end[0], end[1], duration=0.1) def _draw_simple_pattern(self, cx: int, cy: int) -> None: """Draw a simple decorative pattern.""" # Draw a few curved lines for offset in [-50, 0, 50]: self.dc.drag(cx - 100, cy + offset, cx + 100, cy + offset, duration=0.3) def _analyze_screen(self) -> Dict[str, Any]: """ Analyze current screen state. Would use OCR, object detection in full implementation. """ screenshot = self.dc.screenshot() active_window = self.dc.get_active_window() mouse_pos = self.dc.get_mouse_position() analysis = { "active_window": active_window, "mouse_position": mouse_pos, "screen_size": (self.screen_width, self.screen_height), "timestamp": time.time() } # TODO: Add OCR, object detection, UI element detection return analysis def _execute_game_loop(self, step: Dict) -> Dict: """ Execute game playing loop. Would use reinforcement learning in full implementation. """ game = step.get("game", "unknown") max_iter = step.get("max_iterations", 100) logger.info(f"Starting game loop for: {game}") result = { "game": game, "iterations": 0, "actions_taken": [] } # Simple game loop - would be much more sophisticated for i in range(max_iter): # Analyze game state state = self._analyze_screen() # Decide action (simplified - would use ML model) action = self._decide_game_action(state, game) # Execute action self._execute_game_action(action) result["iterations"] += 1 result["actions_taken"].append(action) # Check win/lose condition # (would detect from screen) time.sleep(0.1) return result def _decide_game_action(self, state: Dict, game: str) -> str: """Decide next game action based on state.""" # Simplified - would use game-specific AI return "continue" def _execute_game_action(self, action: str) -> None: """Execute a game action.""" # Simplified - would translate to specific inputs pass # Helper methods for parsing def _extract_subject(self, text: str) -> str: """Extract subject from drawing request.""" # Simple extraction - would use NLP if "draw" in text.lower(): parts = text.lower().split("draw") if len(parts) > 1: return parts[1].strip() return "unknown" def _extract_text_content(self, text: str) -> str: """Extract text content from typing request.""" # Simple extraction if "type" in text.lower(): parts = text.split("type") if len(parts) > 1: return parts[1].strip().strip('"').strip("'") return text def _extract_game_name(self, text: str) -> str: """Extract game name from request.""" # Would use NER for better extraction return "unknown_game" def _extract_app_name(self, text: str) -> str: """Extract application name from request.""" # Simple extraction - would use NER for app in self.app_knowledge.keys(): if app in text.lower(): return app return "notepad" # Default fallback # Quick access function def create_agent(**kwargs) -> AIDesktopAgent: """Create an AI Desktop Agent instance.""" return AIDesktopAgent(**kwargs) if __name__ == "__main__": print("🤖 AI Desktop Agent - Cognitive Automation") print("=" * 60) # Create agent agent = AIDesktopAgent(failsafe=True) print("\n✨ Examples of what you can ask:") print(" - 'Draw a circle in Paint'") print(" - 'Type Hello World in Notepad'") print(" - 'Open Calculator'") print(" - 'Play Solitaire for me'") print("\n🎯 Try it:") task = input("\nWhat would you like me to do? ") if task.strip(): result = agent.execute_task(task) print(f"\n{'='* 60}") print(f"Task Status: {result['status']}") print(f"Steps Executed: {len(result['steps'])}") print(f"Success: {result['success']}") if result.get('screenshots'): print(f"Screenshots captured: {len(result['screenshots'])}") else: print("\nNo task entered. Exiting.")