#!/usr/bin/env python # -*- coding: utf-8 -*- """ Wolf Chat Setup Utility A graphical configuration tool for Wolf Chat that manages: - API settings (OpenAI/compatible providers) - MCP server configurations (Exa, Chroma, custom servers) - Environment variables (.env) - Config file generation (config.py) """ import os import sys import json import tkinter as tk from tkinter import ttk, filedialog, messagebox, scrolledtext import configparser from pathlib import Path import re import shutil import time import signal import logging import subprocess import threading import datetime import schedule import psutil import random # Added for exponential backoff jitter import urllib3 # Added for SSL warning suppression try: import socketio HAS_SOCKETIO = True except ImportError: HAS_SOCKETIO = False # import ssl # ssl import might not be needed if socketio handles it or if not using wss directly in client setup # =============================================================== # Constants # =============================================================== VERSION = "1.0.0" CONFIG_TEMPLATE_PATH = "config_template.py" ENV_FILE_PATH = ".env" REMOTE_CONFIG_PATH = "remote_config.json" # New config file for remote settings # Use absolute path for chroma_data DEFAULT_CHROMA_DATA_PATH = os.path.abspath("chroma_data") DEFAULT_CONFIG_SECTION = """# ==================================================================== # Wolf Chat Configuration # Generated by setup.py - Edit with care # ==================================================================== """ # Get current Windows username for default paths CURRENT_USERNAME = os.getenv("USERNAME", "user") # Global variables for game/bot management game_process_instance = None bot_process_instance = None # This will replace/co-exist with self.running_process control_client_instance = None monitor_thread_instance = None # Renamed to avoid conflict if 'monitor_thread' is used elsewhere scheduler_thread_instance = None # Renamed keep_monitoring_flag = threading.Event() # Renamed for clarity keep_monitoring_flag.set() # Basic logging setup # logger = logging.getLogger("WolfChatSetup") # Defined later in class or globally if needed # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Setup logger instance. This can be configured further if needed. logger = logging.getLogger(__name__) if not logger.handlers: # Avoid adding multiple handlers if script is reloaded handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) # =============================================================== # Helper Functions # =============================================================== def load_remote_config(): """Load remote control and restart settings from remote_config.json""" defaults = { "REMOTE_SERVER_URL": "YOUR_URL_HERE", "REMOTE_CLIENT_KEY": "YOUR_KEY_HERE", # Placeholder "DEFAULT_GAME_RESTART_INTERVAL_MINUTES": 120, "DEFAULT_BOT_RESTART_INTERVAL_MINUTES": 120, "LINK_RESTART_TIMES": True, "GAME_PROCESS_NAME": "LastWar.exe", # Default game process name "BOT_SCRIPT_NAME": "main.py" # Default bot script name } if os.path.exists(REMOTE_CONFIG_PATH): try: with open(REMOTE_CONFIG_PATH, 'r', encoding='utf-8') as f: data = json.load(f) # Ensure all keys from defaults are present, adding them if missing for key, value in defaults.items(): data.setdefault(key, value) return data except json.JSONDecodeError: logger.error(f"Error decoding {REMOTE_CONFIG_PATH}. Using default remote settings.") return defaults.copy() # Return a copy to avoid modifying defaults except Exception as e: logger.error(f"Error loading {REMOTE_CONFIG_PATH}: {e}. Using default remote settings.") return defaults.copy() logger.info(f"{REMOTE_CONFIG_PATH} not found. Creating with default values.") save_remote_config(defaults.copy()) # Create the file if it doesn't exist return defaults.copy() def save_remote_config(remote_data): """Save remote control and restart settings to remote_config.json""" try: with open(REMOTE_CONFIG_PATH, 'w', encoding='utf-8') as f: json.dump(remote_data, f, indent=4) # Use indent for readability logger.info(f"Saved remote settings to {REMOTE_CONFIG_PATH}") except Exception as e: logger.error(f"Error saving {REMOTE_CONFIG_PATH}: {e}") def load_env_file(): """Load existing .env file if it exists""" env_data = {} env_path = Path(ENV_FILE_PATH) if env_path.exists(): with open(env_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) env_data[key.strip()] = value.strip().strip('"\'') return env_data def save_env_file(env_data): """Save environment variables to .env file""" with open(ENV_FILE_PATH, 'w', encoding='utf-8') as f: f.write("# Environment variables for Wolf Chat\n") f.write("# Generated by setup.py\n\n") for key, value in env_data.items(): if value: # Only write non-empty values f.write(f"{key}={value}\n") print(f"Saved environment variables to {ENV_FILE_PATH}") def load_current_config(): """Extract settings from existing config.py if it exists""" # 新增一個幫助函數來標準化路徑 def normalize_path(path): """Convert backslashes to forward slashes in paths""" if path: return path.replace("\\", "/") return path config_data = { "OPENAI_API_BASE_URL": "", "LLM_MODEL": "deepseek/deepseek-chat-v3-0324", "MCP_SERVERS": { "exa": { "enabled": True, "use_smithery": False, "server_path": normalize_path(f"C:/Users/{CURRENT_USERNAME}/AppData/Roaming/npm/exa-mcp-server") } }, "ENABLE_CHAT_LOGGING": True, "LOG_DIR": "chat_logs", "GAME_WINDOW_CONFIG": { "WINDOW_TITLE": "Last War-Survival Game", "ENABLE_SCHEDULED_RESTART": True, "RESTART_INTERVAL_MINUTES": 60, "GAME_EXECUTABLE_PATH": normalize_path(fr"C:\Users\{CURRENT_USERNAME}\AppData\Local\TheLastWar\Launch.exe"), "GAME_WINDOW_X": 50, "GAME_WINDOW_Y": 30, "GAME_WINDOW_WIDTH": 600, "GAME_WINDOW_HEIGHT": 1070, "MONITOR_INTERVAL_SECONDS": 5 } } if os.path.exists("config.py"): try: with open("config.py", 'r', encoding='utf-8') as f: config_content = f.read() # Extract OPENAI_API_BASE_URL api_url_match = re.search(r'OPENAI_API_BASE_URL\s*=\s*["\'](.+?)["\']', config_content) if api_url_match: config_data["OPENAI_API_BASE_URL"] = api_url_match.group(1) # Extract LLM_MODEL model_match = re.search(r'LLM_MODEL\s*=\s*["\'](.+?)["\']', config_content) if model_match: config_data["LLM_MODEL"] = model_match.group(1) # Extract logging settings chat_logging_match = re.search(r'ENABLE_CHAT_LOGGING\s*=\s*(True|False)', config_content) if chat_logging_match: config_data["ENABLE_CHAT_LOGGING"] = (chat_logging_match.group(1) == "True") log_dir_match = re.search(r'LOG_DIR\s*=\s*["\'](.+?)["\']', config_content) if log_dir_match: config_data["LOG_DIR"] = log_dir_match.group(1) # Extract game window settings window_title_match = re.search(r'WINDOW_TITLE\s*=\s*["\'](.+?)["\']', config_content) if window_title_match: config_data["GAME_WINDOW_CONFIG"]["WINDOW_TITLE"] = window_title_match.group(1) restart_match = re.search(r'ENABLE_SCHEDULED_RESTART\s*=\s*(True|False)', config_content) if restart_match: config_data["GAME_WINDOW_CONFIG"]["ENABLE_SCHEDULED_RESTART"] = (restart_match.group(1) == "True") interval_match = re.search(r'RESTART_INTERVAL_MINUTES\s*=\s*(\d+)', config_content) if interval_match: config_data["GAME_WINDOW_CONFIG"]["RESTART_INTERVAL_MINUTES"] = int(interval_match.group(1)) exec_path_match = re.search(r'GAME_EXECUTABLE_PATH\s*=\s*r"(.+?)"', config_content) if exec_path_match: config_data["GAME_WINDOW_CONFIG"]["GAME_EXECUTABLE_PATH"] = exec_path_match.group(1) x_match = re.search(r'GAME_WINDOW_X\s*=\s*(\d+)', config_content) if x_match: config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_X"] = int(x_match.group(1)) y_match = re.search(r'GAME_WINDOW_Y\s*=\s*(\d+)', config_content) if y_match: config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_Y"] = int(y_match.group(1)) width_match = re.search(r'GAME_WINDOW_WIDTH\s*=\s*(\d+)', config_content) if width_match: config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_WIDTH"] = int(width_match.group(1)) height_match = re.search(r'GAME_WINDOW_HEIGHT\s*=\s*(\d+)', config_content) if height_match: config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_HEIGHT"] = int(height_match.group(1)) monitor_interval_match = re.search(r'MONITOR_INTERVAL_SECONDS\s*=\s*(\d+)', config_content) if monitor_interval_match: config_data["GAME_WINDOW_CONFIG"]["MONITOR_INTERVAL_SECONDS"] = int(monitor_interval_match.group(1)) # Extract MCP_SERVERS (more complex parsing) try: servers_section = re.search(r'MCP_SERVERS\s*=\s*{(.+?)}(?=\n\n)', config_content, re.DOTALL) if servers_section: servers_text = servers_section.group(1) # Extract and parse each server definition server_blocks = re.findall(r'"([^"]+)":\s*{(.+?)}(?=,\s*"|,?\s*})', servers_text, re.DOTALL) for server_name, server_block in server_blocks: if server_name not in config_data["MCP_SERVERS"]: config_data["MCP_SERVERS"][server_name] = {"enabled": True} # Skip disabled servers (commented out) if server_block.strip().startswith("#"): config_data["MCP_SERVERS"][server_name]["enabled"] = False continue # Extract command command_match = re.search(r'"command":\s*"([^"]+)"', server_block) if command_match: config_data["MCP_SERVERS"][server_name]["command"] = command_match.group(1) # For Exa server if server_name == "exa": # Check if using Smithery if '"@smithery/cli@latest"' in server_block: config_data["MCP_SERVERS"]["exa"]["use_smithery"] = True else: config_data["MCP_SERVERS"]["exa"]["use_smithery"] = False # Extract server path for local server args_match = re.search(r'"args":\s*\[\s*"([^"]+)"', server_block) if args_match: config_data["MCP_SERVERS"]["exa"]["server_path"] = args_match.group(1) # For Chroma server if server_name == "chroma": # Extract data directory data_dir_match = re.search(r'"--data-dir",\s*"([^"]+)"', server_block) if data_dir_match: config_data["MCP_SERVERS"]["chroma"]["data_dir"] = data_dir_match.group(1) # For custom servers, store the raw configuration if server_name not in ["exa", "chroma"]: config_data["MCP_SERVERS"][server_name]["raw_config"] = server_block except Exception as e: print(f"Error parsing MCP_SERVERS section: {e}") import traceback traceback.print_exc() # Extract memory settings enable_preload_match = re.search(r'ENABLE_PRELOAD_PROFILES\s*=\s*(True|False)', config_content) if enable_preload_match: config_data["ENABLE_PRELOAD_PROFILES"] = (enable_preload_match.group(1) == "True") related_memories_match = re.search(r'PRELOAD_RELATED_MEMORIES\s*=\s*(\d+)', config_content) if related_memories_match: config_data["PRELOAD_RELATED_MEMORIES"] = int(related_memories_match.group(1)) profiles_collection_match = re.search(r'PROFILES_COLLECTION\s*=\s*["\'](.+?)["\']', config_content) if profiles_collection_match: config_data["PROFILES_COLLECTION"] = profiles_collection_match.group(1) conversations_collection_match = re.search(r'CONVERSATIONS_COLLECTION\s*=\s*["\'](.+?)["\']', config_content) if conversations_collection_match: config_data["CONVERSATIONS_COLLECTION"] = conversations_collection_match.group(1) bot_memory_collection_match = re.search(r'BOT_MEMORY_COLLECTION\s*=\s*["\'](.+?)["\']', config_content) if bot_memory_collection_match: config_data["BOT_MEMORY_COLLECTION"] = bot_memory_collection_match.group(1) # Extract memory management settings backup_hour_match = re.search(r'MEMORY_BACKUP_HOUR\s*=\s*(\d+)', config_content) if backup_hour_match: config_data["MEMORY_BACKUP_HOUR"] = int(backup_hour_match.group(1)) backup_minute_match = re.search(r'MEMORY_BACKUP_MINUTE\s*=\s*(\d+)', config_content) if backup_minute_match: config_data["MEMORY_BACKUP_MINUTE"] = int(backup_minute_match.group(1)) profile_model_match = re.search(r'MEMORY_PROFILE_MODEL\s*=\s*["\']?(.+?)["\']?\s*(?:#|$)', config_content) # Handle potential LLM_MODEL reference if profile_model_match: profile_model_val = profile_model_match.group(1).strip() if profile_model_val == "LLM_MODEL": # If it refers to LLM_MODEL, use the already parsed LLM_MODEL value config_data["MEMORY_PROFILE_MODEL"] = config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324") # Fallback if LLM_MODEL wasn't parsed else: config_data["MEMORY_PROFILE_MODEL"] = profile_model_val else: # Default to LLM_MODEL if not found config_data["MEMORY_PROFILE_MODEL"] = config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324") summary_model_match = re.search(r'MEMORY_SUMMARY_MODEL\s*=\s*["\'](.+?)["\']', config_content) if summary_model_match: config_data["MEMORY_SUMMARY_MODEL"] = summary_model_match.group(1) except Exception as e: print(f"Error reading config.py: {e}") import traceback traceback.print_exc() return config_data def generate_config_file(config_data, env_data): """Generate config.py file based on user settings""" # Create backup of existing config if it exists if os.path.exists("config.py"): backup_path = "config.py.bak" shutil.copy2("config.py", backup_path) print(f"Created backup of existing config at {backup_path}") def normalize_path(path): """Convert backslashes to forward slashes in paths""" if path: return path.replace("\\", "/") return path # Helper function to ensure absolute path def ensure_absolute_path(path): if not os.path.isabs(path): return os.path.abspath(path) return path with open("config.py", 'w', encoding='utf-8') as f: f.write(DEFAULT_CONFIG_SECTION) f.write("import os\n") f.write("import json\n") f.write("from dotenv import load_dotenv\n\n") f.write("# --- Load environment variables from .env file ---\n") f.write("load_dotenv()\n") f.write("print(\"Loaded environment variables from .env file.\")\n\n") # Write OpenAI API settings f.write("# =============================================================================\n") f.write("# OpenAI API Configuration / OpenAI-Compatible Provider Settings\n") f.write("# =============================================================================\n") f.write(f"OPENAI_API_BASE_URL = \"{config_data['OPENAI_API_BASE_URL']}\"\n") f.write("OPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\")\n") f.write(f"LLM_MODEL = \"{config_data['LLM_MODEL']}\"\n\n") # Write API Keys section f.write("# =============================================================================\n") f.write("# External API Keys (loaded from environment variables)\n") f.write("# =============================================================================\n") f.write("EXA_API_KEY = os.getenv(\"EXA_API_KEY\")\n\n") # Write Exa config utility f.write("# --- Exa Configuration ---\n") f.write("exa_config_dict = {\"exaApiKey\": EXA_API_KEY if EXA_API_KEY else \"YOUR_EXA_KEY_MISSING\"}\n") f.write("exa_config_arg_string = json.dumps(exa_config_dict)\n\n") # Write MCP Server Configuration f.write("# =============================================================================\n") f.write("# MCP Server Configuration\n") f.write("# =============================================================================\n") f.write("MCP_SERVERS = {\n") # Add configured servers for server_name, server_config in config_data["MCP_SERVERS"].items(): if not server_config.get("enabled", True): f.write(f" # \"{server_name}\": {{ # Disabled\n") continue f.write(f" \"{server_name}\": {{\n") # Handle Exa server special config if server_name == "exa": if server_config.get("use_smithery", False): # Smithery config f.write(" \"command\": \"cmd\",\n") f.write(" \"args\": [\n") f.write(" \"/c\",\n") f.write(" \"npx\",\n") f.write(" \"-y\",\n") f.write(" \"@smithery/cli@latest\",\n") f.write(" \"run\",\n") f.write(" \"exa\",\n") f.write(" \"--config\",\n") f.write(" exa_config_arg_string\n") f.write(" ],\n") else: # Local server config server_path = server_config.get("server_path", "exa-mcp-server") f.write(" \"command\": \"npx\",\n") f.write(" \"args\": [\n") f.write(f" \"{server_path}\",\n") f.write(" \"--tools=web_search,research_paper_search,twitter_search,company_research,crawling,competitor_finder\"\n") f.write(" ],\n") f.write(" \"env\": {\n") f.write(" \"EXA_API_KEY\": EXA_API_KEY\n") f.write(" }\n") # Handle Chroma server elif server_name == "chroma": # Ensure absolute path for chroma data directory data_dir = server_config.get("data_dir", DEFAULT_CHROMA_DATA_PATH) absolute_data_dir = ensure_absolute_path(data_dir) f.write(" \"command\": \"uvx\",\n") f.write(" \"args\": [\n") f.write(" \"chroma-mcp\",\n") f.write(" \"--client-type\",\n") f.write(" \"persistent\",\n") f.write(" \"--data-dir\",\n") # Escape backslashes in the path for the string literal in config.py escaped_data_dir = absolute_data_dir.replace('\\', '\\\\') f.write(f" \"{escaped_data_dir}\"\n") f.write(" ]\n") # Handle custom server - just write as raw JSON elif server_name == "custom" and "raw_config" in server_config: f.write(server_config["raw_config"]) f.write(" },\n") f.write("}\n\n") # Write remaining configuration sections f.write("# =============================================================================\n") f.write("# MCP Client Configuration\n") f.write("# =============================================================================\n") f.write("MCP_CONFIRM_TOOL_EXECUTION = False # True: Confirm before execution, False: Execute automatically\n\n") f.write("# =============================================================================\n") f.write("# Chat Logging Configuration\n") f.write("# =============================================================================\n") f.write(f"ENABLE_CHAT_LOGGING = {str(config_data['ENABLE_CHAT_LOGGING'])}\n") f.write(f"LOG_DIR = \"{config_data['LOG_DIR']}\"\n\n") f.write("# =============================================================================\n") f.write("# Persona Configuration\n") f.write("# =============================================================================\n") f.write("PERSONA_NAME = \"Wolfhart\"\n\n") f.write("# =============================================================================\n") f.write("# Game Window Configuration\n") f.write("# =============================================================================\n") game_config = config_data["GAME_WINDOW_CONFIG"] f.write(f"WINDOW_TITLE = \"{game_config['WINDOW_TITLE']}\"\n") f.write(f"ENABLE_SCHEDULED_RESTART = {str(game_config['ENABLE_SCHEDULED_RESTART'])}\n") f.write(f"RESTART_INTERVAL_MINUTES = {game_config['RESTART_INTERVAL_MINUTES']}\n") f.write(f"GAME_EXECUTABLE_PATH = r\"{game_config['GAME_EXECUTABLE_PATH']}\"\n") f.write(f"GAME_WINDOW_X = {game_config['GAME_WINDOW_X']}\n") f.write(f"GAME_WINDOW_Y = {game_config['GAME_WINDOW_Y']}\n") f.write(f"GAME_WINDOW_WIDTH = {game_config['GAME_WINDOW_WIDTH']}\n") f.write(f"GAME_WINDOW_HEIGHT = {game_config['GAME_WINDOW_HEIGHT']}\n") f.write(f"MONITOR_INTERVAL_SECONDS = {game_config['MONITOR_INTERVAL_SECONDS']}\n\n") # --- Add explicit print before writing Chroma section --- print("DEBUG: Writing ChromaDB Memory Configuration section...") # --- End explicit print --- # Write ChromaDB Memory Configuration f.write("# =============================================================================\n") f.write("# ChromaDB Memory Configuration\n") f.write("# =============================================================================\n") # Ensure boolean is written correctly as True/False, not string 'True'/'False' enable_preload = config_data.get('ENABLE_PRELOAD_PROFILES', True) # Default to True if key missing f.write(f"ENABLE_PRELOAD_PROFILES = {str(enable_preload)}\n") # Writes True or False literal preload_memories = config_data.get('PRELOAD_RELATED_MEMORIES', 2) # Default to 2 f.write(f"PRELOAD_RELATED_MEMORIES = {preload_memories}\n\n") f.write("# Collection Names (used for both local access and MCP tool calls)\n") profiles_col = config_data.get('PROFILES_COLLECTION', 'user_profiles') f.write(f"PROFILES_COLLECTION = \"{profiles_col}\"\n") conversations_col = config_data.get('CONVERSATIONS_COLLECTION', 'conversations') f.write(f"CONVERSATIONS_COLLECTION = \"{conversations_col}\"\n") bot_memory_col = config_data.get('BOT_MEMORY_COLLECTION', 'wolfhart_memory') f.write(f"BOT_MEMORY_COLLECTION = \"{bot_memory_col}\"\n\n") f.write("# Ensure Chroma path is consistent for both direct access and MCP\n") # Get the path set in the UI (or default) # Use .get() chain with defaults for safety chroma_data_dir_ui = config_data.get("MCP_SERVERS", {}).get("chroma", {}).get("data_dir", DEFAULT_CHROMA_DATA_PATH) # Normalize path for writing into the config file string (use forward slashes) normalized_chroma_path = normalize_path(chroma_data_dir_ui) f.write(f"# This path will be made absolute when config.py is loaded.\n") # Write the potentially relative path from UI/default, let config.py handle abspath # Use raw string r"..." to handle potential backslashes in Windows paths correctly within the string literal f.write(f"CHROMA_DATA_DIR = os.path.abspath(r\"{normalized_chroma_path}\")\n\n") # Write Memory Management Configuration f.write("# =============================================================================\n") f.write("# Memory Management Configuration\n") f.write("# =============================================================================\n") backup_hour = config_data.get('MEMORY_BACKUP_HOUR', 0) backup_minute = config_data.get('MEMORY_BACKUP_MINUTE', 0) profile_model = config_data.get('MEMORY_PROFILE_MODEL', 'LLM_MODEL') # Default to referencing LLM_MODEL summary_model = config_data.get('MEMORY_SUMMARY_MODEL', 'mistral-7b-instruct') f.write(f"MEMORY_BACKUP_HOUR = {backup_hour}\n") f.write(f"MEMORY_BACKUP_MINUTE = {backup_minute}\n") # Write profile model, potentially referencing LLM_MODEL if profile_model == config_data.get('LLM_MODEL'): f.write(f"MEMORY_PROFILE_MODEL = LLM_MODEL # Default to main LLM model\n") else: f.write(f"MEMORY_PROFILE_MODEL = \"{profile_model}\"\n") f.write(f"MEMORY_SUMMARY_MODEL = \"{summary_model}\"\n") print("Generated config.py file successfully") # =============================================================== # Main Application # =============================================================== class WolfChatSetup(tk.Tk): def __init__(self): super().__init__() self.title(f"Wolf Chat Setup v{VERSION}") self.geometry("800x600") self.minsize(750, 550) # Load existing data self.env_data = load_env_file() self.config_data = load_current_config() self.remote_data = load_remote_config() # Load new remote config # Create the notebook for tabs self.notebook = ttk.Notebook(self) self.notebook.pack(expand=True, fill=tk.BOTH, padx=10, pady=10) # Create tabs self.create_api_tab() self.create_mcp_tab() self.create_game_tab() self.create_memory_tab() self.create_memory_management_tab() # 新增記憶管理標籤頁 self.create_management_tab() # New tab for combined management # Create bottom buttons self.create_bottom_buttons() # Initialize running process tracker (will be managed by new system) self.running_process = None # This might be replaced by bot_process_instance # Initialize new process management variables self.bot_process_instance = None self.game_process_instance = None self.control_client_instance = None self.monitor_thread_instance = None self.scheduler_thread_instance = None self.keep_monitoring_flag = threading.Event() self.keep_monitoring_flag.set() # Initialize scheduler process tracker self.scheduler_process = None # Set initial states based on loaded data self.update_ui_from_data() self.update_scheduler_button_states(True) # Set initial scheduler button state def create_management_tab(self): """Create the Bot and Game Management tab""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="Management") main_frame = ttk.Frame(tab, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) header = ttk.Label(main_frame, text="Bot & Game Management", font=("", 12, "bold")) header.pack(anchor=tk.W, pady=(0, 10)) # --- Remote Control Settings --- remote_frame = ttk.LabelFrame(main_frame, text="Remote Control Settings") remote_frame.pack(fill=tk.X, pady=10) # Remote Server URL remote_url_frame = ttk.Frame(remote_frame) remote_url_frame.pack(fill=tk.X, pady=5, padx=10) remote_url_label = ttk.Label(remote_url_frame, text="Server URL:", width=15) remote_url_label.pack(side=tk.LEFT) self.remote_url_var = tk.StringVar(value=self.remote_data.get("REMOTE_SERVER_URL", "")) remote_url_entry = ttk.Entry(remote_url_frame, textvariable=self.remote_url_var) remote_url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Remote Client Key remote_key_frame = ttk.Frame(remote_frame) remote_key_frame.pack(fill=tk.X, pady=5, padx=10) remote_key_label = ttk.Label(remote_key_frame, text="Client Key:", width=15) remote_key_label.pack(side=tk.LEFT) self.remote_key_var = tk.StringVar(value=self.remote_data.get("REMOTE_CLIENT_KEY", "")) remote_key_entry = ttk.Entry(remote_key_frame, textvariable=self.remote_key_var, show="*") remote_key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) self.show_remote_key_var = tk.BooleanVar(value=False) show_remote_key_cb = ttk.Checkbutton(remote_key_frame, text="Show", variable=self.show_remote_key_var, command=lambda: self.toggle_field_visibility(remote_key_entry, self.show_remote_key_var)) show_remote_key_cb.pack(side=tk.LEFT, padx=(5,0)) # --- Restart Settings --- restart_settings_frame = ttk.LabelFrame(main_frame, text="Restart Settings") restart_settings_frame.pack(fill=tk.X, pady=10) # Game Restart Interval game_interval_frame = ttk.Frame(restart_settings_frame) game_interval_frame.pack(fill=tk.X, pady=5, padx=10) game_interval_label = ttk.Label(game_interval_frame, text="Game Restart Interval (min):", width=25) game_interval_label.pack(side=tk.LEFT) self.game_restart_interval_var = tk.IntVar(value=self.remote_data.get("DEFAULT_GAME_RESTART_INTERVAL_MINUTES", 120)) game_interval_spinbox = ttk.Spinbox(game_interval_frame, from_=0, to=1440, width=7, textvariable=self.game_restart_interval_var) game_interval_spinbox.pack(side=tk.LEFT) game_interval_info = ttk.Label(game_interval_frame, text="(0 to disable)") game_interval_info.pack(side=tk.LEFT, padx=(5,0)) # Bot Restart Interval bot_interval_frame = ttk.Frame(restart_settings_frame) bot_interval_frame.pack(fill=tk.X, pady=5, padx=10) bot_interval_label = ttk.Label(bot_interval_frame, text="Bot Restart Interval (min):", width=25) bot_interval_label.pack(side=tk.LEFT) self.bot_restart_interval_var = tk.IntVar(value=self.remote_data.get("DEFAULT_BOT_RESTART_INTERVAL_MINUTES", 120)) bot_interval_spinbox = ttk.Spinbox(bot_interval_frame, from_=0, to=1440, width=7, textvariable=self.bot_restart_interval_var) bot_interval_spinbox.pack(side=tk.LEFT) bot_interval_info = ttk.Label(bot_interval_frame, text="(0 to disable)") bot_interval_info.pack(side=tk.LEFT, padx=(5,0)) # Link Restart Times link_restarts_frame = ttk.Frame(restart_settings_frame) link_restarts_frame.pack(fill=tk.X, pady=5, padx=10) self.link_restarts_var = tk.BooleanVar(value=self.remote_data.get("LINK_RESTART_TIMES", True)) link_restarts_cb = ttk.Checkbutton(link_restarts_frame, text="Link Game and Bot restart times (use Game interval if linked)", variable=self.link_restarts_var) link_restarts_cb.pack(anchor=tk.W) # Game Process Name game_proc_name_frame = ttk.Frame(restart_settings_frame) game_proc_name_frame.pack(fill=tk.X, pady=5, padx=10) game_proc_name_label = ttk.Label(game_proc_name_frame, text="Game Process Name:", width=25) game_proc_name_label.pack(side=tk.LEFT) self.game_process_name_var = tk.StringVar(value=self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe")) game_proc_name_entry = ttk.Entry(game_proc_name_frame, textvariable=self.game_process_name_var) game_proc_name_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # --- Control Buttons --- control_buttons_frame = ttk.Frame(main_frame) control_buttons_frame.pack(fill=tk.X, pady=20) self.start_managed_button = ttk.Button(control_buttons_frame, text="Start Managed Bot & Game", command=self.start_managed_session) self.start_managed_button.pack(side=tk.LEFT, padx=5) self.stop_managed_button = ttk.Button(control_buttons_frame, text="Stop Managed Session", command=self.stop_managed_session, state=tk.DISABLED) self.stop_managed_button.pack(side=tk.LEFT, padx=5) # Status Area (Optional, for displaying logs or status messages) status_label = ttk.Label(main_frame, text="Status messages will appear in the console.") status_label.pack(pady=10) def start_managed_session(self): logger.info("Attempting to start managed session...") # This will be the new main function to start bot, game, and monitoring # Ensure previous session is stopped if any if self.bot_process_instance or self.game_process_instance or self.monitor_thread_instance: messagebox.showwarning("Session Active", "A managed session might already be active. Please stop it first or check console.") # self.stop_managed_session() # Optionally force stop # time.sleep(1) # Give time to stop # return # Save current settings before starting self.save_settings(show_success_message=False) # Save without showing popup, or make it optional self.keep_monitoring_flag.set() # Ensure monitoring is enabled # Start Game if not self._start_game_managed(): messagebox.showerror("Error", "Failed to start the game.") self.update_management_buttons_state(True) # Enable start, disable stop return time.sleep(5) # Give game some time to initialize # Start Bot (main.py) if not self._start_bot_managed(): messagebox.showerror("Error", "Failed to start the bot (main.py).") self._stop_game_managed() # Stop game if bot fails to start self.update_management_buttons_state(True) return # Start Control Client if HAS_SOCKETIO: self._start_control_client() else: logger.warning("socketio library not found. Remote control will be disabled.") messagebox.showwarning("Socket.IO Missing", "The 'python-socketio[client]' library is not installed. Remote control features will be disabled. Please install it via 'pip install \"python-socketio[client]\"' or use the 'Install Dependencies' button.") # Start Monitoring Thread self._start_monitoring_thread() # Start Scheduler Thread self._start_scheduler_thread() self.update_management_buttons_state(False) # Disable start, enable stop # messagebox.showinfo("Session Started", "Managed bot and game session started. Check console for logs.") # Removed popup logger.info("Managed bot and game session started. Check console for logs.") # Log instead of popup def stop_managed_session(self): logger.info("Attempting to stop managed session...") self.keep_monitoring_flag.clear() # Signal threads to stop if self.control_client_instance: self._stop_control_client() if self.scheduler_thread_instance and self.scheduler_thread_instance.is_alive(): logger.info("Waiting for scheduler thread to stop...") self.scheduler_thread_instance.join(timeout=5) if self.scheduler_thread_instance.is_alive(): logger.warning("Scheduler thread did not stop in time.") self.scheduler_thread_instance = None schedule.clear() if self.monitor_thread_instance and self.monitor_thread_instance.is_alive(): logger.info("Waiting for monitor thread to stop...") self.monitor_thread_instance.join(timeout=5) if self.monitor_thread_instance.is_alive(): logger.warning("Monitor thread did not stop in time.") self.monitor_thread_instance = None self._stop_bot_managed() self._stop_game_managed() # Reset process instances self.bot_process_instance = None self.game_process_instance = None self.update_management_buttons_state(True) # Enable start, disable stop messagebox.showinfo("Session Stopped", "Managed bot and game session stopped.") def update_management_buttons_state(self, enable_start): if hasattr(self, 'start_managed_button'): self.start_managed_button.config(state=tk.NORMAL if enable_start else tk.DISABLED) if hasattr(self, 'stop_managed_button'): self.stop_managed_button.config(state=tk.DISABLED if enable_start else tk.NORMAL) # Placeholder for game/bot start/stop/check methods to be integrated # These will be adapted from wolf_control.py and use self.config_data and self.remote_data def _find_process_by_name(self, process_name): """Find a process by name using psutil.""" for proc in psutil.process_iter(['pid', 'name']): try: if proc.info['name'].lower() == process_name.lower(): return proc except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass return None def _is_game_running_managed(self): game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe") if self.game_process_instance and self.game_process_instance.poll() is None: # Check if the process name matches, in case Popen object is stale but a process with same PID exists try: p = psutil.Process(self.game_process_instance.pid) if p.name().lower() == game_process_name.lower(): return True except psutil.NoSuchProcess: self.game_process_instance = None # Stale process object return False # Popen object is stale and process is gone # Fallback to checking by name if self.game_process_instance is None or points to a dead/wrong process return self._find_process_by_name(game_process_name) is not None def _start_game_managed(self): global game_process_instance game_exe_path = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_EXECUTABLE_PATH") game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe") if not game_exe_path: logger.error("Game executable path not configured.") messagebox.showerror("Config Error", "Game executable path is not set in Game Settings.") return False if self._is_game_running_managed(): logger.info(f"Game ({game_process_name}) is already running.") # Try to get a Popen object if we don't have one if not self.game_process_instance: existing_proc = self._find_process_by_name(game_process_name) if existing_proc: # We can't directly create a Popen object for an existing process this way easily. # For now, we'll just acknowledge it's running. # For full control, it's best if this script starts it. logger.info(f"Found existing game process PID: {existing_proc.pid}. Monitoring without direct Popen control.") return True try: logger.info(f"Starting game: {game_exe_path}") # Use shell=False and pass arguments as a list if possible, but for .exe, shell=True is often more reliable on Windows # For better process control, avoid shell=True if not strictly necessary. # However, if GAME_EXE_PATH can contain spaces or needs shell interpretation, shell=True might be needed. # For now, let's assume GAME_EXE_PATH is a direct path to an executable. self.game_process_instance = subprocess.Popen(game_exe_path, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) game_process_instance = self.game_process_instance # Update global if used by other parts from wolf_control # Wait a bit for the process to appear in psutil time.sleep(2) if self._is_game_running_managed(): logger.info(f"Game ({game_process_name}) started successfully with PID {self.game_process_instance.pid}.") return True else: logger.warning(f"Game ({game_process_name}) did not appear to start correctly after Popen call.") self.game_process_instance = None # Clear if it failed game_process_instance = None return False except Exception as e: logger.exception(f"Error starting game: {e}") self.game_process_instance = None game_process_instance = None return False def _stop_game_managed(self): global game_process_instance game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe") stopped = False if self.game_process_instance and self.game_process_instance.poll() is None: logger.info(f"Stopping game process (PID: {self.game_process_instance.pid}) started by this manager...") try: self.game_process_instance.terminate() self.game_process_instance.wait(timeout=5) # Wait for termination logger.info("Game process terminated.") stopped = True except subprocess.TimeoutExpired: logger.warning("Game process did not terminate in time, killing...") self.game_process_instance.kill() self.game_process_instance.wait(timeout=5) logger.info("Game process killed.") stopped = True except Exception as e: logger.error(f"Error terminating/killing own game process: {e}") self.game_process_instance = None game_process_instance = None # If not stopped or no instance, try to find and kill by name if not stopped: proc_to_kill = self._find_process_by_name(game_process_name) if proc_to_kill: logger.info(f"Found game process '{game_process_name}' (PID: {proc_to_kill.pid}). Attempting to terminate...") try: proc_to_kill.terminate() proc_to_kill.wait(timeout=5) # psutil's wait logger.info(f"Game process '{game_process_name}' terminated.") stopped = True except psutil.TimeoutExpired: logger.warning(f"Game process '{game_process_name}' did not terminate, killing...") proc_to_kill.kill() proc_to_kill.wait(timeout=5) logger.info(f"Game process '{game_process_name}' killed.") stopped = True except Exception as e: logger.error(f"Error terminating/killing game process by name '{game_process_name}': {e}") else: logger.info(f"Game process '{game_process_name}' not found running.") stopped = True # Considered stopped if not found if self.game_process_instance: # Clear Popen object if it exists self.game_process_instance = None game_process_instance = None return stopped def _is_bot_running_managed(self): bot_script_name = self.remote_data.get("BOT_SCRIPT_NAME", "main.py") if self.bot_process_instance and self.bot_process_instance.poll() is None: # Verify it's the correct script, in case of PID reuse try: p = psutil.Process(self.bot_process_instance.pid) if sys.executable in p.cmdline() and any(bot_script_name in arg for arg in p.cmdline()): return True except psutil.NoSuchProcess: self.bot_process_instance = None # Stale process object return False # Fallback: Check for any python process running the bot script for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: cmdline = proc.cmdline() if cmdline and sys.executable in cmdline[0] and any(bot_script_name in arg for arg in cmdline): # If we find one, and don't have an instance, we can't control it directly with Popen # but we know it's running. if not self.bot_process_instance: logger.info(f"Found external bot process (PID: {proc.pid}). Monitoring without direct Popen control.") return True except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, IndexError): continue # Ignore processes that died or we can't access, or have empty cmdline return False def _start_bot_managed(self): global bot_process_instance # For compatibility if other parts use global bot_script_name = self.remote_data.get("BOT_SCRIPT_NAME", "main.py") if not os.path.exists(bot_script_name): messagebox.showerror("Error", f"Could not find bot script: {bot_script_name}") return False if self._is_bot_running_managed(): logger.info(f"Bot ({bot_script_name}) is already running.") return True # Or handle acquiring Popen object if possible (complex) try: logger.info(f"Starting bot: {sys.executable} {bot_script_name}") # Ensure CWD is script's directory if main.py relies on relative paths script_dir = os.path.dirname(os.path.abspath(__file__)) self.bot_process_instance = subprocess.Popen( [sys.executable, bot_script_name], cwd=script_dir, # Run main.py from its directory stdout=subprocess.PIPE, # Capture output stderr=subprocess.STDOUT, # Redirect stderr to stdout text=True, bufsize=1 # Line buffered ) bot_process_instance = self.bot_process_instance # Update global # Start a thread to log bot's output threading.Thread(target=self._log_subprocess_output, args=(self.bot_process_instance, "Bot"), daemon=True).start() logger.info(f"Bot ({bot_script_name}) started successfully with PID {self.bot_process_instance.pid}.") return True except Exception as e: logger.exception(f"Error starting bot: {e}") self.bot_process_instance = None bot_process_instance = None return False def _log_subprocess_output(self, process, name): """Reads and logs output from a subprocess.""" if not process or not process.stdout: logger.error(f"No process or stdout to log for {name}.") return logger.info(f"Started logging output for {name} (PID: {process.pid}).") try: for line in iter(process.stdout.readline, ''): if line: logger.info(f"[{name}] {line.strip()}") if process.poll() is not None and not line: # Process ended and no more output break process.stdout.close() except Exception as e: logger.error(f"Error logging output for {name}: {e}") finally: return_code = process.wait() logger.info(f"{name} process (PID: {process.pid}) exited with code {return_code}.") def _stop_bot_managed(self): global bot_process_instance bot_script_name = self.remote_data.get("BOT_SCRIPT_NAME", "main.py") stopped = False if self.bot_process_instance and self.bot_process_instance.poll() is None: logger.info(f"Stopping bot process (PID: {self.bot_process_instance.pid}) started by this manager...") try: self.bot_process_instance.terminate() self.bot_process_instance.wait(timeout=5) logger.info("Bot process terminated.") stopped = True except subprocess.TimeoutExpired: logger.warning("Bot process did not terminate in time, killing...") self.bot_process_instance.kill() self.bot_process_instance.wait(timeout=5) logger.info("Bot process killed.") stopped = True except Exception as e: logger.error(f"Error terminating/killing own bot process: {e}") self.bot_process_instance = None bot_process_instance = None # Fallback: find and kill any python process running the bot script if not stopped: for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: cmdline = proc.cmdline() if cmdline and sys.executable in cmdline[0] and any(bot_script_name in arg for arg in cmdline): logger.info(f"Found bot process '{bot_script_name}' (PID: {proc.pid}). Attempting to terminate...") proc.terminate() proc.wait(timeout=5) logger.info(f"Bot process '{bot_script_name}' terminated.") stopped = True break # Assume only one instance for now except psutil.TimeoutExpired: logger.warning(f"Bot process '{bot_script_name}' (PID: {proc.pid}) did not terminate, killing...") proc.kill() proc.wait(timeout=5) logger.info(f"Bot process '{bot_script_name}' killed.") stopped = True break except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, IndexError): continue if not stopped: # If no Popen instance and no external process found logger.info(f"Bot process '{bot_script_name}' not found running.") stopped = True if self.bot_process_instance: # Clear Popen object if it exists self.bot_process_instance = None bot_process_instance = None return stopped def _restart_game_managed(self): logger.info("Restarting game (managed)...") self._stop_game_managed() time.sleep(2) # Give it time to fully stop return self._start_game_managed() def _restart_bot_managed(self): logger.info("Restarting bot (managed)...") self._stop_bot_managed() time.sleep(2) # Give it time to fully stop return self._start_bot_managed() def _restart_all_managed(self): logger.info("Performing full restart (bot and game)...") self._stop_bot_managed() self._stop_game_managed() time.sleep(3) game_started = self._start_game_managed() if game_started: time.sleep(10) # Wait for game to initialize bot_started = self._start_bot_managed() if not bot_started: logger.error("Failed to restart bot after restarting game.") return False else: logger.error("Failed to restart game during full restart.") # Optionally try to start bot anyway or declare full failure # self._start_bot_managed() return False logger.info("Full restart completed.") # Update last restart time if tracking it # self.last_restart_time = datetime.datetime.now() return True def _start_monitoring_thread(self): if self.monitor_thread_instance and self.monitor_thread_instance.is_alive(): logger.info("Monitor thread already running.") return self.monitor_thread_instance = threading.Thread(target=self._monitoring_loop, daemon=True) self.monitor_thread_instance.start() logger.info("Started monitoring thread.") def _monitoring_loop(self): logger.info("Monitoring loop started.") while self.keep_monitoring_flag.is_set(): try: # Check game if not self._is_game_running_managed(): if self.game_process_instance is None : # Only restart if we are supposed to manage it or it was started by us and died logger.warning("Managed game process not found. Attempting to restart game...") self._start_game_managed() # Or _restart_game_managed() # Check bot if not self._is_bot_running_managed(): if self.bot_process_instance is None: # Only restart if we are supposed to manage it or it was started by us and died logger.warning("Managed bot process not found. Attempting to restart bot...") self._start_bot_managed() # Or _restart_bot_managed() # Check for remote commands (if control_client_instance is set up) if self.control_client_instance and hasattr(self.control_client_instance, 'check_signals'): self.control_client_instance.check_signals(self) # Pass self (WolfChatSetup instance) time.sleep(self.config_data.get("GAME_WINDOW_CONFIG", {}).get("MONITOR_INTERVAL_SECONDS", 5)) except Exception as e: logger.exception(f"Error in monitoring loop: {e}") time.sleep(10) # Wait longer after an error logger.info("Monitoring loop stopped.") def _start_scheduler_thread(self): if self.scheduler_thread_instance and self.scheduler_thread_instance.is_alive(): logger.info("Scheduler thread already running.") return self._setup_scheduled_restarts() # Setup jobs based on current config self.scheduler_thread_instance = threading.Thread(target=self._run_scheduler, daemon=True) self.scheduler_thread_instance.start() logger.info("Started scheduler thread.") def _run_scheduler(self): logger.info("Scheduler loop started.") while self.keep_monitoring_flag.is_set(): # Use same flag as monitor schedule.run_pending() time.sleep(1) logger.info("Scheduler loop stopped.") def _setup_scheduled_restarts(self): schedule.clear() # Clear previous jobs link_restarts = self.remote_data.get("LINK_RESTART_TIMES", True) game_interval = self.remote_data.get("DEFAULT_GAME_RESTART_INTERVAL_MINUTES", 0) bot_interval = self.remote_data.get("DEFAULT_BOT_RESTART_INTERVAL_MINUTES", 0) if link_restarts and game_interval > 0: logger.info(f"Scheduling linked restart (game & bot) every {game_interval} minutes.") schedule.every(game_interval).minutes.do(self._restart_all_managed) else: if game_interval > 0: logger.info(f"Scheduling game restart every {game_interval} minutes.") schedule.every(game_interval).minutes.do(self._restart_game_managed) if bot_interval > 0: logger.info(f"Scheduling bot restart every {bot_interval} minutes.") schedule.every(bot_interval).minutes.do(self._restart_bot_managed) if not schedule.jobs: logger.info("No scheduled restarts configured.") def _start_control_client(self): if not HAS_SOCKETIO: logger.warning("Cannot start ControlClient: python-socketio is not installed.") return if self.control_client_instance and self.control_client_instance.is_connected(): # is_connected or similar check logger.info("Control client already connected.") return server_url = self.remote_data.get("REMOTE_SERVER_URL") client_key = self.remote_data.get("REMOTE_CLIENT_KEY") if not server_url or not client_key: logger.warning("Remote server URL or client key not configured. Cannot start control client.") messagebox.showwarning("Remote Config Missing", "Remote Server URL or Client Key is not set in Management tab.") return self.control_client_instance = ControlClient(server_url, client_key, wolf_chat_setup_instance=self) # Pass self # The ControlClient should handle its own connection thread. # self.control_client_instance.start_thread() or similar method if self.control_client_instance.run_in_thread(): # Assuming run_in_thread starts the connection attempt logger.info("Control client thread started.") else: logger.error("Failed to start control client thread.") self.control_client_instance = None def _stop_control_client(self): if self.control_client_instance: logger.info("Stopping control client...") self.control_client_instance.stop() # This should handle thread shutdown self.control_client_instance = None logger.info("Control client stopped.") def on_closing(self): """Handle window close event.""" if messagebox.askokcancel("Quit", "Do you want to quit Wolf Chat Setup? This will stop any managed sessions and running scripts."): print("Closing Setup...") self.stop_managed_session() # Stop bot/game managed session if running self.stop_process() # Stop bot/test script if running independently self.stop_memory_scheduler() # Stop scheduler if running self.destroy() def create_api_tab(self): """Create the API Settings tab""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="API Settings") # Main frame with padding main_frame = ttk.Frame(tab, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # Header header = ttk.Label(main_frame, text="OpenAI API Settings", font=("", 12, "bold")) header.pack(anchor=tk.W, pady=(0, 10)) # API Base URL url_frame = ttk.Frame(main_frame) url_frame.pack(fill=tk.X, pady=5) url_label = ttk.Label(url_frame, text="API Base URL:", width=15) url_label.pack(side=tk.LEFT, padx=(0, 5)) self.api_url_var = tk.StringVar() self.api_url_entry = ttk.Entry(url_frame, textvariable=self.api_url_var) self.api_url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Predefined endpoints url_presets_frame = ttk.Frame(main_frame) url_presets_frame.pack(fill=tk.X, pady=2) preset_label = ttk.Label(url_presets_frame, text="Presets:", width=15) preset_label.pack(side=tk.LEFT, padx=(0, 5)) openai_btn = ttk.Button(url_presets_frame, text="OpenAI", width=10, command=lambda: self.api_url_var.set("")) openai_btn.pack(side=tk.LEFT, padx=2) openrouter_btn = ttk.Button(url_presets_frame, text="OpenRouter", width=10, command=lambda: self.api_url_var.set("https://openrouter.ai/api/v1")) openrouter_btn.pack(side=tk.LEFT, padx=2) localhost_btn = ttk.Button(url_presets_frame, text="Localhost", width=10, command=lambda: self.api_url_var.set("http://localhost:1234/v1")) localhost_btn.pack(side=tk.LEFT, padx=2) # API Key key_frame = ttk.Frame(main_frame) key_frame.pack(fill=tk.X, pady=5) key_label = ttk.Label(key_frame, text="API Key:", width=15) key_label.pack(side=tk.LEFT, padx=(0, 5)) self.api_key_var = tk.StringVar() self.api_key_entry = ttk.Entry(key_frame, textvariable=self.api_key_var, show="*") self.api_key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) self.show_key_var = tk.BooleanVar(value=False) show_key_cb = ttk.Checkbutton(key_frame, text="Show", variable=self.show_key_var, command=self.toggle_key_visibility) show_key_cb.pack(side=tk.LEFT, padx=(5, 0)) # Model Selection model_frame = ttk.Frame(main_frame) model_frame.pack(fill=tk.X, pady=5) model_label = ttk.Label(model_frame, text="LLM Model:", width=15) model_label.pack(side=tk.LEFT, padx=(0, 5)) self.model_var = tk.StringVar(value="deepseek/deepseek-chat-v3-0324") self.model_entry = ttk.Entry(model_frame, textvariable=self.model_var) self.model_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Information box info_frame = ttk.LabelFrame(main_frame, text="Information") info_frame.pack(fill=tk.BOTH, expand=True, pady=10) info_text = ( "• Leave the API Base URL empty to use the official OpenAI API\n" "• For OpenRouter, you need an OpenRouter API key\n" "• For local LLMs, configure your API to use OpenAI-compatible format\n" "• Make sure the selected model is available with your chosen provider" ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700) info_label.pack(padx=10, pady=10, anchor=tk.W) def create_mcp_tab(self): """Create the MCP Settings tab""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="MCP Servers") # Main frame with padding main_frame = ttk.Frame(tab, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # Header header = ttk.Label(main_frame, text="Modular Capability Provider (MCP) Servers", font=("", 12, "bold")) header.pack(anchor=tk.W, pady=(0, 10)) # Create the main frame for server settings servers_frame = ttk.Frame(main_frame) servers_frame.pack(fill=tk.BOTH, expand=True) # Left side - Servers list servers_list_frame = ttk.LabelFrame(servers_frame, text="Available Servers") servers_list_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5)) # Servers list self.servers_listbox = tk.Listbox(servers_list_frame, width=20, height=10) self.servers_listbox.pack(side=tk.LEFT, fill=tk.Y, expand=True, padx=5, pady=5) self.servers_listbox.bind('<>', self.on_server_select) servers_scrollbar = ttk.Scrollbar(servers_list_frame, orient=tk.VERTICAL, command=self.servers_listbox.yview) servers_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.servers_listbox.config(yscrollcommand=servers_scrollbar.set) # Buttons below listbox servers_btn_frame = ttk.Frame(servers_list_frame) servers_btn_frame.pack(fill=tk.X, pady=5, padx=5) add_btn = ttk.Button(servers_btn_frame, text="Add", command=self.add_custom_server) add_btn.pack(side=tk.LEFT, padx=2, fill=tk.X, expand=True) self.remove_btn = ttk.Button(servers_btn_frame, text="Remove", command=self.remove_server, state=tk.DISABLED) self.remove_btn.pack(side=tk.LEFT, padx=2, fill=tk.X, expand=True) # Right side - Server settings self.server_settings_frame = ttk.LabelFrame(servers_frame, text="Server Settings") self.server_settings_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True) # Empty label for initial state self.empty_settings_label = ttk.Label(self.server_settings_frame, text="Select a server from the list to configure it.", justify=tk.CENTER) self.empty_settings_label.pack(expand=True, pady=20) # Frames for each server type (initially hidden) self.create_exa_settings_frame() self.create_chroma_settings_frame() self.create_custom_settings_frame() # Update the servers list self.update_servers_list() def create_exa_settings_frame(self): """Create settings frame for Exa server""" self.exa_frame = ttk.Frame(self.server_settings_frame) # Enable checkbox enable_frame = ttk.Frame(self.exa_frame) enable_frame.pack(fill=tk.X, pady=5) self.exa_enable_var = tk.BooleanVar(value=True) enable_cb = ttk.Checkbutton(enable_frame, text="Enable Exa Server", variable=self.exa_enable_var) enable_cb.pack(anchor=tk.W) # API Key key_frame = ttk.Frame(self.exa_frame) key_frame.pack(fill=tk.X, pady=5) key_label = ttk.Label(key_frame, text="Exa API Key:", width=15) key_label.pack(side=tk.LEFT) self.exa_key_var = tk.StringVar() self.exa_key_entry = ttk.Entry(key_frame, textvariable=self.exa_key_var, show="*") self.exa_key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) self.show_exa_key_var = tk.BooleanVar(value=False) show_key_cb = ttk.Checkbutton(key_frame, text="Show", variable=self.show_exa_key_var, command=lambda: self.toggle_field_visibility(self.exa_key_entry, self.show_exa_key_var)) show_key_cb.pack(side=tk.LEFT, padx=(5, 0)) # Server Type type_frame = ttk.Frame(self.exa_frame) type_frame.pack(fill=tk.X, pady=5) type_label = ttk.Label(type_frame, text="Server Type:", width=15) type_label.pack(side=tk.LEFT) self.exa_type_var = tk.StringVar(value="local") # Changed default to local local_radio = ttk.Radiobutton(type_frame, text="Local Server (recommended)", variable=self.exa_type_var, value="local", command=self.update_exa_settings_visibility) local_radio.pack(anchor=tk.W) smithery_radio = ttk.Radiobutton(type_frame, text="Smithery", variable=self.exa_type_var, value="smithery", command=self.update_exa_settings_visibility) smithery_radio.pack(anchor=tk.W) # Local Server Path self.exa_local_frame = ttk.Frame(self.exa_frame) local_path_label = ttk.Label(self.exa_local_frame, text="Server Path:", width=15) local_path_label.pack(side=tk.LEFT) self.exa_path_var = tk.StringVar(value=f"C:/Users/{CURRENT_USERNAME}/AppData/Roaming/npm/exa-mcp-server") self.exa_path_entry = ttk.Entry(self.exa_local_frame, textvariable=self.exa_path_var) self.exa_path_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) browse_btn = ttk.Button(self.exa_local_frame, text="Browse", command=self.browse_exa_server) browse_btn.pack(side=tk.LEFT, padx=(5, 0)) # Smithery Info self.exa_smithery_frame = ttk.Frame(self.exa_frame) smithery_info = ttk.Label(self.exa_smithery_frame, text="Smithery will automatically download and run the latest Exa MCP server.", wraplength=400) smithery_info.pack(pady=5) # Information text info_frame = ttk.LabelFrame(self.exa_frame, text="Information") info_frame.pack(fill=tk.X, pady=10) info_text = ( "• Exa MCP provides web search and research capabilities\n" "• An Exa API key is required (obtain from https://exa.ai)\n" "• Local server is the default and preferred option\n" "• Smithery requires an internet connection each time" ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=400) info_label.pack(padx=10, pady=10, anchor=tk.W) def create_chroma_settings_frame(self): """Create settings frame for Chroma MCP server""" self.chroma_frame = ttk.Frame(self.server_settings_frame) # Enable checkbox enable_frame = ttk.Frame(self.chroma_frame) enable_frame.pack(fill=tk.X, pady=5) self.chroma_enable_var = tk.BooleanVar(value=True) enable_cb = ttk.Checkbutton(enable_frame, text="Enable Chroma MCP Server", variable=self.chroma_enable_var) enable_cb.pack(anchor=tk.W) # Data directory dir_frame = ttk.Frame(self.chroma_frame) dir_frame.pack(fill=tk.X, pady=5) dir_label = ttk.Label(dir_frame, text="Data Directory:", width=15) dir_label.pack(side=tk.LEFT) self.chroma_dir_var = tk.StringVar(value=DEFAULT_CHROMA_DATA_PATH) dir_entry = ttk.Entry(dir_frame, textvariable=self.chroma_dir_var) dir_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) browse_btn = ttk.Button(dir_frame, text="Browse", command=self.browse_chroma_dir) browse_btn.pack(side=tk.LEFT, padx=(5, 0)) # Information text info_frame = ttk.LabelFrame(self.chroma_frame, text="Information") info_frame.pack(fill=tk.X, pady=10, expand=True) info_text = ( "• Chroma MCP provides vector database storage for memory features\n" "• The data directory will store vector embeddings and metadata\n" "• Use a persistent directory to maintain memory between sessions\n" "• Requires uvx to be installed in your Python environment" ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=400) info_label.pack(padx=10, pady=10, anchor=tk.W) def create_custom_settings_frame(self): """Create settings frame for custom server""" self.custom_frame = ttk.Frame(self.server_settings_frame) # Name field name_frame = ttk.Frame(self.custom_frame) name_frame.pack(fill=tk.X, pady=5) name_label = ttk.Label(name_frame, text="Server Name:", width=15) name_label.pack(side=tk.LEFT) self.custom_name_var = tk.StringVar() name_entry = ttk.Entry(name_frame, textvariable=self.custom_name_var) name_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Enable checkbox enable_frame = ttk.Frame(self.custom_frame) enable_frame.pack(fill=tk.X, pady=5) self.custom_enable_var = tk.BooleanVar(value=True) enable_cb = ttk.Checkbutton(enable_frame, text="Enable Server", variable=self.custom_enable_var) enable_cb.pack(anchor=tk.W) # Raw configuration editor config_label = ttk.Label(self.custom_frame, text="Server Configuration (Python Dictionary Format):") config_label.pack(anchor=tk.W, pady=(10, 5)) self.custom_config_text = scrolledtext.ScrolledText(self.custom_frame, height=12, width=50) self.custom_config_text.pack(fill=tk.BOTH, expand=True, pady=5) # Default configuration template default_config = ( ' "command": "npx",\n' ' "args": [\n' ' "some-mcp-server",\n' ' "--option1",\n' ' "--option2=value"\n' ' ]' ) self.custom_config_text.insert(tk.END, default_config) # Information text info_frame = ttk.LabelFrame(self.custom_frame, text="Information") info_frame.pack(fill=tk.X, pady=10) info_text = ( "• Enter the raw Python dictionary for your custom MCP server\n" "• Format must match the structure in config.py\n" "• Be careful with syntax - incorrect format may cause errors\n" "• Server name must be unique" ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=400) info_label.pack(padx=10, pady=10, anchor=tk.W) def create_game_tab(self): """Create the Game Settings tab""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="Game Settings") # Main frame with padding main_frame = ttk.Frame(tab, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # Header header = ttk.Label(main_frame, text="Game Window Configuration", font=("", 12, "bold")) header.pack(anchor=tk.W, pady=(0, 10)) # Game window title title_frame = ttk.Frame(main_frame) title_frame.pack(fill=tk.X, pady=5) title_label = ttk.Label(title_frame, text="Window Title:", width=20) title_label.pack(side=tk.LEFT, padx=(0, 5)) self.window_title_var = tk.StringVar(value="Last War-Survival Game") title_entry = ttk.Entry(title_frame, textvariable=self.window_title_var) title_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Game executable path path_frame = ttk.Frame(main_frame) path_frame.pack(fill=tk.X, pady=5) path_label = ttk.Label(path_frame, text="Game Executable Path:", width=20) path_label.pack(side=tk.LEFT, padx=(0, 5)) self.game_path_var = tk.StringVar(value=fr"C:\Users\{CURRENT_USERNAME}\AppData\Local\TheLastWar\Launch.exe") path_entry = ttk.Entry(path_frame, textvariable=self.game_path_var) path_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) browse_btn = ttk.Button(path_frame, text="Browse", command=self.browse_game_path) browse_btn.pack(side=tk.LEFT, padx=(5, 0)) # Window position pos_frame = ttk.Frame(main_frame) pos_frame.pack(fill=tk.X, pady=5) pos_label = ttk.Label(pos_frame, text="Window Position:", width=20) pos_label.pack(side=tk.LEFT, padx=(0, 5)) pos_x_label = ttk.Label(pos_frame, text="X:") pos_x_label.pack(side=tk.LEFT) self.pos_x_var = tk.IntVar(value=50) pos_x_entry = ttk.Spinbox(pos_frame, textvariable=self.pos_x_var, from_=0, to=3000, width=5) pos_x_entry.pack(side=tk.LEFT, padx=(0, 10)) pos_y_label = ttk.Label(pos_frame, text="Y:") pos_y_label.pack(side=tk.LEFT) self.pos_y_var = tk.IntVar(value=30) pos_y_entry = ttk.Spinbox(pos_frame, textvariable=self.pos_y_var, from_=0, to=3000, width=5) pos_y_entry.pack(side=tk.LEFT) # Window size size_frame = ttk.Frame(main_frame) size_frame.pack(fill=tk.X, pady=5) size_label = ttk.Label(size_frame, text="Window Size:", width=20) size_label.pack(side=tk.LEFT, padx=(0, 5)) width_label = ttk.Label(size_frame, text="Width:") width_label.pack(side=tk.LEFT) self.width_var = tk.IntVar(value=600) width_entry = ttk.Spinbox(size_frame, textvariable=self.width_var, from_=300, to=3000, width=5) width_entry.pack(side=tk.LEFT, padx=(0, 10)) height_label = ttk.Label(size_frame, text="Height:") height_label.pack(side=tk.LEFT) self.height_var = tk.IntVar(value=1070) height_entry = ttk.Spinbox(size_frame, textvariable=self.height_var, from_=300, to=3000, width=5) height_entry.pack(side=tk.LEFT) # Auto-restart settings (Now managed by 'Management' tab) restart_info_frame = ttk.LabelFrame(main_frame, text="Auto-Restart Settings (Legacy)") restart_info_frame.pack(fill=tk.X, pady=10) legacy_restart_label = ttk.Label(restart_info_frame, text="Scheduled game/bot restarts are now configured in the 'Management' tab.", justify=tk.LEFT, wraplength=680) legacy_restart_label.pack(padx=10, pady=10, anchor=tk.W) # Keep the variables for config.py compatibility if other parts of the app might read them, # but their UI controls are removed from here. self.restart_var = tk.BooleanVar(value=self.config_data.get("GAME_WINDOW_CONFIG", {}).get("ENABLE_SCHEDULED_RESTART", True)) self.interval_var = tk.IntVar(value=self.config_data.get("GAME_WINDOW_CONFIG", {}).get("RESTART_INTERVAL_MINUTES", 60)) # Monitor interval (Still relevant for window positioning, not restart scheduling) monitor_frame = ttk.Frame(main_frame) monitor_frame.pack(fill=tk.X, pady=5) monitor_label = ttk.Label(monitor_frame, text="Monitor Interval (sec):", width=20) monitor_label.pack(side=tk.LEFT, padx=(0, 5)) self.monitor_interval_var = tk.IntVar(value=5) monitor_entry = ttk.Spinbox(monitor_frame, textvariable=self.monitor_interval_var, from_=1, to=60, width=5) monitor_entry.pack(side=tk.LEFT) # Information text info_frame = ttk.LabelFrame(main_frame, text="Information") info_frame.pack(fill=tk.BOTH, expand=True, pady=10) info_text = ( "• These settings control how the game window is monitored and positioned\n" "• Window Title must match exactly what's shown in the title bar\n" "• Scheduled restart helps prevent game crashes and memory leaks\n" "• Monitor interval determines how often the window position is checked\n" "• Changes will take effect after restarting Wolf Chat" ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700) info_label.pack(padx=10, pady=10, anchor=tk.W) def create_memory_tab(self): """Create the Memory Settings tab""" tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="Memory Settings") # Main frame with padding main_frame = ttk.Frame(tab, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # Header header = ttk.Label(main_frame, text="ChromaDB Memory Integration", font=("", 12, "bold")) header.pack(anchor=tk.W, pady=(0, 10)) # Enable Pre-loading preload_frame = ttk.Frame(main_frame) preload_frame.pack(fill=tk.X, pady=5) self.preload_profiles_var = tk.BooleanVar(value=True) preload_cb = ttk.Checkbutton(preload_frame, text="Enable user profile pre-loading", variable=self.preload_profiles_var) preload_cb.pack(anchor=tk.W, pady=2) # Collection Names Frame collections_frame = ttk.LabelFrame(main_frame, text="Collection Names") collections_frame.pack(fill=tk.X, pady=10) # User Profiles Collection profiles_col_frame = ttk.Frame(collections_frame) profiles_col_frame.pack(fill=tk.X, pady=5, padx=10) profiles_col_label = ttk.Label(profiles_col_frame, text="Profiles Collection:", width=20) profiles_col_label.pack(side=tk.LEFT, padx=(0, 5)) # 修正:將預設值改為 "wolfhart_memory" 以匹配實際用法 self.profiles_collection_var = tk.StringVar(value="wolfhart_memory") profiles_col_entry = ttk.Entry(profiles_col_frame, textvariable=self.profiles_collection_var) profiles_col_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Conversations Collection conv_col_frame = ttk.Frame(collections_frame) conv_col_frame.pack(fill=tk.X, pady=5, padx=10) conv_col_label = ttk.Label(conv_col_frame, text="Conversations Collection:", width=20) conv_col_label.pack(side=tk.LEFT, padx=(0, 5)) self.conversations_collection_var = tk.StringVar(value="conversations") conv_col_entry = ttk.Entry(conv_col_frame, textvariable=self.conversations_collection_var) conv_col_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Bot Memory Collection bot_col_frame = ttk.Frame(collections_frame) bot_col_frame.pack(fill=tk.X, pady=5, padx=10) bot_col_label = ttk.Label(bot_col_frame, text="Bot Memory Collection:", width=20) bot_col_label.pack(side=tk.LEFT, padx=(0, 5)) self.bot_memory_collection_var = tk.StringVar(value="wolfhart_memory") bot_col_entry = ttk.Entry(bot_col_frame, textvariable=self.bot_memory_collection_var) bot_col_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Pre-loading Settings preload_settings_frame = ttk.LabelFrame(main_frame, text="Pre-loading Settings") preload_settings_frame.pack(fill=tk.X, pady=10) # Related memories to preload related_frame = ttk.Frame(preload_settings_frame) related_frame.pack(fill=tk.X, pady=5, padx=10) related_label = ttk.Label(related_frame, text="Related Memories Count:", width=20) related_label.pack(side=tk.LEFT, padx=(0, 5)) self.related_memories_var = tk.IntVar(value=2) related_spinner = ttk.Spinbox(related_frame, from_=0, to=10, width=5, textvariable=self.related_memories_var) related_spinner.pack(side=tk.LEFT) related_info = ttk.Label(related_frame, text="(0 to disable related memories pre-loading)") related_info.pack(side=tk.LEFT, padx=(5, 0)) # Information box info_frame = ttk.LabelFrame(main_frame, text="Information") info_frame.pack(fill=tk.BOTH, expand=True, pady=10) info_text = ( "• Pre-loading user profiles will speed up responses by fetching data before LLM calls\n" "• Collection names must match your ChromaDB configuration\n" "• The bot will automatically use pre-loaded data if available\n" "• If data isn't found locally, the bot will fall back to using tool calls" ) info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700) info_label.pack(padx=10, pady=10, anchor=tk.W) # 記憶管理標籤頁 def create_memory_management_tab(self): tab = ttk.Frame(self.notebook) self.notebook.add(tab, text="記憶管理") main_frame = ttk.Frame(tab, padding=10) main_frame.pack(fill=tk.BOTH, expand=True) # 備份時間設置 backup_frame = ttk.LabelFrame(main_frame, text="備份設定") backup_frame.pack(fill=tk.X, pady=10) time_frame = ttk.Frame(backup_frame) time_frame.pack(fill=tk.X, pady=5, padx=10) time_label = ttk.Label(time_frame, text="執行時間:", width=20) time_label.pack(side=tk.LEFT, padx=(0, 5)) self.backup_hour_var = tk.IntVar(value=0) hour_spinner = ttk.Spinbox(time_frame, from_=0, to=23, width=3, textvariable=self.backup_hour_var) hour_spinner.pack(side=tk.LEFT) ttk.Label(time_frame, text=":").pack(side=tk.LEFT) self.backup_minute_var = tk.IntVar(value=0) minute_spinner = ttk.Spinbox(time_frame, from_=0, to=59, width=3, textvariable=self.backup_minute_var) minute_spinner.pack(side=tk.LEFT) # 模型選擇 models_frame = ttk.LabelFrame(main_frame, text="模型選擇") models_frame.pack(fill=tk.X, pady=10) profile_model_frame = ttk.Frame(models_frame) profile_model_frame.pack(fill=tk.X, pady=5, padx=10) profile_model_label = ttk.Label(profile_model_frame, text="用戶檔案生成模型:", width=20) profile_model_label.pack(side=tk.LEFT, padx=(0, 5)) # Initialize with a sensible default, will be overwritten by update_ui_from_data # Use config_data which is loaded in __init__ profile_model_default = self.config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324") self.profile_model_var = tk.StringVar(value=profile_model_default) profile_model_entry = ttk.Entry(profile_model_frame, textvariable=self.profile_model_var) profile_model_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) summary_model_frame = ttk.Frame(models_frame) summary_model_frame.pack(fill=tk.X, pady=5, padx=10) summary_model_label = ttk.Label(summary_model_frame, text="聊天總結生成模型:", width=20) summary_model_label.pack(side=tk.LEFT, padx=(0, 5)) self.summary_model_var = tk.StringVar(value="mistral-7b-instruct") summary_model_entry = ttk.Entry(summary_model_frame, textvariable=self.summary_model_var) summary_model_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) # Information box info_frame_mm = ttk.LabelFrame(main_frame, text="Information") # Renamed to avoid conflict info_frame_mm.pack(fill=tk.BOTH, expand=True, pady=10) info_text_mm = ( "• 設定每日自動執行記憶備份的時間。\n" "• 選擇用於生成用戶檔案和聊天總結的語言模型。\n" "• 用戶檔案生成模型預設使用主LLM模型。" ) info_label_mm = ttk.Label(info_frame_mm, text=info_text_mm, justify=tk.LEFT, wraplength=700) info_label_mm.pack(padx=10, pady=10, anchor=tk.W) def create_bottom_buttons(self): """Create bottom action buttons""" btn_frame = ttk.Frame(self) btn_frame.pack(fill=tk.X, padx=10, pady=10) # Version label on left version_label = ttk.Label(btn_frame, text=f"v{VERSION}") version_label.pack(side=tk.LEFT, padx=5) # Action buttons on right (order matters for packing) cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.quit) cancel_btn.pack(side=tk.RIGHT, padx=5) save_btn = ttk.Button(btn_frame, text="Save Settings", command=self.save_settings) save_btn.pack(side=tk.RIGHT, padx=5) install_deps_btn = ttk.Button(btn_frame, text="Install Dependencies", command=self.install_dependencies) install_deps_btn.pack(side=tk.RIGHT, padx=5) # Run buttons self.run_test_btn = ttk.Button(btn_frame, text="Run Test", command=self.run_test_script) self.run_test_btn.pack(side=tk.RIGHT, padx=5) self.run_bot_btn = ttk.Button(btn_frame, text="Run Chat Bot", command=self.run_chat_bot) self.run_bot_btn.pack(side=tk.RIGHT, padx=5) # Stop button (for bot/test) self.stop_btn = ttk.Button(btn_frame, text="Stop Bot/Test", command=self.stop_process, state=tk.DISABLED) self.stop_btn.pack(side=tk.RIGHT, padx=5) # Scheduler buttons self.stop_scheduler_btn = ttk.Button(btn_frame, text="Stop Scheduler", command=self.stop_memory_scheduler, state=tk.DISABLED) self.stop_scheduler_btn.pack(side=tk.RIGHT, padx=5) self.start_scheduler_btn = ttk.Button(btn_frame, text="Start Scheduler", command=self.run_memory_scheduler) self.start_scheduler_btn.pack(side=tk.RIGHT, padx=5) def install_dependencies(self): """Run the installation script for dependencies""" try: import subprocess import sys # Check if install.py exists if not os.path.exists("install.py"): messagebox.showerror("Error", "Could not find install.py script") return # Run the installer script in a new process subprocess.Popen([sys.executable, "install.py"]) except Exception as e: messagebox.showerror("Error", f"Failed to launch installer: {str(e)}") def run_chat_bot(self): """Run the main chat bot script""" try: import subprocess import sys if not os.path.exists("main.py"): messagebox.showerror("Error", "Could not find main.py script") return if self.running_process is not None: messagebox.showwarning("Already Running", "Another process is already running. Please stop it first.") return self.running_process = subprocess.Popen([sys.executable, "main.py"]) print("Attempting to start main.py...") self.update_run_button_states(False) # Disable run buttons, enable stop except Exception as e: messagebox.showerror("Error", f"Failed to launch main.py: {str(e)}") self.update_run_button_states(True) # Re-enable buttons on failure def run_test_script(self): """Run the LLM debug script""" try: import subprocess import sys test_script_path = os.path.join("test", "llm_debug_script.py") if not os.path.exists(test_script_path): messagebox.showerror("Error", f"Could not find {test_script_path}") return if self.running_process is not None: messagebox.showwarning("Already Running", "Another process is already running. Please stop it first.") return self.running_process = subprocess.Popen([sys.executable, test_script_path]) print(f"Attempting to start {test_script_path}...") self.update_run_button_states(False) # Disable run buttons, enable stop except Exception as e: messagebox.showerror("Error", f"Failed to launch {test_script_path}: {str(e)}") self.update_run_button_states(True) # Re-enable buttons on failure def stop_process(self): """Stop the currently running process""" if hasattr(self, 'running_process') and self.running_process is not None: try: print("Attempting to terminate running process...") self.running_process.terminate() # Or .kill() for a more forceful stop self.running_process = None messagebox.showinfo("Process Stopped", "The running process has been terminated.") except Exception as e: messagebox.showerror("Error", f"Failed to terminate process: {str(e)}") finally: # Re-enable run buttons and disable stop button self.update_run_button_states(True) else: messagebox.showinfo("No Process", "No Bot/Test process is currently running.") def run_memory_scheduler(self): """Run the memory backup scheduler script""" try: scheduler_script = "memory_backup.py" if not os.path.exists(scheduler_script): messagebox.showerror("Error", f"Could not find {scheduler_script}") return if self.scheduler_process is not None and self.scheduler_process.poll() is None: messagebox.showwarning("Already Running", "The memory scheduler process is already running.") return # Run with --schedule argument # Use CREATE_NO_WINDOW flag on Windows to hide the console window creationflags = 0 if sys.platform == "win32": creationflags = subprocess.CREATE_NO_WINDOW self.scheduler_process = subprocess.Popen( [sys.executable, scheduler_script, "--schedule"], creationflags=creationflags ) print(f"Attempting to start {scheduler_script} --schedule... PID: {self.scheduler_process.pid}") self.update_scheduler_button_states(False) # Disable start, enable stop except Exception as e: logger.exception(f"Failed to launch {scheduler_script}") # Log exception messagebox.showerror("Error", f"Failed to launch {scheduler_script}: {str(e)}") self.update_scheduler_button_states(True) # Re-enable start on failure def stop_memory_scheduler(self): """Stop the currently running memory scheduler process""" if self.scheduler_process is not None and self.scheduler_process.poll() is None: try: print(f"Attempting to terminate memory scheduler process (PID: {self.scheduler_process.pid})...") # Terminate the process group on non-Windows to ensure child processes are handled if any if sys.platform != "win32": os.killpg(os.getpgid(self.scheduler_process.pid), signal.SIGTERM) else: # On Windows, terminate the parent process directly self.scheduler_process.terminate() # Wait briefly to allow termination try: self.scheduler_process.wait(timeout=3) print("Scheduler process terminated gracefully.") except subprocess.TimeoutExpired: print("Scheduler process did not terminate gracefully, killing...") if sys.platform != "win32": os.killpg(os.getpgid(self.scheduler_process.pid), signal.SIGKILL) else: self.scheduler_process.kill() self.scheduler_process.wait(timeout=2) # Wait after kill print("Scheduler process killed.") self.scheduler_process = None messagebox.showinfo("Scheduler Stopped", "The memory scheduler process has been terminated.") except Exception as e: logger.exception("Failed to terminate scheduler process") # Log exception messagebox.showerror("Error", f"Failed to terminate scheduler process: {str(e)}") finally: self.scheduler_process = None # Ensure it's cleared self.update_scheduler_button_states(True) # Update buttons else: # If process exists but poll() is not None (already terminated) or process is None if self.scheduler_process is not None: self.scheduler_process = None # Clear stale process object # messagebox.showinfo("No Scheduler Process", "The memory scheduler process is not running.") # Reduce popups print("Scheduler process is not running or already stopped.") self.update_scheduler_button_states(True) # Ensure buttons are in correct state def update_run_button_states(self, enable): """Enable or disable the run buttons and update stop button state""" # Assuming run_bot_btn and run_test_btn exist and are class attributes if hasattr(self, 'run_bot_btn'): self.run_bot_btn.config(state=tk.NORMAL if enable else tk.DISABLED) if hasattr(self, 'run_test_btn'): self.run_test_btn.config(state=tk.NORMAL if enable else tk.DISABLED) if hasattr(self, 'stop_btn'): self.stop_btn.config(state=tk.DISABLED if enable else tk.NORMAL) def update_scheduler_button_states(self, enable_start): """Enable or disable the scheduler buttons""" # Check if process is running is_running = False if self.scheduler_process is not None and self.scheduler_process.poll() is None: is_running = True if hasattr(self, 'start_scheduler_btn'): self.start_scheduler_btn.config(state=tk.NORMAL if not is_running else tk.DISABLED) if hasattr(self, 'stop_scheduler_btn'): self.stop_scheduler_btn.config(state=tk.DISABLED if not is_running else tk.NORMAL) def update_ui_from_data(self): """Update UI controls from loaded data""" try: # API Tab self.api_url_var.set(self.config_data.get("OPENAI_API_BASE_URL", "")) self.api_key_var.set(self.env_data.get("OPENAI_API_KEY", "")) self.model_var.set(self.config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324")) # MCP Servers # Exa settings if "exa" in self.config_data.get("MCP_SERVERS", {}): exa_config = self.config_data["MCP_SERVERS"]["exa"] self.exa_enable_var.set(exa_config.get("enabled", True)) self.exa_key_var.set(self.env_data.get("EXA_API_KEY", "")) if exa_config.get("use_smithery", False): self.exa_type_var.set("smithery") else: self.exa_type_var.set("local") if "server_path" in exa_config: self.exa_path_var.set(exa_config.get("server_path", "")) # Chroma settings if "chroma" in self.config_data.get("MCP_SERVERS", {}): chroma_config = self.config_data["MCP_SERVERS"]["chroma"] self.chroma_enable_var.set(chroma_config.get("enabled", True)) data_dir = chroma_config.get("data_dir", "") if data_dir: # Ensure data directory is absolute path self.chroma_dir_var.set(os.path.abspath(data_dir)) else: # Set default as absolute path self.chroma_dir_var.set(DEFAULT_CHROMA_DATA_PATH) # Update servers list to include custom servers self.update_servers_list() # Game settings game_config = self.config_data.get("GAME_WINDOW_CONFIG", {}) self.window_title_var.set(game_config.get("WINDOW_TITLE", "Last War-Survival Game")) game_path = game_config.get("GAME_EXECUTABLE_PATH", "") if game_path: self.game_path_var.set(game_path) self.pos_x_var.set(game_config.get("GAME_WINDOW_X", 50)) self.pos_y_var.set(game_config.get("GAME_WINDOW_Y", 30)) self.width_var.set(game_config.get("GAME_WINDOW_WIDTH", 600)) self.height_var.set(game_config.get("GAME_WINDOW_HEIGHT", 1070)) self.restart_var.set(game_config.get("ENABLE_SCHEDULED_RESTART", True)) self.interval_var.set(game_config.get("RESTART_INTERVAL_MINUTES", 60)) self.monitor_interval_var.set(game_config.get("MONITOR_INTERVAL_SECONDS", 5)) # Memory Settings self.preload_profiles_var.set(self.config_data.get("ENABLE_PRELOAD_PROFILES", True)) self.related_memories_var.set(self.config_data.get("PRELOAD_RELATED_MEMORIES", 2)) self.profiles_collection_var.set(self.config_data.get("PROFILES_COLLECTION", "user_profiles")) # Default was user_profiles self.conversations_collection_var.set(self.config_data.get("CONVERSATIONS_COLLECTION", "conversations")) self.bot_memory_collection_var.set(self.config_data.get("BOT_MEMORY_COLLECTION", "wolfhart_memory")) # Memory Management Tab Settings if hasattr(self, 'backup_hour_var'): # Check if UI elements for memory management tab exist self.backup_hour_var.set(self.config_data.get("MEMORY_BACKUP_HOUR", 0)) self.backup_minute_var.set(self.config_data.get("MEMORY_BACKUP_MINUTE", 0)) # Default profile model to LLM_MODEL if MEMORY_PROFILE_MODEL isn't set or matches LLM_MODEL profile_model_config = self.config_data.get("MEMORY_PROFILE_MODEL", self.config_data.get("LLM_MODEL")) self.profile_model_var.set(profile_model_config) self.summary_model_var.set(self.config_data.get("MEMORY_SUMMARY_MODEL", "mistral-7b-instruct")) # Management Tab Settings if hasattr(self, 'remote_url_var'): # Check if UI elements for management tab exist self.remote_url_var.set(self.remote_data.get("REMOTE_SERVER_URL", "")) self.remote_key_var.set(self.remote_data.get("REMOTE_CLIENT_KEY", "")) self.game_restart_interval_var.set(self.remote_data.get("DEFAULT_GAME_RESTART_INTERVAL_MINUTES", 120)) self.bot_restart_interval_var.set(self.remote_data.get("DEFAULT_BOT_RESTART_INTERVAL_MINUTES", 120)) self.link_restarts_var.set(self.remote_data.get("LINK_RESTART_TIMES", True)) self.game_process_name_var.set(self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe")) # Update visibility and states self.update_exa_settings_visibility() self.update_management_buttons_state(True) # Initially, start button is enabled except Exception as e: logger.exception("Error updating UI from data") # Log full traceback print(f"Error updating UI from data: {e}") import traceback traceback.print_exc() # =============================================================== # UI Event Handlers # =============================================================== def toggle_key_visibility(self): """Toggle visibility of API key field""" if self.show_key_var.get(): self.api_key_entry.config(show="") else: self.api_key_entry.config(show="*") def toggle_field_visibility(self, entry_widget, show_var): """Toggle visibility of a password field""" if show_var.get(): entry_widget.config(show="") else: entry_widget.config(show="*") def update_exa_settings_visibility(self): """Update visibility of Exa settings based on server type""" if self.exa_type_var.get() == "smithery": self.exa_local_frame.pack_forget() self.exa_smithery_frame.pack(fill=tk.X, pady=5) else: self.exa_smithery_frame.pack_forget() self.exa_local_frame.pack(fill=tk.X, pady=5) def browse_exa_server(self): """Browse for Exa server executable""" file_path = filedialog.askopenfilename( title="Select Exa MCP Server", filetypes=[("All Files", "*.*")], initialdir=os.path.expanduser("~") ) if file_path: # 標準化路徑格式(將反斜線改為正斜線) normalized_path = file_path.replace("\\", "/") self.exa_path_var.set(normalized_path) def browse_chroma_dir(self): """Browse for Chroma data directory""" dir_path = filedialog.askdirectory( title="Select Chroma Data Directory", initialdir=os.path.dirname(DEFAULT_CHROMA_DATA_PATH) ) if dir_path: # 標準化路徑格式(將反斜線改為正斜線) normalized_path = os.path.abspath(dir_path).replace("\\", "/") self.chroma_dir_var.set(normalized_path) def browse_game_path(self): """Browse for game executable""" file_path = filedialog.askopenfilename( title="Select Game Executable", filetypes=[("Executable Files", "*.exe"), ("All Files", "*.*")], initialdir=os.path.expanduser("~") ) if file_path: # 標準化路徑格式(將反斜線改為正斜線) normalized_path = file_path.replace("\\", "/") self.game_path_var.set(normalized_path) def update_servers_list(self): """Update the servers listbox with current servers""" self.servers_listbox.delete(0, tk.END) # Add built-in servers self.servers_listbox.insert(tk.END, "exa") self.servers_listbox.insert(tk.END, "chroma") # Add custom servers for server_name in self.config_data.get("MCP_SERVERS", {}): if server_name not in ("exa", "chroma"): self.servers_listbox.insert(tk.END, server_name) def on_server_select(self, event): """Handle server selection from the listbox""" selection = self.servers_listbox.curselection() if not selection: return selected_server = self.servers_listbox.get(selection[0]) # Hide all settings frames self.empty_settings_label.pack_forget() self.exa_frame.pack_forget() self.chroma_frame.pack_forget() self.custom_frame.pack_forget() # Show the selected server's settings frame if selected_server == "exa": self.exa_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) self.remove_btn.config(state=tk.DISABLED) # Can't remove built-in servers elif selected_server == "chroma": self.chroma_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) self.remove_btn.config(state=tk.DISABLED) # Can't remove built-in servers else: # Custom server if selected_server in self.config_data.get("MCP_SERVERS", {}): # Load existing custom server config server_config = self.config_data["MCP_SERVERS"][selected_server] self.custom_name_var.set(selected_server) self.custom_enable_var.set(server_config.get("enabled", True)) if "raw_config" in server_config: self.custom_config_text.delete("1.0", tk.END) self.custom_config_text.insert("1.0", server_config["raw_config"]) self.custom_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) self.remove_btn.config(state=tk.NORMAL) # Can remove custom servers def add_custom_server(self): """Add a new custom server""" # Generate a unique name new_name = "custom_server" counter = 1 while new_name in self.config_data.get("MCP_SERVERS", {}): new_name = f"custom_server_{counter}" counter += 1 # Add to config data if "MCP_SERVERS" not in self.config_data: self.config_data["MCP_SERVERS"] = {} self.config_data["MCP_SERVERS"][new_name] = { "enabled": True, "raw_config": ' "command": "npx",\n "args": [\n "custom-mcp-server"\n ]' } # Update UI self.update_servers_list() # Select the new server idx = self.servers_listbox.get(0, tk.END).index(new_name) self.servers_listbox.selection_clear(0, tk.END) self.servers_listbox.selection_set(idx) self.servers_listbox.see(idx) self.on_server_select(None) # Trigger the selection handler def remove_server(self): """Remove the selected custom server""" selection = self.servers_listbox.curselection() if not selection: return selected_server = self.servers_listbox.get(selection[0]) # Confirmation confirm = messagebox.askyesno( "Confirm Removal", f"Are you sure you want to remove the server '{selected_server}'?", icon=messagebox.WARNING ) if confirm: # Remove from config data if selected_server in self.config_data.get("MCP_SERVERS", {}): del self.config_data["MCP_SERVERS"][selected_server] # Update UI self.update_servers_list() # Clear selection self.servers_listbox.selection_clear(0, tk.END) # Hide server settings and show empty label self.custom_frame.pack_forget() self.empty_settings_label.pack(expand=True, pady=20) self.remove_btn.config(state=tk.DISABLED) def save_settings(self, show_success_message=True): # Added optional param """Save all settings to config.py, .env, and remote_config.json files""" try: # Update config data from UI (for config.py and .env) # API settings self.config_data["OPENAI_API_BASE_URL"] = self.api_url_var.get() self.config_data["LLM_MODEL"] = self.model_var.get() # Environment variables self.env_data["OPENAI_API_KEY"] = self.api_key_var.get() self.env_data["EXA_API_KEY"] = self.exa_key_var.get() # MCP Servers if "MCP_SERVERS" not in self.config_data: self.config_data["MCP_SERVERS"] = {} # Exa server if "exa" not in self.config_data["MCP_SERVERS"]: self.config_data["MCP_SERVERS"]["exa"] = {} self.config_data["MCP_SERVERS"]["exa"]["enabled"] = self.exa_enable_var.get() self.config_data["MCP_SERVERS"]["exa"]["use_smithery"] = (self.exa_type_var.get() == "smithery") if self.exa_type_var.get() == "local": self.config_data["MCP_SERVERS"]["exa"]["server_path"] = self.exa_path_var.get() # Chroma server if "chroma" not in self.config_data["MCP_SERVERS"]: self.config_data["MCP_SERVERS"]["chroma"] = {} self.config_data["MCP_SERVERS"]["chroma"]["enabled"] = self.chroma_enable_var.get() self.config_data["MCP_SERVERS"]["chroma"]["data_dir"] = self.chroma_dir_var.get() # Custom server - check if one is currently selected selection = self.servers_listbox.curselection() if selection: selected_server = self.servers_listbox.get(selection[0]) if selected_server not in ("exa", "chroma"): # Update custom server settings new_name = self.custom_name_var.get().strip() if not new_name: messagebox.showerror("Error", "Custom server name cannot be empty") return # Handle name change if new_name != selected_server: if new_name in self.config_data["MCP_SERVERS"]: messagebox.showerror("Error", f"Server name '{new_name}' already exists") return # Copy config and delete old entry self.config_data["MCP_SERVERS"][new_name] = self.config_data["MCP_SERVERS"][selected_server].copy() del self.config_data["MCP_SERVERS"][selected_server] # Update other settings self.config_data["MCP_SERVERS"][new_name]["enabled"] = self.custom_enable_var.get() self.config_data["MCP_SERVERS"][new_name]["raw_config"] = self.custom_config_text.get("1.0", tk.END).strip() # Game window settings self.config_data["GAME_WINDOW_CONFIG"] = { "WINDOW_TITLE": self.window_title_var.get(), "ENABLE_SCHEDULED_RESTART": self.restart_var.get(), "RESTART_INTERVAL_MINUTES": self.interval_var.get(), "GAME_EXECUTABLE_PATH": self.game_path_var.get(), "GAME_WINDOW_X": self.pos_x_var.get(), "GAME_WINDOW_Y": self.pos_y_var.get(), "GAME_WINDOW_WIDTH": self.width_var.get(), "GAME_WINDOW_HEIGHT": self.height_var.get(), "MONITOR_INTERVAL_SECONDS": self.monitor_interval_var.get() } # 保存記憶設定 self.config_data["ENABLE_PRELOAD_PROFILES"] = self.preload_profiles_var.get() self.config_data["PRELOAD_RELATED_MEMORIES"] = self.related_memories_var.get() self.config_data["PROFILES_COLLECTION"] = self.profiles_collection_var.get() self.config_data["CONVERSATIONS_COLLECTION"] = self.conversations_collection_var.get() self.config_data["BOT_MEMORY_COLLECTION"] = self.bot_memory_collection_var.get() # Get Memory Management settings from UI if hasattr(self, 'backup_hour_var'): # Check if UI elements exist self.config_data["MEMORY_BACKUP_HOUR"] = self.backup_hour_var.get() self.config_data["MEMORY_BACKUP_MINUTE"] = self.backup_minute_var.get() self.config_data["MEMORY_PROFILE_MODEL"] = self.profile_model_var.get() self.config_data["MEMORY_SUMMARY_MODEL"] = self.summary_model_var.get() # Update remote_data from UI (for remote_config.json) if hasattr(self, 'remote_url_var'): # Check if management tab UI elements exist self.remote_data["REMOTE_SERVER_URL"] = self.remote_url_var.get() self.remote_data["REMOTE_CLIENT_KEY"] = self.remote_key_var.get() self.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = self.game_restart_interval_var.get() self.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = self.bot_restart_interval_var.get() self.remote_data["LINK_RESTART_TIMES"] = self.link_restarts_var.get() self.remote_data["GAME_PROCESS_NAME"] = self.game_process_name_var.get() # Validate critical settings if "exa" in self.config_data["MCP_SERVERS"] and self.config_data["MCP_SERVERS"]["exa"]["enabled"]: if not self.exa_key_var.get(): messagebox.showerror("Validation Error", "Exa API Key is required when Exa server is enabled") return if self.exa_type_var.get() == "local" and not self.exa_path_var.get(): messagebox.showerror("Validation Error", "Exa Server Path is required for local server type") return # Generate config.py and .env files save_env_file(self.env_data) generate_config_file(self.config_data, self.env_data) save_remote_config(self.remote_data) # Save remote config if show_success_message: messagebox.showinfo("Success", "Settings saved successfully.\nRestart managed session for changes to take effect.") except Exception as e: logger.exception("Error saving settings") # Log the full traceback if show_success_message: # Only show error if it's a direct save action messagebox.showerror("Error", f"An error occurred while saving settings:\n{str(e)}") import traceback traceback.print_exc() # =============================================================== # ControlClient Class (adapted from wolf_control.py) # =============================================================== if HAS_SOCKETIO: class ControlClient: def __init__(self, server_url, client_key, wolf_chat_setup_instance): self.server_url = server_url self.client_key = client_key self.wolf_chat_setup = wolf_chat_setup_instance # Reference to the main app # Suppress InsecureRequestWarning when using ssl_verify=False, as is the current default urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.sio = socketio.Client(ssl_verify=False, logger=logger, engineio_logger=logger) # Use app's logger self.connected = False self.authenticated = False self.should_exit_flag = threading.Event() # Use an event for thread control self.client_thread = None self.registered_commands = [ "restart bot", "restart game", "restart all", "set game interval", "set bot interval", "set linked interval" ] # Event handlers self.sio.on('connect', self._on_connect) self.sio.on('disconnect', self._on_disconnect) self.sio.on('authenticated', self._on_authenticated) self.sio.on('command', self._on_command) def is_connected(self): return self.connected and self.authenticated def run_in_thread(self): if self.client_thread and self.client_thread.is_alive(): logger.info("Control client thread already running.") return True self.should_exit_flag.clear() self.client_thread = threading.Thread(target=self._run_forever, daemon=True) self.client_thread.start() return True def _run_forever(self): logger.info(f"ControlClient: Starting connection attempts to {self.server_url}") last_heartbeat = time.time() # For heartbeat retry_delay = 1.0 # Start with 1 second delay for exponential backoff max_delay = 300.0 # Maximum delay of 5 minutes for exponential backoff while not self.should_exit_flag.is_set(): if not self.sio.connected: try: logger.info(f"ControlClient: Attempting to connect to {self.server_url}...") self.sio.connect(self.server_url) logger.info("ControlClient: Successfully connected.") retry_delay = 1.0 # Reset delay on successful connection last_heartbeat = time.time() # Reset heartbeat timer on new connection except socketio.exceptions.ConnectionError as e: logger.error(f"ControlClient: Connection failed: {e}. Retrying in {retry_delay:.2f}s.") self.should_exit_flag.wait(retry_delay) # Implement exponential backoff with jitter retry_delay = min(retry_delay * 2, max_delay) * (0.8 + 0.4 * random.random()) retry_delay = max(1.0, retry_delay) # Ensure it's at least 1s continue except Exception as e: # Catch other potential errors during connection logger.error(f"ControlClient: Unexpected error during connection attempt: {e}. Retrying in {retry_delay:.2f}s.") self.should_exit_flag.wait(retry_delay) retry_delay = min(retry_delay * 2, max_delay) * (0.8 + 0.4 * random.random()) retry_delay = max(1.0, retry_delay) # Ensure it's at least 1s continue # If connected, manage heartbeat and check for exit signal if self.sio.connected: current_time = time.time() if current_time - last_heartbeat > 60: # Send heartbeat every 60 seconds try: self.sio.emit('heartbeat', {'timestamp': current_time}) last_heartbeat = current_time logger.debug("ControlClient: Sent heartbeat to keep connection alive.") except Exception as e: logger.error(f"ControlClient: Error sending heartbeat: {e}. Connection might be lost.") self.should_exit_flag.wait(1) # Check for exit signal every second else: # Fallback if not connected after attempt block (should be rare with current logic) logger.debug(f"ControlClient: Not connected (unexpected state in loop), waiting {retry_delay:.2f}s before next cycle.") self.should_exit_flag.wait(retry_delay) # Optionally re-calculate retry_delay here if this path is hit, to maintain backoff progression retry_delay = min(retry_delay * 2, max_delay) * (0.8 + 0.4 * random.random()) retry_delay = max(1.0, retry_delay) logger.info("ControlClient: Exited _run_forever loop.") if self.sio.connected: self.sio.disconnect() def _on_connect(self): self.connected = True logger.info("ControlClient: Connected to server. Authenticating...") self.sio.emit('authenticate', { 'type': 'client', 'clientKey': self.client_key, 'commands': self.registered_commands }) def _on_disconnect(self): self.connected = False self.authenticated = False logger.info("ControlClient: Disconnected from server.") # Force reconnection if not intentionally stopping if not self.should_exit_flag.is_set(): logger.info("ControlClient: Attempting immediate reconnection from _on_disconnect...") try: # This is an immediate attempt; _run_forever handles sustained retries. if not self.sio.connected: # Check before trying to connect self.sio.connect(self.server_url) except Exception as e: logger.error(f"ControlClient: Immediate reconnection from _on_disconnect failed: {e}") def _on_authenticated(self, data): if data.get('success'): self.authenticated = True logger.info("ControlClient: Authentication successful.") else: self.authenticated = False logger.error(f"ControlClient: Authentication failed: {data.get('error', 'Unknown error')}") self.sio.disconnect() # Disconnect if auth fails def _on_command(self, data): command = data.get('command', '').lower() args_str = data.get('args', '') # Assuming server might send args as a string from_user = data.get('from', 'unknown') logger.info(f"ControlClient: Received command '{command}' with args '{args_str}' from {from_user}") try: if command == "restart bot": self.wolf_chat_setup._restart_bot_managed() self._send_command_result(command, True, "Bot restart initiated.") elif command == "restart game": self.wolf_chat_setup._restart_game_managed() self._send_command_result(command, True, "Game restart initiated.") elif command == "restart all": self.wolf_chat_setup._restart_all_managed() self._send_command_result(command, True, "Full restart initiated.") elif command == "set game interval" or command == "set bot interval" or command == "set linked interval": try: interval = int(args_str) if interval < 0: # 0 means disable self._send_command_result(command, False, "Interval must be non-negative.") return if command == "set game interval": self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = interval if self.wolf_chat_setup.remote_data["LINK_RESTART_TIMES"]: self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = interval elif command == "set bot interval": self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = interval if self.wolf_chat_setup.remote_data["LINK_RESTART_TIMES"]: self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = interval elif command == "set linked interval": self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = interval self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = interval self.wolf_chat_setup.remote_data["LINK_RESTART_TIMES"] = True save_remote_config(self.wolf_chat_setup.remote_data) self.wolf_chat_setup._setup_scheduled_restarts() # Re-apply schedule # Update UI if possible (tricky from non-main thread) # self.wolf_chat_setup.game_restart_interval_var.set(self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"]) # self.wolf_chat_setup.bot_restart_interval_var.set(self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"]) logger.info(f"Updated restart interval via remote: {command} to {interval} min. Saved and re-scheduled.") self._send_command_result(command, True, f"Interval updated to {interval} min and re-scheduled.") except ValueError: self._send_command_result(command, False, "Invalid interval value. Must be an integer.") else: self._send_command_result(command, False, "Unsupported command.") except Exception as e: logger.exception(f"ControlClient: Error executing command '{command}'") self._send_command_result(command, False, f"Error: {str(e)}") def _send_command_result(self, command, success, message): if self.sio.connected: try: self.sio.emit('commandResult', { 'command': command, 'success': success, 'message': message, 'timestamp': time.time() }) except Exception as e: logger.error(f"ControlClient: Failed to send command result: {e}") def check_signals(self, app_instance): # app_instance is self.wolf_chat_setup from the caller """Periodically check connection status and commands, called by monitoring thread.""" # Note: _run_forever is the primary mechanism for establishing and maintaining connection. # This function's connection check is a secondary check. if not self.sio.connected or not self.authenticated: logger.warning("ControlClient: Connection check in check_signals found client not connected/authenticated.") # Avoid aggressive reconnection here if _run_forever is already handling it. # If an explicit reconnect attempt is desired here: # logger.info("ControlClient: Attempting reconnection from check_signals...") # try: # if self.sio.connected: # e.g. connected but not authenticated # self.sio.disconnect() # if not self.sio.connected: # Check again before connecting # self.sio.connect(self.server_url) # except Exception as e: # logger.error(f"ControlClient: Reconnection attempt from check_signals failed: {e}") # Placeholder for any other signal processing logic # logger.debug("ControlClient: check_signals executed.") def stop(self): logger.info("ControlClient: Stopping...") self.should_exit_flag.set() # Signal the run_forever loop to exit if self.sio.connected: self.sio.disconnect() # Attempt to disconnect gracefully if self.client_thread and self.client_thread.is_alive(): logger.info("ControlClient: Waiting for client thread to join...") self.client_thread.join(timeout=5) # Wait for the thread to finish if self.client_thread.is_alive(): logger.warning("ControlClient: Client thread did not join in time.") self.client_thread = None logger.info("ControlClient: Stopped.") else: # HAS_SOCKETIO is False class ControlClient: # Dummy class if socketio is not available def __init__(self, *args, **kwargs): logger.warning("Socket.IO not installed, ControlClient is a dummy.") def run_in_thread(self): return False def stop(self): pass def is_connected(self): return False # =============================================================== # Main Entry Point # =============================================================== if __name__ == "__main__": # Setup main logger for the application if not already done if not logging.getLogger().handlers: # Check root logger logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') app = WolfChatSetup() app.protocol("WM_DELETE_WINDOW", app.on_closing) # Handle window close button app.mainloop()