2025-05-07 23:07:54 +08:00

2392 lines
116 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Wolf Chat Setup Utility
A graphical configuration tool for Wolf Chat that manages:
- API settings (OpenAI/compatible providers)
- MCP server configurations (Exa, Chroma, custom servers)
- Environment variables (.env)
- Config file generation (config.py)
"""
import os
import sys
import json
import tkinter as tk
from tkinter import ttk, filedialog, messagebox, scrolledtext
import configparser
from pathlib import Path
import re
import shutil
import time
import signal
import logging
import subprocess
import threading
import datetime
import schedule
import psutil
import random # Added for exponential backoff jitter
import urllib3 # Added for SSL warning suppression
try:
import socketio
HAS_SOCKETIO = True
except ImportError:
HAS_SOCKETIO = False
# import ssl # ssl import might not be needed if socketio handles it or if not using wss directly in client setup
# ===============================================================
# Constants
# ===============================================================
VERSION = "1.0.0"
CONFIG_TEMPLATE_PATH = "config_template.py"
ENV_FILE_PATH = ".env"
REMOTE_CONFIG_PATH = "remote_config.json" # New config file for remote settings
# Use absolute path for chroma_data
DEFAULT_CHROMA_DATA_PATH = os.path.abspath("chroma_data")
DEFAULT_CONFIG_SECTION = """# ====================================================================
# Wolf Chat Configuration
# Generated by setup.py - Edit with care
# ====================================================================
"""
# Get current Windows username for default paths
CURRENT_USERNAME = os.getenv("USERNAME", "user")
# Global variables for game/bot management
game_process_instance = None
bot_process_instance = None # This will replace/co-exist with self.running_process
control_client_instance = None
monitor_thread_instance = None # Renamed to avoid conflict if 'monitor_thread' is used elsewhere
scheduler_thread_instance = None # Renamed
keep_monitoring_flag = threading.Event() # Renamed for clarity
keep_monitoring_flag.set()
# Basic logging setup
# logger = logging.getLogger("WolfChatSetup") # Defined later in class or globally if needed
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# Setup logger instance. This can be configured further if needed.
logger = logging.getLogger(__name__)
if not logger.handlers: # Avoid adding multiple handlers if script is reloaded
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# ===============================================================
# Helper Functions
# ===============================================================
def load_remote_config():
"""Load remote control and restart settings from remote_config.json"""
defaults = {
"REMOTE_SERVER_URL": "YOUR_URL_HERE",
"REMOTE_CLIENT_KEY": "YOUR_KEY_HERE", # Placeholder
"DEFAULT_GAME_RESTART_INTERVAL_MINUTES": 120,
"DEFAULT_BOT_RESTART_INTERVAL_MINUTES": 120,
"LINK_RESTART_TIMES": True,
"GAME_PROCESS_NAME": "LastWar.exe", # Default game process name
"BOT_SCRIPT_NAME": "main.py" # Default bot script name
}
if os.path.exists(REMOTE_CONFIG_PATH):
try:
with open(REMOTE_CONFIG_PATH, 'r', encoding='utf-8') as f:
data = json.load(f)
# Ensure all keys from defaults are present, adding them if missing
for key, value in defaults.items():
data.setdefault(key, value)
return data
except json.JSONDecodeError:
logger.error(f"Error decoding {REMOTE_CONFIG_PATH}. Using default remote settings.")
return defaults.copy() # Return a copy to avoid modifying defaults
except Exception as e:
logger.error(f"Error loading {REMOTE_CONFIG_PATH}: {e}. Using default remote settings.")
return defaults.copy()
logger.info(f"{REMOTE_CONFIG_PATH} not found. Creating with default values.")
save_remote_config(defaults.copy()) # Create the file if it doesn't exist
return defaults.copy()
def save_remote_config(remote_data):
"""Save remote control and restart settings to remote_config.json"""
try:
with open(REMOTE_CONFIG_PATH, 'w', encoding='utf-8') as f:
json.dump(remote_data, f, indent=4) # Use indent for readability
logger.info(f"Saved remote settings to {REMOTE_CONFIG_PATH}")
except Exception as e:
logger.error(f"Error saving {REMOTE_CONFIG_PATH}: {e}")
def load_env_file():
"""Load existing .env file if it exists"""
env_data = {}
env_path = Path(ENV_FILE_PATH)
if env_path.exists():
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_data[key.strip()] = value.strip().strip('"\'')
return env_data
def save_env_file(env_data):
"""Save environment variables to .env file"""
with open(ENV_FILE_PATH, 'w', encoding='utf-8') as f:
f.write("# Environment variables for Wolf Chat\n")
f.write("# Generated by setup.py\n\n")
for key, value in env_data.items():
if value: # Only write non-empty values
f.write(f"{key}={value}\n")
print(f"Saved environment variables to {ENV_FILE_PATH}")
def load_current_config():
"""Extract settings from existing config.py if it exists"""
# 新增一個幫助函數來標準化路徑
def normalize_path(path):
"""Convert backslashes to forward slashes in paths"""
if path:
return path.replace("\\", "/")
return path
config_data = {
"OPENAI_API_BASE_URL": "",
"LLM_MODEL": "deepseek/deepseek-chat-v3-0324",
"MCP_SERVERS": {
"exa": {
"enabled": True,
"use_smithery": False,
"server_path": normalize_path(f"C:/Users/{CURRENT_USERNAME}/AppData/Roaming/npm/exa-mcp-server")
}
},
"ENABLE_CHAT_LOGGING": True,
"LOG_DIR": "chat_logs",
"GAME_WINDOW_CONFIG": {
"WINDOW_TITLE": "Last War-Survival Game",
"ENABLE_SCHEDULED_RESTART": True,
"RESTART_INTERVAL_MINUTES": 60,
"GAME_EXECUTABLE_PATH": normalize_path(fr"C:\Users\{CURRENT_USERNAME}\AppData\Local\TheLastWar\Launch.exe"),
"GAME_WINDOW_X": 50,
"GAME_WINDOW_Y": 30,
"GAME_WINDOW_WIDTH": 600,
"GAME_WINDOW_HEIGHT": 1070,
"MONITOR_INTERVAL_SECONDS": 5
}
}
if os.path.exists("config.py"):
try:
with open("config.py", 'r', encoding='utf-8') as f:
config_content = f.read()
# Extract OPENAI_API_BASE_URL
api_url_match = re.search(r'OPENAI_API_BASE_URL\s*=\s*["\'](.+?)["\']', config_content)
if api_url_match:
config_data["OPENAI_API_BASE_URL"] = api_url_match.group(1)
# Extract LLM_MODEL
model_match = re.search(r'LLM_MODEL\s*=\s*["\'](.+?)["\']', config_content)
if model_match:
config_data["LLM_MODEL"] = model_match.group(1)
# Extract logging settings
chat_logging_match = re.search(r'ENABLE_CHAT_LOGGING\s*=\s*(True|False)', config_content)
if chat_logging_match:
config_data["ENABLE_CHAT_LOGGING"] = (chat_logging_match.group(1) == "True")
log_dir_match = re.search(r'LOG_DIR\s*=\s*["\'](.+?)["\']', config_content)
if log_dir_match:
config_data["LOG_DIR"] = log_dir_match.group(1)
# Extract game window settings
window_title_match = re.search(r'WINDOW_TITLE\s*=\s*["\'](.+?)["\']', config_content)
if window_title_match:
config_data["GAME_WINDOW_CONFIG"]["WINDOW_TITLE"] = window_title_match.group(1)
restart_match = re.search(r'ENABLE_SCHEDULED_RESTART\s*=\s*(True|False)', config_content)
if restart_match:
config_data["GAME_WINDOW_CONFIG"]["ENABLE_SCHEDULED_RESTART"] = (restart_match.group(1) == "True")
interval_match = re.search(r'RESTART_INTERVAL_MINUTES\s*=\s*(\d+)', config_content)
if interval_match:
config_data["GAME_WINDOW_CONFIG"]["RESTART_INTERVAL_MINUTES"] = int(interval_match.group(1))
exec_path_match = re.search(r'GAME_EXECUTABLE_PATH\s*=\s*r"(.+?)"', config_content)
if exec_path_match:
config_data["GAME_WINDOW_CONFIG"]["GAME_EXECUTABLE_PATH"] = exec_path_match.group(1)
x_match = re.search(r'GAME_WINDOW_X\s*=\s*(\d+)', config_content)
if x_match:
config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_X"] = int(x_match.group(1))
y_match = re.search(r'GAME_WINDOW_Y\s*=\s*(\d+)', config_content)
if y_match:
config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_Y"] = int(y_match.group(1))
width_match = re.search(r'GAME_WINDOW_WIDTH\s*=\s*(\d+)', config_content)
if width_match:
config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_WIDTH"] = int(width_match.group(1))
height_match = re.search(r'GAME_WINDOW_HEIGHT\s*=\s*(\d+)', config_content)
if height_match:
config_data["GAME_WINDOW_CONFIG"]["GAME_WINDOW_HEIGHT"] = int(height_match.group(1))
monitor_interval_match = re.search(r'MONITOR_INTERVAL_SECONDS\s*=\s*(\d+)', config_content)
if monitor_interval_match:
config_data["GAME_WINDOW_CONFIG"]["MONITOR_INTERVAL_SECONDS"] = int(monitor_interval_match.group(1))
# Extract MCP_SERVERS (more complex parsing)
try:
servers_section = re.search(r'MCP_SERVERS\s*=\s*{(.+?)}(?=\n\n)', config_content, re.DOTALL)
if servers_section:
servers_text = servers_section.group(1)
# Extract and parse each server definition
server_blocks = re.findall(r'"([^"]+)":\s*{(.+?)}(?=,\s*"|,?\s*})', servers_text, re.DOTALL)
for server_name, server_block in server_blocks:
if server_name not in config_data["MCP_SERVERS"]:
config_data["MCP_SERVERS"][server_name] = {"enabled": True}
# Skip disabled servers (commented out)
if server_block.strip().startswith("#"):
config_data["MCP_SERVERS"][server_name]["enabled"] = False
continue
# Extract command
command_match = re.search(r'"command":\s*"([^"]+)"', server_block)
if command_match:
config_data["MCP_SERVERS"][server_name]["command"] = command_match.group(1)
# For Exa server
if server_name == "exa":
# Check if using Smithery
if '"@smithery/cli@latest"' in server_block:
config_data["MCP_SERVERS"]["exa"]["use_smithery"] = True
else:
config_data["MCP_SERVERS"]["exa"]["use_smithery"] = False
# Extract server path for local server
args_match = re.search(r'"args":\s*\[\s*"([^"]+)"', server_block)
if args_match:
config_data["MCP_SERVERS"]["exa"]["server_path"] = args_match.group(1)
# For Chroma server
if server_name == "chroma":
# Extract data directory
data_dir_match = re.search(r'"--data-dir",\s*"([^"]+)"', server_block)
if data_dir_match:
config_data["MCP_SERVERS"]["chroma"]["data_dir"] = data_dir_match.group(1)
# For custom servers, store the raw configuration
if server_name not in ["exa", "chroma"]:
config_data["MCP_SERVERS"][server_name]["raw_config"] = server_block
except Exception as e:
print(f"Error parsing MCP_SERVERS section: {e}")
import traceback
traceback.print_exc()
# Extract memory settings
enable_preload_match = re.search(r'ENABLE_PRELOAD_PROFILES\s*=\s*(True|False)', config_content)
if enable_preload_match:
config_data["ENABLE_PRELOAD_PROFILES"] = (enable_preload_match.group(1) == "True")
related_memories_match = re.search(r'PRELOAD_RELATED_MEMORIES\s*=\s*(\d+)', config_content)
if related_memories_match:
config_data["PRELOAD_RELATED_MEMORIES"] = int(related_memories_match.group(1))
profiles_collection_match = re.search(r'PROFILES_COLLECTION\s*=\s*["\'](.+?)["\']', config_content)
if profiles_collection_match:
config_data["PROFILES_COLLECTION"] = profiles_collection_match.group(1)
conversations_collection_match = re.search(r'CONVERSATIONS_COLLECTION\s*=\s*["\'](.+?)["\']', config_content)
if conversations_collection_match:
config_data["CONVERSATIONS_COLLECTION"] = conversations_collection_match.group(1)
bot_memory_collection_match = re.search(r'BOT_MEMORY_COLLECTION\s*=\s*["\'](.+?)["\']', config_content)
if bot_memory_collection_match:
config_data["BOT_MEMORY_COLLECTION"] = bot_memory_collection_match.group(1)
except Exception as e:
print(f"Error reading config.py: {e}")
import traceback
traceback.print_exc()
return config_data
def generate_config_file(config_data, env_data):
"""Generate config.py file based on user settings"""
# Create backup of existing config if it exists
if os.path.exists("config.py"):
backup_path = "config.py.bak"
shutil.copy2("config.py", backup_path)
print(f"Created backup of existing config at {backup_path}")
def normalize_path(path):
"""Convert backslashes to forward slashes in paths"""
if path:
return path.replace("\\", "/")
return path
# Helper function to ensure absolute path
def ensure_absolute_path(path):
if not os.path.isabs(path):
return os.path.abspath(path)
return path
with open("config.py", 'w', encoding='utf-8') as f:
f.write(DEFAULT_CONFIG_SECTION)
f.write("import os\n")
f.write("import json\n")
f.write("from dotenv import load_dotenv\n\n")
f.write("# --- Load environment variables from .env file ---\n")
f.write("load_dotenv()\n")
f.write("print(\"Loaded environment variables from .env file.\")\n\n")
# Write OpenAI API settings
f.write("# =============================================================================\n")
f.write("# OpenAI API Configuration / OpenAI-Compatible Provider Settings\n")
f.write("# =============================================================================\n")
f.write(f"OPENAI_API_BASE_URL = \"{config_data['OPENAI_API_BASE_URL']}\"\n")
f.write("OPENAI_API_KEY = os.getenv(\"OPENAI_API_KEY\")\n")
f.write(f"LLM_MODEL = \"{config_data['LLM_MODEL']}\"\n\n")
# Write API Keys section
f.write("# =============================================================================\n")
f.write("# External API Keys (loaded from environment variables)\n")
f.write("# =============================================================================\n")
f.write("EXA_API_KEY = os.getenv(\"EXA_API_KEY\")\n\n")
# Write Exa config utility
f.write("# --- Exa Configuration ---\n")
f.write("exa_config_dict = {\"exaApiKey\": EXA_API_KEY if EXA_API_KEY else \"YOUR_EXA_KEY_MISSING\"}\n")
f.write("exa_config_arg_string = json.dumps(exa_config_dict)\n\n")
# Write MCP Server Configuration
f.write("# =============================================================================\n")
f.write("# MCP Server Configuration\n")
f.write("# =============================================================================\n")
f.write("MCP_SERVERS = {\n")
# Add configured servers
for server_name, server_config in config_data["MCP_SERVERS"].items():
if not server_config.get("enabled", True):
f.write(f" # \"{server_name}\": {{ # Disabled\n")
continue
f.write(f" \"{server_name}\": {{\n")
# Handle Exa server special config
if server_name == "exa":
if server_config.get("use_smithery", False):
# Smithery config
f.write(" \"command\": \"cmd\",\n")
f.write(" \"args\": [\n")
f.write(" \"/c\",\n")
f.write(" \"npx\",\n")
f.write(" \"-y\",\n")
f.write(" \"@smithery/cli@latest\",\n")
f.write(" \"run\",\n")
f.write(" \"exa\",\n")
f.write(" \"--config\",\n")
f.write(" exa_config_arg_string\n")
f.write(" ],\n")
else:
# Local server config
server_path = server_config.get("server_path", "exa-mcp-server")
f.write(" \"command\": \"npx\",\n")
f.write(" \"args\": [\n")
f.write(f" \"{server_path}\",\n")
f.write(" \"--tools=web_search,research_paper_search,twitter_search,company_research,crawling,competitor_finder\"\n")
f.write(" ],\n")
f.write(" \"env\": {\n")
f.write(" \"EXA_API_KEY\": EXA_API_KEY\n")
f.write(" }\n")
# Handle Chroma server
elif server_name == "chroma":
# Ensure absolute path for chroma data directory
data_dir = server_config.get("data_dir", DEFAULT_CHROMA_DATA_PATH)
absolute_data_dir = ensure_absolute_path(data_dir)
f.write(" \"command\": \"uvx\",\n")
f.write(" \"args\": [\n")
f.write(" \"chroma-mcp\",\n")
f.write(" \"--client-type\",\n")
f.write(" \"persistent\",\n")
f.write(" \"--data-dir\",\n")
f.write(f" \"{absolute_data_dir}\"\n")
f.write(" ]\n")
# Handle custom server - just write as raw JSON
elif server_name == "custom" and "raw_config" in server_config:
f.write(server_config["raw_config"])
f.write(" },\n")
f.write("}\n\n")
# Write remaining configuration sections
f.write("# =============================================================================\n")
f.write("# MCP Client Configuration\n")
f.write("# =============================================================================\n")
f.write("MCP_CONFIRM_TOOL_EXECUTION = False # True: Confirm before execution, False: Execute automatically\n\n")
f.write("# =============================================================================\n")
f.write("# Chat Logging Configuration\n")
f.write("# =============================================================================\n")
f.write(f"ENABLE_CHAT_LOGGING = {str(config_data['ENABLE_CHAT_LOGGING'])}\n")
f.write(f"LOG_DIR = \"{config_data['LOG_DIR']}\"\n\n")
f.write("# =============================================================================\n")
f.write("# Persona Configuration\n")
f.write("# =============================================================================\n")
f.write("PERSONA_NAME = \"Wolfhart\"\n\n")
f.write("# =============================================================================\n")
f.write("# Game Window Configuration\n")
f.write("# =============================================================================\n")
game_config = config_data["GAME_WINDOW_CONFIG"]
f.write(f"WINDOW_TITLE = \"{game_config['WINDOW_TITLE']}\"\n")
f.write(f"ENABLE_SCHEDULED_RESTART = {str(game_config['ENABLE_SCHEDULED_RESTART'])}\n")
f.write(f"RESTART_INTERVAL_MINUTES = {game_config['RESTART_INTERVAL_MINUTES']}\n")
f.write(f"GAME_EXECUTABLE_PATH = r\"{game_config['GAME_EXECUTABLE_PATH']}\"\n")
f.write(f"GAME_WINDOW_X = {game_config['GAME_WINDOW_X']}\n")
f.write(f"GAME_WINDOW_Y = {game_config['GAME_WINDOW_Y']}\n")
f.write(f"GAME_WINDOW_WIDTH = {game_config['GAME_WINDOW_WIDTH']}\n")
f.write(f"GAME_WINDOW_HEIGHT = {game_config['GAME_WINDOW_HEIGHT']}\n")
f.write(f"MONITOR_INTERVAL_SECONDS = {game_config['MONITOR_INTERVAL_SECONDS']}\n\n")
# --- Add explicit print before writing Chroma section ---
print("DEBUG: Writing ChromaDB Memory Configuration section...")
# --- End explicit print ---
# Write ChromaDB Memory Configuration
f.write("# =============================================================================\n")
f.write("# ChromaDB Memory Configuration\n")
f.write("# =============================================================================\n")
# Ensure boolean is written correctly as True/False, not string 'True'/'False'
enable_preload = config_data.get('ENABLE_PRELOAD_PROFILES', True) # Default to True if key missing
f.write(f"ENABLE_PRELOAD_PROFILES = {str(enable_preload)}\n") # Writes True or False literal
preload_memories = config_data.get('PRELOAD_RELATED_MEMORIES', 2) # Default to 2
f.write(f"PRELOAD_RELATED_MEMORIES = {preload_memories}\n\n")
f.write("# Collection Names (used for both local access and MCP tool calls)\n")
profiles_col = config_data.get('PROFILES_COLLECTION', 'user_profiles')
f.write(f"PROFILES_COLLECTION = \"{profiles_col}\"\n")
conversations_col = config_data.get('CONVERSATIONS_COLLECTION', 'conversations')
f.write(f"CONVERSATIONS_COLLECTION = \"{conversations_col}\"\n")
bot_memory_col = config_data.get('BOT_MEMORY_COLLECTION', 'wolfhart_memory')
f.write(f"BOT_MEMORY_COLLECTION = \"{bot_memory_col}\"\n\n")
f.write("# Ensure Chroma path is consistent for both direct access and MCP\n")
# Get the path set in the UI (or default)
# Use .get() chain with defaults for safety
chroma_data_dir_ui = config_data.get("MCP_SERVERS", {}).get("chroma", {}).get("data_dir", DEFAULT_CHROMA_DATA_PATH)
# Normalize path for writing into the config file string (use forward slashes)
normalized_chroma_path = normalize_path(chroma_data_dir_ui)
f.write(f"# This path will be made absolute when config.py is loaded.\n")
# Write the potentially relative path from UI/default, let config.py handle abspath
# Use raw string r"..." to handle potential backslashes in Windows paths correctly within the string literal
f.write(f"CHROMA_DATA_DIR = os.path.abspath(r\"{normalized_chroma_path}\")\n")
print("Generated config.py file successfully")
# ===============================================================
# Main Application
# ===============================================================
class WolfChatSetup(tk.Tk):
def __init__(self):
super().__init__()
self.title(f"Wolf Chat Setup v{VERSION}")
self.geometry("800x600")
self.minsize(750, 550)
# Load existing data
self.env_data = load_env_file()
self.config_data = load_current_config()
self.remote_data = load_remote_config() # Load new remote config
# Create the notebook for tabs
self.notebook = ttk.Notebook(self)
self.notebook.pack(expand=True, fill=tk.BOTH, padx=10, pady=10)
# Create tabs
self.create_api_tab()
self.create_mcp_tab()
self.create_game_tab()
self.create_memory_tab()
self.create_management_tab() # New tab for combined management
# Create bottom buttons
self.create_bottom_buttons()
# Initialize running process tracker (will be managed by new system)
self.running_process = None # This might be replaced by bot_process_instance
# Initialize new process management variables
self.bot_process_instance = None
self.game_process_instance = None
self.control_client_instance = None
self.monitor_thread_instance = None
self.scheduler_thread_instance = None
self.keep_monitoring_flag = threading.Event()
self.keep_monitoring_flag.set()
# Set initial states based on loaded data
self.update_ui_from_data()
def create_management_tab(self):
"""Create the Bot and Game Management tab"""
tab = ttk.Frame(self.notebook)
self.notebook.add(tab, text="Management")
main_frame = ttk.Frame(tab, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
header = ttk.Label(main_frame, text="Bot & Game Management", font=("", 12, "bold"))
header.pack(anchor=tk.W, pady=(0, 10))
# --- Remote Control Settings ---
remote_frame = ttk.LabelFrame(main_frame, text="Remote Control Settings")
remote_frame.pack(fill=tk.X, pady=10)
# Remote Server URL
remote_url_frame = ttk.Frame(remote_frame)
remote_url_frame.pack(fill=tk.X, pady=5, padx=10)
remote_url_label = ttk.Label(remote_url_frame, text="Server URL:", width=15)
remote_url_label.pack(side=tk.LEFT)
self.remote_url_var = tk.StringVar(value=self.remote_data.get("REMOTE_SERVER_URL", ""))
remote_url_entry = ttk.Entry(remote_url_frame, textvariable=self.remote_url_var)
remote_url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Remote Client Key
remote_key_frame = ttk.Frame(remote_frame)
remote_key_frame.pack(fill=tk.X, pady=5, padx=10)
remote_key_label = ttk.Label(remote_key_frame, text="Client Key:", width=15)
remote_key_label.pack(side=tk.LEFT)
self.remote_key_var = tk.StringVar(value=self.remote_data.get("REMOTE_CLIENT_KEY", ""))
remote_key_entry = ttk.Entry(remote_key_frame, textvariable=self.remote_key_var, show="*")
remote_key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.show_remote_key_var = tk.BooleanVar(value=False)
show_remote_key_cb = ttk.Checkbutton(remote_key_frame, text="Show", variable=self.show_remote_key_var,
command=lambda: self.toggle_field_visibility(remote_key_entry, self.show_remote_key_var))
show_remote_key_cb.pack(side=tk.LEFT, padx=(5,0))
# --- Restart Settings ---
restart_settings_frame = ttk.LabelFrame(main_frame, text="Restart Settings")
restart_settings_frame.pack(fill=tk.X, pady=10)
# Game Restart Interval
game_interval_frame = ttk.Frame(restart_settings_frame)
game_interval_frame.pack(fill=tk.X, pady=5, padx=10)
game_interval_label = ttk.Label(game_interval_frame, text="Game Restart Interval (min):", width=25)
game_interval_label.pack(side=tk.LEFT)
self.game_restart_interval_var = tk.IntVar(value=self.remote_data.get("DEFAULT_GAME_RESTART_INTERVAL_MINUTES", 120))
game_interval_spinbox = ttk.Spinbox(game_interval_frame, from_=0, to=1440, width=7, textvariable=self.game_restart_interval_var)
game_interval_spinbox.pack(side=tk.LEFT)
game_interval_info = ttk.Label(game_interval_frame, text="(0 to disable)")
game_interval_info.pack(side=tk.LEFT, padx=(5,0))
# Bot Restart Interval
bot_interval_frame = ttk.Frame(restart_settings_frame)
bot_interval_frame.pack(fill=tk.X, pady=5, padx=10)
bot_interval_label = ttk.Label(bot_interval_frame, text="Bot Restart Interval (min):", width=25)
bot_interval_label.pack(side=tk.LEFT)
self.bot_restart_interval_var = tk.IntVar(value=self.remote_data.get("DEFAULT_BOT_RESTART_INTERVAL_MINUTES", 120))
bot_interval_spinbox = ttk.Spinbox(bot_interval_frame, from_=0, to=1440, width=7, textvariable=self.bot_restart_interval_var)
bot_interval_spinbox.pack(side=tk.LEFT)
bot_interval_info = ttk.Label(bot_interval_frame, text="(0 to disable)")
bot_interval_info.pack(side=tk.LEFT, padx=(5,0))
# Link Restart Times
link_restarts_frame = ttk.Frame(restart_settings_frame)
link_restarts_frame.pack(fill=tk.X, pady=5, padx=10)
self.link_restarts_var = tk.BooleanVar(value=self.remote_data.get("LINK_RESTART_TIMES", True))
link_restarts_cb = ttk.Checkbutton(link_restarts_frame, text="Link Game and Bot restart times (use Game interval if linked)", variable=self.link_restarts_var)
link_restarts_cb.pack(anchor=tk.W)
# Game Process Name
game_proc_name_frame = ttk.Frame(restart_settings_frame)
game_proc_name_frame.pack(fill=tk.X, pady=5, padx=10)
game_proc_name_label = ttk.Label(game_proc_name_frame, text="Game Process Name:", width=25)
game_proc_name_label.pack(side=tk.LEFT)
self.game_process_name_var = tk.StringVar(value=self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe"))
game_proc_name_entry = ttk.Entry(game_proc_name_frame, textvariable=self.game_process_name_var)
game_proc_name_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# --- Control Buttons ---
control_buttons_frame = ttk.Frame(main_frame)
control_buttons_frame.pack(fill=tk.X, pady=20)
self.start_managed_button = ttk.Button(control_buttons_frame, text="Start Managed Bot & Game", command=self.start_managed_session)
self.start_managed_button.pack(side=tk.LEFT, padx=5)
self.stop_managed_button = ttk.Button(control_buttons_frame, text="Stop Managed Session", command=self.stop_managed_session, state=tk.DISABLED)
self.stop_managed_button.pack(side=tk.LEFT, padx=5)
# Status Area (Optional, for displaying logs or status messages)
status_label = ttk.Label(main_frame, text="Status messages will appear in the console.")
status_label.pack(pady=10)
def start_managed_session(self):
logger.info("Attempting to start managed session...")
# This will be the new main function to start bot, game, and monitoring
# Ensure previous session is stopped if any
if self.bot_process_instance or self.game_process_instance or self.monitor_thread_instance:
messagebox.showwarning("Session Active", "A managed session might already be active. Please stop it first or check console.")
# self.stop_managed_session() # Optionally force stop
# time.sleep(1) # Give time to stop
# return
# Save current settings before starting
self.save_settings(show_success_message=False) # Save without showing popup, or make it optional
self.keep_monitoring_flag.set() # Ensure monitoring is enabled
# Start Game
if not self._start_game_managed():
messagebox.showerror("Error", "Failed to start the game.")
self.update_management_buttons_state(True) # Enable start, disable stop
return
time.sleep(5) # Give game some time to initialize
# Start Bot (main.py)
if not self._start_bot_managed():
messagebox.showerror("Error", "Failed to start the bot (main.py).")
self._stop_game_managed() # Stop game if bot fails to start
self.update_management_buttons_state(True)
return
# Start Control Client
if HAS_SOCKETIO:
self._start_control_client()
else:
logger.warning("socketio library not found. Remote control will be disabled.")
messagebox.showwarning("Socket.IO Missing", "The 'python-socketio[client]' library is not installed. Remote control features will be disabled. Please install it via 'pip install \"python-socketio[client]\"' or use the 'Install Dependencies' button.")
# Start Monitoring Thread
self._start_monitoring_thread()
# Start Scheduler Thread
self._start_scheduler_thread()
self.update_management_buttons_state(False) # Disable start, enable stop
# messagebox.showinfo("Session Started", "Managed bot and game session started. Check console for logs.") # Removed popup
logger.info("Managed bot and game session started. Check console for logs.") # Log instead of popup
def stop_managed_session(self):
logger.info("Attempting to stop managed session...")
self.keep_monitoring_flag.clear() # Signal threads to stop
if self.control_client_instance:
self._stop_control_client()
if self.scheduler_thread_instance and self.scheduler_thread_instance.is_alive():
logger.info("Waiting for scheduler thread to stop...")
self.scheduler_thread_instance.join(timeout=5)
if self.scheduler_thread_instance.is_alive():
logger.warning("Scheduler thread did not stop in time.")
self.scheduler_thread_instance = None
schedule.clear()
if self.monitor_thread_instance and self.monitor_thread_instance.is_alive():
logger.info("Waiting for monitor thread to stop...")
self.monitor_thread_instance.join(timeout=5)
if self.monitor_thread_instance.is_alive():
logger.warning("Monitor thread did not stop in time.")
self.monitor_thread_instance = None
self._stop_bot_managed()
self._stop_game_managed()
# Reset process instances
self.bot_process_instance = None
self.game_process_instance = None
self.update_management_buttons_state(True) # Enable start, disable stop
messagebox.showinfo("Session Stopped", "Managed bot and game session stopped.")
def update_management_buttons_state(self, enable_start):
if hasattr(self, 'start_managed_button'):
self.start_managed_button.config(state=tk.NORMAL if enable_start else tk.DISABLED)
if hasattr(self, 'stop_managed_button'):
self.stop_managed_button.config(state=tk.DISABLED if enable_start else tk.NORMAL)
# Placeholder for game/bot start/stop/check methods to be integrated
# These will be adapted from wolf_control.py and use self.config_data and self.remote_data
def _find_process_by_name(self, process_name):
"""Find a process by name using psutil."""
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'].lower() == process_name.lower():
return proc
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return None
def _is_game_running_managed(self):
game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe")
if self.game_process_instance and self.game_process_instance.poll() is None:
# Check if the process name matches, in case Popen object is stale but a process with same PID exists
try:
p = psutil.Process(self.game_process_instance.pid)
if p.name().lower() == game_process_name.lower():
return True
except psutil.NoSuchProcess:
self.game_process_instance = None # Stale process object
return False # Popen object is stale and process is gone
# Fallback to checking by name if self.game_process_instance is None or points to a dead/wrong process
return self._find_process_by_name(game_process_name) is not None
def _start_game_managed(self):
global game_process_instance
game_exe_path = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_EXECUTABLE_PATH")
game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe")
if not game_exe_path:
logger.error("Game executable path not configured.")
messagebox.showerror("Config Error", "Game executable path is not set in Game Settings.")
return False
if self._is_game_running_managed():
logger.info(f"Game ({game_process_name}) is already running.")
# Try to get a Popen object if we don't have one
if not self.game_process_instance:
existing_proc = self._find_process_by_name(game_process_name)
if existing_proc:
# We can't directly create a Popen object for an existing process this way easily.
# For now, we'll just acknowledge it's running.
# For full control, it's best if this script starts it.
logger.info(f"Found existing game process PID: {existing_proc.pid}. Monitoring without direct Popen control.")
return True
try:
logger.info(f"Starting game: {game_exe_path}")
# Use shell=False and pass arguments as a list if possible, but for .exe, shell=True is often more reliable on Windows
# For better process control, avoid shell=True if not strictly necessary.
# However, if GAME_EXE_PATH can contain spaces or needs shell interpretation, shell=True might be needed.
# For now, let's assume GAME_EXE_PATH is a direct path to an executable.
self.game_process_instance = subprocess.Popen(game_exe_path, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
game_process_instance = self.game_process_instance # Update global if used by other parts from wolf_control
# Wait a bit for the process to appear in psutil
time.sleep(2)
if self._is_game_running_managed():
logger.info(f"Game ({game_process_name}) started successfully with PID {self.game_process_instance.pid}.")
return True
else:
logger.warning(f"Game ({game_process_name}) did not appear to start correctly after Popen call.")
self.game_process_instance = None # Clear if it failed
game_process_instance = None
return False
except Exception as e:
logger.exception(f"Error starting game: {e}")
self.game_process_instance = None
game_process_instance = None
return False
def _stop_game_managed(self):
global game_process_instance
game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe")
stopped = False
if self.game_process_instance and self.game_process_instance.poll() is None:
logger.info(f"Stopping game process (PID: {self.game_process_instance.pid}) started by this manager...")
try:
self.game_process_instance.terminate()
self.game_process_instance.wait(timeout=5) # Wait for termination
logger.info("Game process terminated.")
stopped = True
except subprocess.TimeoutExpired:
logger.warning("Game process did not terminate in time, killing...")
self.game_process_instance.kill()
self.game_process_instance.wait(timeout=5)
logger.info("Game process killed.")
stopped = True
except Exception as e:
logger.error(f"Error terminating/killing own game process: {e}")
self.game_process_instance = None
game_process_instance = None
# If not stopped or no instance, try to find and kill by name
if not stopped:
proc_to_kill = self._find_process_by_name(game_process_name)
if proc_to_kill:
logger.info(f"Found game process '{game_process_name}' (PID: {proc_to_kill.pid}). Attempting to terminate...")
try:
proc_to_kill.terminate()
proc_to_kill.wait(timeout=5) # psutil's wait
logger.info(f"Game process '{game_process_name}' terminated.")
stopped = True
except psutil.TimeoutExpired:
logger.warning(f"Game process '{game_process_name}' did not terminate, killing...")
proc_to_kill.kill()
proc_to_kill.wait(timeout=5)
logger.info(f"Game process '{game_process_name}' killed.")
stopped = True
except Exception as e:
logger.error(f"Error terminating/killing game process by name '{game_process_name}': {e}")
else:
logger.info(f"Game process '{game_process_name}' not found running.")
stopped = True # Considered stopped if not found
if self.game_process_instance: # Clear Popen object if it exists
self.game_process_instance = None
game_process_instance = None
return stopped
def _is_bot_running_managed(self):
bot_script_name = self.remote_data.get("BOT_SCRIPT_NAME", "main.py")
if self.bot_process_instance and self.bot_process_instance.poll() is None:
# Verify it's the correct script, in case of PID reuse
try:
p = psutil.Process(self.bot_process_instance.pid)
if sys.executable in p.cmdline() and any(bot_script_name in arg for arg in p.cmdline()):
return True
except psutil.NoSuchProcess:
self.bot_process_instance = None # Stale process object
return False
# Fallback: Check for any python process running the bot script
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
cmdline = proc.cmdline()
if cmdline and sys.executable in cmdline[0] and any(bot_script_name in arg for arg in cmdline):
# If we find one, and don't have an instance, we can't control it directly with Popen
# but we know it's running.
if not self.bot_process_instance:
logger.info(f"Found external bot process (PID: {proc.pid}). Monitoring without direct Popen control.")
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, IndexError):
continue # Ignore processes that died or we can't access, or have empty cmdline
return False
def _start_bot_managed(self):
global bot_process_instance # For compatibility if other parts use global
bot_script_name = self.remote_data.get("BOT_SCRIPT_NAME", "main.py")
if not os.path.exists(bot_script_name):
messagebox.showerror("Error", f"Could not find bot script: {bot_script_name}")
return False
if self._is_bot_running_managed():
logger.info(f"Bot ({bot_script_name}) is already running.")
return True # Or handle acquiring Popen object if possible (complex)
try:
logger.info(f"Starting bot: {sys.executable} {bot_script_name}")
# Ensure CWD is script's directory if main.py relies on relative paths
script_dir = os.path.dirname(os.path.abspath(__file__))
self.bot_process_instance = subprocess.Popen(
[sys.executable, bot_script_name],
cwd=script_dir, # Run main.py from its directory
stdout=subprocess.PIPE, # Capture output
stderr=subprocess.STDOUT, # Redirect stderr to stdout
text=True,
bufsize=1 # Line buffered
)
bot_process_instance = self.bot_process_instance # Update global
# Start a thread to log bot's output
threading.Thread(target=self._log_subprocess_output, args=(self.bot_process_instance, "Bot"), daemon=True).start()
logger.info(f"Bot ({bot_script_name}) started successfully with PID {self.bot_process_instance.pid}.")
return True
except Exception as e:
logger.exception(f"Error starting bot: {e}")
self.bot_process_instance = None
bot_process_instance = None
return False
def _log_subprocess_output(self, process, name):
"""Reads and logs output from a subprocess."""
if not process or not process.stdout:
logger.error(f"No process or stdout to log for {name}.")
return
logger.info(f"Started logging output for {name} (PID: {process.pid}).")
try:
for line in iter(process.stdout.readline, ''):
if line:
logger.info(f"[{name}] {line.strip()}")
if process.poll() is not None and not line: # Process ended and no more output
break
process.stdout.close()
except Exception as e:
logger.error(f"Error logging output for {name}: {e}")
finally:
return_code = process.wait()
logger.info(f"{name} process (PID: {process.pid}) exited with code {return_code}.")
def _stop_bot_managed(self):
global bot_process_instance
bot_script_name = self.remote_data.get("BOT_SCRIPT_NAME", "main.py")
stopped = False
if self.bot_process_instance and self.bot_process_instance.poll() is None:
logger.info(f"Stopping bot process (PID: {self.bot_process_instance.pid}) started by this manager...")
try:
self.bot_process_instance.terminate()
self.bot_process_instance.wait(timeout=5)
logger.info("Bot process terminated.")
stopped = True
except subprocess.TimeoutExpired:
logger.warning("Bot process did not terminate in time, killing...")
self.bot_process_instance.kill()
self.bot_process_instance.wait(timeout=5)
logger.info("Bot process killed.")
stopped = True
except Exception as e:
logger.error(f"Error terminating/killing own bot process: {e}")
self.bot_process_instance = None
bot_process_instance = None
# Fallback: find and kill any python process running the bot script
if not stopped:
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
cmdline = proc.cmdline()
if cmdline and sys.executable in cmdline[0] and any(bot_script_name in arg for arg in cmdline):
logger.info(f"Found bot process '{bot_script_name}' (PID: {proc.pid}). Attempting to terminate...")
proc.terminate()
proc.wait(timeout=5)
logger.info(f"Bot process '{bot_script_name}' terminated.")
stopped = True
break # Assume only one instance for now
except psutil.TimeoutExpired:
logger.warning(f"Bot process '{bot_script_name}' (PID: {proc.pid}) did not terminate, killing...")
proc.kill()
proc.wait(timeout=5)
logger.info(f"Bot process '{bot_script_name}' killed.")
stopped = True
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess, IndexError):
continue
if not stopped: # If no Popen instance and no external process found
logger.info(f"Bot process '{bot_script_name}' not found running.")
stopped = True
if self.bot_process_instance: # Clear Popen object if it exists
self.bot_process_instance = None
bot_process_instance = None
return stopped
def _restart_game_managed(self):
logger.info("Restarting game (managed)...")
self._stop_game_managed()
time.sleep(2) # Give it time to fully stop
return self._start_game_managed()
def _restart_bot_managed(self):
logger.info("Restarting bot (managed)...")
self._stop_bot_managed()
time.sleep(2) # Give it time to fully stop
return self._start_bot_managed()
def _restart_all_managed(self):
logger.info("Performing full restart (bot and game)...")
self._stop_bot_managed()
self._stop_game_managed()
time.sleep(3)
game_started = self._start_game_managed()
if game_started:
time.sleep(10) # Wait for game to initialize
bot_started = self._start_bot_managed()
if not bot_started:
logger.error("Failed to restart bot after restarting game.")
return False
else:
logger.error("Failed to restart game during full restart.")
# Optionally try to start bot anyway or declare full failure
# self._start_bot_managed()
return False
logger.info("Full restart completed.")
# Update last restart time if tracking it
# self.last_restart_time = datetime.datetime.now()
return True
def _start_monitoring_thread(self):
if self.monitor_thread_instance and self.monitor_thread_instance.is_alive():
logger.info("Monitor thread already running.")
return
self.monitor_thread_instance = threading.Thread(target=self._monitoring_loop, daemon=True)
self.monitor_thread_instance.start()
logger.info("Started monitoring thread.")
def _monitoring_loop(self):
logger.info("Monitoring loop started.")
while self.keep_monitoring_flag.is_set():
try:
# Check game
if not self._is_game_running_managed():
if self.game_process_instance is None : # Only restart if we are supposed to manage it or it was started by us and died
logger.warning("Managed game process not found. Attempting to restart game...")
self._start_game_managed() # Or _restart_game_managed()
# Check bot
if not self._is_bot_running_managed():
if self.bot_process_instance is None: # Only restart if we are supposed to manage it or it was started by us and died
logger.warning("Managed bot process not found. Attempting to restart bot...")
self._start_bot_managed() # Or _restart_bot_managed()
# Check for remote commands (if control_client_instance is set up)
if self.control_client_instance and hasattr(self.control_client_instance, 'check_signals'):
self.control_client_instance.check_signals(self) # Pass self (WolfChatSetup instance)
time.sleep(self.config_data.get("GAME_WINDOW_CONFIG", {}).get("MONITOR_INTERVAL_SECONDS", 5))
except Exception as e:
logger.exception(f"Error in monitoring loop: {e}")
time.sleep(10) # Wait longer after an error
logger.info("Monitoring loop stopped.")
def _start_scheduler_thread(self):
if self.scheduler_thread_instance and self.scheduler_thread_instance.is_alive():
logger.info("Scheduler thread already running.")
return
self._setup_scheduled_restarts() # Setup jobs based on current config
self.scheduler_thread_instance = threading.Thread(target=self._run_scheduler, daemon=True)
self.scheduler_thread_instance.start()
logger.info("Started scheduler thread.")
def _run_scheduler(self):
logger.info("Scheduler loop started.")
while self.keep_monitoring_flag.is_set(): # Use same flag as monitor
schedule.run_pending()
time.sleep(1)
logger.info("Scheduler loop stopped.")
def _setup_scheduled_restarts(self):
schedule.clear() # Clear previous jobs
link_restarts = self.remote_data.get("LINK_RESTART_TIMES", True)
game_interval = self.remote_data.get("DEFAULT_GAME_RESTART_INTERVAL_MINUTES", 0)
bot_interval = self.remote_data.get("DEFAULT_BOT_RESTART_INTERVAL_MINUTES", 0)
if link_restarts and game_interval > 0:
logger.info(f"Scheduling linked restart (game & bot) every {game_interval} minutes.")
schedule.every(game_interval).minutes.do(self._restart_all_managed)
else:
if game_interval > 0:
logger.info(f"Scheduling game restart every {game_interval} minutes.")
schedule.every(game_interval).minutes.do(self._restart_game_managed)
if bot_interval > 0:
logger.info(f"Scheduling bot restart every {bot_interval} minutes.")
schedule.every(bot_interval).minutes.do(self._restart_bot_managed)
if not schedule.jobs:
logger.info("No scheduled restarts configured.")
def _start_control_client(self):
if not HAS_SOCKETIO:
logger.warning("Cannot start ControlClient: python-socketio is not installed.")
return
if self.control_client_instance and self.control_client_instance.is_connected(): # is_connected or similar check
logger.info("Control client already connected.")
return
server_url = self.remote_data.get("REMOTE_SERVER_URL")
client_key = self.remote_data.get("REMOTE_CLIENT_KEY")
if not server_url or not client_key:
logger.warning("Remote server URL or client key not configured. Cannot start control client.")
messagebox.showwarning("Remote Config Missing", "Remote Server URL or Client Key is not set in Management tab.")
return
self.control_client_instance = ControlClient(server_url, client_key, wolf_chat_setup_instance=self) # Pass self
# The ControlClient should handle its own connection thread.
# self.control_client_instance.start_thread() or similar method
if self.control_client_instance.run_in_thread(): # Assuming run_in_thread starts the connection attempt
logger.info("Control client thread started.")
else:
logger.error("Failed to start control client thread.")
self.control_client_instance = None
def _stop_control_client(self):
if self.control_client_instance:
logger.info("Stopping control client...")
self.control_client_instance.stop() # This should handle thread shutdown
self.control_client_instance = None
logger.info("Control client stopped.")
def on_closing(self):
"""Handle window close event."""
if messagebox.askokcancel("Quit", "Do you want to quit Wolf Chat Setup? This will stop any managed sessions."):
self.stop_managed_session() # Ensure everything is stopped
self.destroy()
def create_api_tab(self):
"""Create the API Settings tab"""
tab = ttk.Frame(self.notebook)
self.notebook.add(tab, text="API Settings")
# Main frame with padding
main_frame = ttk.Frame(tab, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Header
header = ttk.Label(main_frame, text="OpenAI API Settings", font=("", 12, "bold"))
header.pack(anchor=tk.W, pady=(0, 10))
# API Base URL
url_frame = ttk.Frame(main_frame)
url_frame.pack(fill=tk.X, pady=5)
url_label = ttk.Label(url_frame, text="API Base URL:", width=15)
url_label.pack(side=tk.LEFT, padx=(0, 5))
self.api_url_var = tk.StringVar()
self.api_url_entry = ttk.Entry(url_frame, textvariable=self.api_url_var)
self.api_url_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Predefined endpoints
url_presets_frame = ttk.Frame(main_frame)
url_presets_frame.pack(fill=tk.X, pady=2)
preset_label = ttk.Label(url_presets_frame, text="Presets:", width=15)
preset_label.pack(side=tk.LEFT, padx=(0, 5))
openai_btn = ttk.Button(url_presets_frame, text="OpenAI", width=10,
command=lambda: self.api_url_var.set(""))
openai_btn.pack(side=tk.LEFT, padx=2)
openrouter_btn = ttk.Button(url_presets_frame, text="OpenRouter", width=10,
command=lambda: self.api_url_var.set("https://openrouter.ai/api/v1"))
openrouter_btn.pack(side=tk.LEFT, padx=2)
localhost_btn = ttk.Button(url_presets_frame, text="Localhost", width=10,
command=lambda: self.api_url_var.set("http://localhost:1234/v1"))
localhost_btn.pack(side=tk.LEFT, padx=2)
# API Key
key_frame = ttk.Frame(main_frame)
key_frame.pack(fill=tk.X, pady=5)
key_label = ttk.Label(key_frame, text="API Key:", width=15)
key_label.pack(side=tk.LEFT, padx=(0, 5))
self.api_key_var = tk.StringVar()
self.api_key_entry = ttk.Entry(key_frame, textvariable=self.api_key_var, show="*")
self.api_key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.show_key_var = tk.BooleanVar(value=False)
show_key_cb = ttk.Checkbutton(key_frame, text="Show", variable=self.show_key_var,
command=self.toggle_key_visibility)
show_key_cb.pack(side=tk.LEFT, padx=(5, 0))
# Model Selection
model_frame = ttk.Frame(main_frame)
model_frame.pack(fill=tk.X, pady=5)
model_label = ttk.Label(model_frame, text="LLM Model:", width=15)
model_label.pack(side=tk.LEFT, padx=(0, 5))
self.model_var = tk.StringVar(value="deepseek/deepseek-chat-v3-0324")
self.model_entry = ttk.Entry(model_frame, textvariable=self.model_var)
self.model_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Information box
info_frame = ttk.LabelFrame(main_frame, text="Information")
info_frame.pack(fill=tk.BOTH, expand=True, pady=10)
info_text = (
"• Leave the API Base URL empty to use the official OpenAI API\n"
"• For OpenRouter, you need an OpenRouter API key\n"
"• For local LLMs, configure your API to use OpenAI-compatible format\n"
"• Make sure the selected model is available with your chosen provider"
)
info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700)
info_label.pack(padx=10, pady=10, anchor=tk.W)
def create_mcp_tab(self):
"""Create the MCP Settings tab"""
tab = ttk.Frame(self.notebook)
self.notebook.add(tab, text="MCP Servers")
# Main frame with padding
main_frame = ttk.Frame(tab, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Header
header = ttk.Label(main_frame, text="Modular Capability Provider (MCP) Servers", font=("", 12, "bold"))
header.pack(anchor=tk.W, pady=(0, 10))
# Create the main frame for server settings
servers_frame = ttk.Frame(main_frame)
servers_frame.pack(fill=tk.BOTH, expand=True)
# Left side - Servers list
servers_list_frame = ttk.LabelFrame(servers_frame, text="Available Servers")
servers_list_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 5))
# Servers list
self.servers_listbox = tk.Listbox(servers_list_frame, width=20, height=10)
self.servers_listbox.pack(side=tk.LEFT, fill=tk.Y, expand=True, padx=5, pady=5)
self.servers_listbox.bind('<<ListboxSelect>>', self.on_server_select)
servers_scrollbar = ttk.Scrollbar(servers_list_frame, orient=tk.VERTICAL, command=self.servers_listbox.yview)
servers_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.servers_listbox.config(yscrollcommand=servers_scrollbar.set)
# Buttons below listbox
servers_btn_frame = ttk.Frame(servers_list_frame)
servers_btn_frame.pack(fill=tk.X, pady=5, padx=5)
add_btn = ttk.Button(servers_btn_frame, text="Add", command=self.add_custom_server)
add_btn.pack(side=tk.LEFT, padx=2, fill=tk.X, expand=True)
self.remove_btn = ttk.Button(servers_btn_frame, text="Remove", command=self.remove_server, state=tk.DISABLED)
self.remove_btn.pack(side=tk.LEFT, padx=2, fill=tk.X, expand=True)
# Right side - Server settings
self.server_settings_frame = ttk.LabelFrame(servers_frame, text="Server Settings")
self.server_settings_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)
# Empty label for initial state
self.empty_settings_label = ttk.Label(self.server_settings_frame,
text="Select a server from the list to configure it.",
justify=tk.CENTER)
self.empty_settings_label.pack(expand=True, pady=20)
# Frames for each server type (initially hidden)
self.create_exa_settings_frame()
self.create_chroma_settings_frame()
self.create_custom_settings_frame()
# Update the servers list
self.update_servers_list()
def create_exa_settings_frame(self):
"""Create settings frame for Exa server"""
self.exa_frame = ttk.Frame(self.server_settings_frame)
# Enable checkbox
enable_frame = ttk.Frame(self.exa_frame)
enable_frame.pack(fill=tk.X, pady=5)
self.exa_enable_var = tk.BooleanVar(value=True)
enable_cb = ttk.Checkbutton(enable_frame, text="Enable Exa Server", variable=self.exa_enable_var)
enable_cb.pack(anchor=tk.W)
# API Key
key_frame = ttk.Frame(self.exa_frame)
key_frame.pack(fill=tk.X, pady=5)
key_label = ttk.Label(key_frame, text="Exa API Key:", width=15)
key_label.pack(side=tk.LEFT)
self.exa_key_var = tk.StringVar()
self.exa_key_entry = ttk.Entry(key_frame, textvariable=self.exa_key_var, show="*")
self.exa_key_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
self.show_exa_key_var = tk.BooleanVar(value=False)
show_key_cb = ttk.Checkbutton(key_frame, text="Show", variable=self.show_exa_key_var,
command=lambda: self.toggle_field_visibility(self.exa_key_entry, self.show_exa_key_var))
show_key_cb.pack(side=tk.LEFT, padx=(5, 0))
# Server Type
type_frame = ttk.Frame(self.exa_frame)
type_frame.pack(fill=tk.X, pady=5)
type_label = ttk.Label(type_frame, text="Server Type:", width=15)
type_label.pack(side=tk.LEFT)
self.exa_type_var = tk.StringVar(value="local") # Changed default to local
local_radio = ttk.Radiobutton(type_frame, text="Local Server (recommended)",
variable=self.exa_type_var, value="local",
command=self.update_exa_settings_visibility)
local_radio.pack(anchor=tk.W)
smithery_radio = ttk.Radiobutton(type_frame, text="Smithery",
variable=self.exa_type_var, value="smithery",
command=self.update_exa_settings_visibility)
smithery_radio.pack(anchor=tk.W)
# Local Server Path
self.exa_local_frame = ttk.Frame(self.exa_frame)
local_path_label = ttk.Label(self.exa_local_frame, text="Server Path:", width=15)
local_path_label.pack(side=tk.LEFT)
self.exa_path_var = tk.StringVar(value=f"C:/Users/{CURRENT_USERNAME}/AppData/Roaming/npm/exa-mcp-server")
self.exa_path_entry = ttk.Entry(self.exa_local_frame, textvariable=self.exa_path_var)
self.exa_path_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
browse_btn = ttk.Button(self.exa_local_frame, text="Browse", command=self.browse_exa_server)
browse_btn.pack(side=tk.LEFT, padx=(5, 0))
# Smithery Info
self.exa_smithery_frame = ttk.Frame(self.exa_frame)
smithery_info = ttk.Label(self.exa_smithery_frame,
text="Smithery will automatically download and run the latest Exa MCP server.",
wraplength=400)
smithery_info.pack(pady=5)
# Information text
info_frame = ttk.LabelFrame(self.exa_frame, text="Information")
info_frame.pack(fill=tk.X, pady=10)
info_text = (
"• Exa MCP provides web search and research capabilities\n"
"• An Exa API key is required (obtain from https://exa.ai)\n"
"• Local server is the default and preferred option\n"
"• Smithery requires an internet connection each time"
)
info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=400)
info_label.pack(padx=10, pady=10, anchor=tk.W)
def create_chroma_settings_frame(self):
"""Create settings frame for Chroma MCP server"""
self.chroma_frame = ttk.Frame(self.server_settings_frame)
# Enable checkbox
enable_frame = ttk.Frame(self.chroma_frame)
enable_frame.pack(fill=tk.X, pady=5)
self.chroma_enable_var = tk.BooleanVar(value=True)
enable_cb = ttk.Checkbutton(enable_frame, text="Enable Chroma MCP Server", variable=self.chroma_enable_var)
enable_cb.pack(anchor=tk.W)
# Data directory
dir_frame = ttk.Frame(self.chroma_frame)
dir_frame.pack(fill=tk.X, pady=5)
dir_label = ttk.Label(dir_frame, text="Data Directory:", width=15)
dir_label.pack(side=tk.LEFT)
self.chroma_dir_var = tk.StringVar(value=DEFAULT_CHROMA_DATA_PATH)
dir_entry = ttk.Entry(dir_frame, textvariable=self.chroma_dir_var)
dir_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
browse_btn = ttk.Button(dir_frame, text="Browse", command=self.browse_chroma_dir)
browse_btn.pack(side=tk.LEFT, padx=(5, 0))
# Information text
info_frame = ttk.LabelFrame(self.chroma_frame, text="Information")
info_frame.pack(fill=tk.X, pady=10, expand=True)
info_text = (
"• Chroma MCP provides vector database storage for memory features\n"
"• The data directory will store vector embeddings and metadata\n"
"• Use a persistent directory to maintain memory between sessions\n"
"• Requires uvx to be installed in your Python environment"
)
info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=400)
info_label.pack(padx=10, pady=10, anchor=tk.W)
def create_custom_settings_frame(self):
"""Create settings frame for custom server"""
self.custom_frame = ttk.Frame(self.server_settings_frame)
# Name field
name_frame = ttk.Frame(self.custom_frame)
name_frame.pack(fill=tk.X, pady=5)
name_label = ttk.Label(name_frame, text="Server Name:", width=15)
name_label.pack(side=tk.LEFT)
self.custom_name_var = tk.StringVar()
name_entry = ttk.Entry(name_frame, textvariable=self.custom_name_var)
name_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Enable checkbox
enable_frame = ttk.Frame(self.custom_frame)
enable_frame.pack(fill=tk.X, pady=5)
self.custom_enable_var = tk.BooleanVar(value=True)
enable_cb = ttk.Checkbutton(enable_frame, text="Enable Server", variable=self.custom_enable_var)
enable_cb.pack(anchor=tk.W)
# Raw configuration editor
config_label = ttk.Label(self.custom_frame, text="Server Configuration (Python Dictionary Format):")
config_label.pack(anchor=tk.W, pady=(10, 5))
self.custom_config_text = scrolledtext.ScrolledText(self.custom_frame, height=12, width=50)
self.custom_config_text.pack(fill=tk.BOTH, expand=True, pady=5)
# Default configuration template
default_config = (
' "command": "npx",\n'
' "args": [\n'
' "some-mcp-server",\n'
' "--option1",\n'
' "--option2=value"\n'
' ]'
)
self.custom_config_text.insert(tk.END, default_config)
# Information text
info_frame = ttk.LabelFrame(self.custom_frame, text="Information")
info_frame.pack(fill=tk.X, pady=10)
info_text = (
"• Enter the raw Python dictionary for your custom MCP server\n"
"• Format must match the structure in config.py\n"
"• Be careful with syntax - incorrect format may cause errors\n"
"• Server name must be unique"
)
info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=400)
info_label.pack(padx=10, pady=10, anchor=tk.W)
def create_game_tab(self):
"""Create the Game Settings tab"""
tab = ttk.Frame(self.notebook)
self.notebook.add(tab, text="Game Settings")
# Main frame with padding
main_frame = ttk.Frame(tab, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Header
header = ttk.Label(main_frame, text="Game Window Configuration", font=("", 12, "bold"))
header.pack(anchor=tk.W, pady=(0, 10))
# Game window title
title_frame = ttk.Frame(main_frame)
title_frame.pack(fill=tk.X, pady=5)
title_label = ttk.Label(title_frame, text="Window Title:", width=20)
title_label.pack(side=tk.LEFT, padx=(0, 5))
self.window_title_var = tk.StringVar(value="Last War-Survival Game")
title_entry = ttk.Entry(title_frame, textvariable=self.window_title_var)
title_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Game executable path
path_frame = ttk.Frame(main_frame)
path_frame.pack(fill=tk.X, pady=5)
path_label = ttk.Label(path_frame, text="Game Executable Path:", width=20)
path_label.pack(side=tk.LEFT, padx=(0, 5))
self.game_path_var = tk.StringVar(value=fr"C:\Users\{CURRENT_USERNAME}\AppData\Local\TheLastWar\Launch.exe")
path_entry = ttk.Entry(path_frame, textvariable=self.game_path_var)
path_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
browse_btn = ttk.Button(path_frame, text="Browse", command=self.browse_game_path)
browse_btn.pack(side=tk.LEFT, padx=(5, 0))
# Window position
pos_frame = ttk.Frame(main_frame)
pos_frame.pack(fill=tk.X, pady=5)
pos_label = ttk.Label(pos_frame, text="Window Position:", width=20)
pos_label.pack(side=tk.LEFT, padx=(0, 5))
pos_x_label = ttk.Label(pos_frame, text="X:")
pos_x_label.pack(side=tk.LEFT)
self.pos_x_var = tk.IntVar(value=50)
pos_x_entry = ttk.Spinbox(pos_frame, textvariable=self.pos_x_var, from_=0, to=3000, width=5)
pos_x_entry.pack(side=tk.LEFT, padx=(0, 10))
pos_y_label = ttk.Label(pos_frame, text="Y:")
pos_y_label.pack(side=tk.LEFT)
self.pos_y_var = tk.IntVar(value=30)
pos_y_entry = ttk.Spinbox(pos_frame, textvariable=self.pos_y_var, from_=0, to=3000, width=5)
pos_y_entry.pack(side=tk.LEFT)
# Window size
size_frame = ttk.Frame(main_frame)
size_frame.pack(fill=tk.X, pady=5)
size_label = ttk.Label(size_frame, text="Window Size:", width=20)
size_label.pack(side=tk.LEFT, padx=(0, 5))
width_label = ttk.Label(size_frame, text="Width:")
width_label.pack(side=tk.LEFT)
self.width_var = tk.IntVar(value=600)
width_entry = ttk.Spinbox(size_frame, textvariable=self.width_var, from_=300, to=3000, width=5)
width_entry.pack(side=tk.LEFT, padx=(0, 10))
height_label = ttk.Label(size_frame, text="Height:")
height_label.pack(side=tk.LEFT)
self.height_var = tk.IntVar(value=1070)
height_entry = ttk.Spinbox(size_frame, textvariable=self.height_var, from_=300, to=3000, width=5)
height_entry.pack(side=tk.LEFT)
# Auto-restart settings (Now managed by 'Management' tab)
restart_info_frame = ttk.LabelFrame(main_frame, text="Auto-Restart Settings (Legacy)")
restart_info_frame.pack(fill=tk.X, pady=10)
legacy_restart_label = ttk.Label(restart_info_frame,
text="Scheduled game/bot restarts are now configured in the 'Management' tab.",
justify=tk.LEFT, wraplength=680)
legacy_restart_label.pack(padx=10, pady=10, anchor=tk.W)
# Keep the variables for config.py compatibility if other parts of the app might read them,
# but their UI controls are removed from here.
self.restart_var = tk.BooleanVar(value=self.config_data.get("GAME_WINDOW_CONFIG", {}).get("ENABLE_SCHEDULED_RESTART", True))
self.interval_var = tk.IntVar(value=self.config_data.get("GAME_WINDOW_CONFIG", {}).get("RESTART_INTERVAL_MINUTES", 60))
# Monitor interval (Still relevant for window positioning, not restart scheduling)
monitor_frame = ttk.Frame(main_frame)
monitor_frame.pack(fill=tk.X, pady=5)
monitor_label = ttk.Label(monitor_frame, text="Monitor Interval (sec):", width=20)
monitor_label.pack(side=tk.LEFT, padx=(0, 5))
self.monitor_interval_var = tk.IntVar(value=5)
monitor_entry = ttk.Spinbox(monitor_frame, textvariable=self.monitor_interval_var, from_=1, to=60, width=5)
monitor_entry.pack(side=tk.LEFT)
# Information text
info_frame = ttk.LabelFrame(main_frame, text="Information")
info_frame.pack(fill=tk.BOTH, expand=True, pady=10)
info_text = (
"• These settings control how the game window is monitored and positioned\n"
"• Window Title must match exactly what's shown in the title bar\n"
"• Scheduled restart helps prevent game crashes and memory leaks\n"
"• Monitor interval determines how often the window position is checked\n"
"• Changes will take effect after restarting Wolf Chat"
)
info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700)
info_label.pack(padx=10, pady=10, anchor=tk.W)
def create_memory_tab(self):
"""Create the Memory Settings tab"""
tab = ttk.Frame(self.notebook)
self.notebook.add(tab, text="Memory Settings")
# Main frame with padding
main_frame = ttk.Frame(tab, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
# Header
header = ttk.Label(main_frame, text="ChromaDB Memory Integration", font=("", 12, "bold"))
header.pack(anchor=tk.W, pady=(0, 10))
# Enable Pre-loading
preload_frame = ttk.Frame(main_frame)
preload_frame.pack(fill=tk.X, pady=5)
self.preload_profiles_var = tk.BooleanVar(value=True)
preload_cb = ttk.Checkbutton(preload_frame, text="Enable user profile pre-loading",
variable=self.preload_profiles_var)
preload_cb.pack(anchor=tk.W, pady=2)
# Collection Names Frame
collections_frame = ttk.LabelFrame(main_frame, text="Collection Names")
collections_frame.pack(fill=tk.X, pady=10)
# User Profiles Collection
profiles_col_frame = ttk.Frame(collections_frame)
profiles_col_frame.pack(fill=tk.X, pady=5, padx=10)
profiles_col_label = ttk.Label(profiles_col_frame, text="Profiles Collection:", width=20)
profiles_col_label.pack(side=tk.LEFT, padx=(0, 5))
# 修正:將預設值改為 "wolfhart_memory" 以匹配實際用法
self.profiles_collection_var = tk.StringVar(value="wolfhart_memory")
profiles_col_entry = ttk.Entry(profiles_col_frame, textvariable=self.profiles_collection_var)
profiles_col_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Conversations Collection
conv_col_frame = ttk.Frame(collections_frame)
conv_col_frame.pack(fill=tk.X, pady=5, padx=10)
conv_col_label = ttk.Label(conv_col_frame, text="Conversations Collection:", width=20)
conv_col_label.pack(side=tk.LEFT, padx=(0, 5))
self.conversations_collection_var = tk.StringVar(value="conversations")
conv_col_entry = ttk.Entry(conv_col_frame, textvariable=self.conversations_collection_var)
conv_col_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Bot Memory Collection
bot_col_frame = ttk.Frame(collections_frame)
bot_col_frame.pack(fill=tk.X, pady=5, padx=10)
bot_col_label = ttk.Label(bot_col_frame, text="Bot Memory Collection:", width=20)
bot_col_label.pack(side=tk.LEFT, padx=(0, 5))
self.bot_memory_collection_var = tk.StringVar(value="wolfhart_memory")
bot_col_entry = ttk.Entry(bot_col_frame, textvariable=self.bot_memory_collection_var)
bot_col_entry.pack(side=tk.LEFT, fill=tk.X, expand=True)
# Pre-loading Settings
preload_settings_frame = ttk.LabelFrame(main_frame, text="Pre-loading Settings")
preload_settings_frame.pack(fill=tk.X, pady=10)
# Related memories to preload
related_frame = ttk.Frame(preload_settings_frame)
related_frame.pack(fill=tk.X, pady=5, padx=10)
related_label = ttk.Label(related_frame, text="Related Memories Count:", width=20)
related_label.pack(side=tk.LEFT, padx=(0, 5))
self.related_memories_var = tk.IntVar(value=2)
related_spinner = ttk.Spinbox(related_frame, from_=0, to=10, width=5, textvariable=self.related_memories_var)
related_spinner.pack(side=tk.LEFT)
related_info = ttk.Label(related_frame, text="(0 to disable related memories pre-loading)")
related_info.pack(side=tk.LEFT, padx=(5, 0))
# Information box
info_frame = ttk.LabelFrame(main_frame, text="Information")
info_frame.pack(fill=tk.BOTH, expand=True, pady=10)
info_text = (
"• Pre-loading user profiles will speed up responses by fetching data before LLM calls\n"
"• Collection names must match your ChromaDB configuration\n"
"• The bot will automatically use pre-loaded data if available\n"
"• If data isn't found locally, the bot will fall back to using tool calls"
)
info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700)
info_label.pack(padx=10, pady=10, anchor=tk.W)
def create_bottom_buttons(self):
"""Create bottom action buttons"""
btn_frame = ttk.Frame(self)
btn_frame.pack(fill=tk.X, padx=10, pady=10)
# Version label on left
version_label = ttk.Label(btn_frame, text=f"v{VERSION}")
version_label.pack(side=tk.LEFT, padx=5)
# Action buttons on right (order matters for packing)
cancel_btn = ttk.Button(btn_frame, text="Cancel", command=self.quit)
cancel_btn.pack(side=tk.RIGHT, padx=5)
save_btn = ttk.Button(btn_frame, text="Save Settings", command=self.save_settings)
save_btn.pack(side=tk.RIGHT, padx=5)
install_deps_btn = ttk.Button(btn_frame, text="Install Dependencies", command=self.install_dependencies)
install_deps_btn.pack(side=tk.RIGHT, padx=5)
# Run buttons
self.run_test_btn = ttk.Button(btn_frame, text="Run Test", command=self.run_test_script)
self.run_test_btn.pack(side=tk.RIGHT, padx=5)
self.run_bot_btn = ttk.Button(btn_frame, text="Run Chat Bot", command=self.run_chat_bot)
self.run_bot_btn.pack(side=tk.RIGHT, padx=5)
# Stop button
self.stop_btn = ttk.Button(btn_frame, text="Stop Process", command=self.stop_process, state=tk.DISABLED)
self.stop_btn.pack(side=tk.RIGHT, padx=5)
def install_dependencies(self):
"""Run the installation script for dependencies"""
try:
import subprocess
import sys
# Check if install.py exists
if not os.path.exists("install.py"):
messagebox.showerror("Error", "Could not find install.py script")
return
# Run the installer script in a new process
subprocess.Popen([sys.executable, "install.py"])
except Exception as e:
messagebox.showerror("Error", f"Failed to launch installer: {str(e)}")
def run_chat_bot(self):
"""Run the main chat bot script"""
try:
import subprocess
import sys
if not os.path.exists("main.py"):
messagebox.showerror("Error", "Could not find main.py script")
return
if self.running_process is not None:
messagebox.showwarning("Already Running", "Another process is already running. Please stop it first.")
return
self.running_process = subprocess.Popen([sys.executable, "main.py"])
print("Attempting to start main.py...")
self.update_run_button_states(False) # Disable run buttons, enable stop
except Exception as e:
messagebox.showerror("Error", f"Failed to launch main.py: {str(e)}")
self.update_run_button_states(True) # Re-enable buttons on failure
def run_test_script(self):
"""Run the LLM debug script"""
try:
import subprocess
import sys
test_script_path = os.path.join("test", "llm_debug_script.py")
if not os.path.exists(test_script_path):
messagebox.showerror("Error", f"Could not find {test_script_path}")
return
if self.running_process is not None:
messagebox.showwarning("Already Running", "Another process is already running. Please stop it first.")
return
self.running_process = subprocess.Popen([sys.executable, test_script_path])
print(f"Attempting to start {test_script_path}...")
self.update_run_button_states(False) # Disable run buttons, enable stop
except Exception as e:
messagebox.showerror("Error", f"Failed to launch {test_script_path}: {str(e)}")
self.update_run_button_states(True) # Re-enable buttons on failure
def stop_process(self):
"""Stop the currently running process"""
if hasattr(self, 'running_process') and self.running_process is not None:
try:
print("Attempting to terminate running process...")
self.running_process.terminate() # Or .kill() for a more forceful stop
self.running_process = None
messagebox.showinfo("Process Stopped", "The running process has been terminated.")
except Exception as e:
messagebox.showerror("Error", f"Failed to terminate process: {str(e)}")
finally:
# Re-enable run buttons and disable stop button
self.update_run_button_states(True)
else:
messagebox.showinfo("No Process", "No process is currently running.")
def update_run_button_states(self, enable):
"""Enable or disable the run buttons and update stop button state"""
# Assuming run_bot_btn and run_test_btn exist and are class attributes
if hasattr(self, 'run_bot_btn'):
self.run_bot_btn.config(state=tk.NORMAL if enable else tk.DISABLED)
if hasattr(self, 'run_test_btn'):
self.run_test_btn.config(state=tk.NORMAL if enable else tk.DISABLED)
if hasattr(self, 'stop_btn'):
self.stop_btn.config(state=tk.DISABLED if enable else tk.NORMAL)
def update_ui_from_data(self):
"""Update UI controls from loaded data"""
try:
# API Tab
self.api_url_var.set(self.config_data.get("OPENAI_API_BASE_URL", ""))
self.api_key_var.set(self.env_data.get("OPENAI_API_KEY", ""))
self.model_var.set(self.config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324"))
# MCP Servers
# Exa settings
if "exa" in self.config_data.get("MCP_SERVERS", {}):
exa_config = self.config_data["MCP_SERVERS"]["exa"]
self.exa_enable_var.set(exa_config.get("enabled", True))
self.exa_key_var.set(self.env_data.get("EXA_API_KEY", ""))
if exa_config.get("use_smithery", False):
self.exa_type_var.set("smithery")
else:
self.exa_type_var.set("local")
if "server_path" in exa_config:
self.exa_path_var.set(exa_config.get("server_path", ""))
# Chroma settings
if "chroma" in self.config_data.get("MCP_SERVERS", {}):
chroma_config = self.config_data["MCP_SERVERS"]["chroma"]
self.chroma_enable_var.set(chroma_config.get("enabled", True))
data_dir = chroma_config.get("data_dir", "")
if data_dir:
# Ensure data directory is absolute path
self.chroma_dir_var.set(os.path.abspath(data_dir))
else:
# Set default as absolute path
self.chroma_dir_var.set(DEFAULT_CHROMA_DATA_PATH)
# Update servers list to include custom servers
self.update_servers_list()
# Game settings
game_config = self.config_data.get("GAME_WINDOW_CONFIG", {})
self.window_title_var.set(game_config.get("WINDOW_TITLE", "Last War-Survival Game"))
game_path = game_config.get("GAME_EXECUTABLE_PATH", "")
if game_path:
self.game_path_var.set(game_path)
self.pos_x_var.set(game_config.get("GAME_WINDOW_X", 50))
self.pos_y_var.set(game_config.get("GAME_WINDOW_Y", 30))
self.width_var.set(game_config.get("GAME_WINDOW_WIDTH", 600))
self.height_var.set(game_config.get("GAME_WINDOW_HEIGHT", 1070))
self.restart_var.set(game_config.get("ENABLE_SCHEDULED_RESTART", True))
self.interval_var.set(game_config.get("RESTART_INTERVAL_MINUTES", 60))
self.monitor_interval_var.set(game_config.get("MONITOR_INTERVAL_SECONDS", 5))
# Memory Settings
self.preload_profiles_var.set(self.config_data.get("ENABLE_PRELOAD_PROFILES", True))
self.related_memories_var.set(self.config_data.get("PRELOAD_RELATED_MEMORIES", 2))
self.profiles_collection_var.set(self.config_data.get("PROFILES_COLLECTION", "user_profiles")) # Default was user_profiles
self.conversations_collection_var.set(self.config_data.get("CONVERSATIONS_COLLECTION", "conversations"))
self.bot_memory_collection_var.set(self.config_data.get("BOT_MEMORY_COLLECTION", "wolfhart_memory"))
# Management Tab Settings
if hasattr(self, 'remote_url_var'): # Check if UI elements for management tab exist
self.remote_url_var.set(self.remote_data.get("REMOTE_SERVER_URL", ""))
self.remote_key_var.set(self.remote_data.get("REMOTE_CLIENT_KEY", ""))
self.game_restart_interval_var.set(self.remote_data.get("DEFAULT_GAME_RESTART_INTERVAL_MINUTES", 120))
self.bot_restart_interval_var.set(self.remote_data.get("DEFAULT_BOT_RESTART_INTERVAL_MINUTES", 120))
self.link_restarts_var.set(self.remote_data.get("LINK_RESTART_TIMES", True))
self.game_process_name_var.set(self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe"))
# Update visibility and states
self.update_exa_settings_visibility()
self.update_management_buttons_state(True) # Initially, start button is enabled
except Exception as e:
logger.exception("Error updating UI from data") # Log full traceback
print(f"Error updating UI from data: {e}")
import traceback
traceback.print_exc()
# ===============================================================
# UI Event Handlers
# ===============================================================
def toggle_key_visibility(self):
"""Toggle visibility of API key field"""
if self.show_key_var.get():
self.api_key_entry.config(show="")
else:
self.api_key_entry.config(show="*")
def toggle_field_visibility(self, entry_widget, show_var):
"""Toggle visibility of a password field"""
if show_var.get():
entry_widget.config(show="")
else:
entry_widget.config(show="*")
def update_exa_settings_visibility(self):
"""Update visibility of Exa settings based on server type"""
if self.exa_type_var.get() == "smithery":
self.exa_local_frame.pack_forget()
self.exa_smithery_frame.pack(fill=tk.X, pady=5)
else:
self.exa_smithery_frame.pack_forget()
self.exa_local_frame.pack(fill=tk.X, pady=5)
def browse_exa_server(self):
"""Browse for Exa server executable"""
file_path = filedialog.askopenfilename(
title="Select Exa MCP Server",
filetypes=[("All Files", "*.*")],
initialdir=os.path.expanduser("~")
)
if file_path:
# 標準化路徑格式(將反斜線改為正斜線)
normalized_path = file_path.replace("\\", "/")
self.exa_path_var.set(normalized_path)
def browse_chroma_dir(self):
"""Browse for Chroma data directory"""
dir_path = filedialog.askdirectory(
title="Select Chroma Data Directory",
initialdir=os.path.dirname(DEFAULT_CHROMA_DATA_PATH)
)
if dir_path:
# 標準化路徑格式(將反斜線改為正斜線)
normalized_path = os.path.abspath(dir_path).replace("\\", "/")
self.chroma_dir_var.set(normalized_path)
def browse_game_path(self):
"""Browse for game executable"""
file_path = filedialog.askopenfilename(
title="Select Game Executable",
filetypes=[("Executable Files", "*.exe"), ("All Files", "*.*")],
initialdir=os.path.expanduser("~")
)
if file_path:
# 標準化路徑格式(將反斜線改為正斜線)
normalized_path = file_path.replace("\\", "/")
self.game_path_var.set(normalized_path)
def update_servers_list(self):
"""Update the servers listbox with current servers"""
self.servers_listbox.delete(0, tk.END)
# Add built-in servers
self.servers_listbox.insert(tk.END, "exa")
self.servers_listbox.insert(tk.END, "chroma")
# Add custom servers
for server_name in self.config_data.get("MCP_SERVERS", {}):
if server_name not in ("exa", "chroma"):
self.servers_listbox.insert(tk.END, server_name)
def on_server_select(self, event):
"""Handle server selection from the listbox"""
selection = self.servers_listbox.curselection()
if not selection:
return
selected_server = self.servers_listbox.get(selection[0])
# Hide all settings frames
self.empty_settings_label.pack_forget()
self.exa_frame.pack_forget()
self.chroma_frame.pack_forget()
self.custom_frame.pack_forget()
# Show the selected server's settings frame
if selected_server == "exa":
self.exa_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self.remove_btn.config(state=tk.DISABLED) # Can't remove built-in servers
elif selected_server == "chroma":
self.chroma_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self.remove_btn.config(state=tk.DISABLED) # Can't remove built-in servers
else:
# Custom server
if selected_server in self.config_data.get("MCP_SERVERS", {}):
# Load existing custom server config
server_config = self.config_data["MCP_SERVERS"][selected_server]
self.custom_name_var.set(selected_server)
self.custom_enable_var.set(server_config.get("enabled", True))
if "raw_config" in server_config:
self.custom_config_text.delete("1.0", tk.END)
self.custom_config_text.insert("1.0", server_config["raw_config"])
self.custom_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
self.remove_btn.config(state=tk.NORMAL) # Can remove custom servers
def add_custom_server(self):
"""Add a new custom server"""
# Generate a unique name
new_name = "custom_server"
counter = 1
while new_name in self.config_data.get("MCP_SERVERS", {}):
new_name = f"custom_server_{counter}"
counter += 1
# Add to config data
if "MCP_SERVERS" not in self.config_data:
self.config_data["MCP_SERVERS"] = {}
self.config_data["MCP_SERVERS"][new_name] = {
"enabled": True,
"raw_config": ' "command": "npx",\n "args": [\n "custom-mcp-server"\n ]'
}
# Update UI
self.update_servers_list()
# Select the new server
idx = self.servers_listbox.get(0, tk.END).index(new_name)
self.servers_listbox.selection_clear(0, tk.END)
self.servers_listbox.selection_set(idx)
self.servers_listbox.see(idx)
self.on_server_select(None) # Trigger the selection handler
def remove_server(self):
"""Remove the selected custom server"""
selection = self.servers_listbox.curselection()
if not selection:
return
selected_server = self.servers_listbox.get(selection[0])
# Confirmation
confirm = messagebox.askyesno(
"Confirm Removal",
f"Are you sure you want to remove the server '{selected_server}'?",
icon=messagebox.WARNING
)
if confirm:
# Remove from config data
if selected_server in self.config_data.get("MCP_SERVERS", {}):
del self.config_data["MCP_SERVERS"][selected_server]
# Update UI
self.update_servers_list()
# Clear selection
self.servers_listbox.selection_clear(0, tk.END)
# Hide server settings and show empty label
self.custom_frame.pack_forget()
self.empty_settings_label.pack(expand=True, pady=20)
self.remove_btn.config(state=tk.DISABLED)
def save_settings(self, show_success_message=True): # Added optional param
"""Save all settings to config.py, .env, and remote_config.json files"""
try:
# Update config data from UI (for config.py and .env)
# API settings
self.config_data["OPENAI_API_BASE_URL"] = self.api_url_var.get()
self.config_data["LLM_MODEL"] = self.model_var.get()
# Environment variables
self.env_data["OPENAI_API_KEY"] = self.api_key_var.get()
self.env_data["EXA_API_KEY"] = self.exa_key_var.get()
# MCP Servers
if "MCP_SERVERS" not in self.config_data:
self.config_data["MCP_SERVERS"] = {}
# Exa server
if "exa" not in self.config_data["MCP_SERVERS"]:
self.config_data["MCP_SERVERS"]["exa"] = {}
self.config_data["MCP_SERVERS"]["exa"]["enabled"] = self.exa_enable_var.get()
self.config_data["MCP_SERVERS"]["exa"]["use_smithery"] = (self.exa_type_var.get() == "smithery")
if self.exa_type_var.get() == "local":
self.config_data["MCP_SERVERS"]["exa"]["server_path"] = self.exa_path_var.get()
# Chroma server
if "chroma" not in self.config_data["MCP_SERVERS"]:
self.config_data["MCP_SERVERS"]["chroma"] = {}
self.config_data["MCP_SERVERS"]["chroma"]["enabled"] = self.chroma_enable_var.get()
self.config_data["MCP_SERVERS"]["chroma"]["data_dir"] = self.chroma_dir_var.get()
# Custom server - check if one is currently selected
selection = self.servers_listbox.curselection()
if selection:
selected_server = self.servers_listbox.get(selection[0])
if selected_server not in ("exa", "chroma"):
# Update custom server settings
new_name = self.custom_name_var.get().strip()
if not new_name:
messagebox.showerror("Error", "Custom server name cannot be empty")
return
# Handle name change
if new_name != selected_server:
if new_name in self.config_data["MCP_SERVERS"]:
messagebox.showerror("Error", f"Server name '{new_name}' already exists")
return
# Copy config and delete old entry
self.config_data["MCP_SERVERS"][new_name] = self.config_data["MCP_SERVERS"][selected_server].copy()
del self.config_data["MCP_SERVERS"][selected_server]
# Update other settings
self.config_data["MCP_SERVERS"][new_name]["enabled"] = self.custom_enable_var.get()
self.config_data["MCP_SERVERS"][new_name]["raw_config"] = self.custom_config_text.get("1.0", tk.END).strip()
# Game window settings
self.config_data["GAME_WINDOW_CONFIG"] = {
"WINDOW_TITLE": self.window_title_var.get(),
"ENABLE_SCHEDULED_RESTART": self.restart_var.get(),
"RESTART_INTERVAL_MINUTES": self.interval_var.get(),
"GAME_EXECUTABLE_PATH": self.game_path_var.get(),
"GAME_WINDOW_X": self.pos_x_var.get(),
"GAME_WINDOW_Y": self.pos_y_var.get(),
"GAME_WINDOW_WIDTH": self.width_var.get(),
"GAME_WINDOW_HEIGHT": self.height_var.get(),
"MONITOR_INTERVAL_SECONDS": self.monitor_interval_var.get()
}
# 保存記憶設定
self.config_data["ENABLE_PRELOAD_PROFILES"] = self.preload_profiles_var.get()
self.config_data["PRELOAD_RELATED_MEMORIES"] = self.related_memories_var.get()
self.config_data["PROFILES_COLLECTION"] = self.profiles_collection_var.get()
self.config_data["CONVERSATIONS_COLLECTION"] = self.conversations_collection_var.get()
self.config_data["BOT_MEMORY_COLLECTION"] = self.bot_memory_collection_var.get()
# Update remote_data from UI (for remote_config.json)
if hasattr(self, 'remote_url_var'): # Check if management tab UI elements exist
self.remote_data["REMOTE_SERVER_URL"] = self.remote_url_var.get()
self.remote_data["REMOTE_CLIENT_KEY"] = self.remote_key_var.get()
self.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = self.game_restart_interval_var.get()
self.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = self.bot_restart_interval_var.get()
self.remote_data["LINK_RESTART_TIMES"] = self.link_restarts_var.get()
self.remote_data["GAME_PROCESS_NAME"] = self.game_process_name_var.get()
# Validate critical settings
if "exa" in self.config_data["MCP_SERVERS"] and self.config_data["MCP_SERVERS"]["exa"]["enabled"]:
if not self.exa_key_var.get():
messagebox.showerror("Validation Error", "Exa API Key is required when Exa server is enabled")
return
if self.exa_type_var.get() == "local" and not self.exa_path_var.get():
messagebox.showerror("Validation Error", "Exa Server Path is required for local server type")
return
# Generate config.py and .env files
save_env_file(self.env_data)
generate_config_file(self.config_data, self.env_data)
save_remote_config(self.remote_data) # Save remote config
if show_success_message:
messagebox.showinfo("Success", "Settings saved successfully.\nRestart managed session for changes to take effect.")
except Exception as e:
logger.exception("Error saving settings") # Log the full traceback
if show_success_message: # Only show error if it's a direct save action
messagebox.showerror("Error", f"An error occurred while saving settings:\n{str(e)}")
import traceback
traceback.print_exc()
# ===============================================================
# ControlClient Class (adapted from wolf_control.py)
# ===============================================================
if HAS_SOCKETIO:
class ControlClient:
def __init__(self, server_url, client_key, wolf_chat_setup_instance):
self.server_url = server_url
self.client_key = client_key
self.wolf_chat_setup = wolf_chat_setup_instance # Reference to the main app
# Suppress InsecureRequestWarning when using ssl_verify=False, as is the current default
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.sio = socketio.Client(ssl_verify=False, logger=logger, engineio_logger=logger) # Use app's logger
self.connected = False
self.authenticated = False
self.should_exit_flag = threading.Event() # Use an event for thread control
self.client_thread = None
self.registered_commands = [
"restart bot", "restart game", "restart all",
"set game interval", "set bot interval", "set linked interval"
]
# Event handlers
self.sio.on('connect', self._on_connect)
self.sio.on('disconnect', self._on_disconnect)
self.sio.on('authenticated', self._on_authenticated)
self.sio.on('command', self._on_command)
def is_connected(self):
return self.connected and self.authenticated
def run_in_thread(self):
if self.client_thread and self.client_thread.is_alive():
logger.info("Control client thread already running.")
return True
self.should_exit_flag.clear()
self.client_thread = threading.Thread(target=self._run_forever, daemon=True)
self.client_thread.start()
return True
def _run_forever(self):
logger.info(f"ControlClient: Starting connection attempts to {self.server_url}")
last_heartbeat = time.time() # For heartbeat
retry_delay = 1.0 # Start with 1 second delay for exponential backoff
max_delay = 300.0 # Maximum delay of 5 minutes for exponential backoff
while not self.should_exit_flag.is_set():
if not self.sio.connected:
try:
logger.info(f"ControlClient: Attempting to connect to {self.server_url}...")
self.sio.connect(self.server_url)
logger.info("ControlClient: Successfully connected.")
retry_delay = 1.0 # Reset delay on successful connection
last_heartbeat = time.time() # Reset heartbeat timer on new connection
except socketio.exceptions.ConnectionError as e:
logger.error(f"ControlClient: Connection failed: {e}. Retrying in {retry_delay:.2f}s.")
self.should_exit_flag.wait(retry_delay)
# Implement exponential backoff with jitter
retry_delay = min(retry_delay * 2, max_delay) * (0.8 + 0.4 * random.random())
retry_delay = max(1.0, retry_delay) # Ensure it's at least 1s
continue
except Exception as e: # Catch other potential errors during connection
logger.error(f"ControlClient: Unexpected error during connection attempt: {e}. Retrying in {retry_delay:.2f}s.")
self.should_exit_flag.wait(retry_delay)
retry_delay = min(retry_delay * 2, max_delay) * (0.8 + 0.4 * random.random())
retry_delay = max(1.0, retry_delay) # Ensure it's at least 1s
continue
# If connected, manage heartbeat and check for exit signal
if self.sio.connected:
current_time = time.time()
if current_time - last_heartbeat > 60: # Send heartbeat every 60 seconds
try:
self.sio.emit('heartbeat', {'timestamp': current_time})
last_heartbeat = current_time
logger.debug("ControlClient: Sent heartbeat to keep connection alive.")
except Exception as e:
logger.error(f"ControlClient: Error sending heartbeat: {e}. Connection might be lost.")
self.should_exit_flag.wait(1) # Check for exit signal every second
else:
# Fallback if not connected after attempt block (should be rare with current logic)
logger.debug(f"ControlClient: Not connected (unexpected state in loop), waiting {retry_delay:.2f}s before next cycle.")
self.should_exit_flag.wait(retry_delay)
# Optionally re-calculate retry_delay here if this path is hit, to maintain backoff progression
retry_delay = min(retry_delay * 2, max_delay) * (0.8 + 0.4 * random.random())
retry_delay = max(1.0, retry_delay)
logger.info("ControlClient: Exited _run_forever loop.")
if self.sio.connected:
self.sio.disconnect()
def _on_connect(self):
self.connected = True
logger.info("ControlClient: Connected to server. Authenticating...")
self.sio.emit('authenticate', {
'type': 'client',
'clientKey': self.client_key,
'commands': self.registered_commands
})
def _on_disconnect(self):
self.connected = False
self.authenticated = False
logger.info("ControlClient: Disconnected from server.")
# Force reconnection if not intentionally stopping
if not self.should_exit_flag.is_set():
logger.info("ControlClient: Attempting immediate reconnection from _on_disconnect...")
try:
# This is an immediate attempt; _run_forever handles sustained retries.
if not self.sio.connected: # Check before trying to connect
self.sio.connect(self.server_url)
except Exception as e:
logger.error(f"ControlClient: Immediate reconnection from _on_disconnect failed: {e}")
def _on_authenticated(self, data):
if data.get('success'):
self.authenticated = True
logger.info("ControlClient: Authentication successful.")
else:
self.authenticated = False
logger.error(f"ControlClient: Authentication failed: {data.get('error', 'Unknown error')}")
self.sio.disconnect() # Disconnect if auth fails
def _on_command(self, data):
command = data.get('command', '').lower()
args_str = data.get('args', '') # Assuming server might send args as a string
from_user = data.get('from', 'unknown')
logger.info(f"ControlClient: Received command '{command}' with args '{args_str}' from {from_user}")
try:
if command == "restart bot":
self.wolf_chat_setup._restart_bot_managed()
self._send_command_result(command, True, "Bot restart initiated.")
elif command == "restart game":
self.wolf_chat_setup._restart_game_managed()
self._send_command_result(command, True, "Game restart initiated.")
elif command == "restart all":
self.wolf_chat_setup._restart_all_managed()
self._send_command_result(command, True, "Full restart initiated.")
elif command == "set game interval" or command == "set bot interval" or command == "set linked interval":
try:
interval = int(args_str)
if interval < 0: # 0 means disable
self._send_command_result(command, False, "Interval must be non-negative.")
return
if command == "set game interval":
self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = interval
if self.wolf_chat_setup.remote_data["LINK_RESTART_TIMES"]:
self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = interval
elif command == "set bot interval":
self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = interval
if self.wolf_chat_setup.remote_data["LINK_RESTART_TIMES"]:
self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = interval
elif command == "set linked interval":
self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"] = interval
self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"] = interval
self.wolf_chat_setup.remote_data["LINK_RESTART_TIMES"] = True
save_remote_config(self.wolf_chat_setup.remote_data)
self.wolf_chat_setup._setup_scheduled_restarts() # Re-apply schedule
# Update UI if possible (tricky from non-main thread)
# self.wolf_chat_setup.game_restart_interval_var.set(self.wolf_chat_setup.remote_data["DEFAULT_GAME_RESTART_INTERVAL_MINUTES"])
# self.wolf_chat_setup.bot_restart_interval_var.set(self.wolf_chat_setup.remote_data["DEFAULT_BOT_RESTART_INTERVAL_MINUTES"])
logger.info(f"Updated restart interval via remote: {command} to {interval} min. Saved and re-scheduled.")
self._send_command_result(command, True, f"Interval updated to {interval} min and re-scheduled.")
except ValueError:
self._send_command_result(command, False, "Invalid interval value. Must be an integer.")
else:
self._send_command_result(command, False, "Unsupported command.")
except Exception as e:
logger.exception(f"ControlClient: Error executing command '{command}'")
self._send_command_result(command, False, f"Error: {str(e)}")
def _send_command_result(self, command, success, message):
if self.sio.connected:
try:
self.sio.emit('commandResult', {
'command': command,
'success': success,
'message': message,
'timestamp': time.time()
})
except Exception as e:
logger.error(f"ControlClient: Failed to send command result: {e}")
def check_signals(self, app_instance): # app_instance is self.wolf_chat_setup from the caller
"""Periodically check connection status and commands, called by monitoring thread."""
# Note: _run_forever is the primary mechanism for establishing and maintaining connection.
# This function's connection check is a secondary check.
if not self.sio.connected or not self.authenticated:
logger.warning("ControlClient: Connection check in check_signals found client not connected/authenticated.")
# Avoid aggressive reconnection here if _run_forever is already handling it.
# If an explicit reconnect attempt is desired here:
# logger.info("ControlClient: Attempting reconnection from check_signals...")
# try:
# if self.sio.connected: # e.g. connected but not authenticated
# self.sio.disconnect()
# if not self.sio.connected: # Check again before connecting
# self.sio.connect(self.server_url)
# except Exception as e:
# logger.error(f"ControlClient: Reconnection attempt from check_signals failed: {e}")
# Placeholder for any other signal processing logic
# logger.debug("ControlClient: check_signals executed.")
def stop(self):
logger.info("ControlClient: Stopping...")
self.should_exit_flag.set() # Signal the run_forever loop to exit
if self.sio.connected:
self.sio.disconnect() # Attempt to disconnect gracefully
if self.client_thread and self.client_thread.is_alive():
logger.info("ControlClient: Waiting for client thread to join...")
self.client_thread.join(timeout=5) # Wait for the thread to finish
if self.client_thread.is_alive():
logger.warning("ControlClient: Client thread did not join in time.")
self.client_thread = None
logger.info("ControlClient: Stopped.")
else: # HAS_SOCKETIO is False
class ControlClient: # Dummy class if socketio is not available
def __init__(self, *args, **kwargs): logger.warning("Socket.IO not installed, ControlClient is a dummy.")
def run_in_thread(self): return False
def stop(self): pass
def is_connected(self): return False
# ===============================================================
# Main Entry Point
# ===============================================================
if __name__ == "__main__":
# Setup main logger for the application if not already done
if not logging.getLogger().handlers: # Check root logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
app = WolfChatSetup()
app.protocol("WM_DELETE_WINDOW", app.on_closing) # Handle window close button
app.mainloop()