Wolf-Chat-for-Lastwar/ui_interaction.py
z060142 da5f7f4358 Add island-based bubble detection method with color masks; refactor system prompt and streamline detection config
- Introduced a new bubble detection method based on color-based connected components ("island detection").
  - Operates in a predefined region (150, 330, 600, 880) using HSV color masks.
  - Mask settings and area thresholds per bubble type (user, bot, etc.) are configurable via `bubble_colors.json`.
  - Supports optional image downscaling for performance; thresholds scale accordingly.
  - Default setting remains off (`self.use_color_detection = False`), can be enabled manually in `DetectionModule`.

- Added keyword image matching for "wolf"/"Wolf" within detected bubbles.
- Improved precision in reply content extraction and sender identification by relocating based on snapshot offsets.

- Removed redundant portions of the system prompt to reduce token usage and improve response clarity.
- Modularized and simplified configuration for bubble type addition and detection tuning.
2025-05-01 02:58:41 +08:00

1827 lines
99 KiB
Python

# ui_interaction.py
# Refactored to separate Detection and Interaction logic.
import pyautogui
import cv2 # opencv-python
import numpy as np
import pyperclip
import time
import os
import collections
import asyncio
import pygetwindow as gw # Used to check/activate windows
import config # Used to read window title
import json # Added for color config loading
import queue
from typing import List, Tuple, Optional, Dict, Any
import threading # Import threading for Lock if needed, or just use a simple flag
# --- Global Pause Flag ---
# Using a simple mutable object (list) for thread-safe-like access without explicit lock
# Or could use threading.Event()
monitoring_paused_flag = [False] # List containing a boolean
# --- Color Config Loading ---
def load_bubble_colors(config_path='bubble_colors.json'):
"""Loads bubble color configuration from a JSON file."""
try:
# Ensure the path is absolute or relative to the script directory
if not os.path.isabs(config_path):
config_path = os.path.join(SCRIPT_DIR, config_path)
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"Successfully loaded color config from {config_path}")
return config.get('bubble_types', [])
except FileNotFoundError:
print(f"Warning: Color config file not found at {config_path}. Using default colors.")
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {config_path}. Using default colors.")
except Exception as e:
print(f"Error loading color config: {e}. Using default colors.")
# Default configuration if loading fails
return [
{
"name": "normal_user",
"is_bot": false,
"hsv_lower": [6, 0, 240],
"hsv_upper": [18, 23, 255],
"min_area": 2500,
"max_area": 300000
},
{
"name": "bot",
"is_bot": true,
"hsv_lower": [105, 9, 208],
"hsv_upper": [116, 43, 243],
"min_area": 2500,
"max_area": 300000
}
]
# --- Configuration Section ---
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
TEMPLATE_DIR = os.path.join(SCRIPT_DIR, "templates")
os.makedirs(TEMPLATE_DIR, exist_ok=True)
# --- Debugging ---
DEBUG_SCREENSHOT_DIR = os.path.join(SCRIPT_DIR, "debug_screenshots")
MAX_DEBUG_SCREENSHOTS = 8
os.makedirs(DEBUG_SCREENSHOT_DIR, exist_ok=True)
# --- End Debugging ---
# --- Template Paths (Consider moving to config.py or loading dynamically) ---
# Bubble Corners
CORNER_TL_IMG = os.path.join(TEMPLATE_DIR, "corner_tl.png")
# CORNER_TR_IMG = os.path.join(TEMPLATE_DIR, "corner_tr.png") # Unused
# CORNER_BL_IMG = os.path.join(TEMPLATE_DIR, "corner_bl.png") # Unused
CORNER_BR_IMG = os.path.join(TEMPLATE_DIR, "corner_br.png")
# --- Additional Regular Bubble Types (Skins) ---
CORNER_TL_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "corner_tl_type2.png") # Added
CORNER_BR_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "corner_br_type2.png") # Added
CORNER_TL_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "corner_tl_type3.png") # Added
CORNER_BR_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "corner_br_type3.png") # Added
CORNER_TL_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "corner_tl_type4.png") # Added type4
CORNER_BR_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "corner_br_type4.png") # Added type4
# --- End Additional Regular Types ---
BOT_CORNER_TL_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tl.png")
# BOT_CORNER_TR_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tr.png") # Unused
# BOT_CORNER_BL_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_bl.png") # Unused
BOT_CORNER_BR_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_br.png")
# --- Additional Bot Bubble Types (Skins) ---
# Type 2
BOT_CORNER_TL_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tl_type2.png")
BOT_CORNER_BR_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_br_type2.png")
# Type 3
BOT_CORNER_TL_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tl_type3.png")
BOT_CORNER_BR_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_br_type3.png")
# --- End Additional Types ---
# Keywords
KEYWORD_wolf_LOWER_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower.png")
KEYWORD_Wolf_UPPER_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper.png")
KEYWORD_wolf_LOWER_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower_type2.png") # Added for type3 bubbles
KEYWORD_Wolf_UPPER_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper_type2.png") # Added for type3 bubbles
KEYWORD_wolf_LOWER_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower_type3.png") # Added for type3 bubbles
KEYWORD_Wolf_UPPER_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper_type3.png") # Added for type3 bubbles
KEYWORD_wolf_LOWER_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower_type4.png") # Added for type4 bubbles
KEYWORD_Wolf_UPPER_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper_type4.png") # Added for type4 bubbles
# --- Reply Keywords ---
KEYWORD_WOLF_REPLY_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply.png") # Added for reply detection
KEYWORD_WOLF_REPLY_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply_type2.png") # Added for reply detection type2
KEYWORD_WOLF_REPLY_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply_type3.png") # Added for reply detection type3
KEYWORD_WOLF_REPLY_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply_type4.png") # Added for reply detection type4
# --- End Reply Keywords ---
# UI Elements
COPY_MENU_ITEM_IMG = os.path.join(TEMPLATE_DIR, "copy_menu_item.png")
PROFILE_OPTION_IMG = os.path.join(TEMPLATE_DIR, "profile_option.png")
COPY_NAME_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "copy_name_button.png")
SEND_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "send_button.png")
CHAT_INPUT_IMG = os.path.join(TEMPLATE_DIR, "chat_input.png")
# State Detection
PROFILE_NAME_PAGE_IMG = os.path.join(TEMPLATE_DIR, "Profile_Name_page.png")
PROFILE_PAGE_IMG = os.path.join(TEMPLATE_DIR, "Profile_page.png")
CHAT_ROOM_IMG = os.path.join(TEMPLATE_DIR, "chat_room.png")
BASE_SCREEN_IMG = os.path.join(TEMPLATE_DIR, "base.png") # Added for navigation
WORLD_MAP_IMG = os.path.join(TEMPLATE_DIR, "World_map.png") # Added for navigation
# Add World/Private chat identifiers later
WORLD_CHAT_IMG = os.path.join(TEMPLATE_DIR, "World_Label_normal.png") # Example
PRIVATE_CHAT_IMG = os.path.join(TEMPLATE_DIR, "Private_Label_normal.png") # Example
# Position Icons (Near Bubble)
POS_DEV_IMG = os.path.join(TEMPLATE_DIR, "positions", "development.png")
POS_INT_IMG = os.path.join(TEMPLATE_DIR, "positions", "interior.png")
POS_SCI_IMG = os.path.join(TEMPLATE_DIR, "positions", "science.png")
POS_SEC_IMG = os.path.join(TEMPLATE_DIR, "positions", "security.png")
POS_STR_IMG = os.path.join(TEMPLATE_DIR, "positions", "strategy.png")
# Capitol Page Elements
CAPITOL_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "capitol_#11.png")
PRESIDENT_TITLE_IMG = os.path.join(TEMPLATE_DIR, "capitol", "president_title.png")
POS_BTN_DEV_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_development.png")
POS_BTN_INT_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_interior.png")
POS_BTN_SCI_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_science.png")
POS_BTN_SEC_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_security.png")
POS_BTN_STR_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_strategy.png")
PAGE_DEV_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_DEVELOPMENT.png")
PAGE_INT_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_INTERIOR.png")
PAGE_SCI_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_SCIENCE.png")
PAGE_SEC_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_SECURITY.png")
PAGE_STR_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_STRATEGY.png")
DISMISS_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "dismiss.png")
CONFIRM_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "confirm.png")
CLOSE_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "close_button.png")
BACK_ARROW_IMG = os.path.join(TEMPLATE_DIR, "capitol", "black_arrow_down.png")
REPLY_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "reply_button.png") # Added for reply functionality
# --- Operation Parameters (Consider moving to config.py) ---
CHAT_INPUT_REGION = None # Example: (100, 800, 500, 50)
CHAT_INPUT_CENTER_X = 400
CHAT_INPUT_CENTER_Y = 1280
SCREENSHOT_REGION = (70, 50, 800, 1365) # Updated region
CONFIDENCE_THRESHOLD = 0.9 # Increased threshold for corner matching
STATE_CONFIDENCE_THRESHOLD = 0.9
AVATAR_OFFSET_X = -45 # Original offset, used for non-reply interactions like position removal
# AVATAR_OFFSET_X_RELOCATED = -50 # Replaced by specific reply offsets
AVATAR_OFFSET_X_REPLY = -45 # Horizontal offset for avatar click after re-location (for reply context)
AVATAR_OFFSET_Y_REPLY = 10 # Vertical offset for avatar click after re-location (for reply context)
BUBBLE_RELOCATE_CONFIDENCE = 0.8 # Reduced confidence for finding the bubble snapshot (was 0.9)
BUBBLE_RELOCATE_FALLBACK_CONFIDENCE = 0.6 # Lower confidence for fallback attempts
BBOX_SIMILARITY_TOLERANCE = 10
RECENT_TEXT_HISTORY_MAXLEN = 5 # This state likely belongs in the coordinator
# --- Helper Function (Module Level) ---
def are_bboxes_similar(bbox1: Optional[Tuple[int, int, int, int]],
bbox2: Optional[Tuple[int, int, int, int]],
tolerance: int = BBOX_SIMILARITY_TOLERANCE) -> bool:
"""Check if two bounding boxes' top-left corners are close."""
if bbox1 is None or bbox2 is None:
return False
# Compare based on bbox top-left (index 0 and 1)
return abs(bbox1[0] - bbox2[0]) <= tolerance and abs(bbox1[1] - bbox2[1]) <= tolerance
# ==============================================================================
# Detection Module
# ==============================================================================
class DetectionModule:
"""Handles finding elements and states on the screen using image recognition or color analysis."""
def __init__(self, templates: Dict[str, str], confidence: float = CONFIDENCE_THRESHOLD,
state_confidence: float = STATE_CONFIDENCE_THRESHOLD,
region: Optional[Tuple[int, int, int, int]] = SCREENSHOT_REGION):
# --- Hardcoded Settings (as per user instruction) ---
self.use_color_detection: bool = True # Set to True to enable color detection by default
self.color_config_path: str = "bubble_colors.json"
# --- End Hardcoded Settings ---
self.templates = templates
self.confidence = confidence
self.state_confidence = state_confidence
self.region = region
self._warned_paths = set()
# Load color configuration if color detection is enabled
self.bubble_colors = []
if self.use_color_detection:
self.bubble_colors = load_bubble_colors(self.color_config_path) # Use internal path
if not self.bubble_colors:
print("Warning: Color detection enabled, but failed to load any color configurations. Color detection might not work.")
print(f"DetectionModule initialized. Color Detection: {'Enabled' if self.use_color_detection else 'Disabled'}")
def _find_template(self, template_key: str, confidence: Optional[float] = None, region: Optional[Tuple[int, int, int, int]] = None, grayscale: bool = False) -> List[Tuple[int, int]]:
"""Internal helper to find a template by its key. Returns list of CENTER coordinates."""
template_path = self.templates.get(template_key)
if not template_path:
print(f"Error: Template key '{template_key}' not found in provided templates.")
return []
# Check if template file exists, warn only once
if not os.path.exists(template_path):
if template_path not in self._warned_paths:
print(f"Error: Template image doesn't exist: {template_path}")
self._warned_paths.add(template_path)
return []
locations = []
current_region = region if region is not None else self.region
current_confidence = confidence if confidence is not None else self.confidence
try:
# locateAllOnScreen returns Box objects (left, top, width, height)
matches = pyautogui.locateAllOnScreen(template_path, region=current_region, confidence=current_confidence, grayscale=grayscale)
if matches:
for box in matches:
# Calculate center coordinates from the Box object
center_x = box.left + box.width // 2
center_y = box.top + box.height // 2
locations.append((center_x, center_y))
# print(f"Found template '{template_key}' at {len(locations)} locations.") # Debug
return locations
except Exception as e:
print(f"Error finding template '{template_key}' ({template_path}): {e}")
return []
def _find_template_raw(self, template_key: str, confidence: Optional[float] = None, region: Optional[Tuple[int, int, int, int]] = None, grayscale: bool = False) -> List[Tuple[int, int, int, int]]:
"""Internal helper to find a template by its key. Returns list of raw Box tuples (left, top, width, height)."""
template_path = self.templates.get(template_key)
if not template_path:
print(f"Error: Template key '{template_key}' not found in provided templates.")
return []
if not os.path.exists(template_path):
if template_path not in self._warned_paths:
print(f"Error: Template image doesn't exist: {template_path}")
self._warned_paths.add(template_path)
return []
locations = []
current_region = region if region is not None else self.region
current_confidence = confidence if confidence is not None else self.confidence
try:
# --- Temporary Debug Print ---
print(f"DEBUG: Searching for template '{template_key}' with confidence {current_confidence}...")
# --- End Temporary Debug Print ---
matches = pyautogui.locateAllOnScreen(template_path, region=current_region, confidence=current_confidence, grayscale=grayscale)
match_count = 0 # Initialize count
if matches:
for box in matches:
locations.append((box.left, box.top, box.width, box.height))
match_count += 1 # Increment count
# --- Temporary Debug Print ---
print(f"DEBUG: Found {match_count} instance(s) of template '{template_key}'.")
# --- End Temporary Debug Print ---
return locations
except Exception as e:
print(f"Error finding template raw '{template_key}' ({template_path}): {e}")
return []
def find_elements(self, template_keys: List[str], confidence: Optional[float] = None, region: Optional[Tuple[int, int, int, int]] = None) -> Dict[str, List[Tuple[int, int]]]:
"""Find multiple templates by their keys. Returns center coordinates."""
results = {}
for key in template_keys:
results[key] = self._find_template(key, confidence=confidence, region=region)
return results
def find_dialogue_bubbles(self) -> List[Dict[str, Any]]:
"""
Detects dialogue bubbles using either color analysis or template matching,
based on the 'use_color_detection' flag. Includes fallback to template matching.
Returns a list of dictionaries, each containing:
{'bbox': (tl_x, tl_y, br_x, br_y), 'is_bot': bool, 'tl_coords': (tl_x, tl_y)}
"""
# --- Try Color Detection First if Enabled ---
if self.use_color_detection:
print("Attempting bubble detection using color analysis...")
try:
# Use a scale factor of 0.5 for performance
bubbles = self.find_dialogue_bubbles_by_color(scale_factor=0.5)
# If color detection returns results, use them
if bubbles:
print("Color detection successful.")
return bubbles
else:
print("Color detection returned no bubbles. Falling back to template matching.")
except Exception as e:
print(f"Color detection failed with error: {e}. Falling back to template matching.")
import traceback
traceback.print_exc()
else:
print("Color detection disabled. Using template matching.")
# --- Fallback to Template Matching ---
print("Executing template matching for bubble detection...")
all_bubbles_info = []
processed_tls = set() # Keep track of TL corners already used in a bubble
# --- Find ALL Regular Bubble Corners (Raw Coordinates) ---
regular_tl_keys = ['corner_tl', 'corner_tl_type2', 'corner_tl_type3', 'corner_tl_type4'] # Added type4
regular_br_keys = ['corner_br', 'corner_br_type2', 'corner_br_type3', 'corner_br_type4'] # Added type4
bubble_detection_region = (150, 330, 600, 880) # Define the specific region for bubbles
print(f"DEBUG: Using specific region for bubble corner detection: {bubble_detection_region}")
all_regular_tl_boxes = []
for key in regular_tl_keys:
all_regular_tl_boxes.extend(self._find_template_raw(key, region=bubble_detection_region)) # Pass region
all_regular_br_boxes = []
for key in regular_br_keys:
all_regular_br_boxes.extend(self._find_template_raw(key, region=bubble_detection_region)) # Pass region
# --- Find Bot Bubble Corners (Raw Coordinates - Single Type) ---
bot_tl_boxes = self._find_template_raw('bot_corner_tl', region=bubble_detection_region) # Pass region
bot_br_boxes = self._find_template_raw('bot_corner_br', region=bubble_detection_region) # Pass region
# --- Match Regular Bubbles (Any Type TL with Any Type BR) ---
if all_regular_tl_boxes and all_regular_br_boxes:
for tl_box in all_regular_tl_boxes:
tl_coords = (tl_box[0], tl_box[1]) # Extract original TL (left, top)
# Skip if this TL is already part of a matched bubble
if tl_coords in processed_tls: continue
potential_br_box = None
min_y_diff = float('inf') # Prioritize minimum Y difference
# Find the valid BR corner (from any regular type) with the closest Y-coordinate
for br_box in all_regular_br_boxes:
br_coords = (br_box[0], br_box[1]) # BR top-left
# Basic geometric check: BR must be below and to the right of TL
if br_coords[0] > tl_coords[0] + 20 and br_coords[1] > tl_coords[1] + 10:
y_diff = abs(br_coords[1] - tl_coords[1]) # Calculate Y difference
if y_diff < min_y_diff:
potential_br_box = br_box
min_y_diff = y_diff
# Optional: Add a secondary check for X distance if Y diff is the same?
# elif y_diff == min_y_diff:
# if potential_br_box is None or abs(br_coords[0] - tl_coords[0]) < abs(potential_br_box[0] - tl_coords[0]):
# potential_br_box = br_box
if potential_br_box:
# Calculate bbox using TL's top-left and BR's bottom-right
bubble_bbox = (tl_coords[0], tl_coords[1],
potential_br_box[0] + potential_br_box[2], potential_br_box[1] + potential_br_box[3])
all_bubbles_info.append({
'bbox': bubble_bbox,
'is_bot': False,
'tl_coords': tl_coords # Store the original TL coords
})
processed_tls.add(tl_coords) # Mark this TL as used
# --- Match Bot Bubbles (Single Type) ---
if bot_tl_boxes and bot_br_boxes:
for tl_box in bot_tl_boxes:
tl_coords = (tl_box[0], tl_box[1]) # Extract original TL (left, top)
# Skip if this TL is already part of a matched bubble
if tl_coords in processed_tls: continue
potential_br_box = None
min_y_diff = float('inf') # Prioritize minimum Y difference
# Find the valid BR corner with the closest Y-coordinate
for br_box in bot_br_boxes:
br_coords = (br_box[0], br_box[1]) # BR top-left
# Basic geometric check: BR must be below and to the right of TL
if br_coords[0] > tl_coords[0] + 20 and br_coords[1] > tl_coords[1] + 10:
y_diff = abs(br_coords[1] - tl_coords[1]) # Calculate Y difference
if y_diff < min_y_diff:
potential_br_box = br_box
min_y_diff = y_diff
# Optional: Add a secondary check for X distance if Y diff is the same?
# elif y_diff == min_y_diff:
# if potential_br_box is None or abs(br_coords[0] - tl_coords[0]) < abs(potential_br_box[0] - tl_coords[0]):
# potential_br_box = br_box
if potential_br_box:
# Calculate bbox using TL's top-left and BR's bottom-right
bubble_bbox = (tl_coords[0], tl_coords[1],
potential_br_box[0] + potential_br_box[2], potential_br_box[1] + potential_br_box[3])
all_bubbles_info.append({
'bbox': bubble_bbox,
'is_bot': True,
'tl_coords': tl_coords # Store the original TL coords
})
processed_tls.add(tl_coords) # Mark this TL as used
# Note: This logic prioritizes matching regular bubbles first, then bot bubbles.
# Confidence thresholds might need tuning.
print(f"Template matching found {len(all_bubbles_info)} bubbles.") # Added log
return all_bubbles_info
def find_dialogue_bubbles_by_color(self, scale_factor=0.5) -> List[Dict[str, Any]]:
"""
Find dialogue bubbles using color analysis within a specific region.
Applies scaling to improve performance.
Returns a list of dictionaries, each containing:
{'bbox': (tl_x, tl_y, br_x, br_y), 'is_bot': bool, 'tl_coords': (tl_x, tl_y)}
"""
all_bubbles_info = []
# Define the specific region for bubble detection (same as template matching)
bubble_detection_region = (150, 330, 600, 880)
print(f"Using bubble color detection region: {bubble_detection_region}")
try:
# 1. Capture the specified region
screenshot = pyautogui.screenshot(region=bubble_detection_region)
if screenshot is None:
print("Error: Failed to capture screenshot for color detection.")
return []
img = np.array(screenshot)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # Convert RGB (from pyautogui) to BGR (for OpenCV)
# 2. Resize for performance
if scale_factor < 1.0:
h, w = img.shape[:2]
new_h, new_w = int(h * scale_factor), int(w * scale_factor)
if new_h <= 0 or new_w <= 0:
print(f"Error: Invalid dimensions after scaling: {new_w}x{new_h}. Using original image.")
img_small = img
current_scale_factor = 1.0
else:
img_small = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
print(f"Original resolution: {w}x{h}, Scaled down to: {new_w}x{new_h}")
current_scale_factor = scale_factor
else:
img_small = img
current_scale_factor = 1.0
# 3. Convert to HSV color space
hsv = cv2.cvtColor(img_small, cv2.COLOR_BGR2HSV)
# 4. Process each configured bubble type
if not self.bubble_colors:
print("Error: No bubble color configurations loaded for detection.")
return []
for color_config in self.bubble_colors:
name = color_config.get('name', 'unknown')
is_bot = color_config.get('is_bot', False)
hsv_lower = np.array(color_config.get('hsv_lower', [0,0,0]))
hsv_upper = np.array(color_config.get('hsv_upper', [179,255,255]))
min_area_config = color_config.get('min_area', 3000)
max_area_config = color_config.get('max_area', 100000)
# Adjust area thresholds based on scaling factor
min_area = min_area_config * (current_scale_factor ** 2)
max_area = max_area_config * (current_scale_factor ** 2)
print(f"Processing color type: {name} (Bot: {is_bot}), HSV Lower: {hsv_lower}, HSV Upper: {hsv_upper}, Area: {min_area:.0f}-{max_area:.0f}")
# 5. Create mask based on HSV range
mask = cv2.inRange(hsv, hsv_lower, hsv_upper)
# 6. Morphological operations (Closing) to remove noise and fill holes
kernel = np.ones((3, 3), np.uint8)
mask_closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=2) # Increased iterations
# Optional: Dilation to merge nearby parts?
# mask_closed = cv2.dilate(mask_closed, kernel, iterations=1)
# 7. Find connected components
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask_closed)
# 8. Filter components by area and add to results
for i in range(1, num_labels): # Skip background label 0
area = stats[i, cv2.CC_STAT_AREA]
if min_area <= area <= max_area:
x_s = stats[i, cv2.CC_STAT_LEFT]
y_s = stats[i, cv2.CC_STAT_TOP]
w_s = stats[i, cv2.CC_STAT_WIDTH]
h_s = stats[i, cv2.CC_STAT_HEIGHT]
# Convert coordinates back to original resolution
if current_scale_factor < 1.0:
x = int(x_s / current_scale_factor)
y = int(y_s / current_scale_factor)
width = int(w_s / current_scale_factor)
height = int(h_s / current_scale_factor)
else:
x, y, width, height = x_s, y_s, w_s, h_s
# Adjust coordinates relative to the full screen (add region offset)
x_adjusted = x + bubble_detection_region[0]
y_adjusted = y + bubble_detection_region[1]
bubble_bbox = (x_adjusted, y_adjusted, x_adjusted + width, y_adjusted + height)
tl_coords = (x_adjusted, y_adjusted) # Top-left coords in full screen space
all_bubbles_info.append({
'bbox': bubble_bbox,
'is_bot': is_bot,
'tl_coords': tl_coords
})
print(f" -> Found '{name}' bubble component. Area: {area:.0f} (Scaled). Original Coords: {bubble_bbox}")
except pyautogui.FailSafeException:
print("FailSafe triggered during color detection.")
return []
except Exception as e:
print(f"Error during color-based bubble detection: {e}")
import traceback
traceback.print_exc()
return [] # Return empty list on error
print(f"Color detection found {len(all_bubbles_info)} bubbles.")
return all_bubbles_info
def find_keyword_in_region(self, region: Tuple[int, int, int, int]) -> Optional[Tuple[int, int]]:
"""Look for keywords within a specified region. Returns center coordinates."""
if region[2] <= 0 or region[3] <= 0: return None # Invalid region width/height
# Try original lowercase with color matching
locations_lower = self._find_template('keyword_wolf_lower', region=region, grayscale=True) # Changed grayscale to False
if locations_lower:
print(f"Found keyword (lowercase, color) in region {region}, position: {locations_lower[0]}") # Updated log message
return locations_lower[0]
# Try original uppercase with color matching
locations_upper = self._find_template('keyword_wolf_upper', region=region, grayscale=True) # Changed grayscale to False
if locations_upper:
print(f"Found keyword (uppercase, color) in region {region}, position: {locations_upper[0]}") # Updated log message
return locations_upper[0]
# Try type2 lowercase (white text, no grayscale)
locations_lower_type2 = self._find_template('keyword_wolf_lower_type2', region=region, grayscale=False) # Added type2 check
if locations_lower_type2:
print(f"Found keyword (lowercase, type2) in region {region}, position: {locations_lower_type2[0]}")
return locations_lower_type2[0]
# Try type2 uppercase (white text, no grayscale)
locations_upper_type2 = self._find_template('keyword_wolf_upper_type2', region=region, grayscale=False) # Added type2 check
if locations_upper_type2:
print(f"Found keyword (uppercase, type2) in region {region}, position: {locations_upper_type2[0]}")
return locations_upper_type2[0]
# Try type3 lowercase (white text, no grayscale) - Corrected
locations_lower_type3 = self._find_template('keyword_wolf_lower_type3', region=region, grayscale=False)
if locations_lower_type3:
print(f"Found keyword (lowercase, type3) in region {region}, position: {locations_lower_type3[0]}")
return locations_lower_type3[0]
# Try type3 uppercase (white text, no grayscale) - Corrected
locations_upper_type3 = self._find_template('keyword_wolf_upper_type3', region=region, grayscale=False)
if locations_upper_type3:
print(f"Found keyword (uppercase, type3) in region {region}, position: {locations_upper_type3[0]}")
return locations_upper_type3[0]
# Try type4 lowercase (white text, no grayscale) - Added type4
locations_lower_type4 = self._find_template('keyword_wolf_lower_type4', region=region, grayscale=False)
if locations_lower_type4:
print(f"Found keyword (lowercase, type4) in region {region}, position: {locations_lower_type4[0]}")
return locations_lower_type4[0]
# Try type4 uppercase (white text, no grayscale) - Added type4
locations_upper_type4 = self._find_template('keyword_wolf_upper_type4', region=region, grayscale=False)
if locations_upper_type4:
print(f"Found keyword (uppercase, type4) in region {region}, position: {locations_upper_type4[0]}")
return locations_upper_type4[0]
# Try reply keyword (normal)
locations_reply = self._find_template('keyword_wolf_reply', region=region, grayscale=False)
if locations_reply:
print(f"Found keyword (reply) in region {region}, position: {locations_reply[0]}")
return locations_reply[0]
# Try reply keyword (type2)
locations_reply_type2 = self._find_template('keyword_wolf_reply_type2', region=region, grayscale=False)
if locations_reply_type2:
print(f"Found keyword (reply, type2) in region {region}, position: {locations_reply_type2[0]}")
return locations_reply_type2[0]
# Try reply keyword (type3)
locations_reply_type3 = self._find_template('keyword_wolf_reply_type3', region=region, grayscale=False)
if locations_reply_type3:
print(f"Found keyword (reply, type3) in region {region}, position: {locations_reply_type3[0]}")
return locations_reply_type3[0]
# Try reply keyword (type4)
locations_reply_type4 = self._find_template('keyword_wolf_reply_type4', region=region, grayscale=False)
if locations_reply_type4:
print(f"Found keyword (reply, type4) in region {region}, position: {locations_reply_type4[0]}")
return locations_reply_type4[0]
return None
def calculate_avatar_coords(self, bubble_tl_coords: Tuple[int, int], offset_x: int = AVATAR_OFFSET_X) -> Tuple[int, int]:
"""
Calculate avatar coordinates based on the EXACT top-left corner coordinates of the bubble.
Uses the Y-coordinate of the TL corner directly.
"""
tl_x, tl_y = bubble_tl_coords[0], bubble_tl_coords[1]
avatar_x = tl_x + offset_x
avatar_y = tl_y # Use the exact Y from the detected TL corner
# print(f"Calculated avatar coordinates using TL {bubble_tl_coords}: ({int(avatar_x)}, {int(avatar_y)})") # Reduce noise
return (int(avatar_x), int(avatar_y))
def get_current_ui_state(self) -> str:
"""Determine the current UI state based on visible elements."""
# Check in order of specificity or likelihood
if self._find_template('profile_name_page', confidence=self.state_confidence):
return 'user_details'
if self._find_template('profile_page', confidence=self.state_confidence):
return 'profile_card'
# Add checks for world/private chat later
if self._find_template('world_chat', confidence=self.state_confidence): # Example
return 'world_chat'
if self._find_template('private_chat', confidence=self.state_confidence): # Example
return 'private_chat'
if self._find_template('chat_room', confidence=self.state_confidence):
return 'chat_room' # General chat room if others aren't found
return 'unknown'
# ==============================================================================
# Interaction Module
# ==============================================================================
class InteractionModule:
"""Handles performing actions on the UI like clicking, typing, clipboard."""
def __init__(self, detector: DetectionModule, input_coords: Tuple[int, int] = (CHAT_INPUT_CENTER_X, CHAT_INPUT_CENTER_Y), input_template_key: Optional[str] = 'chat_input', send_button_key: str = 'send_button'):
self.detector = detector
self.default_input_coords = input_coords
self.input_template_key = input_template_key
self.send_button_key = send_button_key
print("InteractionModule initialized.")
def click_at(self, x: int, y: int, button: str = 'left', clicks: int = 1, interval: float = 0.1, duration: float = 0.1):
"""Safely click at specific coordinates."""
try:
print(f"Moving to and clicking at: ({x}, {y}), button: {button}, clicks: {clicks}")
pyautogui.moveTo(x, y, duration=duration)
pyautogui.click(button=button, clicks=clicks, interval=interval)
time.sleep(0.1)
except Exception as e:
print(f"Error clicking at coordinates ({x}, {y}): {e}")
def press_key(self, key: str, presses: int = 1, interval: float = 0.1):
"""Press a specific key."""
try:
print(f"Pressing key: {key} ({presses} times)")
for _ in range(presses):
pyautogui.press(key)
time.sleep(interval)
except Exception as e:
print(f"Error pressing key '{key}': {e}")
def hotkey(self, *args):
"""Press a key combination (e.g., 'ctrl', 'c')."""
try:
print(f"Pressing hotkey: {args}")
pyautogui.hotkey(*args)
time.sleep(0.1) # Short pause after hotkey
except Exception as e:
print(f"Error pressing hotkey {args}: {e}")
def get_clipboard(self) -> Optional[str]:
"""Get text from clipboard."""
try:
return pyperclip.paste()
except Exception as e:
print(f"Error reading clipboard: {e}")
return None
def set_clipboard(self, text: str):
"""Set clipboard text."""
try:
pyperclip.copy(text)
except Exception as e:
print(f"Error writing to clipboard: {e}")
def copy_text_at(self, coords: Tuple[int, int]) -> Optional[str]:
"""Attempt to copy text after clicking at given coordinates."""
print(f"Attempting to copy text at {coords}...")
original_clipboard = self.get_clipboard() or ""
self.set_clipboard("___MCP_CLEAR___")
time.sleep(0.1)
self.click_at(coords[0], coords[1])
time.sleep(0.1) # Wait for menu/reaction
copied = False
# Try finding "Copy" menu item first
copy_item_locations = self.detector._find_template('copy_menu_item', confidence=0.7) # Use detector
if copy_item_locations:
copy_coords = copy_item_locations[0]
self.click_at(copy_coords[0], copy_coords[1])
print("Clicked 'Copy' menu item.")
time.sleep(0.15)
copied = True
else:
print("'Copy' menu item not found. Attempting Ctrl+C.")
try:
self.hotkey('ctrl', 'c')
time.sleep(0.1)
print("Simulated Ctrl+C.")
copied = True
except Exception as e_ctrlc:
print(f"Failed to simulate Ctrl+C: {e_ctrlc}")
copied = False
copied_text = self.get_clipboard()
self.set_clipboard(original_clipboard) # Restore clipboard
if copied and copied_text and copied_text != "___MCP_CLEAR___":
print(f"Successfully copied text, length: {len(copied_text)}")
return copied_text.strip()
else:
print("Error: Copy operation unsuccessful or clipboard content invalid.")
return None
def retrieve_sender_name_interaction(self,
initial_avatar_coords: Tuple[int, int],
bubble_snapshot: Any, # PIL Image object
search_area: Optional[Tuple[int, int, int, int]]) -> Optional[str]:
"""
Perform the sequence of actions to copy sender name, *without* cleanup.
Includes retries with bubble re-location if the initial avatar click fails.
Returns the name or None if failed.
"""
print(f"Attempting interaction to get username, initial avatar guess: {initial_avatar_coords}...")
original_clipboard = self.get_clipboard() or ""
self.set_clipboard("___MCP_CLEAR___")
time.sleep(0.1)
sender_name = None
profile_page_found = False
current_avatar_coords = initial_avatar_coords
for attempt in range(3): # Retry up to 3 times
print(f"Attempt #{attempt + 1} to click avatar and find profile page...")
# --- Re-locate bubble on retries ---
if attempt > 0:
print("Re-locating bubble before retry...")
if bubble_snapshot is None:
print("Error: Cannot retry re-location, bubble snapshot is missing.")
break # Cannot retry without snapshot
new_bubble_box_retry = pyautogui.locateOnScreen(bubble_snapshot, region=search_area, confidence=BUBBLE_RELOCATE_CONFIDENCE)
if new_bubble_box_retry:
new_tl_x_retry, new_tl_y_retry = new_bubble_box_retry.left, new_bubble_box_retry.top
print(f"Successfully re-located bubble snapshot for retry at: ({new_tl_x_retry}, {new_tl_y_retry})")
# Recalculate avatar coords for the retry
current_avatar_coords = (new_tl_x_retry + AVATAR_OFFSET_X_REPLY, new_tl_y_retry + AVATAR_OFFSET_Y_REPLY)
print(f"Recalculated avatar coordinates for retry: {current_avatar_coords}")
else:
print("Warning: Failed to re-locate bubble snapshot on retry. Aborting name retrieval.")
break # Stop retrying if bubble can't be found
# --- Click Avatar ---
try:
self.click_at(current_avatar_coords[0], current_avatar_coords[1])
time.sleep(0.15) # Slightly longer wait after click to allow UI to update
except Exception as click_err:
print(f"Error clicking avatar at {current_avatar_coords} on attempt {attempt + 1}: {click_err}")
time.sleep(0.3) # Wait a bit longer after a click error before retrying
continue # Go to next attempt
# --- Check for Profile Page ---
if self.detector._find_template('profile_page', confidence=self.detector.state_confidence):
print("Profile page verified.")
profile_page_found = True
break # Success, exit retry loop
else:
print(f"Profile page not found after click attempt {attempt + 1}.")
# Optional: Press ESC once to close potential wrong menus before retrying?
# self.press_key('esc')
# time.sleep(0.1)
time.sleep(0.3) # Wait before next attempt
# --- If Profile Page was found, proceed ---
if profile_page_found:
try:
# 2. Find and click profile option
profile_option_locations = self.detector._find_template('profile_option', confidence=0.7)
if not profile_option_locations:
print("Error: User details option not found on profile card.")
return None # Fail early if critical step missing
self.click_at(profile_option_locations[0][0], profile_option_locations[0][1])
print("Clicked user details option.")
time.sleep(0.1) # Wait for user details window
# 3. Find and click "Copy Name" button
copy_name_locations = self.detector._find_template('copy_name_button', confidence=0.7)
if not copy_name_locations:
print("Error: 'Copy Name' button not found in user details.")
return None # Fail early
self.click_at(copy_name_locations[0][0], copy_name_locations[0][1])
print("Clicked 'Copy Name' button.")
time.sleep(0.1)
# 4. Get name from clipboard
copied_name = self.get_clipboard()
if copied_name and copied_name != "___MCP_CLEAR___":
print(f"Successfully copied username: {copied_name}")
sender_name = copied_name.strip()
else:
print("Error: Clipboard content invalid after clicking copy name.")
sender_name = None
except Exception as e:
print(f"Error during username retrieval interaction (after profile page found): {e}")
import traceback
traceback.print_exc()
sender_name = None # Ensure None is returned on error
else:
print("Failed to verify profile page after multiple attempts.")
sender_name = None
# --- Final Cleanup & Return ---
self.set_clipboard(original_clipboard) # Restore clipboard
# NO cleanup logic (like ESC) here - should be handled by coordinator after this function returns
return sender_name
def send_chat_message(self, reply_text: str) -> bool:
"""Paste text into chat input and send it."""
print("Preparing to send response...")
if not reply_text:
print("Error: Response content is empty, cannot send.")
return False
# Find input box coordinates
input_coords = self.default_input_coords # Fallback
if self.input_template_key and self.detector.templates.get(self.input_template_key):
input_locations = self.detector._find_template(self.input_template_key, confidence=0.7)
if input_locations:
input_coords = input_locations[0]
print(f"Found input box position via image: {input_coords}")
else:
print(f"Warning: Input box template '{self.input_template_key}' not found, using default coordinates.")
else:
print("Warning: Input box template key not set or image missing, using default coordinates.")
# Click input, paste, send
self.click_at(input_coords[0], input_coords[1])
time.sleep(0.1)
print("Pasting response...")
self.set_clipboard(reply_text)
time.sleep(0.1)
try:
self.hotkey('ctrl', 'v')
time.sleep(0.1)
print("Pasted.")
except Exception as e:
print(f"Error pasting response: {e}")
return False
# Try clicking send button first
send_button_locations = self.detector._find_template(self.send_button_key, confidence=0.7)
if send_button_locations:
send_coords = send_button_locations[0]
self.click_at(send_coords[0], send_coords[1])
print("Clicked send button.")
time.sleep(0.1)
return True
else:
# Fallback to pressing Enter
print("Send button not found. Attempting to press Enter.")
try:
self.press_key('enter')
print("Pressed Enter.")
time.sleep(0.1)
return True
except Exception as e_enter:
print(f"Error pressing Enter: {e_enter}")
return False
# ==============================================================================
# Position Removal Logic
# ==============================================================================
def remove_user_position(detector: DetectionModule,
interactor: InteractionModule,
trigger_bubble_region: Tuple[int, int, int, int], # Original region, might be outdated
bubble_snapshot: Any, # PIL Image object for re-location
search_area: Optional[Tuple[int, int, int, int]]) -> bool: # Area to search snapshot in
"""
Performs the sequence of UI actions to remove a user's position based on the triggering chat bubble.
Includes re-location using the provided snapshot before proceeding.
Returns True if successful, False otherwise.
"""
print(f"\n--- Starting Position Removal Process (Initial Trigger Region: {trigger_bubble_region}) ---")
# --- Re-locate Bubble First ---
print("Attempting to re-locate bubble using snapshot before removing position...")
# If bubble_snapshot is None, try to create one from the trigger_bubble_region
if bubble_snapshot is None:
print("Bubble snapshot is missing. Attempting to create a new snapshot from the trigger region...")
try:
if trigger_bubble_region and len(trigger_bubble_region) == 4:
bubble_region_tuple = (int(trigger_bubble_region[0]), int(trigger_bubble_region[1]),
int(trigger_bubble_region[2]), int(trigger_bubble_region[3]))
if bubble_region_tuple[2] <= 0 or bubble_region_tuple[3] <= 0:
print(f"Warning: Invalid bubble region {bubble_region_tuple} for taking new snapshot.")
return False
print(f"Taking new screenshot of region: {bubble_region_tuple}")
bubble_snapshot = pyautogui.screenshot(region=bubble_region_tuple)
if bubble_snapshot:
print("Successfully created new bubble snapshot.")
else:
print("Failed to create new bubble snapshot.")
return False
else:
print("Invalid trigger_bubble_region format, cannot create snapshot.")
return False
except Exception as e:
print(f"Error creating new bubble snapshot: {e}")
return False
if search_area is None:
print("Warning: Search area for snapshot is missing. Creating a default search area.")
# Create a default search area centered around the original trigger region
# This creates a search area that's twice the size of the original bubble
if trigger_bubble_region and len(trigger_bubble_region) == 4:
x, y, width, height = trigger_bubble_region
# Expand by 100% in each direction
search_x = max(0, x - width//2)
search_y = max(0, y - height//2)
search_width = width * 2
search_height = height * 2
search_area = (search_x, search_y, search_width, search_height)
print(f"Created default search area based on bubble region: {search_area}")
else:
# If no valid trigger_bubble_region, default to full screen search
search_area = None # Set search_area to None for full screen search
print(f"Using full screen search as fallback.")
# Try to locate the bubble with decreasing confidence levels if needed
new_bubble_box = None
# Determine the region to search: use provided search_area or None for full screen
region_to_search = search_area
print(f"Attempting bubble location. Search Region: {'Full Screen' if region_to_search is None else region_to_search}")
# First attempt with standard confidence
print(f"First attempt with confidence {BUBBLE_RELOCATE_CONFIDENCE}...")
try:
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=region_to_search,
confidence=BUBBLE_RELOCATE_CONFIDENCE)
except Exception as e:
print(f"Exception during initial bubble location attempt: {e}")
# Second attempt with fallback confidence if first failed
if not new_bubble_box:
print(f"First attempt failed. Trying with lower confidence {BUBBLE_RELOCATE_FALLBACK_CONFIDENCE}...")
try:
# Try with a lower confidence threshold
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=region_to_search,
confidence=BUBBLE_RELOCATE_FALLBACK_CONFIDENCE)
except Exception as e:
print(f"Exception during fallback bubble location attempt: {e}")
# Third attempt with even lower confidence as last resort
if not new_bubble_box:
print("Second attempt failed. Trying with even lower confidence 0.4...")
try:
# Last resort with very low confidence
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=region_to_search,
confidence=0.4)
except Exception as e:
print(f"Exception during last resort bubble location attempt: {e}")
# If we still can't find the bubble using snapshot, try re-detecting bubbles
if not new_bubble_box:
print("Snapshot location failed. Attempting secondary fallback: Re-detecting bubbles...")
try:
# Helper function to calculate distance - define it here or move globally if used elsewhere
def calculate_distance(p1, p2):
return ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5
current_bubbles_info = detector.find_dialogue_bubbles()
non_bot_bubbles = [b for b in current_bubbles_info if not b.get('is_bot')]
if non_bot_bubbles and trigger_bubble_region and len(trigger_bubble_region) == 4:
original_tl = (trigger_bubble_region[0], trigger_bubble_region[1])
closest_bubble = None
min_distance = float('inf')
MAX_ALLOWED_DISTANCE = 150 # Example threshold: Don't match bubbles too far away
for bubble_info in non_bot_bubbles:
bubble_bbox = bubble_info.get('bbox')
if bubble_bbox:
current_tl = (bubble_bbox[0], bubble_bbox[1])
distance = calculate_distance(original_tl, current_tl)
if distance < min_distance:
min_distance = distance
closest_bubble = bubble_info
if closest_bubble and min_distance <= MAX_ALLOWED_DISTANCE:
print(f"Found a close bubble via re-detection (Distance: {min_distance:.2f}). Using its bbox.")
bbox = closest_bubble['bbox']
# Create a dummy box using PyAutoGUI's Box class or a similar structure
from collections import namedtuple
Box = namedtuple('Box', ['left', 'top', 'width', 'height'])
new_bubble_box = Box(left=bbox[0], top=bbox[1], width=bbox[2]-bbox[0], height=bbox[3]-bbox[1])
print(f"Created fallback bubble box from re-detected bubble: {new_bubble_box}")
else:
print(f"Re-detection fallback failed: No close bubble found (Min distance: {min_distance:.2f} > Threshold: {MAX_ALLOWED_DISTANCE}).")
else:
print("Re-detection fallback failed: No non-bot bubbles found or invalid trigger region.")
except Exception as redetect_err:
print(f"Error during bubble re-detection fallback: {redetect_err}")
# Final fallback: If STILL no bubble box, use original trigger region
if not new_bubble_box:
print("All location attempts failed (snapshot & re-detection). Using original trigger region as last resort.")
if trigger_bubble_region and len(trigger_bubble_region) == 4:
# Create a mock bubble_box from the original region
x, y, width, height = trigger_bubble_region
print(f"Using original trigger region as fallback: {trigger_bubble_region}")
# Create a dummy box using PyAutoGUI's Box class or a similar structure
from collections import namedtuple
Box = namedtuple('Box', ['left', 'top', 'width', 'height'])
new_bubble_box = Box(left=x, top=y, width=width, height=height)
print("Created fallback bubble box from original coordinates.")
else:
print("Error: No original trigger region available for fallback. Aborting position removal.")
return False
# Use the NEW coordinates for all subsequent calculations
bubble_x, bubble_y = new_bubble_box.left, new_bubble_box.top
bubble_w, bubble_h = new_bubble_box.width, new_bubble_box.height
print(f"Successfully re-located bubble at: ({bubble_x}, {bubble_y}, {bubble_w}, {bubble_h})")
# --- End Re-location ---
# 1. Find the closest position icon above the *re-located* bubble
search_height_pixels = 50 # Search exactly 50 pixels above as requested
search_region_y_end = bubble_y # Use re-located Y
search_region_y_start = max(0, bubble_y - search_height_pixels) # Search 50 pixels above
search_region_x_start = max(0, bubble_x - 100) # Keep horizontal search wide
search_region_x_end = bubble_x + bubble_w + 100
search_region_width = search_region_x_end - search_region_x_start
search_region_height = search_region_y_end - search_region_y_start
# Ensure region has positive width and height
if search_region_width <= 0 or search_region_height <= 0:
print(f"Error: Invalid search region calculated for position icons: width={search_region_width}, height={search_region_height}")
return False
search_region = (search_region_x_start, search_region_y_start, search_region_width, search_region_height)
print(f"Searching for position icons in region: {search_region}")
position_templates = {
'DEVELOPMENT': POS_DEV_IMG, 'INTERIOR': POS_INT_IMG, 'SCIENCE': POS_SCI_IMG,
'SECURITY': POS_SEC_IMG, 'STRATEGY': POS_STR_IMG
}
found_positions = []
position_icon_confidence = 0.8 # Slightly increased confidence (was 0.75)
for name, path in position_templates.items():
# Use unique keys for detector templates
locations = detector._find_template(name.lower() + '_pos', confidence=position_icon_confidence, region=search_region)
for loc in locations:
found_positions.append({'name': name, 'coords': loc, 'path': path})
if not found_positions:
print("Error: No position icons found near the trigger bubble.")
return False
# Find the closest one to the bubble's top-center
bubble_top_center_x = bubble_x + bubble_w // 2
bubble_top_center_y = bubble_y
closest_position = min(found_positions, key=lambda p:
(p['coords'][0] - bubble_top_center_x)**2 + (p['coords'][1] - bubble_top_center_y)**2)
target_position_name = closest_position['name']
print(f"Found pending position: |{target_position_name}| at {closest_position['coords']}")
# 2. Click user avatar (offset from *re-located* bubble top-left)
# --- MODIFIED: Use specific offsets for remove_position command as requested ---
avatar_click_x = bubble_x + AVATAR_OFFSET_X_REPLY # Use -45 offset
avatar_click_y = bubble_y + AVATAR_OFFSET_Y_REPLY # Use +10 offset
print(f"Clicking avatar for position removal at calculated position: ({avatar_click_x}, {avatar_click_y}) using offsets ({AVATAR_OFFSET_X_REPLY}, {AVATAR_OFFSET_Y_REPLY}) from re-located bubble top-left ({bubble_x}, {bubble_y})")
# --- END MODIFICATION ---
interactor.click_at(avatar_click_x, avatar_click_y)
time.sleep(0.15) # Wait for profile page
# 3. Verify Profile Page and Click Capitol Button
if not detector._find_template('profile_page', confidence=detector.state_confidence):
print("Error: Failed to verify Profile Page after clicking avatar.")
perform_state_cleanup(detector, interactor) # Attempt cleanup
return False
print("Profile page verified.")
capitol_button_locs = detector._find_template('capitol_button', confidence=0.8)
if not capitol_button_locs:
print("Error: Capitol button (#11) not found on profile page.")
perform_state_cleanup(detector, interactor)
return False
interactor.click_at(capitol_button_locs[0][0], capitol_button_locs[0][1])
print("Clicked Capitol button.")
time.sleep(0.15) # Wait for capitol page
# 4. Verify Capitol Page
if not detector._find_template('president_title', confidence=detector.state_confidence):
print("Error: Failed to verify Capitol Page (President Title not found).")
perform_state_cleanup(detector, interactor)
return False
print("Capitol page verified.")
# 5. Find and Click Corresponding Position Button
position_button_templates = {
'DEVELOPMENT': 'pos_btn_dev', 'INTERIOR': 'pos_btn_int', 'SCIENCE': 'pos_btn_sci',
'SECURITY': 'pos_btn_sec', 'STRATEGY': 'pos_btn_str'
}
target_button_key = position_button_templates.get(target_position_name)
if not target_button_key:
print(f"Error: Internal error - unknown position name '{target_position_name}'")
perform_state_cleanup(detector, interactor)
return False
pos_button_locs = detector._find_template(target_button_key, confidence=0.8)
if not pos_button_locs:
print(f"Error: Position button for '{target_position_name}' not found on Capitol page.")
perform_state_cleanup(detector, interactor)
return False
interactor.click_at(pos_button_locs[0][0], pos_button_locs[0][1])
print(f"Clicked '{target_position_name}' position button.")
time.sleep(0.15) # Wait for position page
# 6. Verify Position Page
position_page_templates = {
'DEVELOPMENT': 'page_dev', 'INTERIOR': 'page_int', 'SCIENCE': 'page_sci',
'SECURITY': 'page_sec', 'STRATEGY': 'page_str'
}
target_page_key = position_page_templates.get(target_position_name)
if not target_page_key:
print(f"Error: Internal error - unknown position name '{target_position_name}' for page verification")
perform_state_cleanup(detector, interactor)
return False
if not detector._find_template(target_page_key, confidence=detector.state_confidence):
print(f"Error: Failed to verify correct position page for '{target_position_name}'.")
perform_state_cleanup(detector, interactor)
return False
print(f"Verified '{target_position_name}' position page.")
# 7. Find and Click Dismiss Button
dismiss_locs = detector._find_template('dismiss_button', confidence=0.8)
if not dismiss_locs:
print("Error: Dismiss button not found on position page.")
perform_state_cleanup(detector, interactor)
return False
interactor.click_at(dismiss_locs[0][0], dismiss_locs[0][1])
print("Clicked Dismiss button.")
time.sleep(0.1) # Wait for confirmation
# 8. Find and Click Confirm Button
confirm_locs = detector._find_template('confirm_button', confidence=0.8)
if not confirm_locs:
print("Error: Confirm button not found after clicking dismiss.")
# Don't cleanup here, might be stuck in confirmation state
return False # Indicate failure, but let main loop decide next step
interactor.click_at(confirm_locs[0][0], confirm_locs[0][1])
print("Clicked Confirm button. Position should be dismissed.")
time.sleep(0.05) # Wait for action to complete (Reduced from 0.1)
# 9. Cleanup: Return to Chat Room
# Click Close on position page (should now be back on capitol page implicitly)
close_locs = detector._find_template('close_button', confidence=0.8)
if close_locs:
interactor.click_at(close_locs[0][0], close_locs[0][1])
print("Clicked Close button (returning to Capitol).")
time.sleep(0.05) # Reduced from 0.1
else:
print("Warning: Close button not found after confirm, attempting back arrow anyway.")
# Click Back Arrow on Capitol page (should return to profile)
back_arrow_locs = detector._find_template('back_arrow', confidence=0.8)
if back_arrow_locs:
interactor.click_at(back_arrow_locs[0][0], back_arrow_locs[0][1])
print("Clicked Back Arrow (returning to Profile).")
time.sleep(0.05) # Reduced from 0.1
else:
print("Warning: Back arrow not found on Capitol page, attempting ESC cleanup.")
# Use standard ESC cleanup
print("Initiating final ESC cleanup to return to chat...")
cleanup_success = perform_state_cleanup(detector, interactor)
if cleanup_success:
print("--- Position Removal Process Completed Successfully ---")
return True
else:
print("--- Position Removal Process Completed, but failed to confirm return to chat room ---")
return False # Technically removed, but UI state uncertain
# ==============================================================================
# Coordinator Logic (Placeholder - To be implemented in main.py)
# ==============================================================================
# --- State-based Cleanup Function (To be called by Coordinator) ---
def perform_state_cleanup(detector: DetectionModule, interactor: InteractionModule, max_attempts: int = 4) -> bool:
"""
Attempt to return to the main chat room interface by pressing ESC based on detected state.
Returns True if confirmed back in chat room, False otherwise.
"""
print("Performing cleanup: Attempting to press ESC to return to chat interface...")
returned_to_chat = False
for attempt in range(max_attempts):
print(f"Cleanup attempt #{attempt + 1}/{max_attempts}")
time.sleep(0.1)
current_state = detector.get_current_ui_state()
print(f"Detected state: {current_state}")
if current_state == 'chat_room' or current_state == 'world_chat' or current_state == 'private_chat': # Adjust as needed
print("Chat room interface detected, cleanup complete.")
returned_to_chat = True
break
elif current_state == 'user_details' or current_state == 'profile_card':
print(f"{current_state.replace('_', ' ').title()} detected, pressing ESC...")
interactor.press_key('esc')
time.sleep(0.1) # Wait longer for UI response after ESC
continue
else: # Unknown state
print("Unknown page state detected.")
if attempt < max_attempts - 1:
print("Trying one ESC press as fallback...")
interactor.press_key('esc')
time.sleep(0.1)
else:
print("Maximum attempts reached, stopping cleanup.")
break
if not returned_to_chat:
print("Warning: Could not confirm return to chat room interface via state detection.")
return returned_to_chat
# --- UI Monitoring Loop Function (To be run in a separate thread) ---
def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queue):
"""
Continuously monitors the UI, detects triggers, performs interactions,
puts trigger data into trigger_queue, and processes commands from command_queue.
"""
print("\n--- Starting UI Monitoring Loop (Thread) ---")
# --- Initialization (Instantiate modules within the thread) ---
# Load templates directly using constants defined in this file for now
# Consider passing config or a template loader object in the future
templates = {
# Regular Bubble (Original + Skins) - Keys match those used in find_dialogue_bubbles
'corner_tl': CORNER_TL_IMG, 'corner_br': CORNER_BR_IMG,
'corner_tl_type2': CORNER_TL_TYPE2_IMG, 'corner_br_type2': CORNER_BR_TYPE2_IMG,
'corner_tl_type3': CORNER_TL_TYPE3_IMG, 'corner_br_type3': CORNER_BR_TYPE3_IMG,
'corner_tl_type4': CORNER_TL_TYPE4_IMG, 'corner_br_type4': CORNER_BR_TYPE4_IMG, # Added type4
# Bot Bubble (Single Type)
'bot_corner_tl': BOT_CORNER_TL_IMG, 'bot_corner_br': BOT_CORNER_BR_IMG,
# Keywords & UI Elements
'keyword_wolf_lower': KEYWORD_wolf_LOWER_IMG,
'keyword_wolf_upper': KEYWORD_Wolf_UPPER_IMG,
'keyword_wolf_lower_type2': KEYWORD_wolf_LOWER_TYPE2_IMG,
'keyword_wolf_upper_type2': KEYWORD_Wolf_UPPER_TYPE2_IMG,
'keyword_wolf_lower_type3': KEYWORD_wolf_LOWER_TYPE3_IMG,
'keyword_wolf_upper_type3': KEYWORD_Wolf_UPPER_TYPE3_IMG,
'keyword_wolf_lower_type4': KEYWORD_wolf_LOWER_TYPE4_IMG, # Added type4
'keyword_wolf_upper_type4': KEYWORD_Wolf_UPPER_TYPE4_IMG, # Added type4
# --- Add Reply Keywords ---
'keyword_wolf_reply': KEYWORD_WOLF_REPLY_IMG,
'keyword_wolf_reply_type2': KEYWORD_WOLF_REPLY_TYPE2_IMG,
'keyword_wolf_reply_type3': KEYWORD_WOLF_REPLY_TYPE3_IMG,
'keyword_wolf_reply_type4': KEYWORD_WOLF_REPLY_TYPE4_IMG,
# --- End Reply Keywords ---
'copy_menu_item': COPY_MENU_ITEM_IMG, 'profile_option': PROFILE_OPTION_IMG,
'copy_name_button': COPY_NAME_BUTTON_IMG, 'send_button': SEND_BUTTON_IMG,
'chat_input': CHAT_INPUT_IMG, 'profile_name_page': PROFILE_NAME_PAGE_IMG,
'profile_page': PROFILE_PAGE_IMG, 'chat_room': CHAT_ROOM_IMG,
'base_screen': BASE_SCREEN_IMG, 'world_map_screen': WORLD_MAP_IMG, # Added for navigation
'world_chat': WORLD_CHAT_IMG, 'private_chat': PRIVATE_CHAT_IMG,
# Add position templates
'development_pos': POS_DEV_IMG, 'interior_pos': POS_INT_IMG, 'science_pos': POS_SCI_IMG,
'security_pos': POS_SEC_IMG, 'strategy_pos': POS_STR_IMG,
# Add capitol templates
'capitol_button': CAPITOL_BUTTON_IMG, 'president_title': PRESIDENT_TITLE_IMG,
'pos_btn_dev': POS_BTN_DEV_IMG, 'pos_btn_int': POS_BTN_INT_IMG, 'pos_btn_sci': POS_BTN_SCI_IMG,
'pos_btn_sec': POS_BTN_SEC_IMG, 'pos_btn_str': POS_BTN_STR_IMG,
'page_dev': PAGE_DEV_IMG, 'page_int': PAGE_INT_IMG, 'page_sci': PAGE_SCI_IMG,
'page_sec': PAGE_SEC_IMG, 'page_str': PAGE_STR_IMG,
'dismiss_button': DISMISS_BUTTON_IMG, 'confirm_button': CONFIRM_BUTTON_IMG,
'close_button': CLOSE_BUTTON_IMG, 'back_arrow': BACK_ARROW_IMG,
'reply_button': REPLY_BUTTON_IMG # Added reply button template key
}
# Use default confidence/region settings from constants
# Detector now loads its own color settings internally based on hardcoded values
detector = DetectionModule(templates,
confidence=CONFIDENCE_THRESHOLD,
state_confidence=STATE_CONFIDENCE_THRESHOLD,
region=SCREENSHOT_REGION)
# Use default input coords/keys from constants
interactor = InteractionModule(detector, input_coords=(CHAT_INPUT_CENTER_X, CHAT_INPUT_CENTER_Y), input_template_key='chat_input', send_button_key='send_button')
# --- State Management (Local to this monitoring thread) ---
last_processed_bubble_info = None # Store the whole dict now
recent_texts = collections.deque(maxlen=RECENT_TEXT_HISTORY_MAXLEN) # Context-specific history needed
screenshot_counter = 0 # Initialize counter for debug screenshots
main_screen_click_counter = 0 # Counter for consecutive main screen clicks
while True:
# --- Process ALL Pending Commands First ---
commands_processed_this_cycle = False
try:
while True: # Loop to drain the queue
command_data = command_queue.get_nowait() # Check for commands without blocking
commands_processed_this_cycle = True
action = command_data.get('action')
if action == 'send_reply':
text_to_send = command_data.get('text')
if not text_to_send:
print("UI Thread: Received send_reply command with no text.")
continue # Process next command in queue
print(f"UI Thread: Processing command to send reply: '{text_to_send[:50]}...'")
interactor.send_chat_message(text_to_send)
elif action == 'remove_position':
# region = command_data.get('trigger_bubble_region') # This is the old region, keep for reference?
snapshot = command_data.get('bubble_snapshot')
area = command_data.get('search_area')
# Pass all necessary data to the function, including the original region if needed for context
# but the function should primarily use the snapshot for re-location.
original_region = command_data.get('trigger_bubble_region')
if snapshot: # Check for snapshot presence
print(f"UI Thread: Processing command to remove position (Snapshot provided: {'Yes' if snapshot else 'No'})")
success = remove_user_position(detector, interactor, original_region, snapshot, area)
print(f"UI Thread: Position removal attempt finished. Success: {success}")
else:
print("UI Thread: Received remove_position command without necessary snapshot data.")
elif action == 'pause':
if not monitoring_paused_flag[0]: # Avoid redundant prints if already paused
print("UI Thread: Processing pause command. Pausing monitoring.")
monitoring_paused_flag[0] = True
# No continue needed here, let it finish draining queue
elif action == 'resume':
if monitoring_paused_flag[0]: # Avoid redundant prints if already running
print("UI Thread: Processing resume command. Resuming monitoring.")
monitoring_paused_flag[0] = False
# No state reset here, reset_state command handles that
elif action == 'handle_restart_complete': # Added for game monitor restart signal
print("UI Thread: Received 'handle_restart_complete' command. Initiating internal pause/wait/resume sequence.")
# --- Internal Pause/Wait/Resume Sequence ---
if not monitoring_paused_flag[0]: # Only pause if not already paused
print("UI Thread: Pausing monitoring internally for restart.")
monitoring_paused_flag[0] = True
# No need to send command back to main loop, just update flag
print("UI Thread: Waiting 30 seconds for game to stabilize after restart.")
time.sleep(30) # Wait for game to launch and stabilize
print("UI Thread: Resuming monitoring internally after restart wait.")
monitoring_paused_flag[0] = False
# Clear state to ensure fresh detection after restart
recent_texts.clear()
last_processed_bubble_info = None
print("UI Thread: Monitoring resumed and state reset after restart.")
# --- End Internal Sequence ---
elif action == 'clear_history': # Added for F7
print("UI Thread: Processing clear_history command.")
recent_texts.clear()
print("UI Thread: recent_texts cleared.")
elif action == 'reset_state': # Added for F8 resume
print("UI Thread: Processing reset_state command.")
recent_texts.clear()
last_processed_bubble_info = None
print("UI Thread: recent_texts cleared and last_processed_bubble_info reset.")
else:
print(f"UI Thread: Received unknown command: {action}")
except queue.Empty:
# No more commands in the queue for this cycle
if commands_processed_this_cycle:
print("UI Thread: Finished processing commands for this cycle.")
pass
except Exception as cmd_err:
print(f"UI Thread: Error processing command queue: {cmd_err}")
# Consider if pausing is needed on error, maybe not
# --- Now, Check Pause State ---
if monitoring_paused_flag[0]:
# If paused, sleep and skip UI monitoring part
time.sleep(0.1) # Sleep briefly while paused
continue # Go back to check commands again
# --- If not paused, proceed with UI Monitoring ---
# --- Check for Main Screen Navigation ---
try:
base_locs = detector._find_template('base_screen', confidence=0.8)
map_locs = detector._find_template('world_map_screen', confidence=0.8)
if base_locs or map_locs:
print(f"UI Thread: Detected main screen (Base or World Map). Counter: {main_screen_click_counter}")
if main_screen_click_counter < 5:
main_screen_click_counter += 1
print(f"UI Thread: Attempting click #{main_screen_click_counter}/5 to return to chat...")
# Coordinates provided by user (adjust if needed based on actual screen resolution/layout)
target_x, target_y = 600, 1300
interactor.click_at(target_x, target_y)
time.sleep(0.1) # Short delay after click
print("UI Thread: Clicked. Re-checking screen state...")
else:
print("UI Thread: Clicked 5 times, still on main screen. Pressing ESC...")
interactor.press_key('esc')
main_screen_click_counter = 0 # Reset counter after ESC
time.sleep(0.05) # Wait a bit longer after ESC
print("UI Thread: ESC pressed. Re-checking screen state...")
continue # Skip the rest of the loop and re-evaluate state
else:
# Reset counter if not on the main screen
if main_screen_click_counter > 0:
print("UI Thread: Not on main screen, resetting click counter.")
main_screen_click_counter = 0
except Exception as nav_err:
print(f"UI Thread: Error during main screen navigation check: {nav_err}")
# Decide if you want to continue or pause after error
main_screen_click_counter = 0 # Reset counter on error too
# --- Process Commands Second (Non-blocking) ---
# This block seems redundant now as commands are processed at the start of the loop.
# Keeping it commented out for now, can be removed later if confirmed unnecessary.
# try:
# command_data = command_queue.get_nowait() # Check for commands without blocking
# action = command_data.get('action')
# if action == 'send_reply':
# text_to_send = command_data.get('text')
# # reply_context_activated = command_data.get('reply_context_activated', False) # Check if reply context was set
#
# if not text_to_send:
# print("UI Thread: Received send_reply command with no text.")
# continue # Skip if no text
#
# print(f"UI Thread: Received command to send reply: '{text_to_send[:50]}...'")
# # The reply context (clicking bubble + reply button) is now handled *before* putting into queue.
# # So, we just need to send the message directly here.
# # The input field should already be focused and potentially have @Username prefix if reply context was activated.
# interactor.send_chat_message(text_to_send)
#
# elif action == 'remove_position': # <--- Handle new command
# region = command_data.get('trigger_bubble_region')
# if region:
# print(f"UI Thread: Received command to remove position triggered by bubble region: {region}")
# # Call the new UI function
# success = remove_user_position(detector, interactor, region) # Call synchronous function
# print(f"UI Thread: Position removal attempt finished. Success: {success}")
# # Note: No need to send result back unless main thread needs confirmation
# else:
# print("UI Thread: Received remove_position command without trigger_bubble_region.")
# elif action == 'pause': # <--- Handle pause command
# print("UI Thread: Received pause command. Pausing monitoring.")
# monitoring_paused_flag[0] = True
# continue # Immediately pause after receiving command
# elif action == 'resume': # <--- Handle resume command (might be redundant if checked above, but safe)
# print("UI Thread: Received resume command. Resuming monitoring.")
# monitoring_paused_flag[0] = False
# else:
# print(f"UI Thread: Received unknown command: {action}")
# except queue.Empty:
# pass # No command waiting, continue with monitoring
# except Exception as cmd_err:
# print(f"UI Thread: Error processing command queue: {cmd_err}")
# # This block is now part of the command processing loop above
# pass
# --- Verify Chat Room State Before Bubble Detection (Only if NOT paused) ---
try:
# Use a slightly lower confidence maybe, or state_confidence
chat_room_locs = detector._find_template('chat_room', confidence=detector.state_confidence)
if not chat_room_locs:
print("UI Thread: Not in chat room state before bubble detection. Attempting cleanup...")
# Call the existing cleanup function to try and return
perform_state_cleanup(detector, interactor)
# Regardless of cleanup success, restart the loop to re-evaluate state from the top
print("UI Thread: Continuing loop after attempting chat room cleanup.")
time.sleep(0.5) # Small pause after cleanup attempt
continue
# else: # Optional: Log if chat room is confirmed
# print("UI Thread: Chat room state confirmed.")
except Exception as state_check_err:
print(f"UI Thread: Error checking for chat room state: {state_check_err}")
# Decide how to handle error - maybe pause and retry? For now, continue cautiously.
time.sleep(1)
# --- Then Perform UI Monitoring (Bubble Detection) ---
try:
# 1. Detect Bubbles
all_bubbles_data = detector.find_dialogue_bubbles() # Returns list of dicts
if not all_bubbles_data: time.sleep(2); continue
# Filter out bot bubbles
other_bubbles_data = [b_info for b_info in all_bubbles_data if not b_info['is_bot']]
if not other_bubbles_data: time.sleep(0.2); continue
# Sort bubbles from bottom to top (based on bottom Y coordinate)
sorted_bubbles = sorted(other_bubbles_data, key=lambda b_info: b_info['bbox'][3], reverse=True)
# Iterate through sorted bubbles (bottom to top)
for target_bubble_info in sorted_bubbles:
target_bbox = target_bubble_info['bbox']
bubble_region = (target_bbox[0], target_bbox[1], target_bbox[2]-target_bbox[0], target_bbox[3]-target_bbox[1])
# 3. Detect Keyword in Bubble
keyword_coords = detector.find_keyword_in_region(bubble_region)
if keyword_coords:
print(f"\n!!! Keyword detected in bubble {target_bbox} !!!")
# --- Determine if it's a reply keyword for offset ---
is_reply_keyword = False
reply_keyword_keys = ['keyword_wolf_reply', 'keyword_wolf_reply_type2', 'keyword_wolf_reply_type3', 'keyword_wolf_reply_type4']
for key in reply_keyword_keys:
reply_locs = detector._find_template(key, region=bubble_region, grayscale=False, confidence=detector.confidence)
if reply_locs:
for loc in reply_locs:
if abs(keyword_coords[0] - loc[0]) <= 2 and abs(keyword_coords[1] - loc[1]) <= 2:
print(f"Confirmed detected keyword at {keyword_coords} matches reply keyword template '{key}' at {loc}.")
is_reply_keyword = True
break
if is_reply_keyword:
break
# Calculate click coordinates with potential offset
click_coords = keyword_coords
if is_reply_keyword:
click_coords = (keyword_coords[0], keyword_coords[1] + 25)
print(f"Applying +25 Y-offset for reply keyword. Click target: {click_coords}")
else:
print(f"Detected keyword is not a reply type. Click target: {click_coords}")
# --- Variables needed later ---
bubble_snapshot = None
search_area = SCREENSHOT_REGION
if search_area is None:
print("Warning: SCREENSHOT_REGION not defined, searching full screen for bubble snapshot.")
# --- Take Snapshot for Re-location ---
try:
bubble_region_tuple = (int(bubble_region[0]), int(bubble_region[1]), int(bubble_region[2]), int(bubble_region[3]))
if bubble_region_tuple[2] <= 0 or bubble_region_tuple[3] <= 0:
print(f"Warning: Invalid bubble region {bubble_region_tuple} for snapshot. Skipping this bubble.")
continue # Skip to next bubble in the loop
bubble_snapshot = pyautogui.screenshot(region=bubble_region_tuple)
if bubble_snapshot is None:
print("Warning: Failed to capture bubble snapshot. Skipping this bubble.")
continue # Skip to next bubble
# --- Save Snapshot for Debugging ---
try:
screenshot_index = (screenshot_counter % MAX_DEBUG_SCREENSHOTS) + 1
screenshot_filename = f"debug_relocation_snapshot_{screenshot_index}.png"
screenshot_path = os.path.join(DEBUG_SCREENSHOT_DIR, screenshot_filename)
print(f"Attempting to save bubble snapshot used for re-location to: {screenshot_path}")
bubble_snapshot.save(screenshot_path)
print(f"Successfully saved bubble snapshot: {screenshot_path}")
screenshot_counter += 1
except Exception as save_err:
print(f"Error saving bubble snapshot to {screenshot_path}: {repr(save_err)}")
except Exception as snapshot_err:
print(f"Error taking initial bubble snapshot: {repr(snapshot_err)}")
continue # Skip to next bubble
# 4. Re-locate bubble *before* copying text
print("Attempting to re-locate bubble before copying text...")
new_bubble_box_for_copy = None
if bubble_snapshot:
try:
# Use standard confidence for this initial critical step
new_bubble_box_for_copy = pyautogui.locateOnScreen(bubble_snapshot,
region=search_area,
confidence=BUBBLE_RELOCATE_CONFIDENCE)
except Exception as e:
print(f"Exception during bubble location before copy: {e}")
if not new_bubble_box_for_copy:
print("Warning: Failed to re-locate bubble before copying text. Skipping this bubble.")
continue # Skip to the next bubble in the outer loop
print(f"Successfully re-located bubble for copy at: {new_bubble_box_for_copy}")
# Define the region based on the re-located bubble to find the keyword again
copy_bubble_region = (new_bubble_box_for_copy.left, new_bubble_box_for_copy.top,
new_bubble_box_for_copy.width, new_bubble_box_for_copy.height)
# Find the keyword *again* within the *new* bubble region to get current coords
current_keyword_coords = detector.find_keyword_in_region(copy_bubble_region)
if not current_keyword_coords:
print("Warning: Keyword not found in the re-located bubble region. Skipping this bubble.")
continue # Skip to the next bubble
# Determine if it's a reply keyword based on the *new* location/region
is_reply_keyword_current = False
# (Re-check is_reply_keyword logic here based on current_keyword_coords and copy_bubble_region)
# This check might be complex, for simplicity, we can reuse the 'is_reply_keyword'
# determined earlier based on the initial detection, assuming the keyword type doesn't change.
# Let's reuse the previously determined 'is_reply_keyword' for offset calculation.
click_coords_current = current_keyword_coords
if is_reply_keyword: # Use the flag determined from initial detection
click_coords_current = (current_keyword_coords[0], current_keyword_coords[1] + 25)
print(f"Applying +25 Y-offset for reply keyword (current location). Click target: {click_coords_current}")
else:
print(f"Detected keyword is not a reply type (current location). Click target: {click_coords_current}")
# Interact: Get Bubble Text using current coordinates
bubble_text = interactor.copy_text_at(click_coords_current)
if not bubble_text:
print("Error: Could not get dialogue content for this bubble (after re-location).")
perform_state_cleanup(detector, interactor) # Attempt cleanup
continue # Skip to next bubble
# Check recent text history
if bubble_text in recent_texts:
print(f"Content '{bubble_text[:30]}...' in recent history, skipping this bubble.")
continue # Skip to next bubble
print(">>> New trigger event <<<")
# Add to recent texts *before* potentially long interaction
recent_texts.append(bubble_text)
# 5. Interact: Get Sender Name (uses re-location internally via retrieve_sender_name_interaction)
sender_name = None
try:
# --- Bubble Re-location Logic ---
print("Attempting to re-locate bubble before getting sender name...")
if bubble_snapshot is None:
print("Error: Bubble snapshot missing for re-location. Skipping this bubble.")
continue
# Try locating with decreasing confidence
new_bubble_box = None
confidences_to_try = [BUBBLE_RELOCATE_CONFIDENCE, BUBBLE_RELOCATE_FALLBACK_CONFIDENCE, 0.4]
for conf in confidences_to_try:
print(f"Attempting location with confidence {conf}...")
try:
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=search_area,
confidence=conf)
if new_bubble_box:
print(f"Successfully located with confidence {conf}.")
break # Found it
except Exception as e:
print(f"Exception during location attempt with confidence {conf}: {e}")
# --- End Confidence Loop ---
if new_bubble_box:
new_tl_x, new_tl_y = new_bubble_box.left, new_bubble_box.top
print(f"Successfully re-located bubble snapshot at: ({new_tl_x}, {new_tl_y})")
new_avatar_coords = (new_tl_x + AVATAR_OFFSET_X_REPLY, new_tl_y + AVATAR_OFFSET_Y_REPLY)
print(f"Calculated new avatar coordinates for reply context: {new_avatar_coords}")
sender_name = interactor.retrieve_sender_name_interaction(
initial_avatar_coords=new_avatar_coords,
bubble_snapshot=bubble_snapshot,
search_area=search_area
)
else:
print("Warning: Failed to re-locate bubble snapshot after multiple attempts.")
print("Trying direct approach with original bubble coordinates...")
original_tl_coords = target_bubble_info.get('tl_coords')
if original_tl_coords:
fallback_avatar_coords = (original_tl_coords[0] + AVATAR_OFFSET_X_REPLY,
original_tl_coords[1] + AVATAR_OFFSET_Y_REPLY)
print(f"Using fallback avatar coordinates from original detection: {fallback_avatar_coords}")
sender_name = interactor.retrieve_sender_name_interaction(
initial_avatar_coords=fallback_avatar_coords,
bubble_snapshot=bubble_snapshot,
search_area=search_area
)
if not sender_name:
print("Direct approach failed. Skipping this trigger.")
perform_state_cleanup(detector, interactor)
continue # Skip to next bubble
else:
print("No original coordinates available. Skipping sender name retrieval.")
perform_state_cleanup(detector, interactor)
continue # Skip to next bubble
# --- End Bubble Re-location Logic ---
except Exception as reloc_err:
print(f"Error during bubble re-location or subsequent interaction: {reloc_err}")
import traceback
traceback.print_exc()
perform_state_cleanup(detector, interactor)
continue # Skip to next bubble
# 6. Perform Cleanup
cleanup_successful = perform_state_cleanup(detector, interactor)
if not cleanup_successful:
print("Error: Failed to return to chat screen after getting name. Skipping this bubble.")
continue # Skip to next bubble
if not sender_name:
print("Error: Could not get sender name for this bubble, skipping.")
continue # Skip to next bubble
# --- Attempt to activate reply context ---
reply_context_activated = False
try:
print("Attempting to activate reply context...")
if bubble_snapshot is None:
print("Warning: Bubble snapshot missing for reply context activation. Skipping.")
final_bubble_box_for_reply = None
else:
print(f"Attempting final re-location for reply context using search_area: {search_area}")
final_bubble_box_for_reply = pyautogui.locateOnScreen(bubble_snapshot, region=search_area, confidence=BUBBLE_RELOCATE_CONFIDENCE)
if final_bubble_box_for_reply:
print(f"Final re-location successful at: {final_bubble_box_for_reply}")
bubble_x_reply, bubble_y_reply = final_bubble_box_for_reply.left, final_bubble_box_for_reply.top
bubble_w_reply, bubble_h_reply = final_bubble_box_for_reply.width, final_bubble_box_for_reply.height
center_x_reply = bubble_x_reply + bubble_w_reply // 2
center_y_reply = bubble_y_reply + bubble_h_reply // 2
if is_reply_keyword:
center_y_reply += 15
print(f"Applying +15 Y-offset to bubble center click for reply keyword. Target Y: {center_y_reply}")
print(f"Clicking bubble center for reply at ({center_x_reply}, {center_y_reply})")
interactor.click_at(center_x_reply, center_y_reply)
time.sleep(0.15)
print("Searching for reply button...")
reply_button_locs = detector._find_template('reply_button', confidence=0.8)
if reply_button_locs:
reply_coords = reply_button_locs[0]
print(f"Found reply button at {reply_coords}. Clicking...")
interactor.click_at(reply_coords[0], reply_coords[1])
time.sleep(0.07)
reply_context_activated = True
print("Reply context activated.")
else:
print(">>> Reply button template ('reply_button') not found after clicking bubble center. <<<")
else:
print("Warning: Failed to re-locate bubble for activating reply context.")
except Exception as reply_context_err:
print(f"!!! Error during reply context activation: {reply_context_err} !!!")
# 7. Send Trigger Info to Main Thread
print("\n>>> Putting trigger info in Queue <<<")
print(f" Sender: {sender_name}")
print(f" Content: {bubble_text[:100]}...")
print(f" Bubble Region: {bubble_region}") # Original region for context
print(f" Reply Context Activated: {reply_context_activated}")
try:
data_to_send = {
'sender': sender_name,
'text': bubble_text,
'bubble_region': bubble_region, # Send original region for context if needed
'reply_context_activated': reply_context_activated,
'bubble_snapshot': bubble_snapshot, # Send the snapshot used
'search_area': search_area
}
trigger_queue.put(data_to_send)
print("Trigger info (with region, reply flag, snapshot, search_area) placed in Queue.")
# --- CRITICAL: Break loop after successfully processing one trigger ---
print("--- Single bubble processing complete. Breaking scan cycle. ---")
break # Exit the 'for target_bubble_info in sorted_bubbles' loop
except Exception as q_err:
print(f"Error putting data in Queue: {q_err}")
# Don't break if queue put fails, maybe try next bubble? Or log and break?
# Let's break here too, as something is wrong.
print("Breaking scan cycle due to queue error.")
break
# End of keyword found block (if keyword_coords:)
# End of loop through sorted bubbles (for target_bubble_info...)
# If the loop finished without breaking (i.e., no trigger processed), wait the full interval.
# If it broke, the sleep still happens here before the next cycle.
time.sleep(1.5) # Polling interval after checking all bubbles or processing one
except KeyboardInterrupt:
print("\nMonitoring interrupted.")
break
except Exception as e:
print(f"Unknown error in monitoring loop: {e}")
import traceback
traceback.print_exc()
# Attempt cleanup in case of unexpected error during interaction
print("Attempting cleanup after unexpected error...")
perform_state_cleanup(detector, interactor)
print("Waiting 3 seconds before retry...")
time.sleep(3)
# Note: The old monitor_chat_for_trigger function is replaced by the example_coordinator_loop.
# The actual UI monitoring thread started in main.py should call a function like this example loop.
# The main async loop in main.py will handle getting items from the queue and interacting with the LLM.
# if __name__ == '__main__':
# # This module is not meant to be run directly after refactoring.
# # Initialization and coordination happen in main.py.
# pass