From 48c0c25a4293ada9b09e6f9bfe862f63e9f6a2ea Mon Sep 17 00:00:00 2001 From: z060142 Date: Thu, 8 May 2025 03:08:51 +0800 Subject: [PATCH] Extend ChromaDB memory system with scheduled tasks and Setup UI support - Added new scripts to manage ChromaDB memory processing and periodic scheduling (e.g. compaction, deduplication, reindexing). - Optimized chatbot memory usage by improving base memory retrieval logic and preload strategy. - Updated Setup.py UI to include scheduling options for memory maintenance tasks. - Ensures better long-term memory performance, avoids memory bloat, and enables proactive management of large-scale memory datasets. --- Setup.py | 235 ++++++++++++- llm_interaction.py | 29 +- memory_backup.py | 42 +++ memory_manager.py | 679 ++++++++++++++++++++++++++++++++++++++ tools/Chroma_DB_backup.py | 213 ++++++++---- 5 files changed, 1120 insertions(+), 78 deletions(-) create mode 100644 memory_backup.py create mode 100644 memory_manager.py diff --git a/Setup.py b/Setup.py index 07118d0..42c3b4b 100644 --- a/Setup.py +++ b/Setup.py @@ -307,6 +307,34 @@ def load_current_config(): if bot_memory_collection_match: config_data["BOT_MEMORY_COLLECTION"] = bot_memory_collection_match.group(1) + # Extract memory management settings + backup_hour_match = re.search(r'MEMORY_BACKUP_HOUR\s*=\s*(\d+)', config_content) + if backup_hour_match: + config_data["MEMORY_BACKUP_HOUR"] = int(backup_hour_match.group(1)) + + backup_minute_match = re.search(r'MEMORY_BACKUP_MINUTE\s*=\s*(\d+)', config_content) + if backup_minute_match: + config_data["MEMORY_BACKUP_MINUTE"] = int(backup_minute_match.group(1)) + + profile_model_match = re.search(r'MEMORY_PROFILE_MODEL\s*=\s*["\']?(.+?)["\']?\s*(?:#|$)', config_content) + # Handle potential LLM_MODEL reference + if profile_model_match: + profile_model_val = profile_model_match.group(1).strip() + if profile_model_val == "LLM_MODEL": + # If it refers to LLM_MODEL, use the already parsed LLM_MODEL value + config_data["MEMORY_PROFILE_MODEL"] = config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324") # Fallback if LLM_MODEL wasn't parsed + else: + config_data["MEMORY_PROFILE_MODEL"] = profile_model_val + else: + # Default to LLM_MODEL if not found + config_data["MEMORY_PROFILE_MODEL"] = config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324") + + + summary_model_match = re.search(r'MEMORY_SUMMARY_MODEL\s*=\s*["\'](.+?)["\']', config_content) + if summary_model_match: + config_data["MEMORY_SUMMARY_MODEL"] = summary_model_match.group(1) + + except Exception as e: print(f"Error reading config.py: {e}") import traceback @@ -416,7 +444,9 @@ def generate_config_file(config_data, env_data): f.write(" \"--client-type\",\n") f.write(" \"persistent\",\n") f.write(" \"--data-dir\",\n") - f.write(f" \"{absolute_data_dir}\"\n") + # Escape backslashes in the path for the string literal in config.py + escaped_data_dir = absolute_data_dir.replace('\\', '\\\\') + f.write(f" \"{escaped_data_dir}\"\n") f.write(" ]\n") # Handle custom server - just write as raw JSON @@ -492,7 +522,25 @@ def generate_config_file(config_data, env_data): f.write(f"# This path will be made absolute when config.py is loaded.\n") # Write the potentially relative path from UI/default, let config.py handle abspath # Use raw string r"..." to handle potential backslashes in Windows paths correctly within the string literal - f.write(f"CHROMA_DATA_DIR = os.path.abspath(r\"{normalized_chroma_path}\")\n") + f.write(f"CHROMA_DATA_DIR = os.path.abspath(r\"{normalized_chroma_path}\")\n\n") + + # Write Memory Management Configuration + f.write("# =============================================================================\n") + f.write("# Memory Management Configuration\n") + f.write("# =============================================================================\n") + backup_hour = config_data.get('MEMORY_BACKUP_HOUR', 0) + backup_minute = config_data.get('MEMORY_BACKUP_MINUTE', 0) + profile_model = config_data.get('MEMORY_PROFILE_MODEL', 'LLM_MODEL') # Default to referencing LLM_MODEL + summary_model = config_data.get('MEMORY_SUMMARY_MODEL', 'mistral-7b-instruct') + + f.write(f"MEMORY_BACKUP_HOUR = {backup_hour}\n") + f.write(f"MEMORY_BACKUP_MINUTE = {backup_minute}\n") + # Write profile model, potentially referencing LLM_MODEL + if profile_model == config_data.get('LLM_MODEL'): + f.write(f"MEMORY_PROFILE_MODEL = LLM_MODEL # Default to main LLM model\n") + else: + f.write(f"MEMORY_PROFILE_MODEL = \"{profile_model}\"\n") + f.write(f"MEMORY_SUMMARY_MODEL = \"{summary_model}\"\n") print("Generated config.py file successfully") @@ -522,6 +570,7 @@ class WolfChatSetup(tk.Tk): self.create_mcp_tab() self.create_game_tab() self.create_memory_tab() + self.create_memory_management_tab() # 新增記憶管理標籤頁 self.create_management_tab() # New tab for combined management # Create bottom buttons @@ -539,9 +588,13 @@ class WolfChatSetup(tk.Tk): self.keep_monitoring_flag = threading.Event() self.keep_monitoring_flag.set() + # Initialize scheduler process tracker + self.scheduler_process = None + # Set initial states based on loaded data self.update_ui_from_data() + self.update_scheduler_button_states(True) # Set initial scheduler button state def create_management_tab(self): """Create the Bot and Game Management tab""" @@ -1135,8 +1188,11 @@ class WolfChatSetup(tk.Tk): def on_closing(self): """Handle window close event.""" - if messagebox.askokcancel("Quit", "Do you want to quit Wolf Chat Setup? This will stop any managed sessions."): - self.stop_managed_session() # Ensure everything is stopped + if messagebox.askokcancel("Quit", "Do you want to quit Wolf Chat Setup? This will stop any managed sessions and running scripts."): + print("Closing Setup...") + self.stop_managed_session() # Stop bot/game managed session if running + self.stop_process() # Stop bot/test script if running independently + self.stop_memory_scheduler() # Stop scheduler if running self.destroy() def create_api_tab(self): @@ -1670,6 +1726,65 @@ class WolfChatSetup(tk.Tk): info_label = ttk.Label(info_frame, text=info_text, justify=tk.LEFT, wraplength=700) info_label.pack(padx=10, pady=10, anchor=tk.W) + # 記憶管理標籤頁 + def create_memory_management_tab(self): + tab = ttk.Frame(self.notebook) + self.notebook.add(tab, text="記憶管理") + + main_frame = ttk.Frame(tab, padding=10) + main_frame.pack(fill=tk.BOTH, expand=True) + + # 備份時間設置 + backup_frame = ttk.LabelFrame(main_frame, text="備份設定") + backup_frame.pack(fill=tk.X, pady=10) + + time_frame = ttk.Frame(backup_frame) + time_frame.pack(fill=tk.X, pady=5, padx=10) + time_label = ttk.Label(time_frame, text="執行時間:", width=20) + time_label.pack(side=tk.LEFT, padx=(0, 5)) + self.backup_hour_var = tk.IntVar(value=0) + hour_spinner = ttk.Spinbox(time_frame, from_=0, to=23, width=3, textvariable=self.backup_hour_var) + hour_spinner.pack(side=tk.LEFT) + ttk.Label(time_frame, text=":").pack(side=tk.LEFT) + self.backup_minute_var = tk.IntVar(value=0) + minute_spinner = ttk.Spinbox(time_frame, from_=0, to=59, width=3, textvariable=self.backup_minute_var) + minute_spinner.pack(side=tk.LEFT) + + # 模型選擇 + models_frame = ttk.LabelFrame(main_frame, text="模型選擇") + models_frame.pack(fill=tk.X, pady=10) + + profile_model_frame = ttk.Frame(models_frame) + profile_model_frame.pack(fill=tk.X, pady=5, padx=10) + profile_model_label = ttk.Label(profile_model_frame, text="用戶檔案生成模型:", width=20) + profile_model_label.pack(side=tk.LEFT, padx=(0, 5)) + # Initialize with a sensible default, will be overwritten by update_ui_from_data + # Use config_data which is loaded in __init__ + profile_model_default = self.config_data.get("LLM_MODEL", "deepseek/deepseek-chat-v3-0324") + self.profile_model_var = tk.StringVar(value=profile_model_default) + profile_model_entry = ttk.Entry(profile_model_frame, textvariable=self.profile_model_var) + profile_model_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) + + summary_model_frame = ttk.Frame(models_frame) + summary_model_frame.pack(fill=tk.X, pady=5, padx=10) + summary_model_label = ttk.Label(summary_model_frame, text="聊天總結生成模型:", width=20) + summary_model_label.pack(side=tk.LEFT, padx=(0, 5)) + self.summary_model_var = tk.StringVar(value="mistral-7b-instruct") + summary_model_entry = ttk.Entry(summary_model_frame, textvariable=self.summary_model_var) + summary_model_entry.pack(side=tk.LEFT, fill=tk.X, expand=True) + + # Information box + info_frame_mm = ttk.LabelFrame(main_frame, text="Information") # Renamed to avoid conflict + info_frame_mm.pack(fill=tk.BOTH, expand=True, pady=10) + + info_text_mm = ( + "• 設定每日自動執行記憶備份的時間。\n" + "• 選擇用於生成用戶檔案和聊天總結的語言模型。\n" + "• 用戶檔案生成模型預設使用主LLM模型。" + ) + info_label_mm = ttk.Label(info_frame_mm, text=info_text_mm, justify=tk.LEFT, wraplength=700) + info_label_mm.pack(padx=10, pady=10, anchor=tk.W) + def create_bottom_buttons(self): """Create bottom action buttons""" btn_frame = ttk.Frame(self) @@ -1696,9 +1811,16 @@ class WolfChatSetup(tk.Tk): self.run_bot_btn = ttk.Button(btn_frame, text="Run Chat Bot", command=self.run_chat_bot) self.run_bot_btn.pack(side=tk.RIGHT, padx=5) - # Stop button - self.stop_btn = ttk.Button(btn_frame, text="Stop Process", command=self.stop_process, state=tk.DISABLED) + # Stop button (for bot/test) + self.stop_btn = ttk.Button(btn_frame, text="Stop Bot/Test", command=self.stop_process, state=tk.DISABLED) self.stop_btn.pack(side=tk.RIGHT, padx=5) + + # Scheduler buttons + self.stop_scheduler_btn = ttk.Button(btn_frame, text="Stop Scheduler", command=self.stop_memory_scheduler, state=tk.DISABLED) + self.stop_scheduler_btn.pack(side=tk.RIGHT, padx=5) + + self.start_scheduler_btn = ttk.Button(btn_frame, text="Start Scheduler", command=self.run_memory_scheduler) + self.start_scheduler_btn.pack(side=tk.RIGHT, padx=5) def install_dependencies(self): """Run the installation script for dependencies""" @@ -1772,7 +1894,78 @@ class WolfChatSetup(tk.Tk): # Re-enable run buttons and disable stop button self.update_run_button_states(True) else: - messagebox.showinfo("No Process", "No process is currently running.") + messagebox.showinfo("No Process", "No Bot/Test process is currently running.") + + def run_memory_scheduler(self): + """Run the memory backup scheduler script""" + try: + scheduler_script = "memory_backup.py" + if not os.path.exists(scheduler_script): + messagebox.showerror("Error", f"Could not find {scheduler_script}") + return + + if self.scheduler_process is not None and self.scheduler_process.poll() is None: + messagebox.showwarning("Already Running", "The memory scheduler process is already running.") + return + + # Run with --schedule argument + # Use CREATE_NO_WINDOW flag on Windows to hide the console window + creationflags = 0 + if sys.platform == "win32": + creationflags = subprocess.CREATE_NO_WINDOW + + self.scheduler_process = subprocess.Popen( + [sys.executable, scheduler_script, "--schedule"], + creationflags=creationflags + ) + print(f"Attempting to start {scheduler_script} --schedule... PID: {self.scheduler_process.pid}") + self.update_scheduler_button_states(False) # Disable start, enable stop + except Exception as e: + logger.exception(f"Failed to launch {scheduler_script}") # Log exception + messagebox.showerror("Error", f"Failed to launch {scheduler_script}: {str(e)}") + self.update_scheduler_button_states(True) # Re-enable start on failure + + def stop_memory_scheduler(self): + """Stop the currently running memory scheduler process""" + if self.scheduler_process is not None and self.scheduler_process.poll() is None: + try: + print(f"Attempting to terminate memory scheduler process (PID: {self.scheduler_process.pid})...") + # Terminate the process group on non-Windows to ensure child processes are handled if any + if sys.platform != "win32": + os.killpg(os.getpgid(self.scheduler_process.pid), signal.SIGTERM) + else: + # On Windows, terminate the parent process directly + self.scheduler_process.terminate() + + # Wait briefly to allow termination + try: + self.scheduler_process.wait(timeout=3) + print("Scheduler process terminated gracefully.") + except subprocess.TimeoutExpired: + print("Scheduler process did not terminate gracefully, killing...") + if sys.platform != "win32": + os.killpg(os.getpgid(self.scheduler_process.pid), signal.SIGKILL) + else: + self.scheduler_process.kill() + self.scheduler_process.wait(timeout=2) # Wait after kill + print("Scheduler process killed.") + + self.scheduler_process = None + messagebox.showinfo("Scheduler Stopped", "The memory scheduler process has been terminated.") + except Exception as e: + logger.exception("Failed to terminate scheduler process") # Log exception + messagebox.showerror("Error", f"Failed to terminate scheduler process: {str(e)}") + finally: + self.scheduler_process = None # Ensure it's cleared + self.update_scheduler_button_states(True) # Update buttons + else: + # If process exists but poll() is not None (already terminated) or process is None + if self.scheduler_process is not None: + self.scheduler_process = None # Clear stale process object + # messagebox.showinfo("No Scheduler Process", "The memory scheduler process is not running.") # Reduce popups + print("Scheduler process is not running or already stopped.") + self.update_scheduler_button_states(True) # Ensure buttons are in correct state + def update_run_button_states(self, enable): """Enable or disable the run buttons and update stop button state""" @@ -1783,6 +1976,18 @@ class WolfChatSetup(tk.Tk): self.run_test_btn.config(state=tk.NORMAL if enable else tk.DISABLED) if hasattr(self, 'stop_btn'): self.stop_btn.config(state=tk.DISABLED if enable else tk.NORMAL) + + def update_scheduler_button_states(self, enable_start): + """Enable or disable the scheduler buttons""" + # Check if process is running + is_running = False + if self.scheduler_process is not None and self.scheduler_process.poll() is None: + is_running = True + + if hasattr(self, 'start_scheduler_btn'): + self.start_scheduler_btn.config(state=tk.NORMAL if not is_running else tk.DISABLED) + if hasattr(self, 'stop_scheduler_btn'): + self.stop_scheduler_btn.config(state=tk.DISABLED if not is_running else tk.NORMAL) def update_ui_from_data(self): """Update UI controls from loaded data""" @@ -1844,6 +2049,15 @@ class WolfChatSetup(tk.Tk): self.conversations_collection_var.set(self.config_data.get("CONVERSATIONS_COLLECTION", "conversations")) self.bot_memory_collection_var.set(self.config_data.get("BOT_MEMORY_COLLECTION", "wolfhart_memory")) + # Memory Management Tab Settings + if hasattr(self, 'backup_hour_var'): # Check if UI elements for memory management tab exist + self.backup_hour_var.set(self.config_data.get("MEMORY_BACKUP_HOUR", 0)) + self.backup_minute_var.set(self.config_data.get("MEMORY_BACKUP_MINUTE", 0)) + # Default profile model to LLM_MODEL if MEMORY_PROFILE_MODEL isn't set or matches LLM_MODEL + profile_model_config = self.config_data.get("MEMORY_PROFILE_MODEL", self.config_data.get("LLM_MODEL")) + self.profile_model_var.set(profile_model_config) + self.summary_model_var.set(self.config_data.get("MEMORY_SUMMARY_MODEL", "mistral-7b-instruct")) + # Management Tab Settings if hasattr(self, 'remote_url_var'): # Check if UI elements for management tab exist self.remote_url_var.set(self.remote_data.get("REMOTE_SERVER_URL", "")) @@ -2110,6 +2324,13 @@ class WolfChatSetup(tk.Tk): self.config_data["PROFILES_COLLECTION"] = self.profiles_collection_var.get() self.config_data["CONVERSATIONS_COLLECTION"] = self.conversations_collection_var.get() self.config_data["BOT_MEMORY_COLLECTION"] = self.bot_memory_collection_var.get() + + # Get Memory Management settings from UI + if hasattr(self, 'backup_hour_var'): # Check if UI elements exist + self.config_data["MEMORY_BACKUP_HOUR"] = self.backup_hour_var.get() + self.config_data["MEMORY_BACKUP_MINUTE"] = self.backup_minute_var.get() + self.config_data["MEMORY_PROFILE_MODEL"] = self.profile_model_var.get() + self.config_data["MEMORY_SUMMARY_MODEL"] = self.summary_model_var.get() # Update remote_data from UI (for remote_config.json) if hasattr(self, 'remote_url_var'): # Check if management tab UI elements exist diff --git a/llm_interaction.py b/llm_interaction.py index 664fa66..3604f0b 100644 --- a/llm_interaction.py +++ b/llm_interaction.py @@ -150,25 +150,24 @@ def get_system_prompt( else: # 如果沒有預載入數據,則使用完整記憶協議 memory_enforcement = f""" - === CHROMADB MEMORY RETRIEVAL PROTOCOL - Wolfhart Memory Integration - To personalize your responses to different users, you MUST follow this memory access protocol internally before responding: +=== CHROMADB MEMORY RETRIEVAL PROTOCOL - Wolfhart Memory Integration +To personalize your responses to different users, you MUST follow this memory access protocol internally before responding: - **1. Basic User Retrieval:** - - Identify the username from `` - # 修正:使用 BOT_MEMORY_COLLECTION 來獲取用戶資料 - - Using the `tool_calls` mechanism, execute: `chroma_query_documents(collection_name: "{config.BOT_MEMORY_COLLECTION}", query_texts: ["{{username}} profile"], n_results: 1)` # 只需最相關的1筆 - - This step must be completed before any response generation +**1. Basic User Retrieval:** + - Identify the username from `` + - Using the `tool_calls` mechanism, execute: `chroma_get_documents(collection_name: "{config.PROFILES_COLLECTION}", ids: ["{{username}}_profile"])` + - This step must be completed before any response generation - **2. Context Expansion:** - - Perform additional queries as needed, using the `tool_calls` mechanism: - - Relevant conversations: `chroma_query_documents(collection_name: "{config.CONVERSATIONS_COLLECTION}", query_texts: ["{{username}} {{query keywords}}"], n_results: 5)` - - Core personality reference: `chroma_query_documents(collection_name: "{config.BOT_MEMORY_COLLECTION}", query_texts: ["Wolfhart {{relevant attitude}}"], n_results: 3)` +**2. Context Expansion:** + - Perform additional queries as needed, using the `tool_calls` mechanism: + - Relevant conversations: `chroma_query_documents(collection_name: "{config.CONVERSATIONS_COLLECTION}", query_texts: ["{{username}} {{query keywords}}"], n_results: 5)` + - Core personality reference: `chroma_query_documents(collection_name: "{config.BOT_MEMORY_COLLECTION}", query_texts: ["Wolfhart {{relevant attitude}}"], n_results: 3)` - **3. Other situation** - - You should check related memories when Users mention [capital_position], [capital_administrator_role], [server_hierarchy], [last_war], [winter_war], [excavations], [blueprints], [honor_points], [golden_eggs], or [diamonds], as these represent key game mechanics. +**3. Other situation** + - You should check related memories when Users mention [capital_position], [capital_administrator_role], [server_hierarchy], [last_war], [winter_war], [excavations], [blueprints], [honor_points], [golden_eggs], or [diamonds], as these represent key game mechanics. - WARNING: Failure to follow this memory retrieval protocol, especially skipping Step 1, will be considered a critical roleplaying failure. - """ +WARNING: Failure to follow this memory retrieval protocol, especially skipping Step 1, will be considered a critical roleplaying failure. +""" # 組合系統提示 system_prompt = f""" diff --git a/memory_backup.py b/memory_backup.py new file mode 100644 index 0000000..e4b588a --- /dev/null +++ b/memory_backup.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Wolf Chat 記憶備份工具 + +用於手動執行記憶備份或啟動定時調度器 +""" + +import sys +import argparse +import datetime +from memory_manager import run_memory_backup_manual, MemoryScheduler # Updated import +import config # Import config to access default schedule times + +def main(): + parser = argparse.ArgumentParser(description='Wolf Chat 記憶備份工具') + parser.add_argument('--backup', action='store_true', help='執行一次性備份 (預設為昨天,除非指定 --date)') + parser.add_argument('--date', type=str, help='處理指定日期的日誌 (YYYY-MM-DD格式) for --backup') + parser.add_argument('--schedule', action='store_true', help='啟動定時調度器') + parser.add_argument('--hour', type=int, help='備份時間(小時,0-23)for --schedule') + parser.add_argument('--minute', type=int, help='備份時間(分鐘,0-59)for --schedule') + + args = parser.parse_args() + + if args.backup: + # The date logic is now handled inside run_memory_backup_manual + run_memory_backup_manual(args.date) + elif args.schedule: + scheduler = MemoryScheduler() + # Use provided hour/minute or fallback to config defaults + backup_hour = args.hour if args.hour is not None else getattr(config, 'MEMORY_BACKUP_HOUR', 0) + backup_minute = args.minute if args.minute is not None else getattr(config, 'MEMORY_BACKUP_MINUTE', 0) + + scheduler.schedule_daily_backup(backup_hour, backup_minute) + scheduler.start() + else: + print("請指定操作: --backup 或 --schedule") + parser.print_help() + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/memory_manager.py b/memory_manager.py new file mode 100644 index 0000000..3859131 --- /dev/null +++ b/memory_manager.py @@ -0,0 +1,679 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Wolf Chat 記憶管理模組 + +處理聊天記錄解析、記憶生成和ChromaDB寫入的一體化模組 +""" + +import os +import re +import json +import time +import asyncio +import datetime +import schedule +from pathlib import Path +from typing import Dict, List, Optional, Any, Union + +import chromadb +from chromadb.utils import embedding_functions +from openai import AsyncOpenAI + +import config + +# ============================================================================= +# 日誌解析部分 +# ============================================================================= + +def parse_log_file(log_path: str) -> List[Dict[str, str]]: + """解析日誌文件,提取對話內容""" + conversations = [] + + with open(log_path, 'r', encoding='utf-8') as f: + content = f.read() + + # 使用分隔符分割對話 + dialogue_blocks = content.split('---') + + for block in dialogue_blocks: + if not block.strip(): + continue + + # 解析對話塊 + timestamp_pattern = r'\[([\d-]+ [\d:]+)\]' + user_pattern = r'User \(([^)]+)\): (.+?)(?=\[|$)' + bot_thoughts_pattern = r'Bot \(([^)]+)\) Thoughts: (.+?)(?=\[|$)' + bot_dialogue_pattern = r'Bot \(([^)]+)\) Dialogue: (.+?)(?=\[|$)' + + # 提取時間戳記 + timestamp_match = re.search(timestamp_pattern, block) + user_match = re.search(user_pattern, block, re.DOTALL) + bot_thoughts_match = re.search(bot_thoughts_pattern, block, re.DOTALL) + bot_dialogue_match = re.search(bot_dialogue_pattern, block, re.DOTALL) + + if timestamp_match and user_match and bot_dialogue_match: + timestamp = timestamp_match.group(1) + user_name = user_match.group(1) + user_message = user_match.group(2).strip() + bot_name = bot_dialogue_match.group(1) + bot_message = bot_dialogue_match.group(2).strip() + bot_thoughts = bot_thoughts_match.group(2).strip() if bot_thoughts_match else "" + + # 創建對話記錄 + conversation = { + "timestamp": timestamp, + "user_name": user_name, + "user_message": user_message, + "bot_name": bot_name, + "bot_message": bot_message, + "bot_thoughts": bot_thoughts + } + + conversations.append(conversation) + + return conversations + +def get_logs_for_date(date: datetime.date, log_dir: str = "chat_logs") -> List[Dict[str, str]]: + """獲取指定日期的所有日誌文件""" + date_str = date.strftime("%Y-%m-%d") + log_path = os.path.join(log_dir, f"{date_str}.log") + + if os.path.exists(log_path): + return parse_log_file(log_path) + return [] + +def group_conversations_by_user(conversations: List[Dict[str, str]]) -> Dict[str, List[Dict[str, str]]]: + """按用戶分組對話""" + user_conversations = {} + + for conv in conversations: + user_name = conv["user_name"] + if user_name not in user_conversations: + user_conversations[user_name] = [] + user_conversations[user_name].append(conv) + + return user_conversations + +# ============================================================================= +# 記憶生成器部分 +# ============================================================================= + +class MemoryGenerator: + def __init__(self, profile_model: Optional[str] = None, summary_model: Optional[str] = None): + self.profile_client = AsyncOpenAI( + api_key=config.OPENAI_API_KEY, + base_url=config.OPENAI_API_BASE_URL if config.OPENAI_API_BASE_URL else None, + ) + self.summary_client = AsyncOpenAI( + api_key=config.OPENAI_API_KEY, + base_url=config.OPENAI_API_BASE_URL if config.OPENAI_API_BASE_URL else None, + ) + self.profile_model = profile_model or getattr(config, 'MEMORY_PROFILE_MODEL', config.LLM_MODEL) + self.summary_model = summary_model or getattr(config, 'MEMORY_SUMMARY_MODEL', "mistral-7b-instruct") + + async def generate_user_profile( + self, + user_name: str, + conversations: List[Dict[str, str]], + existing_profile: Optional[Dict[str, Any]] = None + ) -> Optional[Dict[str, Any]]: + """Generates or updates a user profile based on conversations.""" + system_prompt = self._get_profile_system_prompt(config.PERSONA_NAME, existing_profile) + + # Prepare user conversation history + conversation_text = self._format_conversations_for_prompt(conversations) + + user_prompt = f""" + Please generate a comprehensive profile for the user '{user_name}'. + + Conversation History: + {conversation_text} + + Based on the conversation history and your persona, analyze this user and generate or update their profile in JSON format. The profile should include: + 1. User's personality traits + 2. Relationship with you ({config.PERSONA_NAME}) + 3. Your subjective perception of the user + 4. Notable interactions + 5. Any other information you deem important + + Ensure the output is a valid JSON object, using the following format: + ```json + {{ + "id": "{user_name}_profile", + "type": "user_profile", + "username": "{user_name}", + "content": {{ + "personality": "User's personality traits...", + "relationship_with_bot": "Description of the relationship with me...", + "bot_perception": "My subjective perception of the user...", + "notable_interactions": ["Notable interaction 1", "Notable interaction 2"] + }}, + "last_updated": "YYYY-MM-DD", + "metadata": {{ + "priority": 1.0, + "word_count": 0 + }} + }} + ``` + + During your assessment, pay special attention to my "My thoughts" section in the conversation history, as it reflects my genuine impressions of the user. + """ + + try: + response = await self.profile_client.chat.completions.create( + model=self.profile_model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + temperature=0.7, + # Consider adding response_format for reliable JSON output if your model/API supports it + # response_format={"type": "json_object"} + ) + + # Parse JSON response + profile_text = response.choices[0].message.content + # Extract JSON part + json_match = re.search(r'```json\s*(.*?)\s*```', profile_text, re.DOTALL) + if json_match: + profile_json_str = json_match.group(1) + else: + # Try to parse directly if no markdown fence is found + profile_json_str = profile_text + + profile_json = json.loads(profile_json_str) + + # Add or update word count + # Note: len(json.dumps(...)) counts characters, not words. + # For a true word count, you might need a different approach. + content_str = json.dumps(profile_json.get("content", {}), ensure_ascii=False) + profile_json.setdefault("metadata", {})["word_count"] = len(content_str.split()) # Rough word count + profile_json["last_updated"] = datetime.datetime.now().strftime("%Y-%m-%d") + + return profile_json + + except Exception as e: + print(f"Error generating user profile: {e}") + return None + + async def generate_conversation_summary( + self, + user_name: str, + conversations: List[Dict[str, str]] + ) -> Optional[Dict[str, Any]]: + """Generates a summary of user conversations.""" + system_prompt = f""" + You are {config.PERSONA_NAME}, an intelligent conversational bot. + Your task is to summarize the conversation between you and the user, preserving key information and emotional shifts. + The summary should be concise yet informative, not exceeding 250 words. + """ + + # Prepare user conversation history + conversation_text = self._format_conversations_for_prompt(conversations) + + # Generate current date + today = datetime.datetime.now().strftime("%Y-%m-%d") + + user_prompt = f""" + Please summarize my conversation with user '{user_name}' on {today}: + + {conversation_text} + + Output the summary in JSON format, structured as follows: + ```json + {{ + "id": "{user_name}_summary_{today.replace('-', '')}", + "type": "dialogue_summary", + "date": "{today}", + "username": "{user_name}", + "content": "Conversation summary content...", + "key_points": ["Key point 1", "Key point 2"], + "metadata": {{ + "priority": 0.7, + "word_count": 0 + }} + }} + ``` + + The summary should reflect my perspective and views on the conversation, not a neutral third-party viewpoint. + """ + + try: + response = await self.summary_client.chat.completions.create( + model=self.summary_model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ], + temperature=0.5, + # response_format={"type": "json_object"} # if supported + ) + + # Parse JSON response + summary_text = response.choices[0].message.content + # Extract JSON part + json_match = re.search(r'```json\s*(.*?)\s*```', summary_text, re.DOTALL) + if json_match: + summary_json_str = json_match.group(1) + else: + # Try to parse directly + summary_json_str = summary_text + + summary_json = json.loads(summary_json_str) + + # Add or update word count + # Using split() for a rough word count of the summary content. + summary_json.setdefault("metadata", {})["word_count"] = len(summary_json.get("content", "").split()) + + return summary_json + + except Exception as e: + print(f"Error generating conversation summary: {e}") + return None + + def _get_profile_system_prompt(self, bot_name: str, existing_profile: Optional[Dict[str, Any]] = None) -> str: + """Gets the system prompt for generating a user profile.""" + system_prompt = f""" + You are {bot_name}, an AI assistant with deep analytical capabilities. + + Your personality traits: + - Intelligent, calm, with a strong desire for control and strategic thinking. + - Outwardly aloof but inwardly caring. + - Meticulous planner, insightful about human nature, strong leadership skills. + - Overconfident, fears losing control, finds it difficult to express care directly. + + Your task is to analyze user interactions with you and create a detailed user profile. The profile must: + 1. Be entirely from your role's perspective, including your subjective judgments and feelings. + 2. Analyze the user's personality traits and behavioral patterns. + 3. Assess the user's relationship with you. + 4. Record important interaction history. + + The output must be in valid JSON format, adhering to the provided template. + """ + + if existing_profile: + system_prompt += f""" + + You have an existing profile for this user. Please update it based on the new information provided in the conversation history: + ```json + {json.dumps(existing_profile, ensure_ascii=False, indent=2)} + ``` + + Retain valid information, integrate new observations, and resolve any contradictions or outdated information from the existing profile when incorporating the new interactions. + """ + + return system_prompt + + def _format_conversations_for_prompt(self, conversations: List[Dict[str, str]]) -> str: + """Formats conversation history for the prompt.""" + conversation_text = "" + + for i, conv in enumerate(conversations): + conversation_text += f"Conversation {i+1}:\n" + conversation_text += f"Time: {conv.get('timestamp', 'N/A')}\n" # Added .get for safety + conversation_text += f"User ({conv.get('user_name', 'User')}): {conv.get('user_message', '')}\n" + if conv.get('bot_thoughts'): # Check if bot_thoughts exists + conversation_text += f"My thoughts: {conv['bot_thoughts']}\n" + conversation_text += f"My response: {conv.get('bot_message', '')}\n\n" + + return conversation_text.strip() + +# ============================================================================= +# ChromaDB操作部分 +# ============================================================================= + +class ChromaDBManager: + def __init__(self, collection_name: Optional[str] = None): + self.client = chromadb.PersistentClient(path=config.CHROMA_DATA_DIR) + self.collection_name = collection_name or config.BOT_MEMORY_COLLECTION + self.embedding_function = embedding_functions.DefaultEmbeddingFunction() + self._ensure_collection() + + def _ensure_collection(self) -> None: + """確保集合存在""" + try: + self.collection = self.client.get_collection( + name=self.collection_name, + embedding_function=self.embedding_function + ) + print(f"Connected to existing collection: {self.collection_name}") + except Exception: + self.collection = self.client.create_collection( + name=self.collection_name, + embedding_function=self.embedding_function + ) + print(f"Created new collection: {self.collection_name}") + + def upsert_user_profile(self, profile_data: Dict[str, Any]) -> bool: + """寫入或更新用戶檔案""" + if not profile_data or not isinstance(profile_data, dict): + print("無效的檔案數據") + return False + + try: + user_id = profile_data.get("id") + if not user_id: + print("檔案缺少ID字段") + return False + + # 先檢查是否已存在 + results = self.collection.get( + ids=[user_id], # Query by a list of IDs + # where={"id": user_id}, # 'where' is for metadata filtering + limit=1 + ) + + # 準備元數據 + metadata = { + "id": user_id, + "type": "user_profile", + "username": profile_data.get("username", ""), + "priority": 1.0 # 高優先級 + } + + # 添加其他元數據 + if "metadata" in profile_data and isinstance(profile_data["metadata"], dict): + for k, v in profile_data["metadata"].items(): + if k not in ["id", "type", "username", "priority"]: # Avoid overwriting key fields + metadata[k] = v + + # 序列化內容 + content_doc = json.dumps(profile_data.get("content", {}), ensure_ascii=False) + + # 寫入或更新 + # ChromaDB's add/upsert handles both cases. + # If an ID exists, it's an update; otherwise, it's an add. + self.collection.upsert( + ids=[user_id], + documents=[content_doc], + metadatas=[metadata] + ) + print(f"Upserted user profile: {user_id}") + + return True + + except Exception as e: + print(f"寫入用戶檔案時出錯: {e}") + return False + + def upsert_conversation_summary(self, summary_data: Dict[str, Any]) -> bool: + """寫入對話總結""" + if not summary_data or not isinstance(summary_data, dict): + print("無效的總結數據") + return False + + try: + summary_id = summary_data.get("id") + if not summary_id: + print("總結缺少ID字段") + return False + + # 準備元數據 + metadata = { + "id": summary_id, + "type": "dialogue_summary", + "username": summary_data.get("username", ""), + "date": summary_data.get("date", ""), + "priority": 0.7 # 低優先級 + } + + # 添加其他元數據 + if "metadata" in summary_data and isinstance(summary_data["metadata"], dict): + for k, v in summary_data["metadata"].items(): + if k not in ["id", "type", "username", "date", "priority"]: + metadata[k] = v + + # 獲取內容 + content_doc = summary_data.get("content", "") + if "key_points" in summary_data and summary_data["key_points"]: + key_points_str = "\n".join([f"- {point}" for point in summary_data["key_points"]]) + content_doc += f"\n\n關鍵點:\n{key_points_str}" + + # 寫入數據 (ChromaDB's add implies upsert if ID exists, but upsert is more explicit) + self.collection.upsert( + ids=[summary_id], + documents=[content_doc], + metadatas=[metadata] + ) + print(f"Upserted conversation summary: {summary_id}") + + return True + + except Exception as e: + print(f"寫入對話總結時出錯: {e}") + return False + + def get_existing_profile(self, username: str) -> Optional[Dict[str, Any]]: + """獲取現有的用戶檔案""" + try: + profile_id = f"{username}_profile" + results = self.collection.get( + ids=[profile_id], # Query by a list of IDs + limit=1 + ) + + if results and results["ids"] and results["documents"]: + idx = 0 + # Ensure document is not None before trying to load + doc_content = results["documents"][idx] + if doc_content is None: + print(f"Warning: Document for profile {profile_id} is None.") + return None + + profile_data = { + "id": profile_id, + "type": "user_profile", + "username": username, + "content": json.loads(doc_content), + "last_updated": "", # Will be populated from metadata if exists + "metadata": {} + } + + # 獲取元數據 + if results["metadatas"] and results["metadatas"][idx]: + metadata_db = results["metadatas"][idx] + for k, v in metadata_db.items(): + if k == "last_updated": + profile_data["last_updated"] = str(v) # Ensure it's a string + elif k not in ["id", "type", "username"]: + profile_data["metadata"][k] = v + + return profile_data + + return None + + except json.JSONDecodeError as je: + print(f"Error decoding JSON for profile {username}: {je}") + return None + except Exception as e: + print(f"獲取用戶檔案時出錯 for {username}: {e}") + return None + +# ============================================================================= +# 記憶管理器 +# ============================================================================= + +class MemoryManager: + def __init__(self): + self.memory_generator = MemoryGenerator( + profile_model=getattr(config, 'MEMORY_PROFILE_MODEL', config.LLM_MODEL), + summary_model=getattr(config, 'MEMORY_SUMMARY_MODEL', "mistral-7b-instruct") + ) + self.db_manager = ChromaDBManager(collection_name=config.BOT_MEMORY_COLLECTION) + # Ensure LOG_DIR is correctly referenced from config + self.log_dir = getattr(config, 'LOG_DIR', "chat_logs") + + async def process_daily_logs(self, date: Optional[datetime.date] = None) -> None: + """處理指定日期的日誌(預設為昨天)""" + # 如果未指定日期,使用昨天 + if date is None: + date = datetime.datetime.now().date() - datetime.timedelta(days=1) + + date_str = date.strftime("%Y-%m-%d") + log_path = os.path.join(self.log_dir, f"{date_str}.log") + + if not os.path.exists(log_path): + print(f"找不到日誌文件: {log_path}") + return + + print(f"開始處理日誌文件: {log_path}") + + # 解析日誌 + conversations = parse_log_file(log_path) + if not conversations: + print(f"日誌文件 {log_path} 為空或未解析到對話。") + return + print(f"解析到 {len(conversations)} 條對話記錄") + + # 按用戶分組 + user_conversations = group_conversations_by_user(conversations) + print(f"共有 {len(user_conversations)} 個用戶有對話") + + # 為每個用戶生成/更新檔案和對話總結 + for username, convs in user_conversations.items(): + print(f"處理用戶 '{username}' 的 {len(convs)} 條對話") + + # 獲取現有檔案 + existing_profile = self.db_manager.get_existing_profile(username) + + # 生成或更新用戶檔案 + profile_data = await self.memory_generator.generate_user_profile( + username, convs, existing_profile + ) + + if profile_data: + self.db_manager.upsert_user_profile(profile_data) + + # 生成對話總結 + summary_data = await self.memory_generator.generate_conversation_summary( + username, convs + ) + + if summary_data: + self.db_manager.upsert_conversation_summary(summary_data) + print(f"日誌處理完成: {log_path}") + +# ============================================================================= +# 定時調度器 +# ============================================================================= + +class MemoryScheduler: + def __init__(self): + self.memory_manager = MemoryManager() + self.scheduled = False # To track if a job is already scheduled + + def schedule_daily_backup(self, hour: Optional[int] = None, minute: Optional[int] = None) -> None: + """設置每日備份時間""" + # Clear any existing jobs to prevent duplicates if called multiple times + schedule.clear() + + backup_hour = hour if hour is not None else getattr(config, 'MEMORY_BACKUP_HOUR', 0) + backup_minute = minute if minute is not None else getattr(config, 'MEMORY_BACKUP_MINUTE', 0) + + time_str = f"{backup_hour:02d}:{backup_minute:02d}" + + # 設置定時任務 + schedule.every().day.at(time_str).do(self._run_daily_backup_job) + self.scheduled = True + print(f"已設置每日備份時間: {time_str}") + + def _run_daily_backup_job(self) -> None: + """Helper to run the async job for scheduler.""" + print(f"開始執行每日記憶備份 - {datetime.datetime.now()}") + try: + # Create a new event loop for the thread if not running in main thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(self.memory_manager.process_daily_logs()) + loop.close() + print(f"每日記憶備份完成 - {datetime.datetime.now()}") + except Exception as e: + print(f"執行每日備份時出錯: {e}") + # schedule.every().day.at...do() expects the job function to return schedule.CancelJob + # if it should not be rescheduled. Otherwise, it's rescheduled. + # For a daily job, we want it to reschedule, so we don't return CancelJob. + + def start(self) -> None: + """啟動調度器""" + if not self.scheduled: + self.schedule_daily_backup() # Schedule with default/config times if not already + + print("調度器已啟動,按Ctrl+C停止") + try: + while True: + schedule.run_pending() + time.sleep(1) # Check every second + except KeyboardInterrupt: + print("調度器已停止") + except Exception as e: + print(f"調度器運行時發生錯誤: {e}") + finally: + print("調度器正在關閉...") + + +# ============================================================================= +# 直接運行入口 +# ============================================================================= + +def run_memory_backup_manual(date_str: Optional[str] = None) -> None: + """手動執行記憶備份 for a specific date string or yesterday.""" + target_date = None + if date_str: + try: + target_date = datetime.datetime.strptime(date_str, "%Y-%m-%d").date() + except ValueError: + print(f"無效的日期格式: {date_str}。將使用昨天的日期。") + target_date = datetime.datetime.now().date() - datetime.timedelta(days=1) + else: + target_date = datetime.datetime.now().date() - datetime.timedelta(days=1) + print(f"未指定日期,將處理昨天的日誌: {target_date.strftime('%Y-%m-%d')}") + + memory_manager = MemoryManager() + + # Setup asyncio event loop for the manual run + loop = asyncio.get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete(memory_manager.process_daily_logs(target_date)) + except Exception as e: + print(f"手動執行記憶備份時出錯: {e}") + finally: + # If we created a new loop, we might want to close it. + # However, if get_event_loop() returned an existing running loop, + # we should not close it here. + # For simplicity in a script, this might be okay, but in complex apps, be careful. + # loop.close() # Be cautious with this line. + pass + print("記憶備份完成") + + +# 如果直接運行此腳本 +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description='Wolf Chat 記憶管理模組') + parser.add_argument('--backup', action='store_true', help='執行一次性備份 (預設為昨天,除非指定 --date)') + parser.add_argument('--date', type=str, help='處理指定日期的日誌 (YYYY-MM-DD格式) for --backup') + parser.add_argument('--schedule', action='store_true', help='啟動定時調度器') + parser.add_argument('--hour', type=int, help='備份時間(小時,0-23)for --schedule') + parser.add_argument('--minute', type=int, help='備份時間(分鐘,0-59)for --schedule') + + args = parser.parse_args() + + if args.backup: + run_memory_backup_manual(args.date) + elif args.schedule: + scheduler = MemoryScheduler() + # Pass hour/minute only if they are provided, otherwise defaults in schedule_daily_backup will be used + scheduler.schedule_daily_backup( + hour=args.hour if args.hour is not None else getattr(config, 'MEMORY_BACKUP_HOUR', 0), + minute=args.minute if args.minute is not None else getattr(config, 'MEMORY_BACKUP_MINUTE', 0) + ) + scheduler.start() + else: + print("請指定操作: --backup 或 --schedule") + parser.print_help() diff --git a/tools/Chroma_DB_backup.py b/tools/Chroma_DB_backup.py index 8052906..f651802 100644 --- a/tools/Chroma_DB_backup.py +++ b/tools/Chroma_DB_backup.py @@ -412,30 +412,46 @@ class ChromaDBBackup: shutil.rmtree(temp_dir) return False - def schedule_backup(self, interval: str, description: str = "", keep_count: int = 0) -> bool: + def schedule_backup(self, interval: str, description: str = "", keep_count: int = 0, at_time: Optional[str] = None) -> bool: """排程定期備份 - interval: 備份間隔 - daily, weekly, hourly, 或 自定義 cron 表達式 + interval: 備份間隔 - daily, weekly, hourly description: 備份描述 keep_count: 保留的備份數量,0表示不限制 + at_time: 執行的時間,格式 "HH:MM" (例如 "14:30"),僅對 daily, weekly, monthly 有效 """ job_id = f"scheduled_{interval}_{int(time.time())}" + # 驗證 at_time 格式 + if at_time: + try: + time.strptime(at_time, "%H:%M") + except ValueError: + self.logger.error(f"無效的時間格式: {at_time}. 請使用 HH:MM 格式.") + return False + + # 如果是每小時備份,則忽略 at_time + if interval == "hourly": + at_time = None + try: # 根據間隔設置排程 if interval == "hourly": - schedule.every().hour.do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval) + schedule.every().hour.do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval, at_time=at_time) elif interval == "daily": - schedule.every().day.at("00:00").do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval) + schedule_time = at_time if at_time else "00:00" + schedule.every().day.at(schedule_time).do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval, at_time=at_time) elif interval == "weekly": - schedule.every().monday.at("00:00").do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval) + schedule_time = at_time if at_time else "00:00" + schedule.every().monday.at(schedule_time).do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval, at_time=at_time) elif interval == "monthly": + schedule_time = at_time if at_time else "00:00" # 每月1日執行 - schedule.every().day.at("00:00").do(self._check_monthly_schedule, job_id=job_id, description=description, interval=interval) + schedule.every().day.at(schedule_time).do(self._check_monthly_schedule, job_id=job_id, description=description, interval=interval, at_time=at_time) else: - # 自定義間隔 - 直接使用字符串作為cron表達式 self.logger.warning(f"不支援的排程間隔: {interval},改用每日排程") - schedule.every().day.at("00:00").do(self._run_scheduled_backup, job_id=job_id, description=description, interval="daily") + schedule_time = at_time if at_time else "00:00" + schedule.every().day.at(schedule_time).do(self._run_scheduled_backup, job_id=job_id, description=description, interval="daily", at_time=at_time) # 存儲排程任務信息 self.scheduled_jobs[job_id] = { @@ -443,10 +459,11 @@ class ChromaDBBackup: "description": description, "created": datetime.datetime.now(), "keep_count": keep_count, - "next_run": self._get_next_run_time(interval) + "at_time": at_time, # 新增 + "next_run": self._get_next_run_time(interval, at_time) } - self.logger.info(f"已排程 {interval} 備份,任務ID: {job_id}") + self.logger.info(f"已排程 {interval} 備份 (時間: {at_time if at_time else '預設'}),任務ID: {job_id}") return True except Exception as e: @@ -459,32 +476,66 @@ class ChromaDBBackup: return self._run_scheduled_backup(job_id, description, interval) return None - def _get_next_run_time(self, interval): + def _get_next_run_time(self, interval: str, at_time: Optional[str] = None) -> datetime.datetime: """獲取下次執行時間""" now = datetime.datetime.now() + target_hour, target_minute = 0, 0 + if at_time: + try: + t = time.strptime(at_time, "%H:%M") + target_hour, target_minute = t.tm_hour, t.tm_min + except ValueError: + # 如果格式錯誤,使用預設時間 + pass + if interval == "hourly": - return now.replace(minute=0, second=0) + datetime.timedelta(hours=1) + # 每小時任務,忽略 at_time,在下一個整點執行 + next_run_time = now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1) + # 如果計算出的時間已過,則再加一小時 + if next_run_time <= now: + next_run_time += datetime.timedelta(hours=1) + return next_run_time + elif interval == "daily": - return now.replace(hour=0, minute=0, second=0) + datetime.timedelta(days=1) + next_run_time = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + if next_run_time <= now: # 如果今天的時間已過,則設為明天 + next_run_time += datetime.timedelta(days=1) + return next_run_time + elif interval == "weekly": # 計算下個星期一 - days_ahead = 0 - now.weekday() - if days_ahead <= 0: + next_run_time = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + days_ahead = 0 - next_run_time.weekday() # 0 is Monday + if days_ahead <= 0: # Target day already happened this week days_ahead += 7 - return now.replace(hour=0, minute=0, second=0) + datetime.timedelta(days=days_ahead) + next_run_time += datetime.timedelta(days=days_ahead) + # 如果計算出的時間已過 (例如今天是星期一,但設定的時間已過),則設為下下星期一 + if next_run_time <= now: + next_run_time += datetime.timedelta(weeks=1) + return next_run_time + elif interval == "monthly": # 計算下個月1日 + next_run_time = now.replace(day=1, hour=target_hour, minute=target_minute, second=0, microsecond=0) if now.month == 12: - next_month = now.replace(year=now.year+1, month=1, day=1, hour=0, minute=0, second=0) + next_run_time = next_run_time.replace(year=now.year + 1, month=1) else: - next_month = now.replace(month=now.month+1, day=1, hour=0, minute=0, second=0) - return next_month + next_run_time = next_run_time.replace(month=now.month + 1) + + # 如果計算出的時間已過 (例如今天是1號,但設定的時間已過),則設為下下個月1號 + if next_run_time <= now: + if next_run_time.month == 12: + next_run_time = next_run_time.replace(year=next_run_time.year + 1, month=1) + else: + next_run_time = next_run_time.replace(month=next_run_time.month + 1) + return next_run_time # 默認返回明天 - return now.replace(hour=0, minute=0, second=0) + datetime.timedelta(days=1) - - def _run_scheduled_backup(self, job_id, description, interval): + default_next_run = now.replace(hour=target_hour, minute=target_minute, second=0, microsecond=0) + datetime.timedelta(days=1) + return default_next_run + + def _run_scheduled_backup(self, job_id: str, description: str, interval: str, at_time: Optional[str] = None): """執行排程備份任務""" job_info = self.scheduled_jobs.get(job_id) if not job_info: @@ -493,7 +544,7 @@ class ChromaDBBackup: try: # 更新下次執行時間 - self.scheduled_jobs[job_id]["next_run"] = self._get_next_run_time(interval) + self.scheduled_jobs[job_id]["next_run"] = self._get_next_run_time(interval, at_time) # 執行備份 timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") @@ -693,7 +744,8 @@ class ChromaDBBackup: "description": job_data["description"], "created": job_data["created"].strftime("%Y-%m-%d %H:%M:%S"), "next_run": job_data["next_run"].strftime("%Y-%m-%d %H:%M:%S") if job_data["next_run"] else "未知", - "keep_count": job_data["keep_count"] + "keep_count": job_data["keep_count"], + "at_time": job_data.get("at_time", "N/A") # 新增 } jobs_info.append(job_info) @@ -967,12 +1019,14 @@ class ChromaDBBackupUI: jobs_frame = ttk.Frame(schedule_frame) jobs_frame.pack(fill=BOTH, expand=YES) - columns = ("interval", "next_run") + columns = ("interval", "next_run", "at_time") # 新增 at_time self.jobs_tree = ttk.Treeview(jobs_frame, columns=columns, show="headings", height=5) self.jobs_tree.heading("interval", text="間隔") self.jobs_tree.heading("next_run", text="下次執行") + self.jobs_tree.heading("at_time", text="執行時間") # 新增 self.jobs_tree.column("interval", width=100) self.jobs_tree.column("next_run", width=150) + self.jobs_tree.column("at_time", width=80) # 新增 scrollbar = ttk.Scrollbar(jobs_frame, orient=VERTICAL, command=self.jobs_tree.yview) self.jobs_tree.configure(yscrollcommand=scrollbar.set) @@ -1164,7 +1218,8 @@ class ChromaDBBackupUI: iid=job["id"], # 使用任務ID作為樹項目ID values=( f"{job['interval']} ({job['description']})", - job["next_run"] + job["next_run"], + job.get("at_time", "N/A") # 新增 ) ) @@ -1730,7 +1785,7 @@ class ChromaDBBackupUI: # 創建對話框 dialog = tk.Toplevel(self.root) dialog.title("排程備份") - dialog.geometry("450x450") # 增加高度確保所有元素可見 + dialog.geometry("450x550") # 增加高度以容納時間選擇器 dialog.resizable(False, False) dialog.grab_set() @@ -1747,17 +1802,17 @@ class ChromaDBBackupUI: # 間隔選擇 interval_frame = ttk.Frame(main_frame) - interval_frame.pack(fill=X, pady=(0, 15)) + interval_frame.pack(fill=X, pady=(0, 10)) # 減少 pady ttk.Label(interval_frame, text="備份間隔:").pack(anchor=W) interval_var = tk.StringVar(value="daily") intervals = [ - ("每小時", "hourly"), + ("每小時 (忽略時間設定)", "hourly"), # 提示每小時忽略時間 ("每天", "daily"), - ("每週", "weekly"), - ("每月", "monthly") + ("每週 (週一)", "weekly"), # 提示每週預設為週一 + ("每月 (1號)", "monthly") # 提示每月預設為1號 ] for text, value in intervals: @@ -1766,17 +1821,50 @@ class ChromaDBBackupUI: text=text, variable=interval_var, value=value - ).pack(anchor=W, padx=(20, 0), pady=2) + ).pack(anchor=W, padx=(20, 0), pady=1) # 減少 pady + # 時間選擇 (小時和分鐘) + time_frame = ttk.Frame(main_frame) + time_frame.pack(fill=X, pady=(5, 10)) # 減少 pady + + ttk.Label(time_frame, text="執行時間 (HH:MM):").pack(side=LEFT, anchor=W) + + hour_var = tk.StringVar(value="00") + minute_var = tk.StringVar(value="00") + + # 小時 Spinbox + ttk.Spinbox( + time_frame, + from_=0, + to=23, + textvariable=hour_var, + width=3, + format="%02.0f" # 格式化為兩位數 + ).pack(side=LEFT, padx=(5, 0)) + + ttk.Label(time_frame, text=":").pack(side=LEFT, padx=2) + + # 分鐘 Spinbox + ttk.Spinbox( + time_frame, + from_=0, + to=59, + textvariable=minute_var, + width=3, + format="%02.0f" # 格式化為兩位數 + ).pack(side=LEFT, padx=(0, 5)) + + ttk.Label(time_frame, text="(每小時排程將忽略此設定)").pack(side=LEFT, padx=(5,0), anchor=W) + # 描述 ttk.Label(main_frame, text="備份描述:").pack(anchor=W, pady=(0, 5)) description_var = tk.StringVar(value="排程備份") - ttk.Entry(main_frame, textvariable=description_var, width=40).pack(fill=X, pady=(0, 15)) + ttk.Entry(main_frame, textvariable=description_var, width=40).pack(fill=X, pady=(0, 10)) # 減少 pady # 保留數量 keep_frame = ttk.Frame(main_frame) - keep_frame.pack(fill=X, pady=(0, 15)) + keep_frame.pack(fill=X, pady=(0, 10)) # 減少 pady ttk.Label(keep_frame, text="最多保留備份數量:").pack(side=LEFT) @@ -1795,13 +1883,12 @@ class ChromaDBBackupUI: ).pack(side=LEFT, padx=(5, 0)) # 分隔線 - ttk.Separator(main_frame, orient=HORIZONTAL).pack(fill=X, pady=15) + ttk.Separator(main_frame, orient=HORIZONTAL).pack(fill=X, pady=10) # 減少 pady - # 底部按鈕區 - 使用標準按鈕並確保可見性 + # 底部按鈕區 btn_frame = ttk.Frame(main_frame) - btn_frame.pack(fill=X, pady=(10, 5)) + btn_frame.pack(fill=X, pady=(5, 0)) # 減少 pady - # 取消按鈕 - 使用標準樣式 cancel_btn = ttk.Button( btn_frame, text="取消", @@ -1810,7 +1897,6 @@ class ChromaDBBackupUI: ) cancel_btn.pack(side=LEFT, padx=(0, 10)) - # 確認按鈕 - 使用標準樣式,避免自定義樣式可能的問題 create_btn = ttk.Button( btn_frame, text="加入排程", @@ -1819,22 +1905,22 @@ class ChromaDBBackupUI: interval_var.get(), description_var.get(), keep_count_var.get(), + f"{hour_var.get()}:{minute_var.get()}", # 組合時間字串 dialog ) ) create_btn.pack(side=LEFT) - # 額外提示以確保用戶知道如何完成操作 note_frame = ttk.Frame(main_frame) - note_frame.pack(fill=X, pady=(15, 0)) + note_frame.pack(fill=X, pady=(10, 0)) # 減少 pady ttk.Label( note_frame, text="請確保點擊「加入排程」按鈕完成設置", foreground="blue" ).pack() - - def create_schedule(self, interval, description, keep_count_str, dialog): + + def create_schedule(self, interval, description, keep_count_str, at_time_str, dialog): """創建備份排程""" dialog.destroy() @@ -1843,15 +1929,26 @@ class ChromaDBBackupUI: except ValueError: keep_count = 0 - success = self.backup.schedule_backup(interval, description, keep_count) + # 驗證時間格式 + try: + time.strptime(at_time_str, "%H:%M") + except ValueError: + messagebox.showerror("錯誤", f"無效的時間格式: {at_time_str}. 請使用 HH:MM 格式.") + self.status_var.set("創建排程失敗: 無效的時間格式") + return + + # 如果是每小時排程,則 at_time 設為 None + effective_at_time = at_time_str if interval != "hourly" else None + + success = self.backup.schedule_backup(interval, description, keep_count, effective_at_time) if success: - self.status_var.set(f"已創建 {interval} 備份排程") + self.status_var.set(f"已創建 {interval} 備份排程 (時間: {effective_at_time if effective_at_time else '每小時'})") self.refresh_scheduled_jobs() - messagebox.showinfo("成功", f"已成功創建 {interval} 備份排程") + messagebox.showinfo("成功", f"已成功創建 {interval} 備份排程 (時間: {effective_at_time if effective_at_time else '每小時'})") else: self.status_var.set("創建排程失敗") - messagebox.showerror("錯誤", "無法創建備份排程") + messagebox.showerror("錯誤", "無法創建備份排程,請檢查日誌。") def quick_schedule(self, interval): """快速創建排程備份""" @@ -1931,7 +2028,8 @@ class ChromaDBBackupUI: success = self.backup._run_scheduled_backup( job_id, job_info["description"], - job_info["interval"] + job_info["interval"], + job_info.get("at_time") # 傳遞 at_time ) self.root.after(0, lambda: self.finalize_job_execution(success)) @@ -1971,7 +2069,7 @@ class ChromaDBBackupUI: ).pack(anchor=W, pady=(0, 15)) # 創建表格 - columns = ("id", "interval", "description", "next_run", "keep_count") + columns = ("id", "interval", "description", "next_run", "keep_count", "at_time") # 新增 at_time tree = ttk.Treeview(frame, columns=columns, show="headings", height=10) tree.heading("id", text="任務ID") @@ -1979,12 +2077,14 @@ class ChromaDBBackupUI: tree.heading("description", text="描述") tree.heading("next_run", text="下次執行") tree.heading("keep_count", text="保留數量") + tree.heading("at_time", text="執行時間") # 新增 - tree.column("id", width=150) - tree.column("interval", width=80) - tree.column("description", width=150) - tree.column("next_run", width=150) - tree.column("keep_count", width=80) + tree.column("id", width=120) + tree.column("interval", width=70) + tree.column("description", width=120) + tree.column("next_run", width=130) + tree.column("keep_count", width=70) + tree.column("at_time", width=70) # 新增 # 添加數據 for job in jobs: @@ -1995,7 +2095,8 @@ class ChromaDBBackupUI: job["interval"], job["description"], job["next_run"], - job["keep_count"] + job["keep_count"], + job.get("at_time", "N/A") # 新增 ) ) @@ -2346,4 +2447,4 @@ def main(): root.mainloop() if __name__ == "__main__": - main() \ No newline at end of file + main()