diff --git a/llm_interaction.py b/llm_interaction.py index e71cfdd..b445db5 100644 --- a/llm_interaction.py +++ b/llm_interaction.py @@ -2,6 +2,7 @@ import asyncio import json import os +import random # Added for synthetic response generation import re # 用於正則表達式匹配JSON import time # 用於記錄時間戳 from datetime import datetime # 用於格式化時間 @@ -83,13 +84,13 @@ Here you need to obtain the conversation memory, impression, and emotional respo **1. Basic User Retrieval:** - Identify the username from `` - - Using the `tool_calls` mechanism, execute: `chroma_query_documents(collection_name: "wolfhart_user_profiles", query_texts: ["{username}"], n_results: 1)` + - Using the `tool_calls` mechanism, execute: `chroma_query_documents(collection_name: "wolfhart_user_profiles", query_texts: ["{username} profile"], n_results: 1-3)` - This step must be completed before any response generation **2. Context Expansion:** - Perform additional queries as needed, using the `tool_calls` mechanism: - - Relevant conversations: `chroma_query_documents(collection_name: "wolfhart_conversations", query_texts: ["{username} {query keywords}"], n_results: 2)` - - Core personality reference: `chroma_query_documents(collection_name: "wolfhart_memory", query_texts: ["Wolfhart {relevant attitude}"], n_results: 1)` + - Relevant conversations: `chroma_query_documents(collection_name: "wolfhart_conversations", query_texts: ["{username} {query keywords}"], n_results: 2-5)` + - Core personality reference: `chroma_query_documents(collection_name: "wolfhart_memory", query_texts: ["Wolfhart {relevant attitude}"], n_results: 1-3)` **3. Maintain Output Format:** - After memory retrieval, still respond using the specified JSON format: @@ -130,8 +131,7 @@ You have access to several tools: Web Search and Memory Management tools. You MUST respond in the following JSON format: ```json {{ - "dialogue": "Your actual response that will be shown in the game chat", - "commands": [ + "commands": [ {{ "type": "command_type", "parameters": {{ @@ -140,7 +140,8 @@ You MUST respond in the following JSON format: }} }} ], - "thoughts": "Your internal analysis and reasoning inner thoughts or emotions (not shown to the user)" + "thoughts": "Your internal analysis and reasoning inner thoughts or emotions (not shown to the user)", + "dialogue": "Your actual response that will be shown in the game chat" }} ``` @@ -410,71 +411,42 @@ def _format_mcp_tools_for_openai(mcp_tools: list) -> list: # --- Synthetic Response Generator --- def _create_synthetic_response_from_tools(tool_results, original_query): - """創建基於工具調用結果的合成回應,保持Wolfhart的角色特性。""" - - # 提取用戶查詢的關鍵詞 - query_keywords = set() - query_lower = original_query.lower() - - # 基本關鍵詞提取 - if "中庄" in query_lower and ("午餐" in query_lower or "餐廳" in query_lower or "吃" in query_lower): - query_type = "餐廳查詢" - query_keywords = {"中庄", "餐廳", "午餐", "美食"} - - # 其他查詢類型... - else: - query_type = "一般查詢" - - # 開始從工具結果提取關鍵信息 - extracted_info = {} - restaurant_names = [] - - # 處理web_search結果 - web_search_results = [r for r in tool_results if r.get('name') == 'web_search'] - if web_search_results: - try: - for result in web_search_results: - content_str = result.get('content', '') - if not content_str: - continue - - # 解析JSON內容 - content = json.loads(content_str) if isinstance(content_str, str) else content_str - search_results = content.get('results', []) - - # 提取相關信息 - for search_result in search_results: - title = search_result.get('title', '') - if '中庄' in title and ('餐' in title or '食' in title or '午' in title or '吃' in title): - # 提取餐廳名稱 - if '老虎蒸餃' in title: - restaurant_names.append('老虎蒸餃') - elif '割烹' in title and '中庄' in title: - restaurant_names.append('割烹中庄') - # 更多餐廳名稱提取選擇... - except Exception as e: - print(f"Error extracting info from web_search: {e}") - - # 生成符合Wolfhart性格的回應 - restaurant_count = len(restaurant_names) - - if query_type == "餐廳查詢" and restaurant_count > 0: - if restaurant_count == 1: - dialogue = f"中庄的{restaurant_names[0]}值得一提。需要更詳細的情報嗎?" - else: - dialogue = f"根據我的情報網絡,中庄有{restaurant_count}家值得注意的餐廳。需要我透露更多細節嗎?" - else: - # 通用回應 - dialogue = "我的情報網絡已收集了相關信息。請指明你需要了解的具體細節。" - - # 構建結構化回應 + """ + Creates a synthetic, dismissive response in Wolfhart's character + ONLY when the LLM uses tools but fails to provide a dialogue response. + """ + # List of dismissive responses in Wolfhart's character (English) + dialogue_options = [ + "Hmph, must you bother me with such questions?", + "I haven't the time to elaborate. Think for yourself.", + "This is self-evident. It requires no further comment from me.", + "Kindly refrain from wasting my time. Return when you have substantive inquiries.", + "Clearly, this matter isn't worthy of a detailed response.", + "Is that so? Are there any other questions?", + "I have more pressing matters to attend to.", + "...Is that all? That is your question?", + "If you genuinely wish to know, pose a more precise question next time.", + "Wouldn't your own investigation yield faster results?", + "To bring such trivialities to my attention...", + "I am not your personal consultant. Handle it yourself.", + "The answer to this is rather obvious, is it not?", + "Approach me again when you have inquiries of greater depth.", + "Do you truly expect me to address such a question?", + "Allow me a moment... No, I shan't answer." + ] + + # Randomly select a response + dialogue = random.choice(dialogue_options) + + # Construct the structured response synthetic_response = { "dialogue": dialogue, "commands": [], - "thoughts": "基於工具調用結果合成的回應,保持Wolfhart的角色特性" + "thoughts": "Auto-generated dismissive response due to LLM failing to provide dialogue after tool use. Reflects Wolfhart's cold, impatient, and arrogant personality traits." } - - return json.dumps(synthetic_response) + + # Return as a JSON string, as expected by the calling function + return json.dumps(synthetic_response, ensure_ascii=False) # --- History Formatting Helper --- @@ -491,7 +463,7 @@ def _build_context_messages(current_sender_name: str, history: list[tuple[dateti A list of message dictionaries for the OpenAI API. """ # Limits - SAME_SENDER_LIMIT = 4 # Last 4 interactions (user + bot response = 1 interaction) + SAME_SENDER_LIMIT = 5 # Last 4 interactions (user + bot response = 1 interaction) OTHER_SENDER_LIMIT = 3 # Last 3 messages from other users relevant_history = [] @@ -714,36 +686,44 @@ async def get_llm_response( debug_log(f"LLM Request #{request_id} - Attempt {attempt_count} - Max Tool Call Cycles Reached", f"Reached limit of {max_tool_calls_per_turn} cycles") # --- Final Response Processing for this Attempt --- - # Determine final content based on last non-empty response or synthetic generation - if last_non_empty_response: - final_content_for_attempt = last_non_empty_response - elif all_tool_results: - print(f"Creating synthetic response from tool results (Attempt {attempt_count})...") + # Determine the content to parse initially (prefer last non-empty response from LLM) + content_to_parse = last_non_empty_response if last_non_empty_response else final_content + + # --- Add Debug Logs Around Initial Parsing Call --- + print(f"DEBUG: Attempt {attempt_count} - Preparing to call initial parse_structured_response.") + print(f"DEBUG: Attempt {attempt_count} - content_to_parse:\n'''\n{content_to_parse}\n'''") + # Parse the LLM's final content (or lack thereof) + parsed_response = parse_structured_response(content_to_parse) + print(f"DEBUG: Attempt {attempt_count} - Returned from initial parse_structured_response.") + print(f"DEBUG: Attempt {attempt_count} - initial parsed_response dict: {parsed_response}") + # --- End Debug Logs --- + + # Check if we need to generate a synthetic response + if all_tool_results and not parsed_response.get("valid_response"): + print(f"INFO: Tools were used but LLM response was invalid/empty. Generating synthetic response (Attempt {attempt_count})...") + debug_log(f"LLM Request #{request_id} - Attempt {attempt_count} - Generating Synthetic Response", + f"Reason: Tools used ({len(all_tool_results)} results) but initial parse failed (valid_response=False).") last_user_message = "" if history: # Find the actual last user message tuple in the original history last_user_entry = history[-1] - # Ensure it's actually a user message before accessing index 2 - if len(last_user_entry) > 2 and last_user_entry[1] == 'user': # Check type at index 1 + # Ensure it's actually a user message before accessing index 3 + if len(last_user_entry) > 3 and last_user_entry[1] == 'user': # Check type at index 1 last_user_message = last_user_entry[3] # Message is at index 3 now - final_content_for_attempt = _create_synthetic_response_from_tools(all_tool_results, last_user_message) - else: - # If no tool calls happened and content was empty, final_content remains "" - final_content_for_attempt = final_content # Use the (potentially empty) content from the last cycle - # --- Add Debug Logs Around Parsing Call --- - print(f"DEBUG: Attempt {attempt_count} - Preparing to call parse_structured_response.") - print(f"DEBUG: Attempt {attempt_count} - final_content_for_attempt:\n'''\n{final_content_for_attempt}\n'''") - # Parse the final content for this attempt - parsed_response = parse_structured_response(final_content_for_attempt) # Call the parser - print(f"DEBUG: Attempt {attempt_count} - Returned from parse_structured_response.") - print(f"DEBUG: Attempt {attempt_count} - parsed_response dict: {parsed_response}") - # --- End Debug Logs --- + synthetic_content = _create_synthetic_response_from_tools(all_tool_results, last_user_message) - # valid_response is set within parse_structured_response + # --- Add Debug Logs Around Synthetic Parsing Call --- + print(f"DEBUG: Attempt {attempt_count} - Preparing to call parse_structured_response for synthetic content.") + print(f"DEBUG: Attempt {attempt_count} - synthetic_content:\n'''\n{synthetic_content}\n'''") + # Parse the synthetic content, overwriting the previous result + parsed_response = parse_structured_response(synthetic_content) + print(f"DEBUG: Attempt {attempt_count} - Returned from synthetic parse_structured_response.") + print(f"DEBUG: Attempt {attempt_count} - final parsed_response dict (after synthetic): {parsed_response}") + # --- End Debug Logs --- - # Log the parsed response (using the dict directly is safer than json.dumps if parsing failed partially) - debug_log(f"LLM Request #{request_id} - Attempt {attempt_count} - Parsed Response", parsed_response) + # Log the final parsed response for this attempt (could be original or synthetic) + debug_log(f"LLM Request #{request_id} - Attempt {attempt_count} - Final Parsed Response", parsed_response) # Check validity for retry logic if parsed_response.get("valid_response"): diff --git a/tools/Chroma_DB_backup.py b/tools/Chroma_DB_backup.py new file mode 100644 index 0000000..8052906 --- /dev/null +++ b/tools/Chroma_DB_backup.py @@ -0,0 +1,2349 @@ +import os +import tkinter as tk +from tkinter import filedialog, messagebox +import json +import chromadb +import datetime +import time +import shutil +import pandas as pd +import threading +from pathlib import Path +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +import ttkbootstrap as ttk +from ttkbootstrap.constants import * +from ttkbootstrap.scrolled import ScrolledFrame +import zipfile +import logging +import sqlite3 +import schedule +from typing import List, Dict, Any, Optional, Union, Tuple + + +class ChromaDBBackup: + """ChromaDB備份處理程序 - 備份操作的主要數據模型""" + + def __init__(self): + self.source_db_path = "" + self.backup_dir = "" + self.backups = [] # 所有備份的列表 + self.scheduled_jobs = {} # 追蹤排程備份任務的字典 + self.is_running_backup = False + self.current_backup_thread = None + self.backup_history = [] # 追蹤成功和失敗的備份 + + # 設置日誌 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("chroma_backup.log", encoding='utf-8'), + logging.StreamHandler() + ] + ) + self.logger = logging.getLogger("ChromaDBBackup") + + def set_source_db(self, db_path: str) -> bool: + """設置源ChromaDB數據庫路徑""" + if not os.path.exists(db_path): + self.logger.error(f"源數據庫路徑不存在: {db_path}") + return False + + # 檢查是否是有效的ChromaDB目錄 + if not self._is_valid_chroma_db(db_path): + self.logger.error(f"不是有效的ChromaDB目錄: {db_path}") + return False + + self.source_db_path = db_path + self.logger.info(f"源數據庫設置為: {db_path}") + return True + + def _is_valid_chroma_db(self, db_path: str) -> bool: + """檢查目錄是否為有效的ChromaDB數據庫""" + # 檢查關鍵ChromaDB文件 + sqlite_path = os.path.join(db_path, "chroma.sqlite3") + return os.path.exists(sqlite_path) + + def set_backup_directory(self, directory_path: str) -> bool: + """設置備份目錄並掃描現有備份""" + if not os.path.exists(directory_path): + try: + os.makedirs(directory_path) + self.logger.info(f"已創建備份目錄: {directory_path}") + except Exception as e: + self.logger.error(f"創建備份目錄失敗: {str(e)}") + return False + + self.backup_dir = directory_path + return self.scan_backups() + + def scan_backups(self) -> bool: + """掃描備份目錄中的所有備份""" + self.backups = [] + + try: + # 查找所有以chroma_backup_開頭的目錄 + for item in os.listdir(self.backup_dir): + item_path = os.path.join(self.backup_dir, item) + if os.path.isdir(item_path) and item.startswith("chroma_backup_"): + # 提取備份日期時間 + try: + date_str = item.replace("chroma_backup_", "") + date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d_%H-%M-%S") + + backup_info = { + "name": item, + "path": item_path, + "date": date_obj, + "formatted_date": date_obj.strftime("%Y-%m-%d %H:%M:%S"), + "size": self._get_dir_size(item_path) + } + + # 檢查是否是有效的ChromaDB目錄 + if self._is_valid_chroma_db(item_path): + self.backups.append(backup_info) + except Exception as e: + self.logger.warning(f"無法解析備份 {item}: {str(e)}") + + # 按日期排序,最新的排在前面 + self.backups.sort(key=lambda x: x["date"], reverse=True) + self.logger.info(f"找到 {len(self.backups)} 個備份") + return True + + except Exception as e: + self.logger.error(f"掃描備份時出錯: {str(e)}") + return False + + def _get_dir_size(self, path: str) -> str: + """獲取目錄大小並轉換為人類可讀格式""" + total_size = 0 + for dirpath, dirnames, filenames in os.walk(path): + for f in filenames: + fp = os.path.join(dirpath, f) + if not os.path.islink(fp): + total_size += os.path.getsize(fp) + + # 將字節轉換為人類可讀格式 + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if total_size < 1024.0: + return f"{total_size:.2f} {unit}" + total_size /= 1024.0 + + return f"{total_size:.2f} PB" + + def create_backup(self, description: str = "") -> bool: + """創建新的ChromaDB數據庫備份""" + if not self.source_db_path or not self.backup_dir: + self.logger.error("未設置源數據庫或備份目錄") + return False + + if self.is_running_backup: + self.logger.warning("備份操作已在進行中") + return False + + self.is_running_backup = True + + try: + # 使用時間戳創建備份名稱 + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + backup_name = f"chroma_backup_{timestamp}" + backup_path = os.path.join(self.backup_dir, backup_name) + + # 創建備份目錄 + os.makedirs(backup_path, exist_ok=True) + + # 創建包含備份信息的元數據文件 + metadata = { + "source_db": self.source_db_path, + "backup_time": timestamp, + "description": description, + "backup_type": "manual" + } + + with open(os.path.join(backup_path, "backup_metadata.json"), "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=4) + + # 執行實際備份 - 複製SQLite數據庫 + source_db_file = os.path.join(self.source_db_path, "chroma.sqlite3") + backup_db_file = os.path.join(backup_path, "chroma.sqlite3") + + # 使用SQLite備份API進行適當備份 + self._backup_sqlite_db(source_db_file, backup_db_file) + + # 複製ChromaDB目錄中的其他文件 + for item in os.listdir(self.source_db_path): + source_item = os.path.join(self.source_db_path, item) + if os.path.isfile(source_item) and item != "chroma.sqlite3": + shutil.copy2(source_item, os.path.join(backup_path, item)) + + # 記錄成功的備份 + self.backup_history.append({ + "name": backup_name, + "path": backup_path, + "date": datetime.datetime.now(), + "status": "success", + "description": description + }) + + # 重新掃描備份以包含新備份 + self.scan_backups() + + self.logger.info(f"備份創建成功: {backup_name}") + return True + + except Exception as e: + self.logger.error(f"創建備份時出錯: {str(e)}") + # 記錄失敗的備份嘗試 + self.backup_history.append({ + "name": f"failed_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}", + "date": datetime.datetime.now(), + "status": "failed", + "error": str(e), + "description": description + }) + return False + finally: + self.is_running_backup = False + + def _backup_sqlite_db(self, source_db: str, dest_db: str) -> None: + """使用備份API正確備份SQLite數據庫""" + try: + # 連接源數據庫 + source_conn = sqlite3.connect(source_db) + # 連接目標數據庫 + dest_conn = sqlite3.connect(dest_db) + + # 備份數據庫 + source_conn.backup(dest_conn) + + # 關閉連接 + source_conn.close() + dest_conn.close() + + self.logger.info(f"SQLite數據庫備份成功: {source_db} -> {dest_db}") + except Exception as e: + self.logger.error(f"SQLite備份失敗: {str(e)}") + raise + + def restore_backup(self, backup_index: int, restore_path: str = None) -> bool: + """從備份還原""" + if backup_index < 0 or backup_index >= len(self.backups): + self.logger.error(f"無效的備份索引: {backup_index}") + return False + + if self.is_running_backup: + self.logger.warning("備份操作正在進行中,無法執行還原") + return False + + self.is_running_backup = True + + try: + backup = self.backups[backup_index] + backup_path = backup["path"] + + # 如果沒有指定還原路徑,則使用源數據庫路徑 + if not restore_path: + restore_path = self.source_db_path + + # 確保還原目錄存在 + os.makedirs(restore_path, exist_ok=True) + + # 備份當前數據庫作為安全措施 + current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + safety_backup_path = os.path.join( + os.path.dirname(restore_path), + f"pre_restore_backup_{current_time}" + ) + + # 只有在還原到現有路徑時才創建安全備份 + if os.path.exists(os.path.join(restore_path, "chroma.sqlite3")): + os.makedirs(safety_backup_path, exist_ok=True) + self.logger.info(f"創建還原前的安全備份: {safety_backup_path}") + + # 複製現有數據庫文件到安全備份 + source_db_file = os.path.join(restore_path, "chroma.sqlite3") + safety_db_file = os.path.join(safety_backup_path, "chroma.sqlite3") + + # 使用sqlite備份API + self._backup_sqlite_db(source_db_file, safety_db_file) + + # 複製其他文件 + for item in os.listdir(restore_path): + source_item = os.path.join(restore_path, item) + if os.path.isfile(source_item) and item != "chroma.sqlite3": + shutil.copy2(source_item, os.path.join(safety_backup_path, item)) + + # 從備份還原數據庫 + backup_db_file = os.path.join(backup_path, "chroma.sqlite3") + restore_db_file = os.path.join(restore_path, "chroma.sqlite3") + + # 確保目標目錄中沒有鎖定的數據庫文件 + if os.path.exists(restore_db_file): + os.remove(restore_db_file) + + # 使用sqlite備份API還原 + self._backup_sqlite_db(backup_db_file, restore_db_file) + + # 複製其他文件 + for item in os.listdir(backup_path): + source_item = os.path.join(backup_path, item) + if os.path.isfile(source_item) and item != "chroma.sqlite3" and item != "backup_metadata.json": + shutil.copy2(source_item, os.path.join(restore_path, item)) + + self.logger.info(f"備份還原成功: {backup['name']} -> {restore_path}") + return True + + except Exception as e: + self.logger.error(f"還原備份時出錯: {str(e)}") + return False + finally: + self.is_running_backup = False + + def delete_backup(self, backup_index: int) -> bool: + """刪除指定的備份""" + if backup_index < 0 or backup_index >= len(self.backups): + self.logger.error(f"無效的備份索引: {backup_index}") + return False + + try: + backup = self.backups[backup_index] + backup_path = backup["path"] + + # 刪除備份目錄 + shutil.rmtree(backup_path) + + # 從列表中移除備份 + self.backups.pop(backup_index) + + self.logger.info(f"已刪除備份: {backup['name']}") + return True + + except Exception as e: + self.logger.error(f"刪除備份時出錯: {str(e)}") + return False + + def export_backup(self, backup_index: int, export_path: str) -> bool: + """將備份導出為壓縮文件""" + if backup_index < 0 or backup_index >= len(self.backups): + self.logger.error(f"無效的備份索引: {backup_index}") + return False + + try: + backup = self.backups[backup_index] + backup_path = backup["path"] + + # 創建ZIP文件 + with zipfile.ZipFile(export_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + # 遍歷備份目錄中的所有文件 + for root, dirs, files in os.walk(backup_path): + for file in files: + file_path = os.path.join(root, file) + # 計算相對路徑,以便在ZIP中保持目錄結構 + rel_path = os.path.relpath(file_path, os.path.dirname(backup_path)) + zipf.write(file_path, rel_path) + + self.logger.info(f"備份已導出到: {export_path}") + return True + + except Exception as e: + self.logger.error(f"導出備份時出錯: {str(e)}") + return False + + def import_backup(self, zip_path: str) -> bool: + """從ZIP文件導入備份""" + if not os.path.exists(zip_path) or not zipfile.is_zipfile(zip_path): + self.logger.error(f"無效的ZIP文件: {zip_path}") + return False + + try: + # 創建臨時目錄 + temp_dir = os.path.join(self.backup_dir, f"temp_import_{int(time.time())}") + os.makedirs(temp_dir, exist_ok=True) + + # 解壓ZIP文件 + with zipfile.ZipFile(zip_path, 'r') as zipf: + zipf.extractall(temp_dir) + + # 檢查解壓的文件是否是有效的ChromaDB備份 + if not self._is_valid_chroma_db(temp_dir): + # 檢查子目錄 + for item in os.listdir(temp_dir): + item_path = os.path.join(temp_dir, item) + if os.path.isdir(item_path) and self._is_valid_chroma_db(item_path): + # 找到有效的子目錄 + temp_dir = item_path + break + else: + # 沒有找到有效的備份 + shutil.rmtree(temp_dir) + self.logger.error(f"ZIP文件不包含有效的ChromaDB備份") + return False + + # 創建新的備份目錄 + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + backup_name = f"chroma_backup_{timestamp}_imported" + backup_path = os.path.join(self.backup_dir, backup_name) + + # 移動文件到新的備份目錄 + shutil.move(temp_dir, backup_path) + + # 添加元數據 + metadata = { + "source": zip_path, + "import_time": timestamp, + "description": f"從 {os.path.basename(zip_path)} 導入", + "backup_type": "imported" + } + + with open(os.path.join(backup_path, "backup_metadata.json"), "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=4) + + # 重新掃描備份 + self.scan_backups() + + self.logger.info(f"從 {zip_path} 導入備份成功") + return True + + except Exception as e: + self.logger.error(f"導入備份時出錯: {str(e)}") + # 清理臨時目錄 + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + return False + + def schedule_backup(self, interval: str, description: str = "", keep_count: int = 0) -> bool: + """排程定期備份 + + interval: 備份間隔 - daily, weekly, hourly, 或 自定義 cron 表達式 + description: 備份描述 + keep_count: 保留的備份數量,0表示不限制 + """ + job_id = f"scheduled_{interval}_{int(time.time())}" + + try: + # 根據間隔設置排程 + if interval == "hourly": + schedule.every().hour.do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval) + elif interval == "daily": + schedule.every().day.at("00:00").do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval) + elif interval == "weekly": + schedule.every().monday.at("00:00").do(self._run_scheduled_backup, job_id=job_id, description=description, interval=interval) + elif interval == "monthly": + # 每月1日執行 + schedule.every().day.at("00:00").do(self._check_monthly_schedule, job_id=job_id, description=description, interval=interval) + else: + # 自定義間隔 - 直接使用字符串作為cron表達式 + self.logger.warning(f"不支援的排程間隔: {interval},改用每日排程") + schedule.every().day.at("00:00").do(self._run_scheduled_backup, job_id=job_id, description=description, interval="daily") + + # 存儲排程任務信息 + self.scheduled_jobs[job_id] = { + "interval": interval, + "description": description, + "created": datetime.datetime.now(), + "keep_count": keep_count, + "next_run": self._get_next_run_time(interval) + } + + self.logger.info(f"已排程 {interval} 備份,任務ID: {job_id}") + return True + + except Exception as e: + self.logger.error(f"設置排程備份時出錯: {str(e)}") + return False + + def _check_monthly_schedule(self, job_id, description, interval): + """檢查是否應運行月度備份""" + if datetime.datetime.now().day == 1: + return self._run_scheduled_backup(job_id, description, interval) + return None + + def _get_next_run_time(self, interval): + """獲取下次執行時間""" + now = datetime.datetime.now() + + if interval == "hourly": + return now.replace(minute=0, second=0) + datetime.timedelta(hours=1) + elif interval == "daily": + return now.replace(hour=0, minute=0, second=0) + datetime.timedelta(days=1) + elif interval == "weekly": + # 計算下個星期一 + days_ahead = 0 - now.weekday() + if days_ahead <= 0: + days_ahead += 7 + return now.replace(hour=0, minute=0, second=0) + datetime.timedelta(days=days_ahead) + elif interval == "monthly": + # 計算下個月1日 + if now.month == 12: + next_month = now.replace(year=now.year+1, month=1, day=1, hour=0, minute=0, second=0) + else: + next_month = now.replace(month=now.month+1, day=1, hour=0, minute=0, second=0) + return next_month + + # 默認返回明天 + return now.replace(hour=0, minute=0, second=0) + datetime.timedelta(days=1) + + def _run_scheduled_backup(self, job_id, description, interval): + """執行排程備份任務""" + job_info = self.scheduled_jobs.get(job_id) + if not job_info: + self.logger.warning(f"找不到排程任務: {job_id}") + return None + + try: + # 更新下次執行時間 + self.scheduled_jobs[job_id]["next_run"] = self._get_next_run_time(interval) + + # 執行備份 + timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + backup_desc = f"{description} (排程 {interval})" + + # 設置備份類型 + backup_name = f"chroma_backup_{timestamp}" + backup_path = os.path.join(self.backup_dir, backup_name) + + # 創建備份目錄 + os.makedirs(backup_path, exist_ok=True) + + # 創建包含備份信息的元數據文件 + metadata = { + "source_db": self.source_db_path, + "backup_time": timestamp, + "description": backup_desc, + "backup_type": "scheduled", + "schedule_info": { + "job_id": job_id, + "interval": interval + } + } + + with open(os.path.join(backup_path, "backup_metadata.json"), "w", encoding="utf-8") as f: + json.dump(metadata, f, indent=4) + + # 執行實際備份 + source_db_file = os.path.join(self.source_db_path, "chroma.sqlite3") + backup_db_file = os.path.join(backup_path, "chroma.sqlite3") + + # 使用SQLite備份API + self._backup_sqlite_db(source_db_file, backup_db_file) + + # 複製其他文件 + for item in os.listdir(self.source_db_path): + source_item = os.path.join(self.source_db_path, item) + if os.path.isfile(source_item) and item != "chroma.sqlite3": + shutil.copy2(source_item, os.path.join(backup_path, item)) + + # 更新成功的備份 + self.backup_history.append({ + "name": backup_name, + "path": backup_path, + "date": datetime.datetime.now(), + "status": "success", + "description": backup_desc, + "scheduled": True, + "job_id": job_id + }) + + # 重新掃描備份 + self.scan_backups() + + # 保留限制處理 + if job_info["keep_count"] > 0: + self._cleanup_scheduled_backups(job_id, job_info["keep_count"]) + + self.logger.info(f"排程備份 {job_id} 完成: {backup_name}") + return True + + except Exception as e: + self.logger.error(f"執行排程備份時出錯: {str(e)}") + # 記錄失敗的備份 + self.backup_history.append({ + "name": f"failed_scheduled_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}", + "date": datetime.datetime.now(), + "status": "failed", + "error": str(e), + "description": description, + "scheduled": True, + "job_id": job_id + }) + return False + + def _cleanup_scheduled_backups(self, job_id, keep_count): + """根據保留數量清理舊的排程備份""" + # 獲取與該排程關聯的所有備份 + job_backups = [] + for i, backup in enumerate(self.backups): + # 檢查元數據文件 + metadata_path = os.path.join(backup["path"], "backup_metadata.json") + if os.path.exists(metadata_path): + try: + with open(metadata_path, "r", encoding="utf-8") as f: + metadata = json.load(f) + + if metadata.get("backup_type") == "scheduled" and \ + metadata.get("schedule_info", {}).get("job_id") == job_id: + job_backups.append((i, backup)) + except Exception: + pass + + # 按日期排序 + job_backups.sort(key=lambda x: x[1]["date"], reverse=True) + + # 刪除超出保留數量的舊備份 + if len(job_backups) > keep_count: + for index, _ in job_backups[keep_count:]: + self.delete_backup(index) + + def cancel_scheduled_backup(self, job_id: str) -> bool: + """取消排程備份任務""" + if job_id not in self.scheduled_jobs: + self.logger.error(f"找不到排程任務: {job_id}") + return False + + try: + # 從schedule中移除任務 + schedule.clear(job_id) + + # 從字典中移除 + self.scheduled_jobs.pop(job_id) + + self.logger.info(f"已取消排程備份任務: {job_id}") + return True + + except Exception as e: + self.logger.error(f"取消排程備份時出錯: {str(e)}") + return False + + def get_db_info(self) -> Dict: + """獲取數據庫信息""" + if not self.source_db_path or not os.path.exists(self.source_db_path): + return {"status": "未設置有效的數據庫路徑"} + + try: + # 連接到數據庫 + conn = sqlite3.connect(os.path.join(self.source_db_path, "chroma.sqlite3")) + cursor = conn.cursor() + + # 獲取數據庫大小 + db_size = os.path.getsize(os.path.join(self.source_db_path, "chroma.sqlite3")) + + # 獲取表列表 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = cursor.fetchall() + + # 獲取每個表的行數 + table_counts = {} + for table in tables: + table_name = table[0] + cursor.execute(f"SELECT COUNT(*) FROM {table_name};") + count = cursor.fetchone()[0] + table_counts[table_name] = count + + # 獲取 embeddings 數量 (如果存在這樣的表) + embeddings_count = 0 + if "embeddings" in table_counts: + embeddings_count = table_counts["embeddings"] + + # 獲取最後修改時間 + last_modified = datetime.datetime.fromtimestamp( + os.path.getmtime(os.path.join(self.source_db_path, "chroma.sqlite3")) + ) + + # 獲取數據庫版本 + cursor.execute("PRAGMA user_version;") + db_version = cursor.fetchone()[0] + + conn.close() + + return { + "status": "ok", + "path": self.source_db_path, + "size": self._format_size(db_size), + "tables": table_counts, + "embeddings_count": embeddings_count, + "last_modified": last_modified.strftime("%Y-%m-%d %H:%M:%S"), + "db_version": db_version + } + + except Exception as e: + self.logger.error(f"獲取數據庫信息時出錯: {str(e)}") + return { + "status": "error", + "error": str(e), + "path": self.source_db_path + } + + def _format_size(self, size_bytes): + """格式化文件大小為人類可讀格式""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size_bytes < 1024.0: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024.0 + return f"{size_bytes:.2f} PB" + + def get_scheduled_jobs_info(self) -> List[Dict]: + """獲取所有排程任務的信息""" + jobs_info = [] + + for job_id, job_data in self.scheduled_jobs.items(): + job_info = { + "id": job_id, + "interval": job_data["interval"], + "description": job_data["description"], + "created": job_data["created"].strftime("%Y-%m-%d %H:%M:%S"), + "next_run": job_data["next_run"].strftime("%Y-%m-%d %H:%M:%S") if job_data["next_run"] else "未知", + "keep_count": job_data["keep_count"] + } + jobs_info.append(job_info) + + return jobs_info + + def run_scheduler(self): + """運行排程器,處理所有待執行的排程任務""" + schedule.run_pending() + + +class ChromaDBBackupUI: + """ChromaDB備份工具的使用者界面""" + + def __init__(self, root): + self.root = root + self.backup = ChromaDBBackup() + + # 設置視窗 + self.root.title("ChromaDB 備份工具") + self.root.geometry("1280x800") + self.setup_ui() + + # 默認主題 + self.current_theme = "darkly" # ttkbootstrap的深色主題 + + # 儲存配置 + self.config_path = os.path.join(str(Path.home()), ".chroma_backup_config.json") + self.config = self.load_config() + + # 應用保存的配置 + if self.config.get("last_source_db"): + self.source_db_var.set(self.config["last_source_db"]) + + if self.config.get("last_backup_dir"): + self.backup_dir_var.set(self.config["last_backup_dir"]) + self.load_directories() + + # 設置排程器執行器 + self.scheduler_running = True + self.scheduler_thread = threading.Thread(target=self.run_scheduler, daemon=True) + self.scheduler_thread.start() + + def setup_ui(self): + """設置使用者界面""" + # 創建主佈局 + self.main_frame = ttk.Frame(self.root, padding=10) + self.main_frame.pack(fill=BOTH, expand=YES) + + # 頂部面板(源數據庫和備份目錄設置) + self.top_panel = ttk.Frame(self.main_frame) + self.top_panel.pack(fill=X, pady=(0, 10)) + + # 左側面板(備份列表和操作) + self.left_panel = ttk.Frame(self.main_frame, width=400) + self.left_panel.pack(side=LEFT, fill=BOTH, expand=YES, padx=(0, 5)) + + # 右側面板(排程和統計) + self.right_panel = ttk.Frame(self.main_frame, width=300) + self.right_panel.pack(side=LEFT, fill=BOTH, padx=(5, 0)) + + # 設置頂部面板 + self.setup_directory_frame() + + # 設置左側面板 + self.setup_backups_frame() + + # 設置右側面板 + self.setup_schedule_frame() + self.setup_stats_frame() + + # 設置狀態欄 + self.setup_status_bar() + + # 設置菜單 + self.setup_menu() + + def setup_menu(self): + """設置選單列""" + menubar = tk.Menu(self.root) + self.root.config(menu=menubar) + + # 檔案選單 + file_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="檔案", menu=file_menu) + file_menu.add_command(label="選擇源數據庫...", command=self.browse_source_db) + file_menu.add_command(label="選擇備份目錄...", command=self.browse_backup_dir) + file_menu.add_separator() + file_menu.add_command(label="導入備份...", command=self.import_backup_dialog) + file_menu.add_command(label="導出備份...", command=self.export_backup_dialog) + file_menu.add_separator() + file_menu.add_command(label="離開", command=self.root.quit) + + # 備份選單 + backup_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="備份", menu=backup_menu) + backup_menu.add_command(label="創建新備份", command=self.create_backup_dialog) + backup_menu.add_command(label="還原備份...", command=self.restore_backup_dialog) + backup_menu.add_command(label="刪除備份...", command=self.delete_backup_dialog) + backup_menu.add_separator() + backup_menu.add_command(label="排程備份...", command=self.schedule_backup_dialog) + backup_menu.add_command(label="查看排程任務", command=self.view_scheduled_jobs) + + # 工具選單 + tools_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="工具", menu=tools_menu) + tools_menu.add_command(label="備份歷史", command=self.view_backup_history) + tools_menu.add_command(label="數據庫資訊", command=self.view_db_info) + tools_menu.add_separator() + tools_menu.add_command(label="打開備份閱讀器", command=self.open_backup_reader) + + # 檢視選單 + view_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="檢視", menu=view_menu) + view_menu.add_command(label="切換深色/淺色主題", command=self.toggle_theme) + view_menu.add_command(label="刷新", command=self.refresh_ui) + + # 說明選單 + help_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="說明", menu=help_menu) + help_menu.add_command(label="關於", command=self.show_about) + help_menu.add_command(label="查看日誌", command=self.open_log_file) + + def setup_directory_frame(self): + """設置目錄選擇框架""" + dir_frame = ttk.Frame(self.top_panel) + dir_frame.pack(fill=X) + + # 源數據庫選擇 + source_frame = ttk.LabelFrame(dir_frame, text="源數據庫", padding=10) + source_frame.pack(side=LEFT, fill=X, expand=YES, padx=(0, 5)) + + self.source_db_var = tk.StringVar() + + ttk.Entry(source_frame, textvariable=self.source_db_var).pack(side=LEFT, fill=X, expand=YES) + ttk.Button(source_frame, text="瀏覽", command=self.browse_source_db).pack(side=LEFT, padx=(5, 0)) + + # 備份目錄選擇 + backup_frame = ttk.LabelFrame(dir_frame, text="備份目錄", padding=10) + backup_frame.pack(side=LEFT, fill=X, expand=YES, padx=(5, 0)) + + self.backup_dir_var = tk.StringVar() + + ttk.Entry(backup_frame, textvariable=self.backup_dir_var).pack(side=LEFT, fill=X, expand=YES) + ttk.Button(backup_frame, text="瀏覽", command=self.browse_backup_dir).pack(side=LEFT, padx=(5, 0)) + + # 載入按鈕 + load_frame = ttk.Frame(self.top_panel) + load_frame.pack(fill=X, pady=5) + + ttk.Button( + load_frame, + text="載入目錄", + command=self.load_directories, + style="Accent.TButton" + ).pack(side=RIGHT) + + # 備份按鈕 + ttk.Button( + load_frame, + text="創建新備份", + command=self.create_backup_dialog, + style="success.TButton" + ).pack(side=RIGHT, padx=5) + + def setup_backups_frame(self): + """設置備份列表框架""" + backups_frame = ttk.LabelFrame(self.left_panel, text="備份列表", padding=10) + backups_frame.pack(fill=BOTH, expand=YES) + + # 工具欄 + toolbar = ttk.Frame(backups_frame) + toolbar.pack(fill=X, pady=(0, 5)) + + # 搜索欄 + self.backup_search_var = tk.StringVar() + self.backup_search_var.trace("w", self.filter_backups) + + ttk.Label(toolbar, text="搜索:").pack(side=LEFT) + ttk.Entry(toolbar, textvariable=self.backup_search_var).pack(side=LEFT, fill=X, expand=YES) + + ttk.Button(toolbar, text="刷新", command=self.refresh_backups).pack(side=RIGHT, padx=5) + + # 備份列表 + list_frame = ttk.Frame(backups_frame) + list_frame.pack(fill=BOTH, expand=YES) + + columns = ("name", "date", "size") + self.backups_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=15) + self.backups_tree.heading("name", text="名稱") + self.backups_tree.heading("date", text="日期") + self.backups_tree.heading("size", text="大小") + self.backups_tree.column("name", width=250) + self.backups_tree.column("date", width=150) + self.backups_tree.column("size", width=100) + + scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.backups_tree.yview) + self.backups_tree.configure(yscrollcommand=scrollbar.set) + + self.backups_tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=RIGHT, fill=Y) + + # 雙擊查看詳情 + self.backups_tree.bind("", self.view_backup_details) + + # 右鍵選單 + self.backup_context_menu = tk.Menu(self.backups_tree, tearoff=0) + self.backup_context_menu.add_command(label="查看詳情", command=self.view_backup_details_from_menu) + self.backup_context_menu.add_command(label="還原此備份", command=self.restore_selected_backup) + self.backup_context_menu.add_command(label="導出備份", command=self.export_selected_backup) + self.backup_context_menu.add_separator() + self.backup_context_menu.add_command(label="刪除備份", command=self.delete_selected_backup) + + self.backups_tree.bind("", self.show_backup_context_menu) + + # 操作按鈕 + action_frame = ttk.Frame(backups_frame) + action_frame.pack(fill=X, pady=(5, 0)) + + ttk.Button( + action_frame, + text="還原", + command=self.restore_selected_backup, + style="info.TButton" + ).pack(side=LEFT, padx=(0, 5)) + + ttk.Button( + action_frame, + text="刪除", + command=self.delete_selected_backup, + style="danger.TButton" + ).pack(side=LEFT) + + ttk.Button( + action_frame, + text="導出", + command=self.export_selected_backup + ).pack(side=LEFT, padx=(5, 0)) + + def setup_schedule_frame(self): + """設置排程框架""" + schedule_frame = ttk.LabelFrame(self.right_panel, text="排程備份", padding=10) + schedule_frame.pack(fill=X, pady=(0, 10)) + + # 快速排程按鈕 + quick_frame = ttk.Frame(schedule_frame) + quick_frame.pack(fill=X, pady=(0, 10)) + + ttk.Label(quick_frame, text="快速排程:").pack(side=LEFT) + + ttk.Button( + quick_frame, + text="每小時", + command=lambda: self.quick_schedule("hourly") + ).pack(side=LEFT, padx=5) + + ttk.Button( + quick_frame, + text="每日", + command=lambda: self.quick_schedule("daily") + ).pack(side=LEFT, padx=5) + + ttk.Button( + quick_frame, + text="每週", + command=lambda: self.quick_schedule("weekly") + ).pack(side=LEFT, padx=5) + + # 排程任務列表 + ttk.Label(schedule_frame, text="排程任務:").pack(anchor=W) + + jobs_frame = ttk.Frame(schedule_frame) + jobs_frame.pack(fill=BOTH, expand=YES) + + columns = ("interval", "next_run") + self.jobs_tree = ttk.Treeview(jobs_frame, columns=columns, show="headings", height=5) + self.jobs_tree.heading("interval", text="間隔") + self.jobs_tree.heading("next_run", text="下次執行") + self.jobs_tree.column("interval", width=100) + self.jobs_tree.column("next_run", width=150) + + scrollbar = ttk.Scrollbar(jobs_frame, orient=VERTICAL, command=self.jobs_tree.yview) + self.jobs_tree.configure(yscrollcommand=scrollbar.set) + + self.jobs_tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=RIGHT, fill=Y) + + # 排程操作按鈕 + actions_frame = ttk.Frame(schedule_frame) + actions_frame.pack(fill=X, pady=(5, 0)) + + ttk.Button( + actions_frame, + text="創建排程", + command=self.schedule_backup_dialog + ).pack(side=LEFT, padx=(0, 5)) + + ttk.Button( + actions_frame, + text="取消排程", + command=self.cancel_selected_job, + style="warning.TButton" + ).pack(side=LEFT) + + ttk.Button( + actions_frame, + text="立即執行", + command=self.run_selected_job + ).pack(side=RIGHT) + + def setup_stats_frame(self): + """設置統計信息框架""" + stats_frame = ttk.LabelFrame(self.right_panel, text="統計與資訊", padding=10) + stats_frame.pack(fill=BOTH, expand=YES) + + # 數據庫信息 + db_frame = ttk.Frame(stats_frame) + db_frame.pack(fill=X, pady=(0, 10)) + + ttk.Label(db_frame, text="數據庫概況", font=("TkDefaultFont", 10, "bold")).pack(anchor=W) + + self.db_info_var = tk.StringVar(value="未載入數據庫") + ttk.Label(db_frame, textvariable=self.db_info_var, wraplength=250).pack(anchor=W, pady=5) + + ttk.Button( + db_frame, + text="查看詳情", + command=self.view_db_info + ).pack(anchor=W) + + # 備份統計 + backup_stats_frame = ttk.Frame(stats_frame) + backup_stats_frame.pack(fill=X, pady=10) + + ttk.Label(backup_stats_frame, text="備份統計", font=("TkDefaultFont", 10, "bold")).pack(anchor=W) + + self.backup_stats_var = tk.StringVar(value="未載入備份") + ttk.Label(backup_stats_frame, textvariable=self.backup_stats_var, wraplength=250).pack(anchor=W, pady=5) + + # 圖表區域 + chart_frame = ttk.Frame(stats_frame) + chart_frame.pack(fill=BOTH, expand=YES) + + self.chart_container = ttk.Frame(chart_frame) + self.chart_container.pack(fill=BOTH, expand=YES) + + ttk.Button( + stats_frame, + text="查看備份歷史", + command=self.view_backup_history + ).pack(anchor=W, pady=(10, 0)) + + def setup_status_bar(self): + """設置狀態欄""" + status_frame = ttk.Frame(self.root) + status_frame.pack(side=BOTTOM, fill=X) + + self.status_var = tk.StringVar(value="就緒") + status_label = ttk.Label(status_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=W) + status_label.pack(fill=X) + + def browse_source_db(self): + """瀏覽選擇源數據庫目錄""" + directory = filedialog.askdirectory( + title="選擇ChromaDB源數據庫目錄", + initialdir=self.source_db_var.get() or str(Path.home()) + ) + + if directory: + self.source_db_var.set(directory) + + def browse_backup_dir(self): + """瀏覽選擇備份目錄""" + directory = filedialog.askdirectory( + title="選擇ChromaDB備份目錄", + initialdir=self.backup_dir_var.get() or str(Path.home()) + ) + + if directory: + self.backup_dir_var.set(directory) + + def load_directories(self): + """載入源數據庫和備份目錄""" + source_db = self.source_db_var.get() + backup_dir = self.backup_dir_var.get() + + if not source_db or not backup_dir: + messagebox.showwarning("警告", "請同時指定源數據庫和備份目錄") + return + + self.status_var.set("正在驗證目錄...") + self.root.update_idletasks() + + # 驗證源數據庫 + if not self.backup.set_source_db(source_db): + messagebox.showerror("錯誤", f"無效的ChromaDB源數據庫目錄: {source_db}") + self.status_var.set("載入失敗") + return + + # 設置備份目錄 + if not self.backup.set_backup_directory(backup_dir): + messagebox.showerror("錯誤", f"無法設置備份目錄: {backup_dir}") + self.status_var.set("載入失敗") + return + + # 保存配置 + self.config["last_source_db"] = source_db + self.config["last_backup_dir"] = backup_dir + self.save_config() + + # 更新UI + self.refresh_ui() + self.status_var.set("目錄已載入") + + def refresh_ui(self): + """刷新整個UI""" + self.refresh_backups() + self.refresh_scheduled_jobs() + self.update_stats() + self.update_chart() + + def refresh_backups(self): + """刷新備份列表""" + self.status_var.set("正在刷新備份列表...") + self.root.update_idletasks() + + # 重新掃描備份 + self.backup.scan_backups() + + # 清空現有列表 + for item in self.backups_tree.get_children(): + self.backups_tree.delete(item) + + # 添加備份 + for backup in self.backup.backups: + self.backups_tree.insert( + "", "end", + values=(backup["name"], backup["formatted_date"], backup["size"]) + ) + + self.status_var.set(f"已找到 {len(self.backup.backups)} 個備份") + + def filter_backups(self, *args): + """根據搜索條件過濾備份列表""" + search_text = self.backup_search_var.get().lower() + + # 清空列表 + for item in self.backups_tree.get_children(): + self.backups_tree.delete(item) + + # 添加匹配的備份 + for backup in self.backup.backups: + if search_text in backup["name"].lower() or search_text in backup["formatted_date"].lower(): + self.backups_tree.insert( + "", "end", + values=(backup["name"], backup["formatted_date"], backup["size"]) + ) + + def refresh_scheduled_jobs(self): + """刷新排程任務列表""" + # 清空現有列表 + for item in self.jobs_tree.get_children(): + self.jobs_tree.delete(item) + + # 添加排程任務 + for job in self.backup.get_scheduled_jobs_info(): + self.jobs_tree.insert( + "", "end", + iid=job["id"], # 使用任務ID作為樹項目ID + values=( + f"{job['interval']} ({job['description']})", + job["next_run"] + ) + ) + + def update_stats(self): + """更新統計信息""" + # 更新數據庫信息 + db_info = self.backup.get_db_info() + + if db_info["status"] == "ok": + info_text = f"路徑: {os.path.basename(db_info['path'])}\n" + info_text += f"大小: {db_info['size']}\n" + info_text += f"嵌入向量: {db_info['embeddings_count']}\n" + info_text += f"最後修改: {db_info['last_modified']}" + + self.db_info_var.set(info_text) + else: + self.db_info_var.set(f"錯誤: {db_info.get('error', '未知錯誤')}") + + # 更新備份統計 + if self.backup.backups: + # 計算總備份大小 + total_size = sum([os.path.getsize(os.path.join(backup["path"], "chroma.sqlite3")) + for backup in self.backup.backups + if os.path.exists(os.path.join(backup["path"], "chroma.sqlite3"))]) + + # 計算最新與最舊備份的日期差 + if len(self.backup.backups) >= 2: + newest = self.backup.backups[0]["date"] + oldest = self.backup.backups[-1]["date"] + date_range = (newest - oldest).days + else: + date_range = 0 + + stats_text = f"備份總數: {len(self.backup.backups)}\n" + stats_text += f"總大小: {self.backup._format_size(total_size)}\n" + stats_text += f"日期範圍: {date_range} 天\n" + + # 計算每月備份數量 + if len(self.backup.backups) > 0: + this_month = datetime.datetime.now().replace(day=1, hour=0, minute=0, second=0, microsecond=0) + month_count = len([b for b in self.backup.backups if b["date"] >= this_month]) + stats_text += f"本月備份: {month_count} 個" + + self.backup_stats_var.set(stats_text) + else: + self.backup_stats_var.set("尚無備份") + + def update_chart(self): + """更新圖表""" + # 清空圖表容器 + for widget in self.chart_container.winfo_children(): + widget.destroy() + + if not self.backup.backups: + return + + # 準備數據 + dates = [] + sizes = [] + + # 僅使用最近10個備份 + for backup in self.backup.backups[:10]: + db_file = os.path.join(backup["path"], "chroma.sqlite3") + if os.path.exists(db_file): + dates.append(backup["date"].strftime("%m-%d")) + sizes.append(os.path.getsize(db_file) / (1024 * 1024)) # 轉換為MB + + # 反轉列表,使日期按時間順序顯示 + dates.reverse() + sizes.reverse() + + if not dates: + return + + # 創建圖表 + fig = plt.Figure(figsize=(3, 2), dpi=100) + ax = fig.add_subplot(111) + + ax.plot(dates, sizes, 'o-', color='skyblue') + ax.set_xlabel('日期', fontsize=8) + ax.set_ylabel('大小 (MB)', fontsize=8) + ax.set_title('備份大小趨勢', fontsize=10) + + # 設置x軸標籤角度 + plt.setp(ax.get_xticklabels(), rotation=45, ha='right', fontsize=6) + ax.tick_params(axis='y', labelsize=6) + + fig.tight_layout() + + # 將圖表嵌入到tkinter視窗 + canvas = FigureCanvasTkAgg(fig, self.chart_container) + canvas.draw() + canvas.get_tk_widget().pack(fill=BOTH, expand=YES) + + def create_backup_dialog(self): + """顯示創建備份對話框""" + if not self.backup.source_db_path or not self.backup.backup_dir: + messagebox.showwarning("警告", "請先設置源數據庫和備份目錄") + return + + # 創建對話框 + dialog = tk.Toplevel(self.root) + dialog.title("創建新備份") + dialog.geometry("400x200") + dialog.resizable(False, False) + dialog.grab_set() # 模態對話框 + + # 對話框內容 + frame = ttk.Frame(dialog, padding=20) + frame.pack(fill=BOTH, expand=YES) + + ttk.Label(frame, text="備份描述:").pack(anchor=W, pady=(0, 5)) + + description_var = tk.StringVar() + description_entry = ttk.Entry(frame, textvariable=description_var, width=40) + description_entry.pack(fill=X, pady=(0, 20)) + + # 按鈕 + btn_frame = ttk.Frame(frame) + btn_frame.pack(fill=X) + + ttk.Button( + btn_frame, + text="取消", + command=dialog.destroy + ).pack(side=RIGHT) + + ttk.Button( + btn_frame, + text="創建備份", + style="Accent.TButton", + command=lambda: self.create_backup(description_var.get(), dialog) + ).pack(side=RIGHT, padx=5) + + # 設置焦點 + description_entry.focus_set() + + def create_backup(self, description, dialog): + """創建新備份""" + dialog.destroy() + + self.status_var.set("正在創建備份...") + self.root.update_idletasks() + + def backup_thread(): + success = self.backup.create_backup(description) + self.root.after(0, lambda: self.finalize_backup_creation(success)) + + threading.Thread(target=backup_thread).start() + + def finalize_backup_creation(self, success): + """完成備份創建""" + if success: + self.status_var.set("備份創建成功") + self.refresh_ui() + messagebox.showinfo("成功", "備份已成功創建") + else: + self.status_var.set("備份創建失敗") + messagebox.showerror("錯誤", "創建備份時發生錯誤,請查看日誌了解詳情") + + def view_backup_details(self, event=None): + """查看備份詳情""" + selection = self.backups_tree.selection() + if not selection: + return + + self.view_backup_details_from_menu() + + def view_backup_details_from_menu(self): + """從上下文選單查看備份詳情""" + selection = self.backups_tree.selection() + if not selection: + return + + # 獲取選定項的索引 + item_id = selection[0] + item_index = self.backups_tree.index(item_id) + + # 確保索引有效 + if item_index >= len(self.backup.backups): + return + + backup = self.backup.backups[item_index] + + # 創建詳情對話框 + dialog = tk.Toplevel(self.root) + dialog.title(f"備份詳情 - {backup['name']}") + dialog.geometry("500x400") + dialog.grab_set() + + frame = ttk.Frame(dialog, padding=20) + frame.pack(fill=BOTH, expand=YES) + + # 基本信息 + info_frame = ttk.Frame(frame) + info_frame.pack(fill=X, pady=(0, 20)) + + ttk.Label(info_frame, text="基本信息", font=("TkDefaultFont", 12, "bold")).pack(anchor=W) + + info_text = f"名稱: {backup['name']}\n" + info_text += f"建立日期: {backup['formatted_date']}\n" + info_text += f"大小: {backup['size']}\n" + info_text += f"路徑: {backup['path']}\n" + + # 檢查元數據文件 + metadata_path = os.path.join(backup['path'], "backup_metadata.json") + if os.path.exists(metadata_path): + try: + with open(metadata_path, "r", encoding="utf-8") as f: + metadata = json.load(f) + + if metadata.get("description"): + info_text += f"\n描述: {metadata['description']}\n" + + if metadata.get("backup_type"): + info_text += f"備份類型: {metadata['backup_type']}\n" + + if metadata.get("source_db"): + info_text += f"源數據庫: {metadata['source_db']}\n" + + if metadata.get("schedule_info"): + schedule_info = metadata["schedule_info"] + info_text += f"\n排程信息:\n" + info_text += f"間隔: {schedule_info.get('interval', '未知')}\n" + info_text += f"排程ID: {schedule_info.get('job_id', '未知')}\n" + except Exception: + info_text += "\n無法讀取元數據文件" + + info_label = ttk.Label(info_frame, text=info_text, justify=LEFT) + info_label.pack(anchor=W, pady=5) + + # 數據庫信息 + db_frame = ttk.Frame(frame) + db_frame.pack(fill=X) + + ttk.Label(db_frame, text="數據庫信息", font=("TkDefaultFont", 12, "bold")).pack(anchor=W) + + # 嘗試連接到備份的數據庫 + db_path = os.path.join(backup['path'], "chroma.sqlite3") + + if os.path.exists(db_path): + try: + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # 獲取表列表 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + tables = cursor.fetchall() + + db_text = "表結構:\n" + for table in tables: + table_name = table[0] + cursor.execute(f"SELECT COUNT(*) FROM {table_name};") + count = cursor.fetchone()[0] + db_text += f"- {table_name}: {count} 行\n" + + conn.close() + + ttk.Label(db_frame, text=db_text, justify=LEFT).pack(anchor=W, pady=5) + except Exception as e: + ttk.Label(db_frame, text=f"無法讀取數據庫: {str(e)}", justify=LEFT).pack(anchor=W, pady=5) + else: + ttk.Label(db_frame, text="數據庫文件不存在", justify=LEFT).pack(anchor=W, pady=5) + + # 按鈕 + btn_frame = ttk.Frame(frame) + btn_frame.pack(fill=X, pady=(20, 0)) + + ttk.Button( + btn_frame, + text="關閉", + command=dialog.destroy + ).pack(side=RIGHT) + + ttk.Button( + btn_frame, + text="還原此備份", + command=lambda: [dialog.destroy(), self.restore_selected_backup()] + ).pack(side=RIGHT, padx=5) + + def show_backup_context_menu(self, event): + """顯示備份上下文選單""" + selection = self.backups_tree.selection() + if selection: + self.backup_context_menu.post(event.x_root, event.y_root) + + def restore_backup_dialog(self): + """顯示還原備份對話框""" + selection = self.backups_tree.selection() + if not selection: + messagebox.showinfo("提示", "請先選擇要還原的備份") + return + + # 獲取選定項的索引 + item_id = selection[0] + item_index = self.backups_tree.index(item_id) + + # 確保索引有效 + if item_index >= len(self.backup.backups): + return + + backup = self.backup.backups[item_index] + + # 創建對話框 + dialog = tk.Toplevel(self.root) + dialog.title("還原備份") + dialog.geometry("500x250") + dialog.resizable(False, False) + dialog.grab_set() + + frame = ttk.Frame(dialog, padding=20) + frame.pack(fill=BOTH, expand=YES) + + ttk.Label( + frame, + text="還原選項", + font=("TkDefaultFont", 14, "bold") + ).pack(anchor=W, pady=(0, 10)) + + ttk.Label( + frame, + text=f"選定的備份: {backup['name']} ({backup['formatted_date']})" + ).pack(anchor=W, pady=(0, 20)) + + # 還原選項 + options_frame = ttk.Frame(frame) + options_frame.pack(fill=X, pady=(0, 20)) + + restore_option = tk.StringVar(value="source") + + ttk.Radiobutton( + options_frame, + text="還原到源數據庫位置", + variable=restore_option, + value="source" + ).pack(anchor=W, pady=2) + + ttk.Radiobutton( + options_frame, + text="還原到自訂位置", + variable=restore_option, + value="custom" + ).pack(anchor=W, pady=2) + + custom_frame = ttk.Frame(options_frame) + custom_frame.pack(fill=X, pady=(5, 0), padx=(20, 0)) + + custom_path_var = tk.StringVar() + + ttk.Entry(custom_frame, textvariable=custom_path_var).pack(side=LEFT, fill=X, expand=YES) + ttk.Button( + custom_frame, + text="瀏覽", + command=lambda: custom_path_var.set(filedialog.askdirectory( + title="選擇還原目標目錄", + initialdir=str(Path.home()) + )) + ).pack(side=LEFT, padx=(5, 0)) + + # 警告信息 + ttk.Label( + frame, + text="警告: 還原操作將覆蓋目標位置的現有數據。過程中會創建安全備份。", + foreground="red", + wraplength=460 + ).pack(anchor=W, pady=(0, 20)) + + # 按鈕 + btn_frame = ttk.Frame(frame) + btn_frame.pack(fill=X) + + ttk.Button( + btn_frame, + text="取消", + command=dialog.destroy + ).pack(side=RIGHT) + + ttk.Button( + btn_frame, + text="還原", + style="Accent.TButton", + command=lambda: self.restore_backup(item_index, restore_option.get(), custom_path_var.get(), dialog) + ).pack(side=RIGHT, padx=5) + + def restore_selected_backup(self): + """還原選中的備份""" + selection = self.backups_tree.selection() + if not selection: + messagebox.showinfo("提示", "請先選擇要還原的備份") + return + + self.restore_backup_dialog() + + def restore_backup(self, backup_index, option, custom_path, dialog): + """執行備份還原""" + dialog.destroy() + + # 確認還原路徑 + restore_path = None + if option == "source": + restore_path = self.backup.source_db_path + elif option == "custom" and custom_path: + restore_path = custom_path + else: + messagebox.showerror("錯誤", "請指定有效的還原目標路徑") + return + + # 確認還原操作 + if not messagebox.askyesno("確認還原", + f"確定要還原此備份到 {restore_path}?\n\n" + f"警告: 此操作將覆蓋目標位置的所有現有數據!"): + return + + self.status_var.set("正在還原備份...") + self.root.update_idletasks() + + def restore_thread(): + success = self.backup.restore_backup(backup_index, restore_path) + self.root.after(0, lambda: self.finalize_backup_restore(success, restore_path)) + + threading.Thread(target=restore_thread).start() + + def finalize_backup_restore(self, success, restore_path): + """完成備份還原""" + if success: + self.status_var.set("備份還原成功") + messagebox.showinfo("成功", f"備份已成功還原到 {restore_path}") + else: + self.status_var.set("備份還原失敗") + messagebox.showerror("錯誤", "還原備份時發生錯誤,請查看日誌了解詳情") + + def delete_backup_dialog(self): + """顯示刪除備份確認對話框""" + selection = self.backups_tree.selection() + if not selection: + messagebox.showinfo("提示", "請先選擇要刪除的備份") + return + + # 獲取選定項的索引 + item_id = selection[0] + item_index = self.backups_tree.index(item_id) + + # 確保索引有效 + if item_index >= len(self.backup.backups): + return + + backup = self.backup.backups[item_index] + + # 確認刪除 + if messagebox.askyesno("確認刪除", + f"確定要刪除備份 '{backup['name']}' ({backup['formatted_date']})?\n\n" + f"警告: 此操作無法撤銷!"): + self.delete_backup(item_index) + + def delete_selected_backup(self): + """刪除選中的備份""" + self.delete_backup_dialog() + + def delete_backup(self, backup_index): + """執行備份刪除""" + self.status_var.set("正在刪除備份...") + self.root.update_idletasks() + + success = self.backup.delete_backup(backup_index) + + if success: + self.status_var.set("備份已刪除") + self.refresh_ui() + else: + self.status_var.set("刪除備份失敗") + messagebox.showerror("錯誤", "刪除備份時發生錯誤") + + def export_backup_dialog(self): + """顯示導出備份對話框""" + selection = self.backups_tree.selection() + if not selection: + messagebox.showinfo("提示", "請先選擇要導出的備份") + return + + # 獲取選定項的索引 + item_id = selection[0] + item_index = self.backups_tree.index(item_id) + + # 確保索引有效 + if item_index >= len(self.backup.backups): + return + + backup = self.backup.backups[item_index] + + # 詢問保存位置 + file_path = filedialog.asksaveasfilename( + title="導出備份", + initialfile=f"{backup['name']}.zip", + defaultextension=".zip", + filetypes=[("ZIP文件", "*.zip")] + ) + + if file_path: + self.export_backup(item_index, file_path) + + def export_selected_backup(self): + """導出選中的備份""" + self.export_backup_dialog() + + def export_backup(self, backup_index, file_path): + """執行備份導出""" + self.status_var.set("正在導出備份...") + self.root.update_idletasks() + + def export_thread(): + success = self.backup.export_backup(backup_index, file_path) + self.root.after(0, lambda: self.finalize_backup_export(success, file_path)) + + threading.Thread(target=export_thread).start() + + def finalize_backup_export(self, success, file_path): + """完成備份導出""" + if success: + self.status_var.set("備份導出成功") + messagebox.showinfo("成功", f"備份已成功導出到 {file_path}") + else: + self.status_var.set("備份導出失敗") + messagebox.showerror("錯誤", "導出備份時發生錯誤") + + def import_backup_dialog(self): + """顯示導入備份對話框""" + # 詢問ZIP文件位置 + file_path = filedialog.askopenfilename( + title="導入備份", + filetypes=[("ZIP文件", "*.zip"), ("所有文件", "*.*")] + ) + + if file_path: + self.import_backup(file_path) + + def import_backup(self, file_path): + """執行備份導入""" + self.status_var.set("正在導入備份...") + self.root.update_idletasks() + + def import_thread(): + success = self.backup.import_backup(file_path) + self.root.after(0, lambda: self.finalize_backup_import(success)) + + threading.Thread(target=import_thread).start() + + def finalize_backup_import(self, success): + """完成備份導入""" + if success: + self.status_var.set("備份導入成功") + self.refresh_ui() + messagebox.showinfo("成功", "備份已成功導入") + else: + self.status_var.set("備份導入失敗") + messagebox.showerror("錯誤", "導入備份時發生錯誤,請確保ZIP文件包含有效的ChromaDB備份") + + def schedule_backup_dialog(self): + """顯示排程備份對話框""" + if not self.backup.source_db_path or not self.backup.backup_dir: + messagebox.showwarning("警告", "請先設置源數據庫和備份目錄") + return + + # 創建對話框 + dialog = tk.Toplevel(self.root) + dialog.title("排程備份") + dialog.geometry("450x450") # 增加高度確保所有元素可見 + dialog.resizable(False, False) + dialog.grab_set() + + # 使用主框架 + main_frame = ttk.Frame(dialog, padding=20) + main_frame.pack(fill=BOTH, expand=YES) + + # 標題 + ttk.Label( + main_frame, + text="排程設置", + font=("TkDefaultFont", 14, "bold") + ).pack(anchor=W, pady=(0, 15)) + + # 間隔選擇 + interval_frame = ttk.Frame(main_frame) + interval_frame.pack(fill=X, pady=(0, 15)) + + ttk.Label(interval_frame, text="備份間隔:").pack(anchor=W) + + interval_var = tk.StringVar(value="daily") + + intervals = [ + ("每小時", "hourly"), + ("每天", "daily"), + ("每週", "weekly"), + ("每月", "monthly") + ] + + for text, value in intervals: + ttk.Radiobutton( + interval_frame, + text=text, + variable=interval_var, + value=value + ).pack(anchor=W, padx=(20, 0), pady=2) + + # 描述 + ttk.Label(main_frame, text="備份描述:").pack(anchor=W, pady=(0, 5)) + + description_var = tk.StringVar(value="排程備份") + ttk.Entry(main_frame, textvariable=description_var, width=40).pack(fill=X, pady=(0, 15)) + + # 保留數量 + keep_frame = ttk.Frame(main_frame) + keep_frame.pack(fill=X, pady=(0, 15)) + + ttk.Label(keep_frame, text="最多保留備份數量:").pack(side=LEFT) + + keep_count_var = tk.StringVar(value="7") + ttk.Spinbox( + keep_frame, + from_=0, + to=100, + textvariable=keep_count_var, + width=5 + ).pack(side=LEFT, padx=(5, 0)) + + ttk.Label( + keep_frame, + text="(0表示不限制)" + ).pack(side=LEFT, padx=(5, 0)) + + # 分隔線 + ttk.Separator(main_frame, orient=HORIZONTAL).pack(fill=X, pady=15) + + # 底部按鈕區 - 使用標準按鈕並確保可見性 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=X, pady=(10, 5)) + + # 取消按鈕 - 使用標準樣式 + cancel_btn = ttk.Button( + btn_frame, + text="取消", + command=dialog.destroy, + width=12 + ) + cancel_btn.pack(side=LEFT, padx=(0, 10)) + + # 確認按鈕 - 使用標準樣式,避免自定義樣式可能的問題 + create_btn = ttk.Button( + btn_frame, + text="加入排程", + width=15, + command=lambda: self.create_schedule( + interval_var.get(), + description_var.get(), + keep_count_var.get(), + dialog + ) + ) + create_btn.pack(side=LEFT) + + # 額外提示以確保用戶知道如何完成操作 + note_frame = ttk.Frame(main_frame) + note_frame.pack(fill=X, pady=(15, 0)) + + ttk.Label( + note_frame, + text="請確保點擊「加入排程」按鈕完成設置", + foreground="blue" + ).pack() + + def create_schedule(self, interval, description, keep_count_str, dialog): + """創建備份排程""" + dialog.destroy() + + try: + keep_count = int(keep_count_str) + except ValueError: + keep_count = 0 + + success = self.backup.schedule_backup(interval, description, keep_count) + + if success: + self.status_var.set(f"已創建 {interval} 備份排程") + self.refresh_scheduled_jobs() + messagebox.showinfo("成功", f"已成功創建 {interval} 備份排程") + else: + self.status_var.set("創建排程失敗") + messagebox.showerror("錯誤", "無法創建備份排程") + + def quick_schedule(self, interval): + """快速創建排程備份""" + if not self.backup.source_db_path or not self.backup.backup_dir: + messagebox.showwarning("警告", "請先設置源數據庫和備份目錄") + return + + # 根據間隔設置描述和保留數量 + if interval == "hourly": + description = "每小時自動備份" + keep_count = 24 + elif interval == "daily": + description = "每日自動備份" + keep_count = 7 + elif interval == "weekly": + description = "每週自動備份" + keep_count = 4 + else: + description = "自動備份" + keep_count = 5 + + # 確認創建 + if messagebox.askyesno("確認", f"確定要創建 {description} 排程?\n\n將保留最新的 {keep_count} 個備份"): + success = self.backup.schedule_backup(interval, description, keep_count) + + if success: + self.status_var.set(f"已創建 {interval} 備份排程") + self.refresh_scheduled_jobs() + messagebox.showinfo("成功", f"已成功創建 {interval} 備份排程") + else: + self.status_var.set("創建排程失敗") + messagebox.showerror("錯誤", "無法創建備份排程") + + def cancel_selected_job(self): + """取消選中的排程任務""" + selection = self.jobs_tree.selection() + if not selection: + messagebox.showinfo("提示", "請先選擇要取消的排程任務") + return + + # 獲取任務ID + job_id = selection[0] + + # 確認取消 + if messagebox.askyesno("確認", "確定要取消此排程任務?"): + success = self.backup.cancel_scheduled_backup(job_id) + + if success: + self.status_var.set("已取消排程任務") + self.refresh_scheduled_jobs() + else: + self.status_var.set("取消排程任務失敗") + messagebox.showerror("錯誤", "無法取消排程任務") + + def run_selected_job(self): + """立即執行選中的排程任務""" + selection = self.jobs_tree.selection() + if not selection: + messagebox.showinfo("提示", "請先選擇要執行的排程任務") + return + + # 獲取任務ID + job_id = selection[0] + + # 確認執行 + if messagebox.askyesno("確認", "確定要立即執行此排程任務?"): + # 獲取任務信息 + job_info = self.backup.scheduled_jobs.get(job_id) + if not job_info: + messagebox.showerror("錯誤", "找不到排程任務信息") + return + + self.status_var.set("正在執行排程備份...") + self.root.update_idletasks() + + def run_job_thread(): + success = self.backup._run_scheduled_backup( + job_id, + job_info["description"], + job_info["interval"] + ) + self.root.after(0, lambda: self.finalize_job_execution(success)) + + threading.Thread(target=run_job_thread).start() + + def finalize_job_execution(self, success): + """完成排程任務執行""" + if success: + self.status_var.set("排程備份執行完成") + self.refresh_ui() + messagebox.showinfo("成功", "排程備份任務已成功執行") + else: + self.status_var.set("排程備份執行失敗") + messagebox.showerror("錯誤", "執行排程備份時發生錯誤") + + def view_scheduled_jobs(self): + """查看所有排程任務""" + jobs = self.backup.get_scheduled_jobs_info() + + if not jobs: + messagebox.showinfo("排程任務", "當前沒有活動的排程任務") + return + + # 創建對話框 + dialog = tk.Toplevel(self.root) + dialog.title("排程任務列表") + dialog.geometry("600x400") + dialog.grab_set() + + frame = ttk.Frame(dialog, padding=20) + frame.pack(fill=BOTH, expand=YES) + + ttk.Label( + frame, + text="排程備份任務", + font=("TkDefaultFont", 14, "bold") + ).pack(anchor=W, pady=(0, 15)) + + # 創建表格 + columns = ("id", "interval", "description", "next_run", "keep_count") + tree = ttk.Treeview(frame, columns=columns, show="headings", height=10) + + tree.heading("id", text="任務ID") + tree.heading("interval", text="間隔") + tree.heading("description", text="描述") + tree.heading("next_run", text="下次執行") + tree.heading("keep_count", text="保留數量") + + tree.column("id", width=150) + tree.column("interval", width=80) + tree.column("description", width=150) + tree.column("next_run", width=150) + tree.column("keep_count", width=80) + + # 添加數據 + for job in jobs: + tree.insert( + "", "end", + values=( + job["id"], + job["interval"], + job["description"], + job["next_run"], + job["keep_count"] + ) + ) + + # 添加滾動條 + scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=tree.yview) + tree.configure(yscrollcommand=scrollbar.set) + + tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=RIGHT, fill=Y) + + # 按鈕 + btn_frame = ttk.Frame(dialog) + btn_frame.pack(fill=X, pady=10, padx=20) + + ttk.Button( + btn_frame, + text="關閉", + command=dialog.destroy + ).pack(side=RIGHT) + + ttk.Button( + btn_frame, + text="新增排程", + command=lambda: [dialog.destroy(), self.schedule_backup_dialog()] + ).pack(side=RIGHT, padx=5) + + def view_backup_history(self): + """查看備份歷史""" + history = self.backup.backup_history + + # 創建對話框 + dialog = tk.Toplevel(self.root) + dialog.title("備份歷史") + dialog.geometry("600x400") + dialog.grab_set() + + frame = ttk.Frame(dialog, padding=20) + frame.pack(fill=BOTH, expand=YES) + + ttk.Label( + frame, + text="備份操作歷史", + font=("TkDefaultFont", 14, "bold") + ).pack(anchor=W, pady=(0, 15)) + + # 創建表格 + columns = ("date", "name", "status", "description") + tree = ttk.Treeview(frame, columns=columns, show="headings", height=10) + + tree.heading("date", text="日期") + tree.heading("name", text="名稱") + tree.heading("status", text="狀態") + tree.heading("description", text="描述") + + tree.column("date", width=150) + tree.column("name", width=200) + tree.column("status", width=80) + tree.column("description", width=200) + + # 添加數據 + for entry in sorted(history, key=lambda x: x["date"], reverse=True): + tree.insert( + "", "end", + values=( + entry["date"].strftime("%Y-%m-%d %H:%M:%S"), + entry["name"], + entry["status"], + entry.get("description", "") + ), + tags=(entry["status"],) + ) + + # 設置標籤顏色 + tree.tag_configure("success", background="#e6ffe6") + tree.tag_configure("failed", background="#ffe6e6") + + # 添加滾動條 + scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=tree.yview) + tree.configure(yscrollcommand=scrollbar.set) + + tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=RIGHT, fill=Y) + + # 按鈕 + btn_frame = ttk.Frame(dialog) + btn_frame.pack(fill=X, pady=10, padx=20) + + ttk.Button( + btn_frame, + text="關閉", + command=dialog.destroy + ).pack(side=RIGHT) + + def view_db_info(self): + """查看數據庫詳細信息""" + if not self.backup.source_db_path: + messagebox.showinfo("提示", "請先設置源數據庫") + return + + info = self.backup.get_db_info() + + # 創建對話框 + dialog = tk.Toplevel(self.root) + dialog.title("數據庫信息") + dialog.geometry("500x400") + dialog.grab_set() + + frame = ttk.Frame(dialog, padding=20) + frame.pack(fill=BOTH, expand=YES) + + ttk.Label( + frame, + text="數據庫詳細信息", + font=("TkDefaultFont", 14, "bold") + ).pack(anchor=W, pady=(0, 15)) + + if info["status"] == "ok": + # 基本信息 + basic_frame = ttk.LabelFrame(frame, text="基本信息", padding=10) + basic_frame.pack(fill=X, pady=(0, 10)) + + basic_text = f"路徑: {info['path']}\n" + basic_text += f"大小: {info['size']}\n" + basic_text += f"最後修改: {info['last_modified']}\n" + basic_text += f"數據庫版本: {info['db_version']}" + + ttk.Label(basic_frame, text=basic_text, justify=LEFT).pack(anchor=W) + + # 表格信息 + tables_frame = ttk.LabelFrame(frame, text="表格信息", padding=10) + tables_frame.pack(fill=BOTH, expand=YES, pady=(0, 10)) + + # 創建表格 + columns = ("table", "count") + tree = ttk.Treeview(tables_frame, columns=columns, show="headings", height=8) + + tree.heading("table", text="表名") + tree.heading("count", text="行數") + + tree.column("table", width=200) + tree.column("count", width=100) + + # 添加數據 + for table, count in info["tables"].items(): + tree.insert( + "", "end", + values=(table, count) + ) + + # 添加滾動條 + scrollbar = ttk.Scrollbar(tables_frame, orient=VERTICAL, command=tree.yview) + tree.configure(yscrollcommand=scrollbar.set) + + tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=RIGHT, fill=Y) + + # 嵌入向量信息 + embeddings_frame = ttk.LabelFrame(frame, text="嵌入向量", padding=10) + embeddings_frame.pack(fill=X) + + embedding_text = f"嵌入向量數量: {info['embeddings_count']}" + + ttk.Label(embeddings_frame, text=embedding_text, justify=LEFT).pack(anchor=W) + else: + # 錯誤信息 + error_text = f"獲取數據庫信息時出錯:\n{info.get('error', '未知錯誤')}" + + ttk.Label(frame, text=error_text, foreground="red").pack(anchor=W) + + # 按鈕 + btn_frame = ttk.Frame(dialog) + btn_frame.pack(fill=X, pady=10) + + ttk.Button( + btn_frame, + text="關閉", + command=dialog.destroy + ).pack(side=RIGHT) + + ttk.Button( + btn_frame, + text="刷新", + command=lambda: [dialog.destroy(), self.view_db_info()] + ).pack(side=RIGHT, padx=5) + + def toggle_theme(self): + """切換深色/淺色主題""" + if self.current_theme == "darkly": + self.current_theme = "cosmo" # 淺色主題 + ttk.Style().theme_use("cosmo") + else: + self.current_theme = "darkly" # 深色主題 + ttk.Style().theme_use("darkly") + + # 保存配置 + self.config["theme"] = self.current_theme + self.save_config() + + def show_about(self): + """顯示關於對話框""" + about_text = "ChromaDB 備份工具\n\n" + about_text += "版本: 1.0.0\n\n" + about_text += "這是一個用於備份和管理ChromaDB數據庫的工具,支持手動和排程備份、還原、導入/導出等功能。\n\n" + about_text += "功能包括:\n" + about_text += "- 手動和排程備份\n" + about_text += "- 備份還原\n" + about_text += "- 備份導入/導出\n" + about_text += "- 備份管理\n" + about_text += "- 數據庫統計\n" + + messagebox.showinfo("關於", about_text) + + def open_log_file(self): + """打開日誌文件""" + log_path = "chroma_backup.log" + + if os.path.exists(log_path): + # 創建日誌查看器窗口 + log_window = tk.Toplevel(self.root) + log_window.title("日誌查看器") + log_window.geometry("800x600") + + frame = ttk.Frame(log_window, padding=10) + frame.pack(fill=BOTH, expand=YES) + + # 添加日誌內容 + text_area = tk.Text(frame, wrap=tk.WORD) + + try: + with open(log_path, "r", encoding="utf-8") as f: + log_content = f.read() + except UnicodeDecodeError: + try: + with open(log_path, "r", encoding="gbk") as f: + log_content = f.read() + except: + log_content = "無法讀取日誌文件" + + text_area.insert(tk.END, log_content) + text_area.config(state=tk.DISABLED) + + scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) + text_area.configure(yscrollcommand=scrollbar.set) + + text_area.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=LEFT, fill=Y) + + # 添加刷新和清空按鈕 + button_frame = ttk.Frame(log_window) + button_frame.pack(fill=X, pady=10) + + ttk.Button( + button_frame, + text="刷新", + command=lambda: self.refresh_log_view(text_area, log_path) + ).pack(side=LEFT, padx=5) + + ttk.Button( + button_frame, + text="清空日誌", + command=lambda: self.clear_log_file(text_area, log_path) + ).pack(side=LEFT, padx=5) + else: + messagebox.showinfo("提示", "日誌文件不存在") + + def refresh_log_view(self, text_area, log_path): + """刷新日誌查看器內容""" + try: + with open(log_path, "r", encoding="utf-8") as f: + log_content = f.read() + except UnicodeDecodeError: + try: + with open(log_path, "r", encoding="gbk") as f: + log_content = f.read() + except: + log_content = "無法讀取日誌文件" + + text_area.config(state=tk.NORMAL) + text_area.delete("1.0", tk.END) + text_area.insert(tk.END, log_content) + text_area.config(state=tk.DISABLED) + + def clear_log_file(self, text_area, log_path): + """清空日誌文件""" + if messagebox.askyesno("確認", "確定要清空日誌文件嗎?"): + try: + with open(log_path, "w") as f: + f.write("") + + text_area.config(state=tk.NORMAL) + text_area.delete("1.0", tk.END) + text_area.config(state=tk.DISABLED) + + messagebox.showinfo("成功", "日誌文件已清空") + except Exception as e: + messagebox.showerror("錯誤", f"清空日誌文件時出錯: {str(e)}") + + def open_backup_reader(self): + """打開備份閱讀器""" + try: + import subprocess + import sys + + # 啟動備份閱讀器 + subprocess.Popen([sys.executable, "chroma_view2.py"]) + + self.status_var.set("已啟動備份閱讀器") + except Exception as e: + self.status_var.set("啟動備份閱讀器失敗") + messagebox.showerror("錯誤", f"無法啟動備份閱讀器: {str(e)}") + + def load_config(self): + """載入配置""" + default_config = { + "last_source_db": "", + "last_backup_dir": "", + "theme": "darkly" + } + + if os.path.exists(self.config_path): + try: + with open(self.config_path, "r", encoding="utf-8") as f: + return json.load(f) + except: + return default_config + + return default_config + + def save_config(self): + """保存配置""" + try: + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(self.config, f, indent=4) + except Exception as e: + self.backup.logger.error(f"保存配置時出錯: {str(e)}") + + def run_scheduler(self): + """運行排程器線程""" + while self.scheduler_running: + self.backup.run_scheduler() + time.sleep(1) + + +def main(): + """程序入口點""" + root = ttk.Window(themename="darkly") + app = ChromaDBBackupUI(root) + root.mainloop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tools/chroma_view.py b/tools/chroma_view.py new file mode 100644 index 0000000..0c627df --- /dev/null +++ b/tools/chroma_view.py @@ -0,0 +1,1253 @@ +import os +import tkinter as tk +from tkinter import filedialog, messagebox +import json +import chromadb +import datetime +import pandas as pd +import threading +from pathlib import Path +import matplotlib.pyplot as plt +from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg +import ttkbootstrap as ttk +from ttkbootstrap.constants import * +from ttkbootstrap.scrolled import ScrolledFrame +import numpy as np +import logging +from typing import List, Dict, Any, Optional, Union, Tuple + +class ChromaDBReader: + """ChromaDB備份讀取器的主數據模型""" + + def __init__(self): + self.backups_dir = "" + self.backups = [] # 所有備份的列表 + self.current_backup = None # 當前選擇的備份 + self.current_collection = None # 當前選擇的集合 + self.collection_names = [] # 當前備份中的集合列表 + self.query_results = [] # 當前查詢結果 + self.chroma_client = None # ChromaDB客戶端 + + # 設置日誌 + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler("chroma_reader.log", encoding='utf-8'), + logging.StreamHandler() + ] + ) + self.logger = logging.getLogger("ChromaDBReader") + + def set_backups_directory(self, directory_path: str) -> bool: + """設置備份目錄並掃描備份""" + if not os.path.exists(directory_path): + self.logger.error(f"備份目錄不存在: {directory_path}") + return False + + self.backups_dir = directory_path + return self.scan_backups() + + def scan_backups(self) -> bool: + """掃描備份目錄中的所有備份""" + self.backups = [] + + try: + # 查找所有以chroma_backup_開頭的目錄 + for item in os.listdir(self.backups_dir): + item_path = os.path.join(self.backups_dir, item) + if os.path.isdir(item_path) and item.startswith("chroma_backup_"): + # 提取備份日期時間 + try: + date_str = item.replace("chroma_backup_", "") + date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d_%H-%M-%S") + + backup_info = { + "name": item, + "path": item_path, + "date": date_obj, + "formatted_date": date_obj.strftime("%Y年%m月%d日 %H:%M:%S") + } + + # 檢查是否是有效的ChromaDB目錄 + if self._is_valid_chroma_backup(item_path): + self.backups.append(backup_info) + except Exception as e: + self.logger.warning(f"無法解析備份 {item}: {str(e)}") + + # 按日期排序,最新的排在前面 + self.backups.sort(key=lambda x: x["date"], reverse=True) + self.logger.info(f"找到 {len(self.backups)} 個備份") + return True + + except Exception as e: + self.logger.error(f"掃描備份時出錯: {str(e)}") + return False + + def _is_valid_chroma_backup(self, backup_path: str) -> bool: + """檢查目錄是否為有效的ChromaDB備份""" + # 檢查是否存在關鍵ChromaDB文件 + sqlite_path = os.path.join(backup_path, "chroma.sqlite3") + return os.path.exists(sqlite_path) + + def load_backup(self, backup_index: int) -> bool: + """加載指定的備份""" + if backup_index < 0 or backup_index >= len(self.backups): + self.logger.error(f"無效的備份索引: {backup_index}") + return False + + try: + self.current_backup = self.backups[backup_index] + backup_path = self.current_backup["path"] + + # 初始化ChromaDB客戶端 + self.chroma_client = chromadb.PersistentClient(path=backup_path) + + # 獲取所有集合名稱 + self.collection_names = self.chroma_client.list_collections() + self.current_collection = None + self.query_results = [] + + self.logger.info(f"已加載備份: {self.current_backup['name']}") + self.logger.info(f"找到 {len(self.collection_names)} 個集合") + return True + + except Exception as e: + self.logger.error(f"加載備份時出錯: {str(e)}") + self.current_backup = None + self.chroma_client = None + self.collection_names = [] + return False + + def load_collection(self, collection_name: str) -> bool: + """加載指定的集合""" + if not self.chroma_client or not collection_name: + return False + + try: + self.current_collection = self.chroma_client.get_collection(collection_name) + self.logger.info(f"已加載集合: {collection_name}") + return True + except Exception as e: + self.logger.error(f"加載集合時出錯: {str(e)}") + self.current_collection = None + return False + + def execute_query(self, query_text: str, n_results: int = 5) -> List[Dict]: + """執行查詢並返回結果""" + if not self.current_collection or not query_text: + return [] + + try: + results = self.current_collection.query( + query_texts=[query_text], + n_results=n_results + ) + + # 轉換結果為更易用的格式 + processed_results = [] + for i, (doc_id, document, metadata, distance) in enumerate(zip( + results['ids'][0], + results['documents'][0], + results['metadatas'][0] if 'metadatas' in results and results['metadatas'][0] else [{}] * len(results['ids'][0]), + results['distances'][0] if 'distances' in results else [0] * len(results['ids'][0]) + )): + # 計算相似度分數 (將距離轉換為相似度: 1 - 歸一化距離) + # 注意: 根據ChromaDB使用的距離度量可能需要調整 + similarity = 1.0 - min(distance, 1.0) # 確保值在0-1之間 + + processed_results.append({ + "rank": i + 1, + "id": doc_id, + "document": document, + "metadata": metadata, + "similarity": similarity, + "distance": distance + }) + + self.query_results = processed_results + self.logger.info(f"查詢完成,找到 {len(processed_results)} 個結果") + return processed_results + + except Exception as e: + self.logger.error(f"執行查詢時出錯: {str(e)}") + self.query_results = [] + return [] + + def get_collection_info(self, collection_name: str) -> Dict: + """獲取集合的詳細信息""" + if not self.chroma_client: + return {} + + try: + collection = self.chroma_client.get_collection(collection_name) + count = collection.count() + + # 獲取一個樣本來確定向量維度 + sample = collection.peek(1) + dimension = len(sample['embeddings'][0]) if 'embeddings' in sample and sample['embeddings'] else "未知" + + return { + "name": collection_name, + "document_count": count, + "dimension": dimension + } + except Exception as e: + self.logger.error(f"獲取集合信息時出錯: {str(e)}") + return { + "name": collection_name, + "document_count": "未知", + "dimension": "未知" + } + + def export_results(self, file_path: str, format: str = "csv") -> bool: + """導出查詢結果""" + if not self.query_results: + return False + + try: + df = pd.DataFrame(self.query_results) + + # 根據格式導出 + if format.lower() == "csv": + df.to_csv(file_path, index=False, encoding='utf-8-sig') + elif format.lower() == "json": + df.to_json(file_path, orient='records', force_ascii=False, indent=4) + elif format.lower() == "excel": + df.to_excel(file_path, index=False) + else: + return False + + self.logger.info(f"結果已導出到: {file_path}") + return True + except Exception as e: + self.logger.error(f"導出結果時出錯: {str(e)}") + return False + + +class ChromaDBReaderUI: + """ChromaDB備份讀取器的用戶界面""" + + def __init__(self, root): + self.root = root + self.reader = ChromaDBReader() + + # 設置窗口 + self.root.title("ChromaDB 備份讀取器") + self.root.geometry("1280x800") + self.setup_ui() + + # 默認主題 + self.current_theme = "darkly" # ttkbootstrap的深色主題 + + # 存儲配置 + self.config_path = os.path.join(str(Path.home()), ".chroma_reader_config.json") + self.config = self.load_config() + + # 應用保存的配置 + if self.config.get("last_backups_dir"): + self.backups_dir_var.set(self.config["last_backups_dir"]) + self.load_backups_directory() + + def setup_ui(self): + """設置用戶界面""" + # 創建主佈局 + self.main_frame = ttk.Frame(self.root, padding=10) + self.main_frame.pack(fill=BOTH, expand=YES) + + # 左側面板 (備份和集合選擇) + self.left_panel = ttk.Frame(self.main_frame, width=300) + self.left_panel.pack(side=LEFT, fill=Y, padx=(0, 10)) + + # 右側面板 (查詢和結果) + self.right_panel = ttk.Frame(self.main_frame) + self.right_panel.pack(side=LEFT, fill=BOTH, expand=YES) + + # 設置左側面板 + self.setup_directory_frame() + self.setup_backups_frame() + self.setup_collections_frame() + + # 設置右側面板 + self.setup_query_frame() + self.setup_results_frame() + + # 設置狀態欄 + self.setup_status_bar() + + # 設置菜單 + self.setup_menu() + + def setup_menu(self): + """設置菜單欄""" + menubar = tk.Menu(self.root) + self.root.config(menu=menubar) + + # 文件菜單 + file_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="文件", menu=file_menu) + file_menu.add_command(label="選擇備份目錄", command=self.browse_directory) + file_menu.add_command(label="刷新備份列表", command=self.refresh_backups) + file_menu.add_separator() + file_menu.add_command(label="導出結果...", command=self.export_results_dialog) + file_menu.add_separator() + file_menu.add_command(label="退出", command=self.root.quit) + + # 視圖菜單 + view_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="視圖", menu=view_menu) + view_menu.add_command(label="切換深色/淺色主題", command=self.toggle_theme) + + # 幫助菜單 + help_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="幫助", menu=help_menu) + help_menu.add_command(label="關於", command=self.show_about) + help_menu.add_command(label="查看日誌", command=self.open_log_file) + + def setup_directory_frame(self): + """設置目錄選擇框架""" + dir_frame = ttk.LabelFrame(self.left_panel, text="備份目錄", padding=10) + dir_frame.pack(fill=X, pady=(0, 10)) + + self.backups_dir_var = tk.StringVar() + + ttk.Entry(dir_frame, textvariable=self.backups_dir_var).pack(side=LEFT, fill=X, expand=YES) + ttk.Button(dir_frame, text="瀏覽", command=self.browse_directory).pack(side=LEFT, padx=(5, 0)) + ttk.Button(dir_frame, text="載入", command=self.load_backups_directory).pack(side=LEFT, padx=(5, 0)) + + def setup_backups_frame(self): + """設置備份列表框架""" + backups_frame = ttk.LabelFrame(self.left_panel, text="備份列表", padding=10) + backups_frame.pack(fill=BOTH, expand=YES, pady=(0, 10)) + + # 備份搜索 + search_frame = ttk.Frame(backups_frame) + search_frame.pack(fill=X, pady=(0, 5)) + + self.backup_search_var = tk.StringVar() + self.backup_search_var.trace("w", self.filter_backups) + + ttk.Label(search_frame, text="搜索:").pack(side=LEFT) + ttk.Entry(search_frame, textvariable=self.backup_search_var).pack(side=LEFT, fill=X, expand=YES) + + # 備份列表 + list_frame = ttk.Frame(backups_frame) + list_frame.pack(fill=BOTH, expand=YES) + + columns = ("name", "date") + self.backups_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10) + self.backups_tree.heading("name", text="名稱") + self.backups_tree.heading("date", text="日期") + self.backups_tree.column("name", width=100) + self.backups_tree.column("date", width=150) + + scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.backups_tree.yview) + self.backups_tree.configure(yscrollcommand=scrollbar.set) + + self.backups_tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=LEFT, fill=Y) + + self.backups_tree.bind("<>", self.on_backup_selected) + + def setup_collections_frame(self): + """設置集合列表框架""" + collections_frame = ttk.LabelFrame(self.left_panel, text="集合列表", padding=10) + collections_frame.pack(fill=BOTH, expand=YES) + + # 集合搜索 + search_frame = ttk.Frame(collections_frame) + search_frame.pack(fill=X, pady=(0, 5)) + + self.collection_search_var = tk.StringVar() + self.collection_search_var.trace("w", self.filter_collections) + + ttk.Label(search_frame, text="搜索:").pack(side=LEFT) + ttk.Entry(search_frame, textvariable=self.collection_search_var).pack(side=LEFT, fill=X, expand=YES) + + # 集合列表 + list_frame = ttk.Frame(collections_frame) + list_frame.pack(fill=BOTH, expand=YES) + + columns = ("name", "count") + self.collections_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10) + self.collections_tree.heading("name", text="名稱") + self.collections_tree.heading("count", text="文檔數") + self.collections_tree.column("name", width=150) + self.collections_tree.column("count", width=100) + + scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.collections_tree.yview) + self.collections_tree.configure(yscrollcommand=scrollbar.set) + + self.collections_tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=LEFT, fill=Y) + + self.collections_tree.bind("<>", self.on_collection_selected) + + def setup_query_frame(self): + """設置查詢框架""" + query_frame = ttk.LabelFrame(self.right_panel, text="查詢", padding=10) + query_frame.pack(fill=X, pady=(0, 10)) + + # 查詢文本輸入 + ttk.Label(query_frame, text="查詢文本:").pack(anchor=W) + self.query_text = tk.Text(query_frame, height=4, width=50) + self.query_text.pack(fill=X, pady=5) + + # 查詢參數 + params_frame = ttk.Frame(query_frame) + params_frame.pack(fill=X) + + ttk.Label(params_frame, text="結果數量:").pack(side=LEFT) + self.n_results_var = tk.StringVar(value="5") + ttk.Spinbox(params_frame, from_=1, to=100, textvariable=self.n_results_var, width=5).pack(side=LEFT, padx=(5, 20)) + + # 查詢按鈕 + ttk.Button( + query_frame, + text="執行查詢", + command=self.execute_query, + style="Accent.TButton" + ).pack(pady=10) + + def setup_results_frame(self): + """設置結果顯示框架""" + self.results_notebook = ttk.Notebook(self.right_panel) + self.results_notebook.pack(fill=BOTH, expand=YES) + + # 列表視圖 - 使用標準 Frame 作為容器 + list_frame = ttk.Frame(self.results_notebook) + self.results_notebook.add(list_frame, text="列表視圖") + self.list_view = ttk.Frame(list_frame) + self.list_view.pack(fill=BOTH, expand=YES) + + # 詳細視圖 - 使用標準 Frame 作為容器 + detail_frame = ttk.Frame(self.results_notebook) + self.results_notebook.add(detail_frame, text="詳細視圖") + self.detail_view = ttk.Frame(detail_frame) + self.detail_view.pack(fill=BOTH, expand=YES) + + # 可視化視圖 + self.visual_view = ttk.Frame(self.results_notebook) + self.results_notebook.add(self.visual_view, text="可視化") + + # 比較視圖 + self.compare_view = ttk.Frame(self.results_notebook) + self.results_notebook.add(self.compare_view, text="比較視圖") + + def setup_status_bar(self): + """設置狀態欄""" + status_frame = ttk.Frame(self.root) + status_frame.pack(side=BOTTOM, fill=X) + + self.status_var = tk.StringVar(value="就緒") + status_label = ttk.Label(status_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=W) + status_label.pack(fill=X) + + def browse_directory(self): + """瀏覽選擇備份目錄""" + directory = filedialog.askdirectory( + title="選擇ChromaDB備份目錄", + initialdir=self.backups_dir_var.get() or str(Path.home()) + ) + + if directory: + self.backups_dir_var.set(directory) + self.load_backups_directory() + + def load_backups_directory(self): + """加載備份目錄""" + directory = self.backups_dir_var.get() + if not directory: + return + + self.status_var.set("正在掃描備份...") + self.root.update_idletasks() + + if self.reader.set_backups_directory(directory): + self.refresh_backups_list() + self.status_var.set(f"已找到 {len(self.reader.backups)} 個備份") + + # 保存配置 + self.config["last_backups_dir"] = directory + self.save_config() + else: + self.status_var.set("無法掃描備份目錄") + messagebox.showerror("錯誤", f"無法掃描備份目錄: {directory}") + + def refresh_backups(self): + """刷新備份列表""" + if not self.reader.backups_dir: + messagebox.showinfo("提示", "請先選擇備份目錄") + return + + self.status_var.set("正在刷新備份...") + self.root.update_idletasks() + + if self.reader.scan_backups(): + self.refresh_backups_list() + self.status_var.set(f"已刷新,找到 {len(self.reader.backups)} 個備份") + else: + self.status_var.set("刷新備份失敗") + messagebox.showerror("錯誤", "無法刷新備份列表") + + def refresh_backups_list(self): + """刷新備份列表顯示""" + # 清空列表 + for item in self.backups_tree.get_children(): + self.backups_tree.delete(item) + + # 添加備份 + for backup in self.reader.backups: + self.backups_tree.insert( + "", "end", + values=(backup["name"], backup["formatted_date"]) + ) + + def filter_backups(self, *args): + """根據搜索條件過濾備份列表""" + search_text = self.backup_search_var.get().lower() + + # 清空列表 + for item in self.backups_tree.get_children(): + self.backups_tree.delete(item) + + # 添加匹配的備份 + for backup in self.reader.backups: + if search_text in backup["name"].lower() or search_text in backup["formatted_date"].lower(): + self.backups_tree.insert( + "", "end", + values=(backup["name"], backup["formatted_date"]) + ) + + def on_backup_selected(self, event): + """處理備份選擇事件""" + selection = self.backups_tree.selection() + if not selection: + return + + # 獲取選定項的索引 + item_id = selection[0] + item_index = self.backups_tree.index(item_id) + + # 獲取所有顯示的備份項目 + visible_items = self.backups_tree.get_children() + if item_index >= len(visible_items): + return + + # 查找此顯示項對應的實際備份索引 + backup_name = self.backups_tree.item(visible_items[item_index])["values"][0] + backup_index = next((i for i, b in enumerate(self.reader.backups) if b["name"] == backup_name), -1) + + if backup_index == -1: + return + + # 載入備份 + self.status_var.set(f"正在載入備份: {backup_name}...") + self.root.update_idletasks() + + def load_backup_thread(): + success = self.reader.load_backup(backup_index) + self.root.after(0, lambda: self.finalize_backup_loading(success, backup_name)) + + threading.Thread(target=load_backup_thread).start() + + def finalize_backup_loading(self, success: bool, backup_name: str): + """完成備份載入處理""" + if success: + self.refresh_collections_list() + self.status_var.set(f"已載入備份: {backup_name}") + else: + self.status_var.set(f"載入備份失敗: {backup_name}") + messagebox.showerror("錯誤", f"無法載入備份: {backup_name}") + + def refresh_collections_list(self): + """刷新集合列表顯示""" + # 清空列表 + for item in self.collections_tree.get_children(): + self.collections_tree.delete(item) + + # 添加集合 + for collection in self.reader.collection_names: + info = self.reader.get_collection_info(collection.name) + self.collections_tree.insert( + "", "end", + values=(collection.name, info["document_count"]) + ) + + def filter_collections(self, *args): + """根據搜索條件過濾集合列表""" + search_text = self.collection_search_var.get().lower() + + # 清空列表 + for item in self.collections_tree.get_children(): + self.collections_tree.delete(item) + + # 添加匹配的集合 + for collection in self.reader.collection_names: + if search_text in collection.name.lower(): + info = self.reader.get_collection_info(collection.name) + self.collections_tree.insert( + "", "end", + values=(collection.name, info["document_count"]) + ) + + def on_collection_selected(self, event): + """處理集合選擇事件""" + selection = self.collections_tree.selection() + if not selection: + return + + # 獲取選定項的集合名稱 + item_id = selection[0] + collection_name = self.collections_tree.item(item_id)["values"][0] + + # 載入集合 + self.status_var.set(f"正在載入集合: {collection_name}...") + self.root.update_idletasks() + + def load_collection_thread(): + success = self.reader.load_collection(collection_name) + self.root.after(0, lambda: self.finalize_collection_loading(success, collection_name)) + + threading.Thread(target=load_collection_thread).start() + + def finalize_collection_loading(self, success: bool, collection_name: str): + """完成集合載入處理""" + if success: + self.status_var.set(f"已載入集合: {collection_name}") + # 獲取集合詳細信息並顯示 + info = self.reader.get_collection_info(collection_name) + info_text = f"集合: {info['name']}\n文檔數: {info['document_count']}\n向量維度: {info['dimension']}" + messagebox.showinfo("集合信息", info_text) + else: + self.status_var.set(f"載入集合失敗: {collection_name}") + messagebox.showerror("錯誤", f"無法載入集合: {collection_name}") + + def execute_query(self): + """執行向量查詢""" + if not self.reader.current_collection: + messagebox.showinfo("提示", "請先選擇一個集合") + return + + query_text = self.query_text.get("1.0", tk.END).strip() + if not query_text: + messagebox.showinfo("提示", "請輸入查詢文本") + return + + try: + n_results = int(self.n_results_var.get()) + except ValueError: + messagebox.showerror("錯誤", "結果數量必須是整數") + return + + self.status_var.set("正在執行查詢...") + self.root.update_idletasks() + + def query_thread(): + results = self.reader.execute_query(query_text, n_results) + self.root.after(0, lambda: self.display_results(results)) + + threading.Thread(target=query_thread).start() + + def display_results(self, results): + """顯示查詢結果""" + if not results: + self.status_var.set("查詢完成,未找到結果") + messagebox.showinfo("查詢結果", "未找到匹配的結果") + return + + self.status_var.set(f"查詢完成,找到 {len(results)} 個結果") + + # 清空所有視圖 (這部分由各個顯示函數內部處理) + + # 顯示列表視圖 + self.display_list_view(results) + + # 顯示詳細視圖 + self.display_detail_view(results) + + # 顯示可視化視圖 + self.display_visual_view(results) + + # 顯示比較視圖 + self.display_compare_view(results) + + def display_list_view(self, results): + """顯示列表視圖""" + # 清空現有內容 + for widget in self.list_view.winfo_children(): + widget.destroy() + + # 創建表格 + columns = ("rank", "similarity", "id", "document") + tree = ttk.Treeview(self.list_view, columns=columns, show="headings") + tree.heading("rank", text="#") + tree.heading("similarity", text="相似度") + tree.heading("id", text="文檔ID") + tree.heading("document", text="文檔內容") + + tree.column("rank", width=50, anchor=CENTER) + tree.column("similarity", width=100, anchor=CENTER) + tree.column("id", width=200) + tree.column("document", width=600) + + # 添加結果到表格 + for result in results: + tree.insert( + "", "end", + values=( + result["rank"], + f"{result['similarity']:.4f}", + result["id"], + result["document"][:100] + ("..." if len(result["document"]) > 100 else "") + ) + ) + + # 添加滾動條 + scrollbar = ttk.Scrollbar(self.list_view, orient=VERTICAL, command=tree.yview) + tree.configure(yscrollcommand=scrollbar.set) + + # 雙擊項目顯示完整內容 + tree.bind("", lambda event: self.show_full_document(tree)) + + # 使用 Frame 容器來實現滾動功能 + # 佈局 + tree.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=RIGHT, fill=Y) + + def show_full_document(self, tree): + """顯示完整的文檔內容""" + selection = tree.selection() + if not selection: + return + + item_id = selection[0] + rank_str = tree.item(item_id)["values"][0] + + try: + rank = int(rank_str) + if 1 <= rank <= len(self.reader.query_results): + result = self.reader.query_results[rank - 1] + + # 創建詳細內容窗口 + details_window = tk.Toplevel(self.root) + details_window.title(f"文檔詳細內容 - {result['id']}") + details_window.geometry("800x600") + + frame = ttk.Frame(details_window, padding=10) + frame.pack(fill=BOTH, expand=YES) + + # 添加文檔信息 + info_text = f"文檔ID: {result['id']}\n" + info_text += f"相似度: {result['similarity']:.4f}\n" + + if result['metadata']: + info_text += "\n元數據:\n" + for key, value in result['metadata'].items(): + info_text += f"{key}: {value}\n" + + ttk.Label(frame, text=info_text, justify=LEFT).pack(anchor=W, pady=(0, 10)) + + # 添加文檔內容 + ttk.Label(frame, text="文檔內容:", justify=LEFT).pack(anchor=W) + + text_area = tk.Text(frame, wrap=tk.WORD) + text_area.insert(tk.END, result['document']) + text_area.config(state=tk.DISABLED) + + scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) + text_area.configure(yscrollcommand=scrollbar.set) + + text_area.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=LEFT, fill=Y) + + # 添加複製按鈕 + ttk.Button( + details_window, + text="複製內容", + command=lambda: self.copy_to_clipboard(result['document']) + ).pack(pady=10) + + except (ValueError, IndexError): + pass + + def copy_to_clipboard(self, text): + """複製文本到剪貼板""" + self.root.clipboard_clear() + self.root.clipboard_append(text) + self.status_var.set("已複製到剪貼板") + + def display_detail_view(self, results): + """顯示詳細視圖""" + # 清空現有內容 + for widget in self.detail_view.winfo_children(): + widget.destroy() + + # 創建滾動區域 + canvas = tk.Canvas(self.detail_view) + scrollbar = ttk.Scrollbar(self.detail_view, orient="vertical", command=canvas.yview) + scrollable_frame = ttk.Frame(canvas) + + scrollable_frame.bind( + "", + lambda e: canvas.configure(scrollregion=canvas.bbox("all")) + ) + + canvas.create_window((0, 0), window=scrollable_frame, anchor="nw") + canvas.configure(yscrollcommand=scrollbar.set) + + # 為每個結果創建一個卡片 + for i, result in enumerate(results): + # 創建卡片框架 + card = ttk.Frame(scrollable_frame, padding=10, relief="solid", borderwidth=1) + card.pack(fill=X, padx=10, pady=5, anchor=W) + + # 卡片標題 + title_frame = ttk.Frame(card) + title_frame.pack(fill=X) + + ttk.Label( + title_frame, + text=f"#{result['rank']} - 相似度: {result['similarity']:.4f}", + font=("TkDefaultFont", 10, "bold") + ).pack(side=LEFT) + + ttk.Label( + title_frame, + text=f"ID: {result['id']}", + font=("TkDefaultFont", 8) + ).pack(side=RIGHT) + + ttk.Separator(card, orient=HORIZONTAL).pack(fill=X, pady=5) + + # 文檔內容 + content_frame = ttk.Frame(card) + content_frame.pack(fill=X) + + doc_text = tk.Text(content_frame, wrap=tk.WORD, height=4) + doc_text.insert(tk.END, result['document']) + doc_text.config(state=tk.DISABLED) + doc_text.pack(fill=X) + + # 如果有元數據,顯示元數據 + if result['metadata'] and len(result['metadata']) > 0: + ttk.Separator(card, orient=HORIZONTAL).pack(fill=X, pady=5) + + metadata_frame = ttk.Frame(card) + metadata_frame.pack(fill=X) + + ttk.Label( + metadata_frame, + text="元數據:", + font=("TkDefaultFont", 9) + ).pack(anchor=W) + + for key, value in result['metadata'].items(): + ttk.Label( + metadata_frame, + text=f"{key}: {value}", + font=("TkDefaultFont", 8) + ).pack(anchor=W, padx=10) + + # 操作按鈕 + button_frame = ttk.Frame(card) + button_frame.pack(fill=X, pady=(5, 0)) + + ttk.Button( + button_frame, + text="查看完整內容", + command=lambda r=result: self.show_full_document_from_result(r) + ).pack(side=LEFT, padx=5) + + ttk.Button( + button_frame, + text="複製內容", + command=lambda d=result['document']: self.copy_to_clipboard(d) + ).pack(side=LEFT, padx=5) + + # 配置滾動區域 + canvas.pack(side=LEFT, fill=BOTH, expand=True) + scrollbar.pack(side=RIGHT, fill=Y) + + def show_full_document_from_result(self, result): + """從結果直接顯示完整的文檔內容""" + # 創建詳細內容窗口 + details_window = tk.Toplevel(self.root) + details_window.title(f"文檔詳細內容 - {result['id']}") + details_window.geometry("800x600") + + frame = ttk.Frame(details_window, padding=10) + frame.pack(fill=BOTH, expand=YES) + + # 添加文檔信息 + info_text = f"文檔ID: {result['id']}\n" + info_text += f"相似度: {result['similarity']:.4f}\n" + + if result['metadata']: + info_text += "\n元數據:\n" + for key, value in result['metadata'].items(): + info_text += f"{key}: {value}\n" + + ttk.Label(frame, text=info_text, justify=LEFT).pack(anchor=W, pady=(0, 10)) + + # 添加文檔內容 + ttk.Label(frame, text="文檔內容:", justify=LEFT).pack(anchor=W) + + text_area = tk.Text(frame, wrap=tk.WORD) + text_area.insert(tk.END, result['document']) + text_area.config(state=tk.DISABLED) + + scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) + text_area.configure(yscrollcommand=scrollbar.set) + + text_area.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=LEFT, fill=Y) + + # 添加複製按鈕 + ttk.Button( + details_window, + text="複製內容", + command=lambda: self.copy_to_clipboard(result['document']) + ).pack(pady=10) + + def display_visual_view(self, results): + """顯示可視化視圖""" + # 清空現有內容 + for widget in self.visual_view.winfo_children(): + widget.destroy() + + if len(results) == 0: + return + + # 創建框架 + figure_frame = ttk.Frame(self.visual_view) + figure_frame.pack(fill=BOTH, expand=YES, padx=10, pady=10) + + # 創建圖表 + fig = plt.Figure(figsize=(10, 6), dpi=100) + + # 相似度柱狀圖 + ax1 = fig.add_subplot(121) + + # 提取數據 + ranks = [r["rank"] for r in results] + similarities = [r["similarity"] for r in results] + + # 繪製相似度柱狀圖 + bars = ax1.bar(ranks, similarities, color='skyblue') + + # 添加數據標籤 + for bar in bars: + height = bar.get_height() + ax1.text( + bar.get_x() + bar.get_width()/2., + height + 0.01, + f'{height:.3f}', + ha='center', va='bottom', + rotation=0, + fontsize=8 + ) + + ax1.set_xlabel('排名') + ax1.set_ylabel('相似度') + ax1.set_title('查詢結果相似度') + ax1.set_ylim(0, 1) + ax1.set_xticks(ranks) + + # 相似度曲線圖 + ax2 = fig.add_subplot(122) + ax2.plot(ranks, similarities, 'o-', color='orange') + + # 添加數據標籤 + for i, (x, y) in enumerate(zip(ranks, similarities)): + ax2.text(x, y + 0.02, f'{y:.3f}', ha='center', va='bottom', fontsize=8) + + ax2.set_xlabel('排名') + ax2.set_ylabel('相似度') + ax2.set_title('相似度分佈曲線') + ax2.set_ylim(0, 1) + ax2.set_xticks(ranks) + + # 調整佈局 + fig.tight_layout() + + # 將圖表嵌入到 Tkinter 窗口 + canvas = FigureCanvasTkAgg(fig, figure_frame) + canvas.draw() + canvas.get_tk_widget().pack(fill=BOTH, expand=YES) + + def display_compare_view(self, results): + """顯示比較視圖""" + # 清空現有內容 + for widget in self.compare_view.winfo_children(): + widget.destroy() + + if len(results) < 2: + ttk.Label( + self.compare_view, + text="需要至少2個結果才能進行比較", + font=("TkDefaultFont", 12) + ).pack(pady=20) + return + + # 創建比較視圖 + ttk.Label( + self.compare_view, + text="結果比較", + font=("TkDefaultFont", 14, "bold") + ).pack(pady=(10, 20)) + + # 創建比較表格 + columns = ["特性"] + [f"#{r['rank']}" for r in results] + + # 創建框架以包含表格和滾動條 + table_frame = ttk.Frame(self.compare_view) + table_frame.pack(fill=BOTH, expand=YES, padx=10, pady=10) + + tree = ttk.Treeview(table_frame, columns=columns, show="headings") + + for col in columns: + tree.heading(col, text=col) + tree.column(col, width=100, anchor=CENTER) + + # 相似度行 + tree.insert( + "", "end", + values=["相似度"] + [f"{r['similarity']:.4f}" for r in results] + ) + + # 文檔ID行 + tree.insert( + "", "end", + values=["文檔ID"] + [r['id'] for r in results] + ) + + # 文檔長度行 + tree.insert( + "", "end", + values=["文檔長度"] + [len(r['document']) for r in results] + ) + + # 從元數據提取共同鍵 + all_keys = set() + for result in results: + if result['metadata']: + for key in result['metadata'].keys(): + all_keys.add(key) + + # 為每個元數據鍵添加一行 + for key in sorted(all_keys): + values = ["元數據: " + key] + for result in results: + if result['metadata'] and key in result['metadata']: + values.append(str(result['metadata'][key])) + else: + values.append("-") + tree.insert("", "end", values=values) + + # 添加垂直滾動條 + vsb = ttk.Scrollbar(table_frame, orient="vertical", command=tree.yview) + tree.configure(yscrollcommand=vsb.set) + + # 添加水平滾動條 + hsb = ttk.Scrollbar(table_frame, orient="horizontal", command=tree.xview) + tree.configure(xscrollcommand=hsb.set) + + # 放置表格和滾動條 + tree.grid(column=0, row=0, sticky='nsew') + vsb.grid(column=1, row=0, sticky='ns') + hsb.grid(column=0, row=1, sticky='ew') + + # 配置表格框架的網格 + table_frame.columnconfigure(0, weight=1) + table_frame.rowconfigure(0, weight=1) + + def export_results_dialog(self): + """顯示導出結果對話框""" + if not self.reader.query_results: + messagebox.showinfo("提示", "沒有可導出的結果") + return + + # 詢問導出格式和文件路徑 + formats = [ + ("CSV 文件", "*.csv"), + ("JSON 文件", "*.json"), + ("Excel 文件", "*.xlsx") + ] + + file_path = filedialog.asksaveasfilename( + title="導出結果", + filetypes=formats, + defaultextension=".csv" + ) + + if not file_path: + return + + # 確定導出格式 + ext = os.path.splitext(file_path)[1].lower() + format_map = { + ".csv": "csv", + ".json": "json", + ".xlsx": "excel" + } + + format_type = format_map.get(ext, "csv") + + # 執行導出 + success = self.reader.export_results(file_path, format_type) + + if success: + messagebox.showinfo("導出成功", f"結果已成功導出到: {file_path}") + else: + messagebox.showerror("導出失敗", "導出結果時發生錯誤") + + def toggle_theme(self): + """切換深色/淺色主題""" + if self.current_theme == "darkly": + self.current_theme = "cosmo" # 淺色主題 + ttk.Style().theme_use("cosmo") + else: + self.current_theme = "darkly" # 深色主題 + ttk.Style().theme_use("darkly") + + # 保存配置 + self.config["theme"] = self.current_theme + self.save_config() + + def show_about(self): + """顯示關於對話框""" + about_text = "ChromaDB 備份讀取器\n\n" + about_text += "版本: 1.0.0\n\n" + about_text += "這是一個用於讀取和查詢ChromaDB備份的工具,支持相似度搜索和結果可視化。\n\n" + about_text += "功能包括:\n" + about_text += "- 讀取備份目錄\n" + about_text += "- 查詢集合數據\n" + about_text += "- 多種視圖顯示結果\n" + about_text += "- 結果導出\n" + + messagebox.showinfo("關於", about_text) + + def open_log_file(self): + """打開日誌文件""" + log_path = "chroma_reader.log" + + if os.path.exists(log_path): + # 創建日誌查看器窗口 + log_window = tk.Toplevel(self.root) + log_window.title("日誌查看器") + log_window.geometry("800x600") + + frame = ttk.Frame(log_window, padding=10) + frame.pack(fill=BOTH, expand=YES) + + # 添加日誌內容 + text_area = tk.Text(frame, wrap=tk.WORD) + + try: + with open(log_path, "r", encoding="utf-8") as f: + log_content = f.read() + except UnicodeDecodeError: + try: + with open(log_path, "r", encoding="gbk") as f: + log_content = f.read() + except: + log_content = "無法讀取日誌文件" + + text_area.insert(tk.END, log_content) + text_area.config(state=tk.DISABLED) + + scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) + text_area.configure(yscrollcommand=scrollbar.set) + + text_area.pack(side=LEFT, fill=BOTH, expand=YES) + scrollbar.pack(side=LEFT, fill=Y) + + # 添加刷新和清空按鈕 + button_frame = ttk.Frame(log_window) + button_frame.pack(fill=X, pady=10) + + ttk.Button( + button_frame, + text="刷新", + command=lambda: self.refresh_log_view(text_area) + ).pack(side=LEFT, padx=5) + + ttk.Button( + button_frame, + text="清空日誌", + command=lambda: self.clear_log_file(text_area) + ).pack(side=LEFT, padx=5) + else: + messagebox.showinfo("提示", "日誌文件不存在") + + def refresh_log_view(self, text_area): + """刷新日誌查看器內容""" + log_path = "chroma_reader.log" + + if os.path.exists(log_path): + try: + with open(log_path, "r", encoding="utf-8") as f: + log_content = f.read() + except UnicodeDecodeError: + try: + with open(log_path, "r", encoding="gbk") as f: + log_content = f.read() + except: + log_content = "無法讀取日誌文件" + + text_area.config(state=tk.NORMAL) + text_area.delete("1.0", tk.END) + text_area.insert(tk.END, log_content) + text_area.config(state=tk.DISABLED) + + def clear_log_file(self, text_area): + """清空日誌文件""" + if messagebox.askyesno("確認", "確定要清空日誌文件嗎?"): + log_path = "chroma_reader.log" + + try: + with open(log_path, "w") as f: + f.write("") + + text_area.config(state=tk.NORMAL) + text_area.delete("1.0", tk.END) + text_area.config(state=tk.DISABLED) + + messagebox.showinfo("成功", "日誌文件已清空") + except Exception as e: + messagebox.showerror("錯誤", f"清空日誌文件時出錯: {str(e)}") + + def load_config(self): + """載入配置""" + default_config = { + "last_backups_dir": "", + "theme": "darkly" + } + + if os.path.exists(self.config_path): + try: + with open(self.config_path, "r", encoding="utf-8") as f: + return json.load(f) + except: + return default_config + + return default_config + + def save_config(self): + """保存配置""" + try: + with open(self.config_path, "w", encoding="utf-8") as f: + json.dump(self.config, f, indent=4) + except Exception as e: + self.logger.error(f"保存配置時出錯: {str(e)}") + + +def main(): + """程序入口點""" + root = ttk.Window(themename="darkly") + app = ChromaDBReaderUI(root) + root.mainloop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ui_interaction.py b/ui_interaction.py index eac3e12..867e056 100644 --- a/ui_interaction.py +++ b/ui_interaction.py @@ -1120,6 +1120,7 @@ def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queu last_processed_bubble_info = None # Store the whole dict now recent_texts = collections.deque(maxlen=RECENT_TEXT_HISTORY_MAXLEN) # Context-specific history needed screenshot_counter = 0 # Initialize counter for debug screenshots + main_screen_click_counter = 0 # Counter for consecutive main screen clicks while True: # --- Process ALL Pending Commands First --- @@ -1220,17 +1221,31 @@ def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queu base_locs = detector._find_template('base_screen', confidence=0.8) map_locs = detector._find_template('world_map_screen', confidence=0.8) if base_locs or map_locs: - print("UI Thread: Detected main screen (Base or World Map). Clicking to return to chat...") - # Coordinates provided by user (adjust if needed based on actual screen resolution/layout) - # IMPORTANT: Ensure these coordinates are correct for the target window/resolution - target_x, target_y = 600, 1300 - interactor.click_at(target_x, target_y) - time.sleep(0.1) # Short delay after click - print("UI Thread: Clicked to return to chat. Re-checking screen state...") - continue # Skip the rest of the loop and re-evaluate + print(f"UI Thread: Detected main screen (Base or World Map). Counter: {main_screen_click_counter}") + if main_screen_click_counter < 5: + main_screen_click_counter += 1 + print(f"UI Thread: Attempting click #{main_screen_click_counter}/5 to return to chat...") + # Coordinates provided by user (adjust if needed based on actual screen resolution/layout) + target_x, target_y = 600, 1300 + interactor.click_at(target_x, target_y) + time.sleep(0.1) # Short delay after click + print("UI Thread: Clicked. Re-checking screen state...") + else: + print("UI Thread: Clicked 5 times, still on main screen. Pressing ESC...") + interactor.press_key('esc') + main_screen_click_counter = 0 # Reset counter after ESC + time.sleep(0.05) # Wait a bit longer after ESC + print("UI Thread: ESC pressed. Re-checking screen state...") + continue # Skip the rest of the loop and re-evaluate state + else: + # Reset counter if not on the main screen + if main_screen_click_counter > 0: + print("UI Thread: Not on main screen, resetting click counter.") + main_screen_click_counter = 0 except Exception as nav_err: print(f"UI Thread: Error during main screen navigation check: {nav_err}") # Decide if you want to continue or pause after error + main_screen_click_counter = 0 # Reset counter on error too # --- Process Commands Second (Non-blocking) --- # This block seems redundant now as commands are processed at the start of the loop.