Wolf-Chat-for-Lastwar/ui_interaction.py
z060142 2ac63718a9 Implement perceptual image hash deduplication for bubble processing
- Added `simple_bubble_dedup.py` module using perceptual hashing (pHash) to detect duplicate chat bubbles based on visual similarity.
- System keeps recent N (default 5) hashes and skips bubbles with Hamming distance below threshold (default 5).
- Integrated into `run_ui_monitoring_loop()`:
  - Hash is computed upon bubble snapshot capture.
  - Duplicate check occurs before message enqueue.
  - Sender info is optionally attached to matching hash entries after successful processing.
- Deduplication state is persisted in `simple_bubble_dedup.json`.
- F7 (`clear_history`) and F8 (`reset_state`) now also clear image-based hash history.
- Removed or commented out legacy `recent_texts` text-based deduplication logic.

This visual deduplication system reduces false negatives caused by slight text variations and ensures higher confidence in skipping repeated bubble interactions.
2025-05-16 02:02:31 +08:00

2373 lines
130 KiB
Python

# ui_interaction.py
# Refactored to separate Detection and Interaction logic.
import pyautogui
import cv2 # opencv-python
import numpy as np
import sys # Added for special character handling
import io # Added for special character handling
import pyperclip
import time
import os
import collections
import asyncio
import pygetwindow as gw # Used to check/activate windows
import config # Used to read window title
import json # Added for color config loading
import queue
from typing import List, Tuple, Optional, Dict, Any
import threading # Import threading for Lock if needed, or just use a simple flag
import math # Added for distance calculation in dual method
import time # Ensure time is imported for MessageDeduplication
from simple_bubble_dedup import SimpleBubbleDeduplication
class MessageDeduplication:
def __init__(self, expiry_seconds=3600): # 1 hour expiry time
self.processed_messages = {} # {composite_key: timestamp}
self.expiry_seconds = expiry_seconds
def create_key(self, sender, content):
"""Create a standardized composite key."""
# Thoroughly standardize text - remove all whitespace and punctuation, lowercase
clean_content = ''.join(c.lower() for c in content if c.isalnum())
clean_sender = ''.join(c.lower() for c in sender if c.isalnum())
# Truncate content to first 100 chars to prevent overly long keys
if len(clean_content) > 100:
clean_content = clean_content[:100]
return f"{clean_sender}:{clean_content}"
def is_duplicate(self, sender, content):
"""Check if the message is a duplicate within the expiry period."""
if not sender or not content:
return False # Missing necessary info, treat as new message
key = self.create_key(sender, content)
current_time = time.time()
# Check if duplicate and not expired
if key in self.processed_messages:
last_time = self.processed_messages[key]
if current_time - last_time < self.expiry_seconds:
print(f"Deduplicator: Detected duplicate message: {sender} - {content[:20]}...")
return True
# Update processing time
self.processed_messages[key] = current_time
return False
def purge_expired(self):
"""Remove expired message records."""
current_time = time.time()
expired_keys = [k for k, t in self.processed_messages.items()
if current_time - t >= self.expiry_seconds]
for key in expired_keys:
del self.processed_messages[key]
if expired_keys: # Log only if something was purged
print(f"Deduplicator: Purged {len(expired_keys)} expired message records.")
return len(expired_keys)
def clear_all(self):
"""Clear all recorded messages (for F7/F8 functionality)."""
count = len(self.processed_messages)
self.processed_messages.clear()
if count > 0: # Log only if something was cleared
print(f"Deduplicator: Cleared all {count} message records.")
return count
# --- Global Pause Flag ---
# Using a simple mutable object (list) for thread-safe-like access without explicit lock
# Or could use threading.Event()
monitoring_paused_flag = [False] # List containing a boolean
# --- Global Error Handling Setup for Text Encoding ---
def handle_text_encoding(text, default_text="[無法處理的文字]"):
"""安全處理任何文字,確保不會因編碼問題而崩潰程序"""
if text is None:
return default_text
try:
# 嘗試使用 utf-8 編碼
return text
except UnicodeEncodeError:
try:
# 嘗試將特殊字符替換為可顯示字符
return text.encode('utf-8', errors='replace').decode('utf-8')
except:
# 最後手段:忽略任何無法處理的字符
try:
return text.encode('utf-8', errors='ignore').decode('utf-8')
except:
return default_text
# --- Color Config Loading ---
def load_bubble_colors(config_path='bubble_colors.json'):
"""Loads bubble color configuration from a JSON file."""
try:
# Ensure the path is absolute or relative to the script directory
if not os.path.isabs(config_path):
config_path = os.path.join(SCRIPT_DIR, config_path)
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"Successfully loaded color config from {config_path}")
return config.get('bubble_types', [])
except FileNotFoundError:
print(f"Warning: Color config file not found at {config_path}. Using default colors.")
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {config_path}. Using default colors.")
except Exception as e:
print(f"Error loading color config: {e}. Using default colors.")
# Default configuration if loading fails
return [
{
"name": "normal_user",
"is_bot": False, # Corrected boolean value
"hsv_lower": [6, 0, 240],
"hsv_upper": [18, 23, 255],
"min_area": 2500,
"max_area": 300000
},
{
"name": "bot",
"is_bot": True, # Corrected boolean value
"hsv_lower": [105, 9, 208],
"hsv_upper": [116, 43, 243],
"min_area": 2500,
"max_area": 300000
}
]
# --- Configuration Section ---
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
TEMPLATE_DIR = os.path.join(SCRIPT_DIR, "templates")
os.makedirs(TEMPLATE_DIR, exist_ok=True)
# --- Debugging ---
DEBUG_SCREENSHOT_DIR = os.path.join(SCRIPT_DIR, "debug_screenshots")
MAX_DEBUG_SCREENSHOTS = 8
os.makedirs(DEBUG_SCREENSHOT_DIR, exist_ok=True)
DEBUG_LEVEL = 1 # 0=Off, 1=Basic Info, 2=Detailed, 3=Visual Debug
# --- End Debugging ---
# --- Template Paths (Consider moving to config.py or loading dynamically) ---
# Bubble Corners
CORNER_TL_IMG = os.path.join(TEMPLATE_DIR, "corner_tl.png")
# CORNER_TR_IMG = os.path.join(TEMPLATE_DIR, "corner_tr.png") # Unused
# CORNER_BL_IMG = os.path.join(TEMPLATE_DIR, "corner_bl.png") # Unused
CORNER_BR_IMG = os.path.join(TEMPLATE_DIR, "corner_br.png")
# --- Additional Regular Bubble Types (Skins) ---
CORNER_TL_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "corner_tl_type2.png") # Added
CORNER_BR_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "corner_br_type2.png") # Added
CORNER_TL_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "corner_tl_type3.png") # Added
CORNER_BR_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "corner_br_type3.png") # Added
CORNER_TL_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "corner_tl_type4.png") # Added type4
CORNER_BR_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "corner_br_type4.png") # Added type4
# --- End Additional Regular Types ---
BOT_CORNER_TL_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tl.png")
# BOT_CORNER_TR_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tr.png") # Unused
# BOT_CORNER_BL_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_bl.png") # Unused
BOT_CORNER_BR_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_br.png")
# --- Additional Bot Bubble Types (Skins) ---
# Type 2
BOT_CORNER_TL_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tl_type2.png")
BOT_CORNER_BR_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_br_type2.png")
# Type 3
BOT_CORNER_TL_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_tl_type3.png")
BOT_CORNER_BR_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "bot_corner_br_type3.png")
# --- End Additional Types ---
# Keywords (Refactored based on guide)
KEYWORD_wolf_LOWER_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower.png") # Active Core
KEYWORD_Wolf_UPPER_IMG = os.path.join(TEMPLATE_DIR, "keyword_Wolf_upper.png") # Active Core
KEYWORD_WOLF_REPLY_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply.png") # Active Core
# Deprecated but kept for potential legacy fallback or reference
KEYWORD_wolf_LOWER_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower_type2.png") # Deprecated
KEYWORD_Wolf_UPPER_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper_type2.png") # Deprecated
KEYWORD_wolf_LOWER_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower_type3.png") # Deprecated
KEYWORD_Wolf_UPPER_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper_type3.png") # Deprecated
KEYWORD_wolf_LOWER_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_lower_type4.png") # Deprecated
KEYWORD_Wolf_UPPER_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_upper_type4.png") # Deprecated
KEYWORD_WOLF_REPLY_TYPE2_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply_type2.png") # Deprecated
KEYWORD_WOLF_REPLY_TYPE3_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply_type3.png") # Deprecated
KEYWORD_WOLF_REPLY_TYPE4_IMG = os.path.join(TEMPLATE_DIR, "keyword_wolf_reply_type4.png") # Deprecated
# UI Elements
COPY_MENU_ITEM_IMG = os.path.join(TEMPLATE_DIR, "copy_menu_item.png")
PROFILE_OPTION_IMG = os.path.join(TEMPLATE_DIR, "profile_option.png")
COPY_NAME_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "copy_name_button.png")
SEND_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "send_button.png")
CHAT_INPUT_IMG = os.path.join(TEMPLATE_DIR, "chat_input.png")
# 新增的模板路徑
CHAT_OPTION_IMG = os.path.join(TEMPLATE_DIR, "chat_option.png")
UPDATE_CONFIRM_IMG = os.path.join(TEMPLATE_DIR, "update_confirm.png")
# State Detection
PROFILE_NAME_PAGE_IMG = os.path.join(TEMPLATE_DIR, "Profile_Name_page.png")
PROFILE_PAGE_IMG = os.path.join(TEMPLATE_DIR, "Profile_page.png")
CHAT_ROOM_IMG = os.path.join(TEMPLATE_DIR, "chat_room.png")
BASE_SCREEN_IMG = os.path.join(TEMPLATE_DIR, "base.png") # Added for navigation
WORLD_MAP_IMG = os.path.join(TEMPLATE_DIR, "World_map.png") # Added for navigation
# Add World/Private chat identifiers later
WORLD_CHAT_IMG = os.path.join(TEMPLATE_DIR, "World_Label_normal.png") # Example
PRIVATE_CHAT_IMG = os.path.join(TEMPLATE_DIR, "Private_Label_normal.png") # Example
# Position Icons (Near Bubble)
POS_DEV_IMG = os.path.join(TEMPLATE_DIR, "positions", "development.png")
POS_INT_IMG = os.path.join(TEMPLATE_DIR, "positions", "interior.png")
POS_SCI_IMG = os.path.join(TEMPLATE_DIR, "positions", "science.png")
POS_SEC_IMG = os.path.join(TEMPLATE_DIR, "positions", "security.png")
POS_STR_IMG = os.path.join(TEMPLATE_DIR, "positions", "strategy.png")
# Capitol Page Elements
CAPITOL_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "capitol_#11.png")
PRESIDENT_TITLE_IMG = os.path.join(TEMPLATE_DIR, "capitol", "president_title.png")
POS_BTN_DEV_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_development.png")
POS_BTN_INT_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_interior.png")
POS_BTN_SCI_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_science.png")
POS_BTN_SEC_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_security.png")
POS_BTN_STR_IMG = os.path.join(TEMPLATE_DIR, "capitol", "position_strategy.png")
PAGE_DEV_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_DEVELOPMENT.png")
PAGE_INT_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_INTERIOR.png")
PAGE_SCI_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_SCIENCE.png")
PAGE_SEC_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_SECURITY.png")
PAGE_STR_IMG = os.path.join(TEMPLATE_DIR, "capitol", "page_STRATEGY.png")
DISMISS_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "dismiss.png")
CONFIRM_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "confirm.png")
CLOSE_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "capitol", "close_button.png")
BACK_ARROW_IMG = os.path.join(TEMPLATE_DIR, "capitol", "black_arrow_down.png")
REPLY_BUTTON_IMG = os.path.join(TEMPLATE_DIR, "reply_button.png") # Added for reply functionality
# --- Operation Parameters (Consider moving to config.py) ---
CHAT_INPUT_REGION = None # Example: (100, 800, 500, 50)
CHAT_INPUT_CENTER_X = 400
CHAT_INPUT_CENTER_Y = 1280
SCREENSHOT_REGION = (70, 50, 800, 1365) # Updated region
CONFIDENCE_THRESHOLD = 0.9 # Increased threshold for corner matching
STATE_CONFIDENCE_THRESHOLD = 0.9
AVATAR_OFFSET_X = -45 # Original offset, used for non-reply interactions like position removal
# AVATAR_OFFSET_X_RELOCATED = -50 # Replaced by specific reply offsets
AVATAR_OFFSET_X_REPLY = -45 # Horizontal offset for avatar click after re-location (for reply context)
AVATAR_OFFSET_Y_REPLY = 10 # Vertical offset for avatar click after re-location (for reply context)
BUBBLE_RELOCATE_CONFIDENCE = 0.8 # Reduced confidence for finding the bubble snapshot (was 0.9)
BUBBLE_RELOCATE_FALLBACK_CONFIDENCE = 0.6 # Lower confidence for fallback attempts
BBOX_SIMILARITY_TOLERANCE = 10
RECENT_TEXT_HISTORY_MAXLEN = 5 # This state likely belongs in the coordinator
# --- New Constants for Dual Method ---
CLAHE_CLIP_LIMIT = 2.0 # CLAHE enhancement parameter
CLAHE_TILE_SIZE = (8, 8) # CLAHE grid size
MATCH_DISTANCE_THRESHOLD = 10 # Threshold for considering detections as overlapping (pixels)
DUAL_METHOD_CONFIDENCE_THRESHOLD = 0.85 # Confidence threshold for individual methods in dual mode
DUAL_METHOD_HIGH_CONFIDENCE_THRESHOLD = 0.85 # Threshold for accepting single method result directly
DUAL_METHOD_FALLBACK_CONFIDENCE_THRESHOLD = 0.8 # Threshold for accepting single method result in fallback
# --- Helper Function (Module Level) ---
def are_bboxes_similar(bbox1: Optional[Tuple[int, int, int, int]],
bbox2: Optional[Tuple[int, int, int, int]],
tolerance: int = BBOX_SIMILARITY_TOLERANCE) -> bool:
"""Check if two bounding boxes' top-left corners are close."""
if bbox1 is None or bbox2 is None:
return False
# Compare based on bbox top-left (index 0 and 1)
return abs(bbox1[0] - bbox2[0]) <= tolerance and abs(bbox1[1] - bbox2[1]) <= tolerance
# ==============================================================================
# Detection Module
# ==============================================================================
class DetectionModule:
"""Handles finding elements and states on the screen using image recognition or color analysis."""
def __init__(self, templates: Dict[str, str], confidence: float = CONFIDENCE_THRESHOLD,
state_confidence: float = STATE_CONFIDENCE_THRESHOLD,
region: Optional[Tuple[int, int, int, int]] = SCREENSHOT_REGION,
use_dual_method: bool = True): # Added use_dual_method flag
# --- Hardcoded Settings (as per user instruction) ---
self.use_color_detection: bool = True # Set to True to enable color detection by default
self.color_config_path: str = "bubble_colors.json"
# --- End Hardcoded Settings ---
self.templates = templates
self.confidence = confidence # Default confidence for legacy methods
self.state_confidence = state_confidence
self.region = region
self._warned_paths = set()
# --- Dual Method Specific Initialization ---
self.use_dual_method = use_dual_method
self.clahe = cv2.createCLAHE(clipLimit=CLAHE_CLIP_LIMIT, tileGridSize=CLAHE_TILE_SIZE)
self.core_keyword_templates = {k: v for k, v in templates.items()
if k in ['keyword_wolf_lower', 'keyword_Wolf_upper', 'keyword_wolf_reply']}
self.last_detection_method = None
self.last_detection_confidence = 0.0
self.DEBUG_LEVEL = DEBUG_LEVEL # Use global debug level
# Performance Stats
self.performance_stats = {
'total_detections': 0,
'successful_detections': 0,
'gray_only_detections': 0,
'clahe_only_detections': 0,
'dual_method_detections': 0,
'fallback_detections': 0, # Added for fallback tracking
'total_detection_time': 0.0,
'inverted_matches': 0
}
# --- End Dual Method Specific Initialization ---
# Load color configuration if color detection is enabled
self.bubble_colors = []
if self.use_color_detection:
self.bubble_colors = load_bubble_colors(self.color_config_path) # Use internal path
if not self.bubble_colors:
print("Warning: Color detection enabled, but failed to load any color configurations. Color detection might not work.")
print(f"DetectionModule initialized. Color Detection: {'Enabled' if self.use_color_detection else 'Disabled'}. Dual Keyword Method: {'Enabled' if self.use_dual_method else 'Disabled'}")
def _apply_clahe(self, image):
"""Apply CLAHE to enhance image contrast."""
if image is None:
print("Warning: _apply_clahe received None image.")
return None
try:
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy() # Assume already grayscale
enhanced = self.clahe.apply(gray)
return enhanced
except Exception as e:
print(f"Error applying CLAHE: {e}")
# Return original grayscale image on error
return gray if 'gray' in locals() else image
def _find_template(self, template_key: str, confidence: Optional[float] = None, region: Optional[Tuple[int, int, int, int]] = None, grayscale: bool = False) -> List[Tuple[int, int]]:
"""Internal helper to find a template by its key using PyAutoGUI. Returns list of CENTER coordinates (absolute)."""
template_path = self.templates.get(template_key)
if not template_path:
print(f"Error: Template key '{template_key}' not found in provided templates.")
return []
# Check if template file exists, warn only once
if not os.path.exists(template_path):
if template_path not in self._warned_paths:
print(f"Error: Template image doesn't exist: {template_path}")
self._warned_paths.add(template_path)
return []
locations = []
current_region = region if region is not None else self.region
current_confidence = confidence if confidence is not None else self.confidence
try:
# locateAllOnScreen returns Box objects (left, top, width, height)
matches = pyautogui.locateAllOnScreen(template_path, region=current_region, confidence=current_confidence, grayscale=grayscale)
if matches:
for box in matches:
# Calculate center coordinates from the Box object
center_x = box.left + box.width // 2
center_y = box.top + box.height // 2
locations.append((center_x, center_y))
# print(f"Found template '{template_key}' at {len(locations)} locations.") # Debug
return locations
except Exception as e:
print(f"Error finding template '{template_key}' ({template_path}): {e}")
return []
def _find_template_raw(self, template_key: str, confidence: Optional[float] = None, region: Optional[Tuple[int, int, int, int]] = None, grayscale: bool = False) -> List[Tuple[int, int, int, int]]:
"""Internal helper to find a template by its key. Returns list of raw Box tuples (left, top, width, height)."""
template_path = self.templates.get(template_key)
if not template_path:
print(f"Error: Template key '{template_key}' not found in provided templates.")
return []
if not os.path.exists(template_path):
if template_path not in self._warned_paths:
print(f"Error: Template image doesn't exist: {template_path}")
self._warned_paths.add(template_path)
return []
locations = []
current_region = region if region is not None else self.region
current_confidence = confidence if confidence is not None else self.confidence
try:
# --- Temporary Debug Print ---
print(f"DEBUG: Searching for template '{template_key}' with confidence {current_confidence}...")
# --- End Temporary Debug Print ---
matches = pyautogui.locateAllOnScreen(template_path, region=current_region, confidence=current_confidence, grayscale=grayscale)
match_count = 0 # Initialize count
if matches:
for box in matches:
locations.append((box.left, box.top, box.width, box.height))
match_count += 1 # Increment count
# --- Temporary Debug Print ---
print(f"DEBUG: Found {match_count} instance(s) of template '{template_key}'.")
# --- End Temporary Debug Print ---
return locations
except Exception as e:
print(f"Error finding template raw '{template_key}' ({template_path}): {e}")
return []
def find_elements(self, template_keys: List[str], confidence: Optional[float] = None, region: Optional[Tuple[int, int, int, int]] = None) -> Dict[str, List[Tuple[int, int]]]:
"""Find multiple templates by their keys. Returns center coordinates."""
results = {}
for key in template_keys:
results[key] = self._find_template(key, confidence=confidence, region=region)
return results
def find_dialogue_bubbles(self) -> List[Dict[str, Any]]:
"""
Detects dialogue bubbles using either color analysis or template matching,
based on the 'use_color_detection' flag. Includes fallback to template matching.
Returns a list of dictionaries, each containing:
{'bbox': (tl_x, tl_y, br_x, br_y), 'is_bot': bool, 'tl_coords': (tl_x, tl_y)}
"""
# --- Try Color Detection First if Enabled ---
if self.use_color_detection:
print("Attempting bubble detection using color analysis...")
try:
# Use a scale factor of 0.5 for performance
bubbles = self.find_dialogue_bubbles_by_color(scale_factor=0.5)
# If color detection returns results, use them
if bubbles:
print("Color detection successful.")
return bubbles
else:
print("Color detection returned no bubbles. Falling back to template matching.")
except Exception as e:
print(f"Color detection failed with error: {e}. Falling back to template matching.")
import traceback
traceback.print_exc()
else:
print("Color detection disabled. Using template matching.")
# --- Fallback to Template Matching ---
print("Executing template matching for bubble detection...")
all_bubbles_info = []
processed_tls = set() # Keep track of TL corners already used in a bubble
# --- Find ALL Regular Bubble Corners (Raw Coordinates) ---
regular_tl_keys = ['corner_tl', 'corner_tl_type2', 'corner_tl_type3', 'corner_tl_type4'] # Added type4
regular_br_keys = ['corner_br', 'corner_br_type2', 'corner_br_type3', 'corner_br_type4'] # Added type4
bubble_detection_region = (150, 330, 600, 880) # Define the specific region for bubbles
print(f"DEBUG: Using specific region for bubble corner detection: {bubble_detection_region}")
all_regular_tl_boxes = []
for key in regular_tl_keys:
all_regular_tl_boxes.extend(self._find_template_raw(key, region=bubble_detection_region)) # Pass region
all_regular_br_boxes = []
for key in regular_br_keys:
all_regular_br_boxes.extend(self._find_template_raw(key, region=bubble_detection_region)) # Pass region
# --- Find Bot Bubble Corners (Raw Coordinates - Single Type) ---
bot_tl_boxes = self._find_template_raw('bot_corner_tl', region=bubble_detection_region) # Pass region
bot_br_boxes = self._find_template_raw('bot_corner_br', region=bubble_detection_region) # Pass region
# --- Match Regular Bubbles (Any Type TL with Any Type BR) ---
if all_regular_tl_boxes and all_regular_br_boxes:
for tl_box in all_regular_tl_boxes:
tl_coords = (tl_box[0], tl_box[1]) # Extract original TL (left, top)
# Skip if this TL is already part of a matched bubble
if tl_coords in processed_tls: continue
potential_br_box = None
min_y_diff = float('inf') # Prioritize minimum Y difference
# Find the valid BR corner (from any regular type) with the closest Y-coordinate
for br_box in all_regular_br_boxes:
br_coords = (br_box[0], br_box[1]) # BR top-left
# Basic geometric check: BR must be below and to the right of TL
if br_coords[0] > tl_coords[0] + 20 and br_coords[1] > tl_coords[1] + 10:
y_diff = abs(br_coords[1] - tl_coords[1]) # Calculate Y difference
if y_diff < min_y_diff:
potential_br_box = br_box
min_y_diff = y_diff
# Optional: Add a secondary check for X distance if Y diff is the same?
# elif y_diff == min_y_diff:
# if potential_br_box is None or abs(br_coords[0] - tl_coords[0]) < abs(potential_br_box[0] - tl_coords[0]):
# potential_br_box = br_box
if potential_br_box:
# Calculate bbox using TL's top-left and BR's bottom-right
bubble_bbox = (tl_coords[0], tl_coords[1],
potential_br_box[0] + potential_br_box[2], potential_br_box[1] + potential_br_box[3])
all_bubbles_info.append({
'bbox': bubble_bbox,
'is_bot': False,
'tl_coords': tl_coords # Store the original TL coords
})
processed_tls.add(tl_coords) # Mark this TL as used
# --- Match Bot Bubbles (Single Type) ---
if bot_tl_boxes and bot_br_boxes:
for tl_box in bot_tl_boxes:
tl_coords = (tl_box[0], tl_box[1]) # Extract original TL (left, top)
# Skip if this TL is already part of a matched bubble
if tl_coords in processed_tls: continue
potential_br_box = None
min_y_diff = float('inf') # Prioritize minimum Y difference
# Find the valid BR corner with the closest Y-coordinate
for br_box in bot_br_boxes:
br_coords = (br_box[0], br_box[1]) # BR top-left
# Basic geometric check: BR must be below and to the right of TL
if br_coords[0] > tl_coords[0] + 20 and br_coords[1] > tl_coords[1] + 10:
y_diff = abs(br_coords[1] - tl_coords[1]) # Calculate Y difference
if y_diff < min_y_diff:
potential_br_box = br_box
min_y_diff = y_diff
# Optional: Add a secondary check for X distance if Y diff is the same?
# elif y_diff == min_y_diff:
# if potential_br_box is None or abs(br_coords[0] - tl_coords[0]) < abs(potential_br_box[0] - tl_coords[0]):
# potential_br_box = br_box
if potential_br_box:
# Calculate bbox using TL's top-left and BR's bottom-right
bubble_bbox = (tl_coords[0], tl_coords[1],
potential_br_box[0] + potential_br_box[2], potential_br_box[1] + potential_br_box[3])
all_bubbles_info.append({
'bbox': bubble_bbox,
'is_bot': True,
'tl_coords': tl_coords # Store the original TL coords
})
processed_tls.add(tl_coords) # Mark this TL as used
# Note: This logic prioritizes matching regular bubbles first, then bot bubbles.
# Confidence thresholds might need tuning.
print(f"Template matching found {len(all_bubbles_info)} bubbles.") # Added log
return all_bubbles_info
def find_dialogue_bubbles_by_color(self, scale_factor=0.5) -> List[Dict[str, Any]]:
"""
Find dialogue bubbles using color analysis within a specific region.
Applies scaling to improve performance.
Returns a list of dictionaries, each containing:
{'bbox': (tl_x, tl_y, br_x, br_y), 'is_bot': bool, 'tl_coords': (tl_x, tl_y)}
"""
all_bubbles_info = []
# Define the specific region for bubble detection (same as template matching)
bubble_detection_region = (150, 330, 600, 880)
print(f"Using bubble color detection region: {bubble_detection_region}")
try:
# 1. Capture the specified region
screenshot = pyautogui.screenshot(region=bubble_detection_region)
if screenshot is None:
print("Error: Failed to capture screenshot for color detection.")
return []
img = np.array(screenshot)
img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # Convert RGB (from pyautogui) to BGR (for OpenCV)
# 2. Resize for performance
if scale_factor < 1.0:
h, w = img.shape[:2]
new_h, new_w = int(h * scale_factor), int(w * scale_factor)
if new_h <= 0 or new_w <= 0:
print(f"Error: Invalid dimensions after scaling: {new_w}x{new_h}. Using original image.")
img_small = img
current_scale_factor = 1.0
else:
img_small = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_AREA)
print(f"Original resolution: {w}x{h}, Scaled down to: {new_w}x{new_h}")
current_scale_factor = scale_factor
else:
img_small = img
current_scale_factor = 1.0
# 3. Convert to HSV color space
hsv = cv2.cvtColor(img_small, cv2.COLOR_BGR2HSV)
# 4. Process each configured bubble type
if not self.bubble_colors:
print("Error: No bubble color configurations loaded for detection.")
return []
for color_config in self.bubble_colors:
name = color_config.get('name', 'unknown')
is_bot = color_config.get('is_bot', False)
hsv_lower = np.array(color_config.get('hsv_lower', [0,0,0]))
hsv_upper = np.array(color_config.get('hsv_upper', [179,255,255]))
min_area_config = color_config.get('min_area', 3000)
max_area_config = color_config.get('max_area', 100000)
# Adjust area thresholds based on scaling factor
min_area = min_area_config * (current_scale_factor ** 2)
max_area = max_area_config * (current_scale_factor ** 2)
print(f"Processing color type: {name} (Bot: {is_bot}), HSV Lower: {hsv_lower}, HSV Upper: {hsv_upper}, Area: {min_area:.0f}-{max_area:.0f}")
# 5. Create mask based on HSV range
mask = cv2.inRange(hsv, hsv_lower, hsv_upper)
# 6. Morphological operations (Closing) to remove noise and fill holes
kernel = np.ones((3, 3), np.uint8)
mask_closed = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=2) # Increased iterations
# Optional: Dilation to merge nearby parts?
# mask_closed = cv2.dilate(mask_closed, kernel, iterations=1)
# 7. Find connected components
num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats(mask_closed)
# 8. Filter components by area and add to results
for i in range(1, num_labels): # Skip background label 0
area = stats[i, cv2.CC_STAT_AREA]
if min_area <= area <= max_area:
x_s = stats[i, cv2.CC_STAT_LEFT]
y_s = stats[i, cv2.CC_STAT_TOP]
w_s = stats[i, cv2.CC_STAT_WIDTH]
h_s = stats[i, cv2.CC_STAT_HEIGHT]
# Convert coordinates back to original resolution
if current_scale_factor < 1.0:
x = int(x_s / current_scale_factor)
y = int(y_s / current_scale_factor)
width = int(w_s / current_scale_factor)
height = int(h_s / current_scale_factor)
else:
x, y, width, height = x_s, y_s, w_s, h_s
# Adjust coordinates relative to the full screen (add region offset)
x_adjusted = x + bubble_detection_region[0]
y_adjusted = y + bubble_detection_region[1]
bubble_bbox = (x_adjusted, y_adjusted, x_adjusted + width, y_adjusted + height)
tl_coords = (x_adjusted, y_adjusted) # Top-left coords in full screen space
all_bubbles_info.append({
'bbox': bubble_bbox,
'is_bot': is_bot,
'tl_coords': tl_coords
})
print(f" -> Found '{name}' bubble component. Area: {area:.0f} (Scaled). Original Coords: {bubble_bbox}")
except pyautogui.FailSafeException:
print("FailSafe triggered during color detection.")
return []
except Exception as e:
print(f"Error during color-based bubble detection: {e}")
import traceback
traceback.print_exc()
return [] # Return empty list on error
print(f"Color detection found {len(all_bubbles_info)} bubbles.")
return all_bubbles_info
def _find_keyword_legacy(self, region: Tuple[int, int, int, int]) -> Optional[Tuple[int, int]]:
"""
Original find_keyword_in_region implementation using multiple templates and PyAutoGUI.
Kept for backward compatibility or fallback. Returns absolute center coordinates or None.
"""
if region[2] <= 0 or region[3] <= 0: return None # Invalid region width/height
# Define the order of templates to check (legacy approach)
legacy_keyword_templates = [
# Original keywords first
'keyword_wolf_lower', 'keyword_wolf_upper',
# Deprecated keywords next (order might matter based on visual similarity)
'keyword_wolf_lower_type2', 'keyword_wolf_upper_type2',
'keyword_wolf_lower_type3', 'keyword_wolf_upper_type3',
'keyword_wolf_lower_type4', 'keyword_wolf_upper_type4',
# Reply keywords last
'keyword_wolf_reply', 'keyword_wolf_reply_type2',
'keyword_wolf_reply_type3', 'keyword_wolf_reply_type4'
]
for key in legacy_keyword_templates:
# Determine grayscale based on key (example logic, adjust as needed)
# Original logic seemed to use grayscale=True for lower/upper, False otherwise. Let's replicate that.
use_grayscale = ('lower' in key or 'upper' in key) and 'type' not in key and 'reply' not in key
# Use the default confidence defined in __init__ for legacy checks
locations = self._find_template(key, region=region, grayscale=use_grayscale, confidence=self.confidence)
if locations:
print(f"Legacy method found keyword ('{key}') in region {region}, position: {locations[0]}")
return locations[0] # Return the first match found
return None # No keyword found using legacy method
def find_keyword_dual_method(self, region: Tuple[int, int, int, int]) -> Optional[Tuple[int, int]]:
"""
Find keywords using grayscale and CLAHE preprocessed images with OpenCV template matching.
Applies coordinate correction to return absolute screen coordinates.
Returns absolute center coordinates tuple (x, y) or None.
"""
if region is None or len(region) != 4 or region[2] <= 0 or region[3] <= 0:
print(f"Error: Invalid region provided to find_keyword_dual_method: {region}")
return None
start_time = time.time()
region_x, region_y, region_w, region_h = region
try:
screenshot = pyautogui.screenshot(region=region)
if screenshot is None:
print("Error: Failed to capture screenshot for dual method detection.")
return None
img = np.array(screenshot)
img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
except Exception as e:
print(f"Error capturing or converting screenshot in region {region}: {e}")
return None
img_gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
img_clahe = self._apply_clahe(img_gray) # Use helper method
if img_clahe is None:
print("Error: CLAHE preprocessing failed. Cannot proceed with CLAHE matching.")
# Optionally, could proceed with only grayscale matching here, but for simplicity, we return None.
return None
gray_results = []
clahe_results = []
template_types = { # Map core template keys to types
'keyword_wolf_lower': 'standard',
'keyword_Wolf_upper': 'standard',
'keyword_wolf_reply': 'reply'
}
for key, template_path in self.core_keyword_templates.items():
if not os.path.exists(template_path):
if template_path not in self._warned_paths:
print(f"Warning: Core keyword template not found: {template_path}")
self._warned_paths.add(template_path)
continue
template_bgr = cv2.imread(template_path)
if template_bgr is None:
if template_path not in self._warned_paths:
print(f"Warning: Failed to load core keyword template: {template_path}")
self._warned_paths.add(template_path)
continue
template_gray = cv2.cvtColor(template_bgr, cv2.COLOR_BGR2GRAY)
template_clahe = self._apply_clahe(template_gray) # Use helper method
if template_clahe is None:
print(f"Warning: CLAHE preprocessing failed for template {key}. Skipping CLAHE match for this template.")
continue # Skip CLAHE part for this template
h_gray, w_gray = template_gray.shape[:2]
h_clahe, w_clahe = template_clahe.shape[:2]
# --- Grayscale Matching ---
try:
gray_res = cv2.matchTemplate(img_gray, template_gray, cv2.TM_CCOEFF_NORMED)
gray_inv_res = cv2.matchTemplate(img_gray, cv2.bitwise_not(template_gray), cv2.TM_CCOEFF_NORMED)
gray_combined = np.maximum(gray_res, gray_inv_res)
_, gray_max_val, _, gray_max_loc = cv2.minMaxLoc(gray_combined)
if gray_max_val >= DUAL_METHOD_CONFIDENCE_THRESHOLD:
# Calculate relative center
relative_center_x = gray_max_loc[0] + w_gray // 2
relative_center_y = gray_max_loc[1] + h_gray // 2
# *** COORDINATE CORRECTION ***
absolute_center_x = region_x + relative_center_x
absolute_center_y = region_y + relative_center_y
# Check inversion
gray_orig_val = gray_res[gray_max_loc[1], gray_max_loc[0]] # Get value at max_loc from original match
is_inverted = (gray_orig_val < gray_max_val - 0.05)
gray_results.append({
'template': key,
'center': (absolute_center_x, absolute_center_y), # Store absolute coords
'confidence': gray_max_val,
'is_inverted': is_inverted,
'type': template_types.get(key, 'standard')
})
except cv2.error as e:
print(f"OpenCV Error during Grayscale matching for {key}: {e}")
except Exception as e:
print(f"Unexpected Error during Grayscale matching for {key}: {e}")
# --- CLAHE Matching ---
try:
clahe_res = cv2.matchTemplate(img_clahe, template_clahe, cv2.TM_CCOEFF_NORMED)
clahe_inv_res = cv2.matchTemplate(img_clahe, cv2.bitwise_not(template_clahe), cv2.TM_CCOEFF_NORMED)
clahe_combined = np.maximum(clahe_res, clahe_inv_res)
_, clahe_max_val, _, clahe_max_loc = cv2.minMaxLoc(clahe_combined)
if clahe_max_val >= DUAL_METHOD_CONFIDENCE_THRESHOLD:
# Calculate relative center
relative_center_x = clahe_max_loc[0] + w_clahe // 2
relative_center_y = clahe_max_loc[1] + h_clahe // 2
# *** COORDINATE CORRECTION ***
absolute_center_x = region_x + relative_center_x
absolute_center_y = region_y + relative_center_y
# Check inversion
clahe_orig_val = clahe_res[clahe_max_loc[1], clahe_max_loc[0]] # Get value at max_loc from original match
is_inverted = (clahe_orig_val < clahe_max_val - 0.05)
clahe_results.append({
'template': key,
'center': (absolute_center_x, absolute_center_y), # Store absolute coords
'confidence': clahe_max_val,
'is_inverted': is_inverted,
'type': template_types.get(key, 'standard')
})
except cv2.error as e:
print(f"OpenCV Error during CLAHE matching for {key}: {e}")
except Exception as e:
print(f"Unexpected Error during CLAHE matching for {key}: {e}")
# --- Result Merging and Selection ---
elapsed_time = time.time() - start_time
self.performance_stats['total_detections'] += 1
self.performance_stats['total_detection_time'] += elapsed_time
best_match = None
final_result_coords = None
final_template_key = None # 新增:用於儲存最終匹配的範本 key
detection_type = "None" # For stats
if not gray_results and not clahe_results:
if self.DEBUG_LEVEL > 1:
print(f"[Dual Method] No keywords found by either method. Time: {elapsed_time:.3f}s")
self.last_detection_method = None
self.last_detection_confidence = 0.0
return None
# Strategy 1: High-confidence single method result
best_gray = max(gray_results, key=lambda x: x['confidence']) if gray_results else None
best_clahe = max(clahe_results, key=lambda x: x['confidence']) if clahe_results else None
if best_gray and not best_clahe and best_gray['confidence'] >= DUAL_METHOD_HIGH_CONFIDENCE_THRESHOLD:
final_result_coords = best_gray['center']
final_template_key = best_gray['template'] # 新增
self.last_detection_method = "Gray" + (" (Inv)" if best_gray['is_inverted'] else "")
self.last_detection_confidence = best_gray['confidence']
detection_type = "Gray Only (High Conf)"
self.performance_stats['gray_only_detections'] += 1
if best_gray['is_inverted']: self.performance_stats['inverted_matches'] += 1
print(f"[Dual Method] Using high-confidence Gray result: {best_gray['template']} at {final_result_coords} (Conf: {best_gray['confidence']:.2f})")
elif best_clahe and not best_gray and best_clahe['confidence'] >= DUAL_METHOD_HIGH_CONFIDENCE_THRESHOLD:
final_result_coords = best_clahe['center']
final_template_key = best_clahe['template'] # 新增
self.last_detection_method = "CLAHE" + (" (Inv)" if best_clahe['is_inverted'] else "")
self.last_detection_confidence = best_clahe['confidence']
detection_type = "CLAHE Only (High Conf)"
self.performance_stats['clahe_only_detections'] += 1
if best_clahe['is_inverted']: self.performance_stats['inverted_matches'] += 1
print(f"[Dual Method] Using high-confidence CLAHE result: {best_clahe['template']} at {final_result_coords} (Conf: {best_clahe['confidence']:.2f})")
# Strategy 2: Find overlapping results if no high-confidence single result yet
if final_result_coords is None:
best_overlap_match = None
highest_overlap_confidence = 0
for gray_match in gray_results:
for clahe_match in clahe_results:
# Check if templates match (or maybe just type?) - let's stick to same template for now
if gray_match['template'] == clahe_match['template']:
dist = math.sqrt((gray_match['center'][0] - clahe_match['center'][0])**2 +
(gray_match['center'][1] - clahe_match['center'][1])**2)
if dist < MATCH_DISTANCE_THRESHOLD:
# Use average confidence or max? Let's use average.
combined_confidence = (gray_match['confidence'] + clahe_match['confidence']) / 2
if combined_confidence > highest_overlap_confidence:
highest_overlap_confidence = combined_confidence
avg_center = (
(gray_match['center'][0] + clahe_match['center'][0]) // 2,
(gray_match['center'][1] + clahe_match['center'][1]) // 2
)
best_overlap_match = {
'template': gray_match['template'],
'center': avg_center,
'confidence': combined_confidence,
'dist': dist,
'is_inverted': gray_match['is_inverted'] or clahe_match['is_inverted'],
'type': gray_match['type'] # Type should be same
}
if best_overlap_match:
final_result_coords = best_overlap_match['center']
final_template_key = best_overlap_match['template'] # 新增
self.last_detection_method = "Dual Overlap" + (" (Inv)" if best_overlap_match['is_inverted'] else "")
self.last_detection_confidence = best_overlap_match['confidence']
detection_type = "Dual Overlap"
self.performance_stats['dual_method_detections'] += 1
if best_overlap_match['is_inverted']: self.performance_stats['inverted_matches'] += 1
print(f"[Dual Method] Using overlapping result: {best_overlap_match['template']} at {final_result_coords} (Conf: {best_overlap_match['confidence']:.2f}, Dist: {best_overlap_match['dist']:.1f}px)")
# Strategy 3: Fallback to best single result if no overlap found
if final_result_coords is None:
all_results = gray_results + clahe_results
if all_results:
best_overall = max(all_results, key=lambda x: x['confidence'])
# Use a slightly lower threshold for fallback
if best_overall['confidence'] >= DUAL_METHOD_FALLBACK_CONFIDENCE_THRESHOLD:
final_result_coords = best_overall['center']
final_template_key = best_overall['template'] # 新增
method_name = "Gray Fallback" if best_overall in gray_results else "CLAHE Fallback"
method_name += " (Inv)" if best_overall['is_inverted'] else ""
self.last_detection_method = method_name
self.last_detection_confidence = best_overall['confidence']
detection_type = "Fallback"
self.performance_stats['fallback_detections'] += 1 # Track fallbacks
if best_overall in gray_results: self.performance_stats['gray_only_detections'] += 1
else: self.performance_stats['clahe_only_detections'] += 1
if best_overall['is_inverted']: self.performance_stats['inverted_matches'] += 1
print(f"[Dual Method] Using fallback result ({method_name}): {best_overall['template']} at {final_result_coords} (Conf: {best_overall['confidence']:.2f})")
# --- Final Result Handling & Debug ---
if final_result_coords:
self.performance_stats['successful_detections'] += 1
if self.DEBUG_LEVEL >= 3:
# --- Visual Debugging ---
try:
# Create side-by-side comparison of gray and clahe
debug_processed_path = os.path.join(DEBUG_SCREENSHOT_DIR, f"dual_processed_{int(time.time())}.png")
# Ensure images have same height for hstack
h_gray_img, w_gray_img = img_gray.shape[:2]
h_clahe_img, w_clahe_img = img_clahe.shape[:2]
max_h = max(h_gray_img, h_clahe_img)
# Resize if needed (convert to BGR for stacking color images if necessary)
img_gray_bgr = cv2.cvtColor(cv2.resize(img_gray, (int(w_gray_img * max_h / h_gray_img), max_h)), cv2.COLOR_GRAY2BGR)
img_clahe_bgr = cv2.cvtColor(cv2.resize(img_clahe, (int(w_clahe_img * max_h / h_clahe_img), max_h)), cv2.COLOR_GRAY2BGR)
debug_img_processed = np.hstack([img_gray_bgr, img_clahe_bgr])
cv2.imwrite(debug_processed_path, debug_img_processed)
# Draw results on original BGR image
result_img = img_bgr.copy()
# Draw relative centers for visualization within the region
for result in gray_results:
rel_x = result['center'][0] - region_x
rel_y = result['center'][1] - region_y
cv2.circle(result_img, (rel_x, rel_y), 5, (0, 0, 255), -1) # Red = Gray
for result in clahe_results:
rel_x = result['center'][0] - region_x
rel_y = result['center'][1] - region_y
cv2.circle(result_img, (rel_x, rel_y), 5, (0, 255, 0), -1) # Green = CLAHE
# Mark final chosen point (relative)
final_rel_x = final_result_coords[0] - region_x
final_rel_y = final_result_coords[1] - region_y
cv2.circle(result_img, (final_rel_x, final_rel_y), 8, (255, 0, 0), 2) # Blue circle = Final
debug_result_path = os.path.join(DEBUG_SCREENSHOT_DIR, f"dual_result_{int(time.time())}.png")
cv2.imwrite(debug_result_path, result_img)
print(f"[Dual Method Debug] Saved processed image to {debug_processed_path}")
print(f"[Dual Method Debug] Saved result image to {debug_result_path}")
except Exception as debug_e:
print(f"Error during visual debugging image generation: {debug_e}")
# --- End Visual Debugging ---
# Return absolute coordinates and the matched key
return (final_result_coords, final_template_key)
else:
if self.DEBUG_LEVEL > 0: # Log failure only if debug level > 0
print(f"[Dual Method] No sufficiently confident match found. Time: {elapsed_time:.3f}s")
self.last_detection_method = None
self.last_detection_confidence = 0.0
return None # Return None for both coords and key on failure
def find_keyword_in_region(self, region: Tuple[int, int, int, int]) -> Optional[Tuple[Tuple[int, int], str]]:
"""
Wrapper method to find keywords in a region.
Uses either the new dual method or the legacy method based on the 'use_dual_method' flag.
Returns a tuple (absolute_center_coordinates, matched_template_key) or None.
"""
if region is None or len(region) != 4 or region[2] <= 0 or region[3] <= 0:
print(f"Error: Invalid region provided to find_keyword_in_region: {region}")
return None
if self.use_dual_method:
# Directly return the result from the dual method (now returns tuple or None)
return self.find_keyword_dual_method(region)
else:
# Legacy method needs adaptation if we want it to return a key.
# For now, it returns only coords or None. We'll return None for the key part.
legacy_coords = self._find_keyword_legacy(region)
if legacy_coords:
# We don't know the key from the legacy method easily. Return a placeholder or None.
return (legacy_coords, None) # Or maybe a specific string like 'legacy_match'
else:
return None
def print_detection_stats(self):
"""Prints the collected keyword detection performance statistics."""
stats = self.performance_stats
total = stats['total_detections']
successful = stats['successful_detections']
if total == 0:
print("\n=== Keyword Detection Performance Stats ===")
print("No detections recorded yet.")
return
print("\n=== Keyword Detection Performance Stats ===")
print(f"Total Detection Attempts: {total}")
success_rate = (successful / total * 100) if total > 0 else 0
print(f"Successful Detections: {successful} ({success_rate:.1f}%)")
avg_time = (stats['total_detection_time'] / total * 1000) if total > 0 else 0
print(f"Average Detection Time: {avg_time:.2f} ms")
if successful > 0:
dual_pct = stats['dual_method_detections'] / successful * 100
gray_pct = stats['gray_only_detections'] / successful * 100
clahe_pct = stats['clahe_only_detections'] / successful * 100
fallback_pct = stats['fallback_detections'] / successful * 100 # Percentage of successful that were fallbacks
print("\nDetection Method Distribution (Successful Detections):")
print(f" - Dual Overlap: {stats['dual_method_detections']} ({dual_pct:.1f}%)")
print(f" - Gray Only: {stats['gray_only_detections']} ({gray_pct:.1f}%)")
print(f" - CLAHE Only: {stats['clahe_only_detections']} ({clahe_pct:.1f}%)")
# Note: Gray Only + CLAHE Only might include high-confidence singles and fallbacks.
# Fallback count is a subset of Gray/CLAHE Only.
print(f" - Fallback Used:{stats['fallback_detections']} ({fallback_pct:.1f}%)")
if stats['inverted_matches'] > 0:
inv_pct = stats['inverted_matches'] / successful * 100
print(f"\nInverted Matches Detected: {stats['inverted_matches']} ({inv_pct:.1f}%)")
print("==========================================")
def calculate_avatar_coords(self, bubble_tl_coords: Tuple[int, int], offset_x: int = AVATAR_OFFSET_X) -> Tuple[int, int]:
"""
Calculate avatar coordinates based on the EXACT top-left corner coordinates of the bubble.
Uses the Y-coordinate of the TL corner directly.
"""
tl_x, tl_y = bubble_tl_coords[0], bubble_tl_coords[1]
avatar_x = tl_x + offset_x
avatar_y = tl_y # Use the exact Y from the detected TL corner
# print(f"Calculated avatar coordinates using TL {bubble_tl_coords}: ({int(avatar_x)}, {int(avatar_y)})") # Reduce noise
return (int(avatar_x), int(avatar_y))
def get_current_ui_state(self) -> str:
"""Determine the current UI state based on visible elements."""
# Check in order of specificity or likelihood
if self._find_template('profile_name_page', confidence=self.state_confidence):
return 'user_details'
if self._find_template('profile_page', confidence=self.state_confidence):
return 'profile_card'
# Add checks for world/private chat later
if self._find_template('world_chat', confidence=self.state_confidence): # Example
return 'world_chat'
if self._find_template('private_chat', confidence=self.state_confidence): # Example
return 'private_chat'
if self._find_template('chat_room', confidence=self.state_confidence):
return 'chat_room' # General chat room if others aren't found
return 'unknown'
# ==============================================================================
# Interaction Module
# ==============================================================================
class InteractionModule:
"""Handles performing actions on the UI like clicking, typing, clipboard."""
def __init__(self, detector: DetectionModule, input_coords: Tuple[int, int] = (CHAT_INPUT_CENTER_X, CHAT_INPUT_CENTER_Y), input_template_key: Optional[str] = 'chat_input', send_button_key: str = 'send_button'):
self.detector = detector
self.default_input_coords = input_coords
self.input_template_key = input_template_key
self.send_button_key = send_button_key
print("InteractionModule initialized.")
def click_at(self, x: int, y: int, button: str = 'left', clicks: int = 1, interval: float = 0.1, duration: float = 0.1):
"""Safely click at specific coordinates."""
try:
print(f"Moving to and clicking at: ({x}, {y}), button: {button}, clicks: {clicks}")
pyautogui.moveTo(x, y, duration=duration)
pyautogui.click(button=button, clicks=clicks, interval=interval)
time.sleep(0.1)
except Exception as e:
print(f"Error clicking at coordinates ({x}, {y}): {e}")
def press_key(self, key: str, presses: int = 1, interval: float = 0.1):
"""Press a specific key."""
try:
print(f"Pressing key: {key} ({presses} times)")
for _ in range(presses):
pyautogui.press(key)
time.sleep(interval)
except Exception as e:
print(f"Error pressing key '{key}': {e}")
def hotkey(self, *args):
"""Press a key combination (e.g., 'ctrl', 'c')."""
try:
print(f"Pressing hotkey: {args}")
pyautogui.hotkey(*args)
time.sleep(0.1) # Short pause after hotkey
except Exception as e:
print(f"Error pressing hotkey {args}: {e}")
def get_clipboard(self) -> Optional[str]:
"""Get text from clipboard."""
try:
return pyperclip.paste()
except Exception as e:
print(f"Error reading clipboard: {e}")
return None
def set_clipboard(self, text: str):
"""Set clipboard text."""
try:
pyperclip.copy(text)
except Exception as e:
print(f"Error writing to clipboard: {e}")
def copy_text_at(self, coords: Tuple[int, int]) -> Optional[str]:
"""Attempt to copy text after clicking at given coordinates."""
print(f"Attempting to copy text at {coords}...")
original_clipboard = self.get_clipboard() or ""
self.set_clipboard("___MCP_CLEAR___")
time.sleep(0.1)
self.click_at(coords[0], coords[1])
time.sleep(0.1) # Wait for menu/reaction
copied = False
# Try finding "Copy" menu item first
copy_item_locations = self.detector._find_template('copy_menu_item', confidence=0.7) # Use detector
if copy_item_locations:
copy_coords = copy_item_locations[0]
self.click_at(copy_coords[0], copy_coords[1])
print("Clicked 'Copy' menu item.")
time.sleep(0.15)
copied = True
else:
print("'Copy' menu item not found. Attempting Ctrl+C.")
try:
self.hotkey('ctrl', 'c')
time.sleep(0.1)
print("Simulated Ctrl+C.")
copied = True
except Exception as e_ctrlc:
print(f"Failed to simulate Ctrl+C: {e_ctrlc}")
copied = False
copied_text = self.get_clipboard()
self.set_clipboard(original_clipboard) # Restore clipboard
if copied and copied_text and copied_text != "___MCP_CLEAR___":
print(f"Successfully copied text, length: {len(copied_text)}")
# 添加編碼安全處理
try:
safe_text = handle_text_encoding(copied_text.strip())
return safe_text
except Exception as e:
print(f"Error handling copied text encoding: {str(e)}")
return copied_text.strip() # 即使有問題也嘗試返回原始文字
else:
print("Error: Copy operation unsuccessful or clipboard content invalid.")
return None
def retrieve_sender_name_interaction(self,
initial_avatar_coords: Tuple[int, int],
bubble_snapshot: Any, # PIL Image object
search_area: Optional[Tuple[int, int, int, int]]) -> Optional[str]:
"""
Perform the sequence of actions to copy sender name, *without* cleanup.
Includes retries with bubble re-location if the initial avatar click fails.
Returns the name or None if failed.
"""
print(f"Attempting interaction to get username, initial avatar guess: {initial_avatar_coords}...")
original_clipboard = self.get_clipboard() or ""
self.set_clipboard("___MCP_CLEAR___")
time.sleep(0.1)
sender_name = None
profile_page_found = False
current_avatar_coords = initial_avatar_coords
for attempt in range(3): # Retry up to 3 times
print(f"Attempt #{attempt + 1} to click avatar and find profile page...")
# --- Re-locate bubble on retries ---
if attempt > 0:
print("Re-locating bubble before retry...")
if bubble_snapshot is None:
print("Error: Cannot retry re-location, bubble snapshot is missing.")
break # Cannot retry without snapshot
new_bubble_box_retry = pyautogui.locateOnScreen(bubble_snapshot, region=search_area, confidence=BUBBLE_RELOCATE_CONFIDENCE)
if new_bubble_box_retry:
new_tl_x_retry, new_tl_y_retry = new_bubble_box_retry.left, new_bubble_box_retry.top
print(f"Successfully re-located bubble snapshot for retry at: ({new_tl_x_retry}, {new_tl_y_retry})")
# Recalculate avatar coords for the retry
current_avatar_coords = (new_tl_x_retry + AVATAR_OFFSET_X_REPLY, new_tl_y_retry + AVATAR_OFFSET_Y_REPLY)
print(f"Recalculated avatar coordinates for retry: {current_avatar_coords}")
else:
print("Warning: Failed to re-locate bubble snapshot on retry. Aborting name retrieval.")
break # Stop retrying if bubble can't be found
# --- Click Avatar ---
try:
self.click_at(current_avatar_coords[0], current_avatar_coords[1])
time.sleep(0.15) # Slightly longer wait after click to allow UI to update
except Exception as click_err:
print(f"Error clicking avatar at {current_avatar_coords} on attempt {attempt + 1}: {click_err}")
time.sleep(0.3) # Wait a bit longer after a click error before retrying
continue # Go to next attempt
# --- Check for Profile Page ---
if self.detector._find_template('profile_page', confidence=self.detector.state_confidence):
print("Profile page verified.")
profile_page_found = True
break # Success, exit retry loop
else:
print(f"Profile page not found after click attempt {attempt + 1}.")
# Optional: Press ESC once to close potential wrong menus before retrying?
# self.press_key('esc')
# time.sleep(0.1)
time.sleep(0.3) # Wait before next attempt
# --- If Profile Page was found, proceed ---
if profile_page_found:
try:
# 2. Find and click profile option
profile_option_locations = self.detector._find_template('profile_option', confidence=0.7)
if not profile_option_locations:
print("Error: User details option not found on profile card.")
return None # Fail early if critical step missing
self.click_at(profile_option_locations[0][0], profile_option_locations[0][1])
print("Clicked user details option.")
time.sleep(0.1) # Wait for user details window
# 3. Find and click "Copy Name" button
copy_name_locations = self.detector._find_template('copy_name_button', confidence=0.7)
if not copy_name_locations:
print("Error: 'Copy Name' button not found in user details.")
return None # Fail early
self.click_at(copy_name_locations[0][0], copy_name_locations[0][1])
print("Clicked 'Copy Name' button.")
time.sleep(0.1)
# 4. Get name from clipboard
copied_name = self.get_clipboard()
if copied_name and copied_name != "___MCP_CLEAR___":
print(f"Successfully copied username: {copied_name}")
sender_name = copied_name.strip()
else:
print("Error: Clipboard content invalid after clicking copy name.")
sender_name = None
except Exception as e:
print(f"Error during username retrieval interaction (after profile page found): {e}")
import traceback
traceback.print_exc()
sender_name = None # Ensure None is returned on error
else:
print("Failed to verify profile page after multiple attempts.")
sender_name = None
# --- Final Cleanup & Return ---
self.set_clipboard(original_clipboard) # Restore clipboard
# NO cleanup logic (like ESC) here - should be handled by coordinator after this function returns
return sender_name
def send_chat_message(self, reply_text: str) -> bool:
"""Paste text into chat input and send it."""
print("Preparing to send response...")
if not reply_text:
print("Error: Response content is empty, cannot send.")
return False
# Find input box coordinates
input_coords = self.default_input_coords # Fallback
if self.input_template_key and self.detector.templates.get(self.input_template_key):
input_locations = self.detector._find_template(self.input_template_key, confidence=0.7)
if input_locations:
input_coords = input_locations[0]
print(f"Found input box position via image: {input_coords}")
else:
print(f"Warning: Input box template '{self.input_template_key}' not found, using default coordinates.")
else:
print("Warning: Input box template key not set or image missing, using default coordinates.")
# Click input, paste, send
self.click_at(input_coords[0], input_coords[1])
time.sleep(0.1)
print("Pasting response...")
self.set_clipboard(reply_text)
time.sleep(0.1)
try:
self.hotkey('ctrl', 'v')
time.sleep(0.1)
print("Pasted.")
except Exception as e:
print(f"Error pasting response: {e}")
return False
# Try clicking send button first
send_button_locations = self.detector._find_template(self.send_button_key, confidence=0.7)
if send_button_locations:
send_coords = send_button_locations[0]
self.click_at(send_coords[0], send_coords[1])
print("Clicked send button.")
time.sleep(0.1)
return True
else:
# Fallback to pressing Enter
print("Send button not found. Attempting to press Enter.")
try:
self.press_key('enter')
print("Pressed Enter.")
time.sleep(0.1)
return True
except Exception as e_enter:
print(f"Error pressing Enter: {e_enter}")
return False
# ==============================================================================
# Position Removal Logic
# ==============================================================================
def remove_user_position(detector: DetectionModule,
interactor: InteractionModule,
trigger_bubble_region: Tuple[int, int, int, int], # Original region, might be outdated
bubble_snapshot: Any, # PIL Image object for re-location
search_area: Optional[Tuple[int, int, int, int]]) -> bool: # Area to search snapshot in
"""
Performs the sequence of UI actions to remove a user's position based on the triggering chat bubble.
Includes re-location using the provided snapshot before proceeding.
Returns True if successful, False otherwise.
"""
print(f"\n--- Starting Position Removal Process (Initial Trigger Region: {trigger_bubble_region}) ---")
# --- Re-locate Bubble First ---
print("Attempting to re-locate bubble using snapshot before removing position...")
# If bubble_snapshot is None, try to create one from the trigger_bubble_region
if bubble_snapshot is None:
print("Bubble snapshot is missing. Attempting to create a new snapshot from the trigger region...")
try:
if trigger_bubble_region and len(trigger_bubble_region) == 4:
bubble_region_tuple = (int(trigger_bubble_region[0]), int(trigger_bubble_region[1]),
int(trigger_bubble_region[2]), int(trigger_bubble_region[3]))
if bubble_region_tuple[2] <= 0 or bubble_region_tuple[3] <= 0:
print(f"Warning: Invalid bubble region {bubble_region_tuple} for taking new snapshot.")
return False
print(f"Taking new screenshot of region: {bubble_region_tuple}")
bubble_snapshot = pyautogui.screenshot(region=bubble_region_tuple)
if bubble_snapshot:
print("Successfully created new bubble snapshot.")
else:
print("Failed to create new bubble snapshot.")
return False
else:
print("Invalid trigger_bubble_region format, cannot create snapshot.")
return False
except Exception as e:
print(f"Error creating new bubble snapshot: {e}")
return False
if search_area is None:
print("Warning: Search area for snapshot is missing. Creating a default search area.")
# Create a default search area centered around the original trigger region
# This creates a search area that's twice the size of the original bubble
if trigger_bubble_region and len(trigger_bubble_region) == 4:
x, y, width, height = trigger_bubble_region
# Expand by 100% in each direction
search_x = max(0, x - width//2)
search_y = max(0, y - height//2)
search_width = width * 2
search_height = height * 2
search_area = (search_x, search_y, search_width, search_height)
print(f"Created default search area based on bubble region: {search_area}")
else:
# If no valid trigger_bubble_region, default to full screen search
search_area = None # Set search_area to None for full screen search
print(f"Using full screen search as fallback.")
# Try to locate the bubble with decreasing confidence levels if needed
new_bubble_box = None
# Determine the region to search: use provided search_area or None for full screen
region_to_search = search_area
print(f"Attempting bubble location. Search Region: {'Full Screen' if region_to_search is None else region_to_search}")
# First attempt with standard confidence
print(f"First attempt with confidence {BUBBLE_RELOCATE_CONFIDENCE}...")
try:
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=region_to_search,
confidence=BUBBLE_RELOCATE_CONFIDENCE)
except Exception as e:
print(f"Exception during initial bubble location attempt: {e}")
# Second attempt with fallback confidence if first failed
if not new_bubble_box:
print(f"First attempt failed. Trying with lower confidence {BUBBLE_RELOCATE_FALLBACK_CONFIDENCE}...")
try:
# Try with a lower confidence threshold
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=region_to_search,
confidence=BUBBLE_RELOCATE_FALLBACK_CONFIDENCE)
except Exception as e:
print(f"Exception during fallback bubble location attempt: {e}")
# Third attempt with even lower confidence as last resort
if not new_bubble_box:
print("Second attempt failed. Trying with even lower confidence 0.4...")
try:
# Last resort with very low confidence
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=region_to_search,
confidence=0.4)
except Exception as e:
print(f"Exception during last resort bubble location attempt: {e}")
# If we still can't find the bubble using snapshot, try re-detecting bubbles
if not new_bubble_box:
print("Snapshot location failed. Attempting secondary fallback: Re-detecting bubbles...")
try:
# Helper function to calculate distance - define it here or move globally if used elsewhere
def calculate_distance(p1, p2):
return ((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)**0.5
current_bubbles_info = detector.find_dialogue_bubbles()
non_bot_bubbles = [b for b in current_bubbles_info if not b.get('is_bot')]
if non_bot_bubbles and trigger_bubble_region and len(trigger_bubble_region) == 4:
original_tl = (trigger_bubble_region[0], trigger_bubble_region[1])
closest_bubble = None
min_distance = float('inf')
MAX_ALLOWED_DISTANCE = 150 # Example threshold: Don't match bubbles too far away
for bubble_info in non_bot_bubbles:
bubble_bbox = bubble_info.get('bbox')
if bubble_bbox:
current_tl = (bubble_bbox[0], bubble_bbox[1])
distance = calculate_distance(original_tl, current_tl)
if distance < min_distance:
min_distance = distance
closest_bubble = bubble_info
if closest_bubble and min_distance <= MAX_ALLOWED_DISTANCE:
print(f"Found a close bubble via re-detection (Distance: {min_distance:.2f}). Using its bbox.")
bbox = closest_bubble['bbox']
# Create a dummy box using PyAutoGUI's Box class or a similar structure
from collections import namedtuple
Box = namedtuple('Box', ['left', 'top', 'width', 'height'])
new_bubble_box = Box(left=bbox[0], top=bbox[1], width=bbox[2]-bbox[0], height=bbox[3]-bbox[1])
print(f"Created fallback bubble box from re-detected bubble: {new_bubble_box}")
else:
print(f"Re-detection fallback failed: No close bubble found (Min distance: {min_distance:.2f} > Threshold: {MAX_ALLOWED_DISTANCE}).")
else:
print("Re-detection fallback failed: No non-bot bubbles found or invalid trigger region.")
except Exception as redetect_err:
print(f"Error during bubble re-detection fallback: {redetect_err}")
# Final fallback: If STILL no bubble box, use original trigger region
if not new_bubble_box:
print("All location attempts failed (snapshot & re-detection). Using original trigger region as last resort.")
if trigger_bubble_region and len(trigger_bubble_region) == 4:
# Create a mock bubble_box from the original region
x, y, width, height = trigger_bubble_region
print(f"Using original trigger region as fallback: {trigger_bubble_region}")
# Create a dummy box using PyAutoGUI's Box class or a similar structure
from collections import namedtuple
Box = namedtuple('Box', ['left', 'top', 'width', 'height'])
new_bubble_box = Box(left=x, top=y, width=width, height=height)
print("Created fallback bubble box from original coordinates.")
else:
print("Error: No original trigger region available for fallback. Aborting position removal.")
return False
# Use the NEW coordinates for all subsequent calculations
bubble_x, bubble_y = new_bubble_box.left, new_bubble_box.top
bubble_w, bubble_h = new_bubble_box.width, new_bubble_box.height
print(f"Successfully re-located bubble at: ({bubble_x}, {bubble_y}, {bubble_w}, {bubble_h})")
# --- End Re-location ---
# 1. Find the closest position icon above the *re-located* bubble
search_height_pixels = 50 # Search exactly 50 pixels above as requested
search_region_y_end = bubble_y # Use re-located Y
search_region_y_start = max(0, bubble_y - search_height_pixels) # Search 50 pixels above
search_region_x_start = max(0, bubble_x - 100) # Keep horizontal search wide
search_region_x_end = bubble_x + bubble_w + 100
search_region_width = search_region_x_end - search_region_x_start
search_region_height = search_region_y_end - search_region_y_start
# Ensure region has positive width and height
if search_region_width <= 0 or search_region_height <= 0:
print(f"Error: Invalid search region calculated for position icons: width={search_region_width}, height={search_region_height}")
return False
search_region = (search_region_x_start, search_region_y_start, search_region_width, search_region_height)
print(f"Searching for position icons in region: {search_region}")
position_templates = {
'DEVELOPMENT': POS_DEV_IMG, 'INTERIOR': POS_INT_IMG, 'SCIENCE': POS_SCI_IMG,
'SECURITY': POS_SEC_IMG, 'STRATEGY': POS_STR_IMG
}
found_positions = []
position_icon_confidence = 0.8 # Slightly increased confidence (was 0.75)
for name, path in position_templates.items():
# Use unique keys for detector templates
locations = detector._find_template(name.lower() + '_pos', confidence=position_icon_confidence, region=search_region)
for loc in locations:
found_positions.append({'name': name, 'coords': loc, 'path': path})
if not found_positions:
print("Error: No position icons found near the trigger bubble.")
return False
# Find the closest one to the bubble's top-center
bubble_top_center_x = bubble_x + bubble_w // 2
bubble_top_center_y = bubble_y
closest_position = min(found_positions, key=lambda p:
(p['coords'][0] - bubble_top_center_x)**2 + (p['coords'][1] - bubble_top_center_y)**2)
target_position_name = closest_position['name']
print(f"Found pending position: |{target_position_name}| at {closest_position['coords']}")
# 2. Click user avatar (offset from *re-located* bubble top-left)
# --- MODIFIED: Use specific offsets for remove_position command as requested ---
avatar_click_x = bubble_x + AVATAR_OFFSET_X_REPLY # Use -45 offset
avatar_click_y = bubble_y + AVATAR_OFFSET_Y_REPLY # Use +10 offset
print(f"Clicking avatar for position removal at calculated position: ({avatar_click_x}, {avatar_click_y}) using offsets ({AVATAR_OFFSET_X_REPLY}, {AVATAR_OFFSET_Y_REPLY}) from re-located bubble top-left ({bubble_x}, {bubble_y})")
# --- END MODIFICATION ---
interactor.click_at(avatar_click_x, avatar_click_y)
time.sleep(0.15) # Wait for profile page
# 3. Verify Profile Page and Click Capitol Button
if not detector._find_template('profile_page', confidence=detector.state_confidence):
print("Error: Failed to verify Profile Page after clicking avatar.")
perform_state_cleanup(detector, interactor) # Attempt cleanup
return False
print("Profile page verified.")
capitol_button_locs = detector._find_template('capitol_button', confidence=0.8)
if not capitol_button_locs:
print("Error: Capitol button (#11) not found on profile page.")
perform_state_cleanup(detector, interactor)
return False
interactor.click_at(capitol_button_locs[0][0], capitol_button_locs[0][1])
print("Clicked Capitol button.")
time.sleep(0.15) # Wait for capitol page
# 4. Verify Capitol Page
if not detector._find_template('president_title', confidence=detector.state_confidence):
print("Error: Failed to verify Capitol Page (President Title not found).")
perform_state_cleanup(detector, interactor)
return False
print("Capitol page verified.")
# 5. Find and Click Corresponding Position Button
position_button_templates = {
'DEVELOPMENT': 'pos_btn_dev', 'INTERIOR': 'pos_btn_int', 'SCIENCE': 'pos_btn_sci',
'SECURITY': 'pos_btn_sec', 'STRATEGY': 'pos_btn_str'
}
target_button_key = position_button_templates.get(target_position_name)
if not target_button_key:
print(f"Error: Internal error - unknown position name '{target_position_name}'")
perform_state_cleanup(detector, interactor)
return False
pos_button_locs = detector._find_template(target_button_key, confidence=0.8)
if not pos_button_locs:
print(f"Error: Position button for '{target_position_name}' not found on Capitol page.")
perform_state_cleanup(detector, interactor)
return False
interactor.click_at(pos_button_locs[0][0], pos_button_locs[0][1])
print(f"Clicked '{target_position_name}' position button.")
time.sleep(0.15) # Wait for position page
# 6. Verify Position Page
position_page_templates = {
'DEVELOPMENT': 'page_dev', 'INTERIOR': 'page_int', 'SCIENCE': 'page_sci',
'SECURITY': 'page_sec', 'STRATEGY': 'page_str'
}
target_page_key = position_page_templates.get(target_position_name)
if not target_page_key:
print(f"Error: Internal error - unknown position name '{target_position_name}' for page verification")
perform_state_cleanup(detector, interactor)
return False
if not detector._find_template(target_page_key, confidence=detector.state_confidence):
print(f"Error: Failed to verify correct position page for '{target_position_name}'.")
perform_state_cleanup(detector, interactor)
return False
print(f"Verified '{target_position_name}' position page.")
# 7. Find and Click Dismiss Button
dismiss_locs = detector._find_template('dismiss_button', confidence=0.8)
if not dismiss_locs:
print("Error: Dismiss button not found on position page.")
perform_state_cleanup(detector, interactor)
return False
interactor.click_at(dismiss_locs[0][0], dismiss_locs[0][1])
print("Clicked Dismiss button.")
time.sleep(0.1) # Wait for confirmation
# 8. Find and Click Confirm Button
confirm_locs = detector._find_template('confirm_button', confidence=0.8)
if not confirm_locs:
print("Error: Confirm button not found after clicking dismiss.")
# Don't cleanup here, might be stuck in confirmation state
return False # Indicate failure, but let main loop decide next step
interactor.click_at(confirm_locs[0][0], confirm_locs[0][1])
print("Clicked Confirm button. Position should be dismissed.")
time.sleep(0.05) # Wait for action to complete (Reduced from 0.1)
# 9. Cleanup: Return to Chat Room
# Click Close on position page (should now be back on capitol page implicitly)
close_locs = detector._find_template('close_button', confidence=0.8)
if close_locs:
interactor.click_at(close_locs[0][0], close_locs[0][1])
print("Clicked Close button (returning to Capitol).")
time.sleep(0.05) # Reduced from 0.1
else:
print("Warning: Close button not found after confirm, attempting back arrow anyway.")
# Click Back Arrow on Capitol page (should return to profile)
back_arrow_locs = detector._find_template('back_arrow', confidence=0.8)
if back_arrow_locs:
interactor.click_at(back_arrow_locs[0][0], back_arrow_locs[0][1])
print("Clicked Back Arrow (returning to Profile).")
time.sleep(0.05) # Reduced from 0.1
else:
print("Warning: Back arrow not found on Capitol page, attempting ESC cleanup.")
# Use standard ESC cleanup
print("Initiating final ESC cleanup to return to chat...")
cleanup_success = perform_state_cleanup(detector, interactor)
if cleanup_success:
print("--- Position Removal Process Completed Successfully ---")
return True
else:
print("--- Position Removal Process Completed, but failed to confirm return to chat room ---")
return False # Technically removed, but UI state uncertain
# ==============================================================================
# Coordinator Logic (Placeholder - To be implemented in main.py)
# ==============================================================================
# --- State-based Cleanup Function (To be called by Coordinator) ---
def perform_state_cleanup(detector: DetectionModule, interactor: InteractionModule, max_attempts: int = 4) -> bool:
"""
Attempt to return to the main chat room interface by pressing ESC based on detected state.
Returns True if confirmed back in chat room, False otherwise.
"""
print("Performing cleanup: Attempting to press ESC to return to chat interface...")
returned_to_chat = False
for attempt in range(max_attempts):
print(f"Cleanup attempt #{attempt + 1}/{max_attempts}")
time.sleep(0.1)
current_state = detector.get_current_ui_state()
print(f"Detected state: {current_state}")
if current_state == 'chat_room' or current_state == 'world_chat' or current_state == 'private_chat': # Adjust as needed
print("Chat room interface detected, cleanup complete.")
returned_to_chat = True
break
elif current_state == 'user_details' or current_state == 'profile_card':
print(f"{current_state.replace('_', ' ').title()} detected, pressing ESC...")
interactor.press_key('esc')
time.sleep(0.1) # Wait longer for UI response after ESC
continue
else: # Unknown state
print("Unknown page state detected.")
if attempt < max_attempts - 1:
print("Trying one ESC press as fallback...")
interactor.press_key('esc')
time.sleep(0.1)
else:
print("Maximum attempts reached, stopping cleanup.")
break
if not returned_to_chat:
print("Warning: Could not confirm return to chat room interface via state detection.")
return returned_to_chat
# --- UI Monitoring Loop Function (To be run in a separate thread) ---
def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queue, deduplicator: 'MessageDeduplication'):
"""
Continuously monitors the UI, detects triggers, performs interactions,
puts trigger data into trigger_queue, and processes commands from command_queue.
"""
print("\n--- Starting UI Monitoring Loop (Thread) ---")
# --- 初始化氣泡圖像去重系統(新增) ---
bubble_deduplicator = SimpleBubbleDeduplication(
storage_file="simple_bubble_dedup.json",
max_bubbles=4, # 保留最近5個氣泡
threshold=7, # 哈希差異閾值(值越小越嚴格)
hash_size=16 # 哈希大小
)
# --- 初始化氣泡圖像去重系統結束 ---
# --- Initialization (Instantiate modules within the thread) ---
# --- Template Dictionary Setup (Refactored) ---
essential_templates = {
# Bubble Corners (All types needed for legacy/color fallback)
'corner_tl': CORNER_TL_IMG, 'corner_br': CORNER_BR_IMG,
'corner_tl_type2': CORNER_TL_TYPE2_IMG, 'corner_br_type2': CORNER_BR_TYPE2_IMG,
'corner_tl_type3': CORNER_TL_TYPE3_IMG, 'corner_br_type3': CORNER_BR_TYPE3_IMG,
'corner_tl_type4': CORNER_TL_TYPE4_IMG, 'corner_br_type4': CORNER_BR_TYPE4_IMG, # Added type4
'bot_corner_tl': BOT_CORNER_TL_IMG, 'bot_corner_br': BOT_CORNER_BR_IMG,
# Core Keywords (for dual method)
'keyword_wolf_lower': KEYWORD_wolf_LOWER_IMG,
'keyword_Wolf_upper': KEYWORD_Wolf_UPPER_IMG,
'keyword_wolf_reply': KEYWORD_WOLF_REPLY_IMG,
# Essential UI Elements
'copy_menu_item': COPY_MENU_ITEM_IMG, 'profile_option': PROFILE_OPTION_IMG,
'copy_name_button': COPY_NAME_BUTTON_IMG, 'send_button': SEND_BUTTON_IMG,
'chat_input': CHAT_INPUT_IMG, 'profile_name_page': PROFILE_NAME_PAGE_IMG,
'profile_page': PROFILE_PAGE_IMG, 'chat_room': CHAT_ROOM_IMG,
'base_screen': BASE_SCREEN_IMG, 'world_map_screen': WORLD_MAP_IMG,
'world_chat': WORLD_CHAT_IMG, 'private_chat': PRIVATE_CHAT_IMG,
# Position templates
'development_pos': POS_DEV_IMG, 'interior_pos': POS_INT_IMG, 'science_pos': POS_SCI_IMG,
'security_pos': POS_SEC_IMG, 'strategy_pos': POS_STR_IMG,
# Capitol templates
'capitol_button': CAPITOL_BUTTON_IMG, 'president_title': PRESIDENT_TITLE_IMG,
'pos_btn_dev': POS_BTN_DEV_IMG, 'pos_btn_int': POS_BTN_INT_IMG, 'pos_btn_sci': POS_BTN_SCI_IMG,
'pos_btn_sec': POS_BTN_SEC_IMG, 'pos_btn_str': POS_BTN_STR_IMG,
'page_dev': PAGE_DEV_IMG, 'page_int': PAGE_INT_IMG, 'page_sci': PAGE_SCI_IMG,
'page_sec': PAGE_SEC_IMG, 'page_str': PAGE_STR_IMG,
'dismiss_button': DISMISS_BUTTON_IMG, 'confirm_button': CONFIRM_BUTTON_IMG,
'close_button': CLOSE_BUTTON_IMG, 'back_arrow': BACK_ARROW_IMG,
'reply_button': REPLY_BUTTON_IMG,
# 添加新模板
'chat_option': CHAT_OPTION_IMG, 'update_confirm': UPDATE_CONFIRM_IMG,
}
legacy_templates = {
# Deprecated Keywords (for legacy method fallback)
'keyword_wolf_lower_type2': KEYWORD_wolf_LOWER_TYPE2_IMG,
'keyword_wolf_upper_type2': KEYWORD_Wolf_UPPER_TYPE2_IMG,
'keyword_wolf_lower_type3': KEYWORD_wolf_LOWER_TYPE3_IMG,
'keyword_wolf_upper_type3': KEYWORD_Wolf_UPPER_TYPE3_IMG,
'keyword_wolf_lower_type4': KEYWORD_wolf_LOWER_TYPE4_IMG,
'keyword_wolf_upper_type4': KEYWORD_Wolf_UPPER_TYPE4_IMG,
'keyword_wolf_reply_type2': KEYWORD_WOLF_REPLY_TYPE2_IMG,
'keyword_wolf_reply_type3': KEYWORD_WOLF_REPLY_TYPE3_IMG,
'keyword_wolf_reply_type4': KEYWORD_WOLF_REPLY_TYPE4_IMG,
}
# Combine dictionaries
all_templates = {**essential_templates, **legacy_templates}
# --- End Template Dictionary Setup ---
# --- Instantiate Modules ---
detector = DetectionModule(all_templates,
confidence=CONFIDENCE_THRESHOLD, # Default for legacy pyautogui calls
state_confidence=STATE_CONFIDENCE_THRESHOLD,
region=SCREENSHOT_REGION,
use_dual_method=True) # Enable new method by default
interactor = InteractionModule(detector,
input_coords=(CHAT_INPUT_CENTER_X, CHAT_INPUT_CENTER_Y),
input_template_key='chat_input',
send_button_key='send_button')
# --- State Management (Local to this monitoring thread) ---
last_processed_bubble_info = None # Store the whole dict now
recent_texts = collections.deque(maxlen=RECENT_TEXT_HISTORY_MAXLEN) # Context-specific history needed
screenshot_counter = 0 # Initialize counter for debug screenshots
main_screen_click_counter = 0 # Counter for consecutive main screen clicks
loop_counter = 0 # Add loop counter for debugging
while True:
loop_counter += 1
# print(f"\n--- UI Loop Iteration #{loop_counter} ---") # DEBUG REMOVED
# --- Process ALL Pending Commands First ---
# print("[DEBUG] UI Loop: Processing command queue...") # DEBUG REMOVED
commands_processed_this_cycle = False
try:
while True: # Loop to drain the queue
command_data = command_queue.get_nowait() # Check for commands without blocking
commands_processed_this_cycle = True
action = command_data.get('action')
if action == 'send_reply':
text_to_send = command_data.get('text')
if not text_to_send:
print("UI Thread: Received send_reply command with no text.")
continue # Process next command in queue
print(f"UI Thread: Processing command to send reply: '{text_to_send[:50]}...'")
interactor.send_chat_message(text_to_send)
elif action == 'remove_position':
# region = command_data.get('trigger_bubble_region') # This is the old region, keep for reference?
snapshot = command_data.get('bubble_snapshot')
area = command_data.get('search_area')
# Pass all necessary data to the function, including the original region if needed for context
# but the function should primarily use the snapshot for re-location.
original_region = command_data.get('trigger_bubble_region')
if snapshot: # Check for snapshot presence
print(f"UI Thread: Processing command to remove position (Snapshot provided: {'Yes' if snapshot else 'No'})")
success = remove_user_position(detector, interactor, original_region, snapshot, area)
print(f"UI Thread: Position removal attempt finished. Success: {success}")
else:
print("UI Thread: Received remove_position command without necessary snapshot data.")
elif action == 'pause':
if not monitoring_paused_flag[0]: # Avoid redundant prints if already paused
print("UI Thread: Processing pause command. Pausing monitoring.")
monitoring_paused_flag[0] = True
# No continue needed here, let it finish draining queue
elif action == 'resume':
if monitoring_paused_flag[0]: # Avoid redundant prints if already running
print("UI Thread: Processing resume command. Resuming monitoring.")
monitoring_paused_flag[0] = False
# No state reset here, reset_state command handles that
elif action == 'handle_restart_complete': # Added for game monitor restart signal
print("UI Thread: Received 'handle_restart_complete' command. Initiating internal pause/wait/resume sequence.")
# --- Internal Pause/Wait/Resume Sequence ---
if not monitoring_paused_flag[0]: # Only pause if not already paused
print("UI Thread: Pausing monitoring internally for restart.")
monitoring_paused_flag[0] = True
# No need to send command back to main loop, just update flag
print("UI Thread: Waiting 30 seconds for game to stabilize after restart.")
time.sleep(30) # Wait for game to launch and stabilize
print("UI Thread: Resuming monitoring internally after restart wait.")
monitoring_paused_flag[0] = False
# Clear state to ensure fresh detection after restart
recent_texts.clear()
last_processed_bubble_info = None
print("UI Thread: Monitoring resumed and state reset after restart.")
# --- End Internal Sequence ---
elif action == 'clear_history': # Added for F7
print("UI Thread: Processing clear_history command.")
recent_texts.clear()
deduplicator.clear_all() # Simultaneously clear deduplication records
# --- 新增:清理氣泡去重記錄 ---
if 'bubble_deduplicator' in locals():
bubble_deduplicator.clear_all()
# --- 清理氣泡去重記錄結束 ---
print("UI Thread: recent_texts and deduplicator records cleared.")
elif action == 'reset_state': # Added for F8 resume
print("UI Thread: Processing reset_state command.")
recent_texts.clear()
last_processed_bubble_info = None
deduplicator.clear_all() # Simultaneously clear deduplication records
# --- 新增:清理氣泡去重記錄 ---
if 'bubble_deduplicator' in locals():
bubble_deduplicator.clear_all()
# --- 清理氣泡去重記錄結束 ---
print("UI Thread: recent_texts, last_processed_bubble_info, and deduplicator records reset.")
else:
print(f"UI Thread: Received unknown command: {action}")
except queue.Empty:
# No more commands in the queue for this cycle
# if commands_processed_this_cycle: # DEBUG REMOVED
# print("UI Thread: Finished processing commands for this cycle.") # DEBUG REMOVED
pass
except Exception as cmd_err:
print(f"UI Thread: Error processing command queue: {cmd_err}")
# Consider if pausing is needed on error, maybe not
# --- Now, Check Pause State ---
# print("[DEBUG] UI Loop: Checking pause state...") # DEBUG REMOVED
if monitoring_paused_flag[0]:
# print("[DEBUG] UI Loop: Monitoring is paused. Sleeping...") # DEBUG REMOVED
# If paused, sleep and skip UI monitoring part
time.sleep(0.1) # Sleep briefly while paused
continue # Go back to check commands again
# --- If not paused, proceed with UI Monitoring ---
# print("[DEBUG] UI Loop: Monitoring is active. Proceeding...") # DEBUG REMOVED
# --- 添加檢查 chat_option 狀態 ---
try:
chat_option_locs = detector._find_template('chat_option', confidence=0.8)
if chat_option_locs:
print("UI Thread: Detected chat_option overlay. Pressing ESC to dismiss...")
interactor.press_key('esc')
time.sleep(0.2) # 給一點時間讓界面響應
print("UI Thread: Pressed ESC to dismiss chat_option. Continuing...")
continue # 重新開始循環以確保界面已清除
except Exception as chat_opt_err:
print(f"UI Thread: Error checking for chat_option: {chat_opt_err}")
# 繼續執行,不要中斷主流程
# --- Check for Main Screen Navigation ---
# print("[DEBUG] UI Loop: Checking for main screen navigation...") # DEBUG REMOVED
try:
base_locs = detector._find_template('base_screen', confidence=0.8)
map_locs = detector._find_template('world_map_screen', confidence=0.8)
if base_locs or map_locs:
print(f"UI Thread: Detected main screen (Base or World Map). Counter: {main_screen_click_counter}")
if main_screen_click_counter < 5:
main_screen_click_counter += 1
print(f"UI Thread: Attempting click #{main_screen_click_counter}/5 to return to chat...")
# Coordinates provided by user (adjust if needed based on actual screen resolution/layout)
target_x, target_y = 600, 1300
interactor.click_at(target_x, target_y)
time.sleep(0.1) # Short delay after click
print("UI Thread: Clicked. Re-checking screen state...")
else:
print("UI Thread: Clicked 5 times, still on main screen. Pressing ESC...")
interactor.press_key('esc')
main_screen_click_counter = 0 # Reset counter after ESC
time.sleep(0.05) # Wait a bit longer after ESC
print("UI Thread: ESC pressed. Re-checking screen state...")
continue # Skip the rest of the loop and re-evaluate state
else:
# Reset counter if not on the main screen
if main_screen_click_counter > 0:
print("UI Thread: Not on main screen, resetting click counter.")
main_screen_click_counter = 0
except Exception as nav_err:
print(f"UI Thread: Error during main screen navigation check: {nav_err}")
# Decide if you want to continue or pause after error
main_screen_click_counter = 0 # Reset counter on error too
# --- Verify Chat Room State Before Bubble Detection (Only if NOT paused) ---
# print("[DEBUG] UI Loop: Verifying chat room state...") # DEBUG REMOVED
try:
# Use a slightly lower confidence maybe, or state_confidence
chat_room_locs = detector._find_template('chat_room', confidence=detector.state_confidence)
if not chat_room_locs:
print("UI Thread: Not in chat room state before bubble detection. Checking for update confirm...")
# 檢查是否存在更新確認按鈕
update_confirm_locs = detector._find_template('update_confirm', confidence=0.8)
if update_confirm_locs:
print("UI Thread: Detected update_confirm button. Clicking to proceed...")
interactor.click_at(update_confirm_locs[0][0], update_confirm_locs[0][1])
time.sleep(0.5) # 給更新過程一些時間
print("UI Thread: Clicked update_confirm button. Continuing...")
continue # 重新開始循環以重新檢查狀態
# 沒有找到更新確認按鈕,繼續原有的清理邏輯
print("UI Thread: No update_confirm button found. Attempting cleanup...")
perform_state_cleanup(detector, interactor)
# Regardless of cleanup success, restart the loop to re-evaluate state from the top
print("UI Thread: Continuing loop after attempting chat room cleanup.")
time.sleep(0.5) # Small pause after cleanup attempt
continue
# else: # Optional: Log if chat room is confirmed # DEBUG REMOVED
# print("[DEBUG] UI Thread: Chat room state confirmed.") # DEBUG REMOVED
except Exception as state_check_err:
print(f"UI Thread: Error checking for chat room state: {state_check_err}")
# Decide how to handle error - maybe pause and retry? For now, continue cautiously.
time.sleep(1)
# --- Then Perform UI Monitoring (Bubble Detection) ---
# print("[DEBUG] UI Loop: Starting bubble detection...") # DEBUG REMOVED
try:
# 1. Detect Bubbles
all_bubbles_data = detector.find_dialogue_bubbles() # Returns list of dicts
if not all_bubbles_data:
# print("[DEBUG] UI Loop: No bubbles detected.") # DEBUG REMOVED
time.sleep(2); continue
# Filter out bot bubbles
other_bubbles_data = [b_info for b_info in all_bubbles_data if not b_info['is_bot']]
if not other_bubbles_data:
# print("[DEBUG] UI Loop: No non-bot bubbles detected.") # DEBUG REMOVED
time.sleep(0.2); continue
# print(f"[DEBUG] UI Loop: Found {len(other_bubbles_data)} non-bot bubbles. Sorting...") # DEBUG REMOVED
# Sort bubbles from bottom to top (based on bottom Y coordinate)
sorted_bubbles = sorted(other_bubbles_data, key=lambda b_info: b_info['bbox'][3], reverse=True)
# Iterate through sorted bubbles (bottom to top)
# print("[DEBUG] UI Loop: Iterating through sorted bubbles...") # DEBUG REMOVED
for i, target_bubble_info in enumerate(sorted_bubbles):
# print(f"[DEBUG] UI Loop: Processing bubble #{i+1}") # DEBUG REMOVED
target_bbox = target_bubble_info['bbox']
# Ensure bubble_region uses standard ints
bubble_region = (int(target_bbox[0]), int(target_bbox[1]), int(target_bbox[2]-target_bbox[0]), int(target_bbox[3]-target_bbox[1]))
# 3. Detect Keyword in Bubble
# print(f"[DEBUG] UI Loop: Detecting keyword in region {bubble_region}...") # DEBUG REMOVED
result = detector.find_keyword_in_region(bubble_region) # Now returns (coords, key) or None
if result: # 檢查是否真的找到了關鍵字
keyword_coords, detected_template_key = result # 解包得到座標和 key
# 在這裡可以更新或加入日誌,包含 detected_template_key
print(f"\n!!! Keyword '{detected_template_key}' detected in bubble {target_bbox} at {keyword_coords} !!!")
# --- 接下來是移除冗餘邏輯並使用新 key ---
# ------------ START: 刪除或註解掉以下區塊 ------------
# is_reply_keyword = False
# reply_keyword_keys = ['keyword_wolf_reply', 'keyword_wolf_reply_type2', 'keyword_wolf_reply_type3', 'keyword_wolf_reply_type4']
# for key in reply_keyword_keys:
# reply_locs = detector._find_template(key, region=bubble_region, grayscale=False, confidence=detector.confidence)
# if reply_locs:
# for loc in reply_locs:
# if abs(keyword_coords[0] - loc[0]) <= 2 and abs(keyword_coords[1] - loc[1]) <= 2:
# print(f"Confirmed detected keyword at {keyword_coords} matches reply keyword template '{key}' at {loc}.")
# is_reply_keyword = True
# break
# if is_reply_keyword:
# break
# ------------- END: 刪除或註解掉以上區塊 -------------
# 直接根據返回的 key 判斷是否為 reply
# Note: Dual method currently only returns 'keyword_wolf_reply' as a reply type key
is_reply_keyword = (detected_template_key == 'keyword_wolf_reply')
# Calculate click coordinates with potential offset
click_coords = keyword_coords
if is_reply_keyword:
click_coords = (keyword_coords[0], keyword_coords[1] + 25) # 假設 reply 需要 +25 Y 偏移
# 更新日誌,包含 key
print(f"Applying +25 Y-offset for reply keyword '{detected_template_key}'. Click target: {click_coords}")
else:
# 更新日誌,包含 key
print(f"Detected keyword '{detected_template_key}' is not a reply type. Click target: {click_coords}")
# --- 將剩餘的邏輯放在 if result: 區塊內 ---
# --- Variables needed later ---
bubble_snapshot = None
search_area = SCREENSHOT_REGION
if search_area is None:
print("Warning: SCREENSHOT_REGION not defined, searching full screen for bubble snapshot.")
# --- Take Snapshot for Re-location ---
# print("[DEBUG] UI Loop: Taking bubble snapshot...") # DEBUG REMOVED
try:
bubble_region_tuple = (int(bubble_region[0]), int(bubble_region[1]), int(bubble_region[2]), int(bubble_region[3]))
if bubble_region_tuple[2] <= 0 or bubble_region_tuple[3] <= 0:
print(f"Warning: Invalid bubble region {bubble_region_tuple} for snapshot. Skipping this bubble.")
continue # Skip to next bubble in the loop
bubble_snapshot = pyautogui.screenshot(region=bubble_region_tuple)
if bubble_snapshot is None:
print("Warning: Failed to capture bubble snapshot. Skipping this bubble.")
continue # Skip to next bubble
# --- New: Image deduplication check ---
if bubble_deduplicator.is_duplicate(bubble_snapshot, bubble_region_tuple):
print("Detected duplicate bubble, skipping processing")
perform_state_cleanup(detector, interactor)
continue # Skip processing this bubble
# --- End of image deduplication check ---
# --- Save Snapshot for Debugging ---
try:
screenshot_index = (screenshot_counter % MAX_DEBUG_SCREENSHOTS) + 1
screenshot_filename = f"debug_relocation_snapshot_{screenshot_index}.png"
screenshot_path = os.path.join(DEBUG_SCREENSHOT_DIR, screenshot_filename)
print(f"Attempting to save bubble snapshot used for re-location to: {screenshot_path}")
bubble_snapshot.save(screenshot_path)
print(f"Successfully saved bubble snapshot: {screenshot_path}")
screenshot_counter += 1
except Exception as save_err:
print(f"Error saving bubble snapshot to {screenshot_path}: {repr(save_err)}")
except Exception as snapshot_err:
print(f"Error taking initial bubble snapshot: {repr(snapshot_err)}")
continue # Skip to next bubble
# 4. Re-locate bubble *before* copying text
# print("[DEBUG] UI Loop: Re-locating bubble before copying text...") # DEBUG REMOVED
new_bubble_box_for_copy = None
if bubble_snapshot:
try:
# Use standard confidence for this initial critical step
new_bubble_box_for_copy = pyautogui.locateOnScreen(bubble_snapshot,
region=search_area,
confidence=BUBBLE_RELOCATE_CONFIDENCE)
except Exception as e:
print(f"Exception during bubble location before copy: {e}")
if not new_bubble_box_for_copy:
print("Warning: Failed to re-locate bubble before copying text. Skipping this bubble.")
continue # Skip to the next bubble in the outer loop
print(f"Successfully re-located bubble for copy at: {new_bubble_box_for_copy}")
# Define the region based on the re-located bubble, casting to int
copy_bubble_region = (int(new_bubble_box_for_copy.left), int(new_bubble_box_for_copy.top),
int(new_bubble_box_for_copy.width), int(new_bubble_box_for_copy.height))
# Find the keyword *again* within the *new* bubble region to get current coords
# print("[DEBUG] UI Loop: Finding keyword again in re-located region...") # DEBUG REMOVED
current_result = detector.find_keyword_in_region(copy_bubble_region) # Returns (coords, key) or None
if not current_result:
print("Warning: Keyword not found in the re-located bubble region. Skipping this bubble.")
continue # Skip to the next bubble
current_keyword_coords, current_detected_key = current_result
print(f"Keyword '{current_detected_key}' re-located at {current_keyword_coords}")
# Determine if it's a reply keyword based on the *new* location/key
# Use the key found in the *re-located* region for the most accurate offset decision
is_reply_keyword_current = (current_detected_key == 'keyword_wolf_reply')
click_coords_current = current_keyword_coords
if is_reply_keyword_current:
click_coords_current = (current_keyword_coords[0], current_keyword_coords[1] + 25)
print(f"Applying +25 Y-offset for reply keyword '{current_detected_key}' (current location). Click target: {click_coords_current}")
else:
print(f"Detected keyword '{current_detected_key}' is not a reply type (current location). Click target: {click_coords_current}")
# Interact: Get Bubble Text using current coordinates
# print("[DEBUG] UI Loop: Copying text...") # DEBUG REMOVED
bubble_text = interactor.copy_text_at(click_coords_current)
if not bubble_text:
print("Error: Could not get dialogue content for this bubble (after re-location).")
perform_state_cleanup(detector, interactor) # Attempt cleanup
continue # Skip to next bubble
# 5. Interact: Get Sender Name (uses re-location internally via retrieve_sender_name_interaction)
# print("[DEBUG] UI Loop: Retrieving sender name...") # DEBUG REMOVED
sender_name = None
try:
# --- Bubble Re-location Logic ---
print("Attempting to re-locate bubble before getting sender name...")
if bubble_snapshot is None:
print("Error: Bubble snapshot missing for re-location. Skipping this bubble.")
continue
# Try locating with decreasing confidence
new_bubble_box = None
confidences_to_try = [BUBBLE_RELOCATE_CONFIDENCE, BUBBLE_RELOCATE_FALLBACK_CONFIDENCE, 0.4]
for conf in confidences_to_try:
print(f"Attempting location with confidence {conf}...")
try:
new_bubble_box = pyautogui.locateOnScreen(bubble_snapshot,
region=search_area,
confidence=conf)
if new_bubble_box:
print(f"Successfully located with confidence {conf}.")
break # Found it
except Exception as e:
print(f"Exception during location attempt with confidence {conf}: {e}")
# --- End Confidence Loop ---
if new_bubble_box:
new_tl_x, new_tl_y = new_bubble_box.left, new_bubble_box.top
print(f"Successfully re-located bubble snapshot at: ({new_tl_x}, {new_tl_y})")
new_avatar_coords = (new_tl_x + AVATAR_OFFSET_X_REPLY, new_tl_y + AVATAR_OFFSET_Y_REPLY)
print(f"Calculated new avatar coordinates for reply context: {new_avatar_coords}")
sender_name = interactor.retrieve_sender_name_interaction(
initial_avatar_coords=new_avatar_coords,
bubble_snapshot=bubble_snapshot,
search_area=search_area
)
else:
print("Warning: Failed to re-locate bubble snapshot after multiple attempts.")
print("Trying direct approach with original bubble coordinates...")
original_tl_coords = target_bubble_info.get('tl_coords')
if original_tl_coords:
fallback_avatar_coords = (original_tl_coords[0] + AVATAR_OFFSET_X_REPLY,
original_tl_coords[1] + AVATAR_OFFSET_Y_REPLY)
print(f"Using fallback avatar coordinates from original detection: {fallback_avatar_coords}")
sender_name = interactor.retrieve_sender_name_interaction(
initial_avatar_coords=fallback_avatar_coords,
bubble_snapshot=bubble_snapshot,
search_area=search_area
)
if not sender_name:
print("Direct approach failed. Skipping this trigger.")
perform_state_cleanup(detector, interactor)
continue # Skip to next bubble
else:
print("No original coordinates available. Skipping sender name retrieval.")
perform_state_cleanup(detector, interactor)
continue # Skip to next bubble
# --- End Bubble Re-location Logic ---
except Exception as reloc_err:
print(f"Error during bubble re-location or subsequent interaction: {reloc_err}")
import traceback
traceback.print_exc()
perform_state_cleanup(detector, interactor)
continue # Skip to next bubble
# 6. Perform Cleanup
# print("[DEBUG] UI Loop: Performing cleanup after getting name...") # DEBUG REMOVED
cleanup_successful = perform_state_cleanup(detector, interactor)
if not cleanup_successful:
print("Error: Failed to return to chat screen after getting name. Skipping this bubble.")
continue # Skip to next bubble
if not sender_name:
print("Error: Could not get sender name for this bubble, skipping.")
continue # Skip to next bubble
# --- Deduplication Check ---
# This is the new central point for deduplication and recent_texts logic
if sender_name and bubble_text: # Ensure both are valid before deduplication
if deduplicator.is_duplicate(sender_name, bubble_text):
print(f"UI Thread: Skipping duplicate message via Deduplicator: {sender_name} - {bubble_text[:30]}...")
# Cleanup UI state as interaction might have occurred during sender_name retrieval
perform_state_cleanup(detector, interactor)
continue # Skip this bubble
# If not a duplicate by deduplicator, then check recent_texts (original safeguard)
# if bubble_text in recent_texts:
# print(f"UI Thread: Content '{bubble_text[:30]}...' in recent_texts history, skipping.")
# perform_state_cleanup(detector, interactor) # Cleanup as we are skipping
# continue
# If not a duplicate by any means, add to recent_texts and proceed
print(">>> New trigger event (passed deduplication) <<<")
# recent_texts.append(bubble_text) # No longer needed with image deduplication
else:
# This case implies sender_name or bubble_text was None/empty,
# which should have been caught by earlier checks.
# If somehow reached, log and skip.
print(f"Warning: sender_name ('{sender_name}') or bubble_text ('{bubble_text[:30]}...') is invalid before deduplication check. Skipping.")
perform_state_cleanup(detector, interactor)
continue
# --- Attempt to activate reply context ---
# print("[DEBUG] UI Loop: Attempting to activate reply context...") # DEBUG REMOVED
reply_context_activated = False
try:
print("Attempting to activate reply context...")
if bubble_snapshot is None:
print("Warning: Bubble snapshot missing for reply context activation. Skipping.")
final_bubble_box_for_reply = None
else:
print(f"Attempting final re-location for reply context using search_area: {search_area}")
final_bubble_box_for_reply = pyautogui.locateOnScreen(bubble_snapshot, region=search_area, confidence=BUBBLE_RELOCATE_CONFIDENCE)
if final_bubble_box_for_reply:
print(f"Final re-location successful at: {final_bubble_box_for_reply}")
bubble_x_reply, bubble_y_reply = final_bubble_box_for_reply.left, final_bubble_box_for_reply.top
bubble_w_reply, bubble_h_reply = final_bubble_box_for_reply.width, final_bubble_box_for_reply.height
center_x_reply = bubble_x_reply + bubble_w_reply // 2
center_y_reply = bubble_y_reply + bubble_h_reply // 2
if is_reply_keyword:
center_y_reply += 15
print(f"Applying +15 Y-offset to bubble center click for reply keyword. Target Y: {center_y_reply}")
print(f"Clicking bubble center for reply at ({center_x_reply}, {center_y_reply})")
interactor.click_at(center_x_reply, center_y_reply)
time.sleep(0.15)
print("Searching for reply button...")
reply_button_locs = detector._find_template('reply_button', confidence=0.8)
if reply_button_locs:
reply_coords = reply_button_locs[0]
print(f"Found reply button at {reply_coords}. Clicking...")
interactor.click_at(reply_coords[0], reply_coords[1])
time.sleep(0.07)
reply_context_activated = True
print("Reply context activated.")
else:
print(">>> Reply button template ('reply_button') not found after clicking bubble center. <<<")
else:
print("Warning: Failed to re-locate bubble for activating reply context.")
except Exception as reply_context_err:
print(f"!!! Error during reply context activation: {reply_context_err} !!!")
# 7. Send Trigger Info to Main Thread
print("\n>>> Putting trigger info in Queue <<<")
try:
# 安全地處理和顯示發送者名稱
safe_sender_display = handle_text_encoding(sender_name, "[未知發送者]")
print(f" Sender: {safe_sender_display}")
# 安全地處理和顯示消息內容
if bubble_text:
display_text = bubble_text[:100] + "..." if len(bubble_text) > 100 else bubble_text
safe_content_display = handle_text_encoding(display_text, "[無法處理的文字內容]")
print(f" Content: {safe_content_display}")
else:
print(" Content: [空]")
except Exception as e_display:
print(f"Error displaying message info: {str(e_display)}")
print(f" Bubble Region: {bubble_region}") # Original region for context
print(f" Reply Context Activated: {reply_context_activated}")
try:
# 確保所有文字數據都經過安全處理
data_to_send = {
'sender': handle_text_encoding(sender_name, "[未知發送者]"),
'text': handle_text_encoding(bubble_text, "[無法處理的文字內容]"),
'bubble_region': bubble_region,
'reply_context_activated': reply_context_activated,
'bubble_snapshot': bubble_snapshot,
'search_area': search_area
}
trigger_queue.put(data_to_send)
print("Trigger info (with region, reply flag, snapshot, search_area) placed in Queue.")
# --- 新增:更新氣泡去重記錄中的發送者信息 ---
# 注意:我們在前面已經添加了氣泡到去重系統,但當時還沒獲取發送者名稱
# 這裡我們嘗試再次更新發送者信息(如果實現允許的話)
if 'bubble_deduplicator' in locals() and bubble_snapshot and sender_name:
bubble_id = bubble_deduplicator.generate_bubble_id(bubble_region_tuple)
if bubble_id in bubble_deduplicator.recent_bubbles:
bubble_deduplicator.recent_bubbles[bubble_id]['sender'] = sender_name
bubble_deduplicator._save_storage()
# --- 更新發送者信息結束 ---
# --- CRITICAL: Break loop after successfully processing one trigger ---
print("--- Single bubble processing complete. Breaking scan cycle. ---")
break # Exit the 'for target_bubble_info in sorted_bubbles' loop
except Exception as q_err:
print(f"Error preparing or enqueueing data: {q_err}")
# 嘗試使用最小數據集合保證功能性
try:
minimal_data = {
'sender': "[數據處理錯誤]",
'text': handle_text_encoding(bubble_text[:100] if bubble_text else "[內容獲取失敗]"), # Apply encoding here too
'bubble_region': bubble_region,
'reply_context_activated': False, # Sensible default
'bubble_snapshot': bubble_snapshot, # Keep snapshot if available
'search_area': search_area
}
trigger_queue.put(minimal_data)
print("Minimal fallback data placed in Queue after error.")
except Exception as min_q_err:
print(f"Critical failure: Could not place any data in queue: {min_q_err}")
# Let's break here too, as something is wrong.
print("Breaking scan cycle due to queue error.")
break
# End of keyword found block (if result:)
# End of loop through sorted bubbles (for target_bubble_info...)
# If the loop finished without breaking (i.e., no trigger processed), wait the full interval.
# If it broke, the sleep still happens here before the next cycle.
# print("[DEBUG] UI Loop: Finished bubble iteration or broke early. Sleeping...") # DEBUG REMOVED
time.sleep(1.5) # Polling interval after checking all bubbles or processing one
except KeyboardInterrupt:
print("\nMonitoring interrupted.")
break
except Exception as e:
print(f"Unknown error in monitoring loop: {e}")
import traceback
traceback.print_exc()
# Attempt cleanup in case of unexpected error during interaction
print("Attempting cleanup after unexpected error...")
perform_state_cleanup(detector, interactor)
print("Waiting 3 seconds before retry...")
time.sleep(3)
# Note: The old monitor_chat_for_trigger function is replaced by the example_coordinator_loop.
# The actual UI monitoring thread started in main.py should call a function like this example loop.
# The main async loop in main.py will handle getting items from the queue and interacting with the LLM.
# if __name__ == '__main__':
# # This module is not meant to be run directly after refactoring.
# # Initialization and coordination happen in main.py.
# pass