feat: Add persistent message deduplication
- Fix duplicate message processing during unattended operation - Replace MessageDeduplication with persistent RobustMessageDeduplication - Add automatic cleanup of expired records (every 5 minutes) - Store deduplication state in JSON file to survive system restarts - Improve similarity detection (95% threshold) for near-duplicate messages
This commit is contained in:
parent
de7e2373fd
commit
f089634bd7
1
.gitignore
vendored
1
.gitignore
vendored
@ -11,3 +11,4 @@ backup/
|
||||
chroma_data/
|
||||
wolf_control.py
|
||||
remote_config.json
|
||||
wolf_chat_dedup.json
|
||||
42
main.py
42
main.py
@ -16,8 +16,8 @@ from mcp import ClientSession, StdioServerParameters, types
|
||||
# --- Keyboard Imports ---
|
||||
import threading
|
||||
import time
|
||||
# Import MessageDeduplication from ui_interaction
|
||||
from ui_interaction import MessageDeduplication
|
||||
# Import RobustMessageDeduplication and StateResetDetector from ui_interaction
|
||||
from ui_interaction import RobustMessageDeduplication, StateResetDetector
|
||||
try:
|
||||
import keyboard # Needs pip install keyboard
|
||||
except ImportError:
|
||||
@ -483,33 +483,45 @@ async def run_main_with_exit_stack():
|
||||
|
||||
# 5. Start UI Monitoring in a separate thread
|
||||
print("\n--- Starting UI monitoring thread ---")
|
||||
# 5c. Create MessageDeduplication instance
|
||||
deduplicator = MessageDeduplication(expiry_seconds=3600) # Default 1 hour
|
||||
|
||||
# Use the new monitoring loop function, passing both queues and the deduplicator
|
||||
# 5c. Initialize Robust Deduplication System
|
||||
def initialize_robust_deduplication():
|
||||
"""初始化強化版去重系統"""
|
||||
deduplicator_instance = RobustMessageDeduplication(
|
||||
storage_file="wolf_chat_dedup.json",
|
||||
expiry_seconds=3600 # 1小時過期
|
||||
)
|
||||
state_monitor_instance = StateResetDetector("wolf_chat_state_resets.log")
|
||||
return deduplicator_instance, state_monitor_instance
|
||||
|
||||
deduplicator, state_monitor = initialize_robust_deduplication()
|
||||
|
||||
# Use the new monitoring loop function, passing trigger_queue, command_queue, deduplicator, and state_monitor
|
||||
monitor_task = loop.create_task(
|
||||
asyncio.to_thread(ui_interaction.run_ui_monitoring_loop, trigger_queue, command_queue, deduplicator), # Pass command_queue and deduplicator
|
||||
name="ui_monitor"
|
||||
asyncio.to_thread(ui_interaction.run_ui_monitoring_loop_enhanced, trigger_queue, command_queue, deduplicator, state_monitor),
|
||||
name="ui_monitor_enhanced"
|
||||
)
|
||||
ui_monitor_task = monitor_task # Store task reference for shutdown
|
||||
# Note: UI task cancellation is handled in shutdown()
|
||||
|
||||
# 5b. Game Window Monitoring is now handled by Setup.py
|
||||
|
||||
# 5d. Start Periodic Cleanup Timer for Deduplicator
|
||||
def periodic_cleanup():
|
||||
# 5d. Start Periodic Cleanup and Stats Logging Timer for Deduplicator
|
||||
def periodic_robust_cleanup_and_stats():
|
||||
if not shutdown_requested: # Only run if not shutting down
|
||||
print("Main Thread: Running periodic deduplicator cleanup...")
|
||||
deduplicator.purge_expired()
|
||||
print("Main Thread: Running periodic robust deduplicator cleanup and stats logging...")
|
||||
deduplicator._cleanup_expired() # Call internal cleanup
|
||||
stats = deduplicator.get_stats()
|
||||
print(f"Main Thread - Dedup Stats: {stats['active_records']} active records (total: {stats['total_records']})")
|
||||
# Reschedule the timer
|
||||
cleanup_timer = threading.Timer(600, periodic_cleanup) # 10 minutes
|
||||
cleanup_timer = threading.Timer(600, periodic_robust_cleanup_and_stats) # 10 minutes
|
||||
cleanup_timer.daemon = True
|
||||
cleanup_timer.start()
|
||||
else:
|
||||
print("Main Thread: Shutdown requested, not rescheduling deduplicator cleanup.")
|
||||
print("Main Thread: Shutdown requested, not rescheduling robust deduplicator cleanup.")
|
||||
|
||||
print("\n--- Starting periodic deduplicator cleanup timer (10 min interval) ---")
|
||||
initial_cleanup_timer = threading.Timer(600, periodic_cleanup)
|
||||
print("\n--- Starting periodic robust deduplicator cleanup and stats timer (10 min interval) ---")
|
||||
initial_cleanup_timer = threading.Timer(600, periodic_robust_cleanup_and_stats)
|
||||
initial_cleanup_timer.daemon = True
|
||||
initial_cleanup_timer.start()
|
||||
# Note: This timer will run in a separate thread.
|
||||
|
||||
@ -21,78 +21,193 @@ import math # Added for distance calculation in dual method
|
||||
import time # Ensure time is imported for MessageDeduplication
|
||||
from simple_bubble_dedup import SimpleBubbleDeduplication
|
||||
import difflib # Added for text similarity
|
||||
import os # Already imported, but good to note for RobustMessageDeduplication
|
||||
import json # Already imported, but good to note for RobustMessageDeduplication
|
||||
|
||||
class MessageDeduplication:
|
||||
def __init__(self, expiry_seconds=3600): # 1 hour expiry time
|
||||
self.processed_messages = {} # {message_key: timestamp}
|
||||
# 替換現有的 MessageDeduplication 類
|
||||
class RobustMessageDeduplication:
|
||||
def __init__(self, storage_file="persistent_dedup.json", expiry_seconds=3600):
|
||||
self.storage_file = storage_file
|
||||
self.expiry_seconds = expiry_seconds
|
||||
self.processed_messages = {}
|
||||
self.last_save_time = 0
|
||||
self.save_interval = 10 # 每10秒保存一次
|
||||
|
||||
# 啟動時加載持久化數據
|
||||
self._load_from_storage()
|
||||
|
||||
# 清理過期記錄
|
||||
self._cleanup_expired()
|
||||
|
||||
print(f"RobustDeduplication initialized with {len(self.processed_messages)} existing records")
|
||||
|
||||
def _load_from_storage(self):
|
||||
"""從持久化文件加載去重記錄"""
|
||||
try:
|
||||
if os.path.exists(self.storage_file):
|
||||
with open(self.storage_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
self.processed_messages = data.get('messages', {})
|
||||
print(f"Loaded {len(self.processed_messages)} dedup records from storage")
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not load dedup storage: {e}")
|
||||
self.processed_messages = {}
|
||||
|
||||
def _save_to_storage(self, force=False):
|
||||
"""保存去重記錄到持久化文件"""
|
||||
current_time = time.time()
|
||||
if not force and (current_time - self.last_save_time) < self.save_interval:
|
||||
return
|
||||
|
||||
try:
|
||||
# 只保存未過期的記錄
|
||||
valid_records = {}
|
||||
for key, timestamp in self.processed_messages.items():
|
||||
if current_time - timestamp < self.expiry_seconds:
|
||||
valid_records[key] = timestamp
|
||||
|
||||
data = {
|
||||
'messages': valid_records,
|
||||
'last_updated': current_time
|
||||
}
|
||||
|
||||
with open(self.storage_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(data, f, indent=2)
|
||||
|
||||
self.processed_messages = valid_records
|
||||
self.last_save_time = current_time
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error saving dedup storage: {e}")
|
||||
|
||||
def _cleanup_expired(self):
|
||||
"""清理過期記錄"""
|
||||
current_time = time.time()
|
||||
before_count = len(self.processed_messages)
|
||||
|
||||
self.processed_messages = {
|
||||
key: timestamp for key, timestamp in self.processed_messages.items()
|
||||
if current_time - timestamp < self.expiry_seconds
|
||||
}
|
||||
|
||||
after_count = len(self.processed_messages)
|
||||
if before_count > after_count:
|
||||
print(f"Cleaned up {before_count - after_count} expired dedup records")
|
||||
|
||||
def _create_message_key(self, sender, content):
|
||||
"""創建標準化的消息鍵"""
|
||||
# 標準化處理
|
||||
clean_sender = sender.lower().strip() if sender else ""
|
||||
clean_content = ' '.join(content.strip().split()) if content else ""
|
||||
return f"{clean_sender}:{clean_content}"
|
||||
|
||||
def is_duplicate(self, sender, content):
|
||||
"""Check if the message is a duplicate within the expiry period using text similarity."""
|
||||
"""強化的重複檢查"""
|
||||
if not sender or not content:
|
||||
return False # Missing necessary info, treat as new message
|
||||
|
||||
current_time = time.time()
|
||||
|
||||
# 遍歷所有已處理的消息
|
||||
for key, timestamp in list(self.processed_messages.items()):
|
||||
# 檢查是否過期
|
||||
if current_time - timestamp >= self.expiry_seconds:
|
||||
# 從 processed_messages 中移除過期的項目,避免集合在迭代時改變大小
|
||||
# 但由於我們使用了 list(self.processed_messages.items()),所以這裡可以安全地 continue
|
||||
# 或者,如果希望立即刪除,則需要不同的迭代策略或在 purge_expired 中處理
|
||||
continue # 繼續檢查下一個,過期項目由 purge_expired 處理
|
||||
|
||||
# 解析之前儲存的發送者和內容
|
||||
stored_sender, stored_content = key.split(":", 1)
|
||||
|
||||
# 檢查發送者是否相同
|
||||
if sender.lower() == stored_sender.lower():
|
||||
# Calculate text similarity
|
||||
similarity = difflib.SequenceMatcher(None, content, stored_content).ratio()
|
||||
if similarity >= 0.95: # Use 0.95 as threshold
|
||||
print(f"Deduplicator: Detected similar message (similarity: {similarity:.2f}): {sender} - {content[:20]}...")
|
||||
return True
|
||||
|
||||
# 不是重複消息,儲存它
|
||||
# 注意:這裡儲存的 content 是原始 content,不是 clean_content
|
||||
message_key = f"{sender.lower()}:{content}"
|
||||
self.processed_messages[message_key] = current_time
|
||||
print("Deduplication: Missing sender or content, treating as new")
|
||||
return False
|
||||
|
||||
# create_key 方法已不再需要,可以移除
|
||||
# def create_key(self, sender, content):
|
||||
# """Create a standardized composite key."""
|
||||
# # Thoroughly standardize text - remove all whitespace and punctuation, lowercase
|
||||
# clean_content = ''.join(c.lower() for c in content if c.isalnum())
|
||||
# clean_sender = ''.join(c.lower() for c in sender if c.isalnum())
|
||||
|
||||
# # Truncate content to first 100 chars to prevent overly long keys
|
||||
# if len(clean_content) > 100:
|
||||
# clean_content = clean_content[:100]
|
||||
|
||||
# return f"{clean_sender}:{clean_content}"
|
||||
|
||||
def purge_expired(self):
|
||||
"""Remove expired message records."""
|
||||
current_time = time.time()
|
||||
expired_keys = [k for k, t in self.processed_messages.items()
|
||||
if current_time - t >= self.expiry_seconds]
|
||||
|
||||
for key in expired_keys:
|
||||
del self.processed_messages[key]
|
||||
# 定期清理(每5分鐘)
|
||||
if hasattr(self, '_last_cleanup'):
|
||||
if current_time - self._last_cleanup > 300:
|
||||
self._cleanup_expired()
|
||||
self._last_cleanup = current_time
|
||||
else:
|
||||
self._last_cleanup = current_time
|
||||
|
||||
if expired_keys: # Log only if something was purged
|
||||
print(f"Deduplicator: Purged {len(expired_keys)} expired message records.")
|
||||
return len(expired_keys)
|
||||
# 創建消息鍵
|
||||
message_key = self._create_message_key(sender, content)
|
||||
|
||||
# 精確匹配檢查
|
||||
if message_key in self.processed_messages:
|
||||
age = current_time - self.processed_messages[message_key]
|
||||
if age < self.expiry_seconds:
|
||||
print(f"DUPLICATE EXACT: {sender} - {content[:40]}... (age: {age:.1f}s)")
|
||||
return True
|
||||
else:
|
||||
# 過期了,移除
|
||||
del self.processed_messages[message_key]
|
||||
|
||||
# 相似性檢查(更嚴格)
|
||||
clean_content = ' '.join(content.strip().split())
|
||||
for existing_key, timestamp in list(self.processed_messages.items()):
|
||||
age = current_time - timestamp
|
||||
if age >= self.expiry_seconds:
|
||||
continue
|
||||
|
||||
try:
|
||||
stored_sender, stored_content = existing_key.split(":", 1)
|
||||
if sender.lower().strip() == stored_sender:
|
||||
# 計算相似度
|
||||
similarity = difflib.SequenceMatcher(None, clean_content, stored_content).ratio()
|
||||
if similarity >= 0.95: # 95%相似度
|
||||
print(f"DUPLICATE SIMILAR: {sender} - {content[:40]}... (similarity: {similarity:.3f}, age: {age:.1f}s)")
|
||||
return True
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# 記錄新消息
|
||||
self.processed_messages[message_key] = current_time
|
||||
print(f"NEW MESSAGE RECORDED: {sender} - {content[:40]}...")
|
||||
|
||||
# 異步保存(不阻塞主流程)
|
||||
self._save_to_storage()
|
||||
|
||||
return False
|
||||
|
||||
def clear_all(self):
|
||||
"""Clear all recorded messages (for F7/F8 functionality)."""
|
||||
count = len(self.processed_messages)
|
||||
"""清空所有記錄"""
|
||||
self.processed_messages.clear()
|
||||
if count > 0: # Log only if something was cleared
|
||||
print(f"Deduplicator: Cleared all {count} message records.")
|
||||
return count
|
||||
self._save_to_storage(force=True)
|
||||
print("All dedup records cleared and persisted")
|
||||
|
||||
def get_stats(self):
|
||||
"""獲取統計信息"""
|
||||
current_time = time.time()
|
||||
active_count = sum(1 for timestamp in self.processed_messages.values()
|
||||
if current_time - timestamp < self.expiry_seconds)
|
||||
|
||||
return {
|
||||
'total_records': len(self.processed_messages),
|
||||
'active_records': active_count,
|
||||
'oldest_record_age': min([current_time - t for t in self.processed_messages.values()]) if self.processed_messages else 0
|
||||
}
|
||||
|
||||
# 診斷工具:狀態重置檢測器
|
||||
class StateResetDetector:
|
||||
def __init__(self, log_file="state_resets.log"):
|
||||
self.log_file = log_file
|
||||
self.start_time = time.time()
|
||||
self.reset_count = 0
|
||||
|
||||
def log_reset(self, reset_type, context=""):
|
||||
"""記錄狀態重置事件"""
|
||||
self.reset_count += 1
|
||||
timestamp = time.time()
|
||||
|
||||
log_entry = f"{timestamp:.3f}: RESET #{self.reset_count} - {reset_type} - {context}\n"
|
||||
|
||||
try:
|
||||
with open(self.log_file, 'a', encoding='utf-8') as f:
|
||||
f.write(log_entry)
|
||||
except:
|
||||
pass
|
||||
|
||||
print(f"STATE RESET DETECTED: {reset_type} - {context}")
|
||||
|
||||
def check_object_identity(self, obj, obj_name):
|
||||
"""檢查對象是否被重新創建"""
|
||||
obj_id = id(obj)
|
||||
attr_name = f"_{obj_name}_last_id"
|
||||
|
||||
if hasattr(self, attr_name):
|
||||
last_id = getattr(self, attr_name)
|
||||
if last_id != obj_id:
|
||||
self.log_reset("OBJECT_RECREATED", f"{obj_name} object was recreated")
|
||||
|
||||
setattr(self, attr_name, obj_id)
|
||||
|
||||
# --- Global Pause Flag ---
|
||||
# Using a simple mutable object (list) for thread-safe-like access without explicit lock
|
||||
@ -1707,12 +1822,13 @@ def perform_state_cleanup(detector: DetectionModule, interactor: InteractionModu
|
||||
|
||||
|
||||
# --- UI Monitoring Loop Function (To be run in a separate thread) ---
|
||||
def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queue, deduplicator: 'MessageDeduplication'):
|
||||
def run_ui_monitoring_loop_enhanced(trigger_queue: queue.Queue, command_queue: queue.Queue, deduplicator: 'RobustMessageDeduplication', state_monitor: 'StateResetDetector'):
|
||||
"""
|
||||
Continuously monitors the UI, detects triggers, performs interactions,
|
||||
puts trigger data into trigger_queue, and processes commands from command_queue.
|
||||
Includes state monitoring and robust deduplication.
|
||||
"""
|
||||
print("\n--- Starting UI Monitoring Loop (Thread) ---")
|
||||
print("\n--- Starting Enhanced UI Monitoring Loop (Thread) ---")
|
||||
|
||||
# --- 初始化氣泡圖像去重系統(新增) ---
|
||||
bubble_deduplicator = SimpleBubbleDeduplication(
|
||||
@ -1792,8 +1908,18 @@ def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queu
|
||||
main_screen_click_counter = 0 # Counter for consecutive main screen clicks
|
||||
|
||||
loop_counter = 0 # Add loop counter for debugging
|
||||
|
||||
while True:
|
||||
loop_counter += 1
|
||||
|
||||
# 每100次循環檢查一次對象狀態
|
||||
if loop_counter % 100 == 0:
|
||||
state_monitor.check_object_identity(deduplicator, "deduplicator")
|
||||
|
||||
# 輸出統計信息
|
||||
stats = deduplicator.get_stats()
|
||||
print(f"Dedup Stats: {stats['active_records']} active records (total: {stats['total_records']})")
|
||||
|
||||
# print(f"\n--- UI Loop Iteration #{loop_counter} ---") # DEBUG REMOVED
|
||||
|
||||
# --- Process ALL Pending Commands First ---
|
||||
@ -2223,7 +2349,7 @@ def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queu
|
||||
# This is the new central point for deduplication and recent_texts logic
|
||||
if sender_name and bubble_text: # Ensure both are valid before deduplication
|
||||
if deduplicator.is_duplicate(sender_name, bubble_text):
|
||||
print(f"UI Thread: Skipping duplicate message via Deduplicator: {sender_name} - {bubble_text[:30]}...")
|
||||
print(f"UI Thread: Message blocked by robust deduplication: {sender_name} - {bubble_text[:30]}...")
|
||||
# Cleanup UI state as interaction might have occurred during sender_name retrieval
|
||||
perform_state_cleanup(detector, interactor)
|
||||
continue # Skip this bubble
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user