#!/usr/bin/env python # -*- coding: utf-8 -*- """ Game Manager Module Provides game window monitoring, automatic restart, and process management features. Designed to be imported and controlled by setup.py or other management scripts. """ import os import sys import time import json import threading import subprocess import logging import pygetwindow as gw # Attempt to import platform-specific modules that might be needed try: import win32gui import win32con HAS_WIN32 = True except ImportError: HAS_WIN32 = False print("Warning: win32gui/win32con modules not installed, some window management features may be unavailable") try: import psutil HAS_PSUTIL = True except ImportError: HAS_PSUTIL = False print("Warning: psutil module not installed, process management features may be unavailable") class GameMonitor: """ Game window monitoring class. Responsible for monitoring game window position, scheduled restarts, and providing window management functions. """ def __init__(self, config_data, remote_data=None, logger=None, callback=None): # Use the provided logger or create a new one self.logger = logger or logging.getLogger("GameMonitor") if not self.logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) self.config_data = config_data self.remote_data = remote_data or {} self.callback = callback # Callback function to notify the caller # Read settings from configuration self.window_title = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("WINDOW_TITLE", "Last War-Survival Game") self.enable_restart = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("ENABLE_SCHEDULED_RESTART", True) self.restart_interval = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("RESTART_INTERVAL_MINUTES", 60) self.game_path = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_EXECUTABLE_PATH", "") self.window_x = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_X", 50) self.window_y = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_Y", 30) self.window_width = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_WIDTH", 600) self.window_height = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_HEIGHT", 1070) self.monitor_interval = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("MONITOR_INTERVAL_SECONDS", 5) # Read game process name from remote_data, use default if not found self.game_process_name = self.remote_data.get("GAME_PROCESS_NAME", "LastWar.exe") # Internal state self.running = False self.next_restart_time = None self.monitor_thread = None self.stop_event = threading.Event() # Add these tracking variables self.last_focus_failure_count = 0 self.last_successful_foreground = time.time() self.logger.info(f"GameMonitor initialized. Game window: '{self.window_title}', Process: '{self.game_process_name}'") self.logger.info(f"Position: ({self.window_x}, {self.window_y}), Size: {self.window_width}x{self.window_height}") self.logger.info(f"Scheduled Restart: {'Enabled' if self.enable_restart else 'Disabled'}, Interval: {self.restart_interval} minutes") def start(self): """Start game window monitoring""" if self.running: self.logger.info("Game window monitoring is already running") return True # Return True if already running self.logger.info("Starting game window monitoring...") self.stop_event.clear() # Set next restart time if self.enable_restart and self.restart_interval > 0: self.next_restart_time = time.time() + (self.restart_interval * 60) self.logger.info(f"Scheduled restart enabled. First restart in {self.restart_interval} minutes") else: self.next_restart_time = None self.logger.info("Scheduled restart is disabled") # Start monitoring thread self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() self.running = True self.logger.info("Game window monitoring started") return True def stop(self): """Stop game window monitoring""" if not self.running: self.logger.info("Game window monitoring is not running") return True # Return True if already stopped self.logger.info("Stopping game window monitoring...") self.stop_event.set() # Wait for monitoring thread to finish if self.monitor_thread and self.monitor_thread.is_alive(): self.logger.info("Waiting for monitoring thread to finish...") self.monitor_thread.join(timeout=5) if self.monitor_thread.is_alive(): self.logger.warning("Game window monitoring thread did not stop within the timeout period") self.running = False self.monitor_thread = None self.logger.info("Game window monitoring stopped") return True def _monitor_loop(self): """Main monitoring loop""" self.logger.info("Game window monitoring loop started") last_adjustment_message = "" # Avoid logging repetitive adjustment messages while not self.stop_event.is_set(): try: # Add to _monitor_loop method - just 7 lines that matter if not self._is_game_running(): self.logger.warning("Game process disappeared - restarting") time.sleep(2) # Let resources release if self._start_game_process(): self.logger.info("Game restarted successfully") else: self.logger.error("Game restart failed") time.sleep(self.monitor_interval) # Wait before next check after a restart attempt continue # Check for scheduled restart if self.next_restart_time and time.time() >= self.next_restart_time: self.logger.info("Scheduled restart time reached. Performing restart...") self._perform_restart() # Reset next restart time self.next_restart_time = time.time() + (self.restart_interval * 60) self.logger.info(f"Restart timer reset. Next restart in {self.restart_interval} minutes") # Continue to next loop iteration time.sleep(self.monitor_interval) continue # Find game window window = self._find_game_window() adjustment_made = False current_message = "" if window: try: # Use win32gui functions only on Windows if HAS_WIN32: # Get window handle hwnd = window._hWnd # 1. Check and adjust position/size current_pos = (window.left, window.top) current_size = (window.width, window.height) target_pos = (self.window_x, self.window_y) target_size = (self.window_width, self.window_height) if current_pos != target_pos or current_size != target_size: window.moveTo(target_pos[0], target_pos[1]) window.resizeTo(target_size[0], target_size[1]) time.sleep(0.1) window.activate() time.sleep(0.1) # Check if changes were successful new_pos = (window.left, window.top) new_size = (window.width, window.height) if new_pos == target_pos and new_size == target_size: current_message += f"Adjusted window position/size. " adjustment_made = True # 2. Check and bring to foreground using enhanced method current_foreground_hwnd = win32gui.GetForegroundWindow() if current_foreground_hwnd != hwnd: # Use enhanced forceful focus method success, method_used = self._force_window_foreground(hwnd, window) if success: current_message += f"Focused window using {method_used}. " adjustment_made = True if not hasattr(self, 'last_focus_failure_count'): self.last_focus_failure_count = 0 self.last_focus_failure_count = 0 else: # Increment failure counter if not hasattr(self, 'last_focus_failure_count'): self.last_focus_failure_count = 0 self.last_focus_failure_count += 1 # Log warning with consecutive failure count self.logger.warning(f"Window focus failed (attempt {self.last_focus_failure_count}): {method_used}") # Restart game after too many failures if self.last_focus_failure_count >= 15: self.logger.warning("Excessive focus failures, restarting game...") self._perform_restart() self.last_focus_failure_count = 0 else: # Use basic functions on non-Windows platforms current_pos = (window.left, window.top) current_size = (window.width, window.height) target_pos = (self.window_x, self.window_y) target_size = (self.window_width, self.window_height) if current_pos != target_pos or current_size != target_size: window.moveTo(target_pos[0], target_pos[1]) window.resizeTo(target_size[0], target_size[1]) current_message += f"Adjusted game window to position {target_pos} size {target_size[0]}x{target_size[1]}. " adjustment_made = True # Try activating the window (may have limited effect on non-Windows) try: window.activate() current_message += "Attempted to activate game window. " adjustment_made = True except Exception as activate_err: self.logger.warning(f"Error activating window: {activate_err}") except Exception as e: self.logger.error(f"Unexpected error while monitoring game window: {e}") # Log only if adjustments were made and the message changed if adjustment_made and current_message and current_message != last_adjustment_message: self.logger.info(f"[GameMonitor] {current_message.strip()}") last_adjustment_message = current_message elif not window: # Reset last message if window disappears last_adjustment_message = "" except Exception as e: self.logger.error(f"Error in monitoring loop: {e}") # Wait for the next check time.sleep(self.monitor_interval) self.logger.info("Game window monitoring loop finished") def _is_game_running(self): """Check if game is running""" if not HAS_PSUTIL: self.logger.warning("_is_game_running: psutil not available, cannot check process status.") return True # Assume running if psutil is not available to avoid unintended restarts try: return any(p.name().lower() == self.game_process_name.lower() for p in psutil.process_iter(['name'])) except Exception as e: self.logger.error(f"Error checking game process: {e}") return False # Assume not running on error def _find_game_window(self): """Find the game window with the specified title""" try: windows = gw.getWindowsWithTitle(self.window_title) if windows: return windows[0] except Exception as e: self.logger.debug(f"Error finding game window: {e}") return None def _force_window_foreground(self, hwnd, window): """Aggressive window focus implementation""" if not HAS_WIN32: return False, "win32 modules unavailable" success = False methods_tried = [] # Method 1: HWND_TOPMOST strategy methods_tried.append("HWND_TOPMOST") try: win32gui.SetWindowPos(hwnd, win32con.HWND_TOPMOST, 0, 0, 0, 0, win32con.SWP_NOMOVE | win32con.SWP_NOSIZE) time.sleep(0.1) win32gui.SetWindowPos(hwnd, win32con.HWND_TOP, 0, 0, 0, 0, win32con.SWP_NOMOVE | win32con.SWP_NOSIZE) win32gui.SetForegroundWindow(hwnd) time.sleep(0.2) if win32gui.GetForegroundWindow() == hwnd: return True, "HWND_TOPMOST" except Exception as e: self.logger.debug(f"Method 1 failed: {e}") # Method 2: Minimize/restore cycle methods_tried.append("MinimizeRestore") try: win32gui.ShowWindow(hwnd, win32con.SW_MINIMIZE) time.sleep(0.3) win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) time.sleep(0.2) win32gui.SetForegroundWindow(hwnd) if win32gui.GetForegroundWindow() == hwnd: return True, "MinimizeRestore" except Exception as e: self.logger.debug(f"Method 2 failed: {e}") # Method 3: Thread input attach methods_tried.append("ThreadAttach") try: import win32process import win32api current_thread_id = win32api.GetCurrentThreadId() window_thread_id = win32process.GetWindowThreadProcessId(hwnd)[0] if current_thread_id != window_thread_id: win32process.AttachThreadInput(current_thread_id, window_thread_id, True) try: win32gui.BringWindowToTop(hwnd) win32gui.SetForegroundWindow(hwnd) time.sleep(0.2) if win32gui.GetForegroundWindow() == hwnd: return True, "ThreadAttach" finally: win32process.AttachThreadInput(current_thread_id, window_thread_id, False) except Exception as e: self.logger.debug(f"Method 3 failed: {e}") # Method 4: Flash + Window messages methods_tried.append("Flash+Messages") try: # First flash to get attention win32gui.FlashWindow(hwnd, True) time.sleep(0.2) # Then send specific window messages win32gui.SendMessage(hwnd, win32con.WM_SETREDRAW, 0, 0) win32gui.SendMessage(hwnd, win32con.WM_SETREDRAW, 1, 0) win32gui.RedrawWindow(hwnd, None, None, win32con.RDW_FRAME | win32con.RDW_INVALIDATE | win32con.RDW_UPDATENOW | win32con.RDW_ALLCHILDREN) win32gui.PostMessage(hwnd, win32con.WM_SYSCOMMAND, win32con.SC_RESTORE, 0) win32gui.PostMessage(hwnd, win32con.WM_ACTIVATE, win32con.WA_ACTIVE, 0) time.sleep(0.2) if win32gui.GetForegroundWindow() == hwnd: return True, "Flash+Messages" except Exception as e: self.logger.debug(f"Method 4 failed: {e}") # Method 5: Hide/Show cycle methods_tried.append("HideShow") try: win32gui.ShowWindow(hwnd, win32con.SW_HIDE) time.sleep(0.2) win32gui.ShowWindow(hwnd, win32con.SW_SHOW) time.sleep(0.2) win32gui.SetForegroundWindow(hwnd) if win32gui.GetForegroundWindow() == hwnd: return True, "HideShow" except Exception as e: self.logger.debug(f"Method 5 failed: {e}") return False, f"All methods failed: {', '.join(methods_tried)}" def _find_game_process_by_window(self): """Find process using both window title and process name""" if not HAS_PSUTIL or not HAS_WIN32: return None try: window = self._find_game_window() if not window: return None hwnd = window._hWnd window_pid = None try: import win32process _, window_pid = win32process.GetWindowThreadProcessId(hwnd) except Exception: return None if window_pid: try: proc = psutil.Process(window_pid) proc_name = proc.name() if proc_name.lower() == self.game_process_name.lower(): self.logger.info(f"Found game process '{proc_name}' (PID: {proc.pid}) with window title '{self.window_title}'") return proc else: self.logger.debug(f"Window process name mismatch: expected '{self.game_process_name}', got '{proc_name}'") return proc # Returning proc even if name mismatches, as per user's code. except Exception: pass # Fallback to name-based search if window-based fails or PID doesn't match process name. # The user's provided code implies a fallback to _find_game_process_by_name() # This will be handled by the updated _find_game_process method. # For now, if the window PID didn't lead to a matching process name, we return None here. # The original code had "return self._find_game_process_by_name()" here, # but that would create a direct dependency. The new _find_game_process handles the fallback. # So, if we reach here, it means the window was found, PID was obtained, but process name didn't match. # The original code returns `proc` even on mismatch, so I'll keep that. # If `window_pid` was None or `psutil.Process(window_pid)` failed, it would have returned None or passed. # The logic "return self._find_game_process_by_name()" was in the original snippet, # I will include it here as per the snippet, but note that the overall _find_game_process will also call it. return self._find_game_process_by_name() # As per user snippet except Exception as e: self.logger.error(f"Process-by-window lookup error: {e}") return None def _find_game_process(self): """Find game process with combined approach""" # Try window-based process lookup first proc = self._find_game_process_by_window() if proc: return proc # Fall back to name-only lookup # This is the original _find_game_process logic, now as a fallback. if not HAS_PSUTIL: self.logger.debug("psutil not available for name-only process lookup fallback.") # Changed to debug as primary is window based return None try: for p_iter in psutil.process_iter(['pid', 'name', 'exe']): try: proc_info = p_iter.info proc_name = proc_info.get('name') if proc_name and proc_name.lower() == self.game_process_name.lower(): self.logger.info(f"Found game process by name '{proc_name}' (PID: {p_iter.pid}) as fallback") return p_iter except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue except Exception as e: self.logger.error(f"Error in name-only game process lookup: {e}") self.logger.info(f"Game process '{self.game_process_name}' not found by name either.") return None def _perform_restart(self): """Execute the game restart process""" self.logger.info("Starting game restart process") try: # 1. Notify that restart has begun (optional) if self.callback: self.callback("restart_begin") # 2. Terminate existing game process self._terminate_game_process() time.sleep(2) # Short wait to ensure process termination # 3. Start new game process if self._start_game_process(): self.logger.info("Game restarted successfully") else: self.logger.error("Failed to start game") # 4. Wait for game to launch restart_wait_time = 45 # seconds, increased from 30 self.logger.info(f"Waiting for game to start ({restart_wait_time} seconds)...") time.sleep(restart_wait_time) # 5. Notify restart completion self.logger.info("Game restart process completed, sending notification") if self.callback: self.callback("restart_complete") return True except Exception as e: self.logger.error(f"Error during game restart process: {e}") # Attempt to notify error if self.callback: self.callback("restart_error") return False def _terminate_game_process(self): """Terminate the game process""" self.logger.info(f"Attempting to terminate game process '{self.game_process_name}'") if not HAS_PSUTIL: self.logger.warning("psutil is not available, cannot terminate process") return False process = self._find_game_process() terminated = False if process: try: self.logger.info(f"Found game process PID: {process.pid}, terminating...") process.terminate() try: process.wait(timeout=5) self.logger.info(f"Process {process.pid} terminated successfully (terminate)") terminated = True except psutil.TimeoutExpired: self.logger.warning(f"Process {process.pid} did not terminate within 5s (terminate), attempting force kill") process.kill() process.wait(timeout=5) self.logger.info(f"Process {process.pid} force killed (kill)") terminated = True except Exception as e: self.logger.error(f"Error terminating process: {e}") else: self.logger.warning(f"No running process found with name '{self.game_process_name}'") return terminated def _start_game_process(self): """Start the game process""" if not self.game_path: self.logger.error("Game executable path not set, cannot start") return False self.logger.info(f"Starting game: {self.game_path}") try: if sys.platform == "win32": os.startfile(self.game_path) self.logger.info("Called os.startfile to launch game") return True else: # Use subprocess.Popen for non-Windows platforms # Ensure it runs detached if possible, or handle appropriately subprocess.Popen([self.game_path], start_new_session=True) # Attempt detached start self.logger.info("Called subprocess.Popen to launch game") return True except FileNotFoundError: self.logger.error(f"Startup error: Game launcher '{self.game_path}' not found") except OSError as ose: self.logger.error(f"Startup error (OSError): {ose} - Check path and permissions", exc_info=True) except Exception as e: self.logger.error(f"Unexpected error starting game: {e}", exc_info=True) return False def restart_now(self): """Perform an immediate restart""" self.logger.info("Manually triggering game restart") result = self._perform_restart() # Reset the timer if scheduled restart is enabled if self.enable_restart and self.restart_interval > 0: self.next_restart_time = time.time() + (self.restart_interval * 60) self.logger.info(f"Restart timer reset. Next restart in {self.restart_interval} minutes") return result def update_config(self, config_data=None, remote_data=None): """Update configuration settings""" if config_data: old_config = self.config_data self.config_data = config_data # Update key settings self.window_title = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("WINDOW_TITLE", self.window_title) self.enable_restart = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("ENABLE_SCHEDULED_RESTART", self.enable_restart) self.restart_interval = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("RESTART_INTERVAL_MINUTES", self.restart_interval) self.game_path = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_EXECUTABLE_PATH", self.game_path) self.window_x = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_X", self.window_x) self.window_y = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_Y", self.window_y) self.window_width = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_WIDTH", self.window_width) self.window_height = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("GAME_WINDOW_HEIGHT", self.window_height) self.monitor_interval = self.config_data.get("GAME_WINDOW_CONFIG", {}).get("MONITOR_INTERVAL_SECONDS", self.monitor_interval) # Reset scheduled restart timer if parameters changed if self.running and self.enable_restart and self.restart_interval > 0: old_interval = old_config.get("GAME_WINDOW_CONFIG", {}).get("RESTART_INTERVAL_MINUTES", 60) if self.restart_interval != old_interval: self.next_restart_time = time.time() + (self.restart_interval * 60) self.logger.info(f"Restart interval updated to {self.restart_interval} minutes, next restart reset") if remote_data: self.remote_data = remote_data old_process_name = self.game_process_name self.game_process_name = self.remote_data.get("GAME_PROCESS_NAME", old_process_name) if self.game_process_name != old_process_name: self.logger.info(f"Game process name updated to '{self.game_process_name}'") self.logger.info("GameMonitor configuration updated") # Provide simple external API functions def create_game_monitor(config_data, remote_data=None, logger=None, callback=None): """Create a game monitor instance""" return GameMonitor(config_data, remote_data, logger, callback) def stop_all_monitors(): """Attempt to stop all created monitors (global cleanup)""" # This function could be implemented if instance references are stored. # In the current design, each monitor needs to be stopped individually. pass # Functionality when run standalone (similar to original game_monitor.py) if __name__ == "__main__": # Set up basic logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("GameManagerStandalone") # Load settings from config.py try: import config logger.info("Loaded config.py") # Build basic configuration dictionary config_data = { "GAME_WINDOW_CONFIG": { "WINDOW_TITLE": config.WINDOW_TITLE, "ENABLE_SCHEDULED_RESTART": config.ENABLE_SCHEDULED_RESTART, "RESTART_INTERVAL_MINUTES": config.RESTART_INTERVAL_MINUTES, "GAME_EXECUTABLE_PATH": config.GAME_EXECUTABLE_PATH, "GAME_WINDOW_X": config.GAME_WINDOW_X, "GAME_WINDOW_Y": config.GAME_WINDOW_Y, "GAME_WINDOW_WIDTH": config.GAME_WINDOW_WIDTH, "GAME_WINDOW_HEIGHT": config.GAME_WINDOW_HEIGHT, "MONITOR_INTERVAL_SECONDS": config.MONITOR_INTERVAL_SECONDS } } # Define a callback for standalone execution def standalone_callback(action): """Send JSON signal via standard output""" logger.info(f"Sending signal: {action}") signal_data = {'action': action} try: json_signal = json.dumps(signal_data) print(json_signal, flush=True) logger.info(f"Signal sent: {action}") except Exception as e: logger.error(f"Failed to send signal '{action}': {e}") # Create and start the monitor monitor = GameMonitor(config_data, logger=logger, callback=standalone_callback) monitor.start() # Keep the program running try: logger.info("Game monitoring started. Press Ctrl+C to stop.") while True: time.sleep(1) except KeyboardInterrupt: logger.info("Ctrl+C received, stopping...") finally: monitor.stop() logger.info("Game monitoring stopped") except ImportError: logger.error("Could not load config.py. Ensure it exists and contains necessary settings.") sys.exit(1) except Exception as e: logger.error(f"Error starting game monitoring: {e}", exc_info=True) sys.exit(1)