import os import tkinter as tk from tkinter import filedialog, messagebox import json import chromadb from chromadb.utils import embedding_functions # 新增導入 import datetime import pandas as pd import threading from pathlib import Path import matplotlib.pyplot as plt from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg import ttkbootstrap as ttk from ttkbootstrap.constants import * from ttkbootstrap.scrolled import ScrolledFrame import numpy as np import logging from typing import List, Dict, Any, Optional, Union, Tuple import inspect # 用於檢查函數簽名,判斷是否支持混合搜索 import re # 新增導入 for ID parsing in UI class ChromaDBReader: """ChromaDB備份讀取器的主數據模型""" def __init__(self): self.backups_dir = "" self.backups = [] # 所有備份的列表 self.current_backup = None # 當前選擇的備份 self.current_collection = None # 當前選擇的集合 self.collection_names = [] # 當前備份中的集合列表 self.query_results = [] # 當前查詢結果 self.chroma_client = None # ChromaDB客戶端 self.selected_embedding_model_name = "default" # 用於查詢的嵌入模型 self.query_embedding_function = None # 實例化的查詢嵌入函數, None 表示使用集合內部預設 # 設置日誌 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("chroma_reader.log", encoding='utf-8'), logging.StreamHandler() ] ) self.logger = logging.getLogger("ChromaDBReader") def set_backups_directory(self, directory_path: str) -> bool: """設置備份目錄並掃描備份""" if not os.path.exists(directory_path): self.logger.error(f"備份目錄不存在: {directory_path}") return False self.backups_dir = directory_path return self.scan_backups() def scan_backups(self) -> bool: """掃描備份目錄中的所有備份""" self.backups = [] try: # 查找所有以chroma_backup_開頭的目錄 for item in os.listdir(self.backups_dir): item_path = os.path.join(self.backups_dir, item) if os.path.isdir(item_path) and item.startswith("chroma_backup_"): # 提取備份日期時間 try: date_str = item.replace("chroma_backup_", "") date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d_%H-%M-%S") backup_info = { "name": item, "path": item_path, "date": date_obj, "formatted_date": date_obj.strftime("%Y年%m月%d日 %H:%M:%S") } # 檢查是否是有效的ChromaDB目錄 if self._is_valid_chroma_backup(item_path): self.backups.append(backup_info) except Exception as e: self.logger.warning(f"無法解析備份 {item}: {str(e)}") # 按日期排序,最新的排在前面 self.backups.sort(key=lambda x: x["date"], reverse=True) self.logger.info(f"找到 {len(self.backups)} 個備份") return True except Exception as e: self.logger.error(f"掃描備份時出錯: {str(e)}") return False def _is_valid_chroma_backup(self, backup_path: str) -> bool: """檢查目錄是否為有效的ChromaDB備份""" # 檢查是否存在關鍵ChromaDB文件 sqlite_path = os.path.join(backup_path, "chroma.sqlite3") return os.path.exists(sqlite_path) def load_backup(self, backup_index: int) -> bool: """加載指定的備份""" if backup_index < 0 or backup_index >= len(self.backups): self.logger.error(f"無效的備份索引: {backup_index}") return False try: self.current_backup = self.backups[backup_index] backup_path = self.current_backup["path"] # 初始化ChromaDB客戶端 self.chroma_client = chromadb.PersistentClient(path=backup_path) # 獲取所有集合名稱 self.collection_names = self.chroma_client.list_collections() self.current_collection = None self.query_results = [] self.logger.info(f"已加載備份: {self.current_backup['name']}") self.logger.info(f"找到 {len(self.collection_names)} 個集合") return True except Exception as e: self.logger.error(f"加載備份時出錯: {str(e)}") self.current_backup = None self.chroma_client = None self.collection_names = [] return False def set_query_embedding_model(self, model_name: str): """設置查詢時使用的嵌入模型""" self.selected_embedding_model_name = model_name if model_name == "default": self.query_embedding_function = None # 表示使用集合的內部嵌入函數 self.logger.info("查詢將使用集合內部嵌入模型。") elif model_name == "all-MiniLM-L6-v2": try: # 注意: sentence-transformers 庫需要安裝 self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2") self.logger.info(f"查詢將使用外部嵌入模型: {model_name}") except Exception as e: self.logger.error(f"無法加載 SentenceTransformer all-MiniLM-L6-v2: {e}。將使用集合內部模型。") self.query_embedding_function = None elif model_name == "paraphrase-multilingual-MiniLM-L12-v2": try: # 注意: sentence-transformers 庫需要安裝 self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="paraphrase-multilingual-MiniLM-L12-v2") self.logger.info(f"查詢將使用外部嵌入模型: {model_name}") except Exception as e: self.logger.error(f"無法加載 SentenceTransformer paraphrase-multilingual-MiniLM-L12-v2: {e}。將使用集合內部模型。") self.query_embedding_function = None # 添加新的模型支持 elif model_name == "paraphrase-multilingual-mpnet-base-v2": try: # 注意: sentence-transformers 庫需要安裝 self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2") self.logger.info(f"查詢將使用外部嵌入模型: {model_name}") except Exception as e: self.logger.error(f"無法加載 SentenceTransformer paraphrase-multilingual-mpnet-base-v2: {e}。將使用集合內部模型。") self.query_embedding_function = None else: self.logger.warning(f"未知的查詢嵌入模型: {model_name}, 將使用集合內部模型。") self.query_embedding_function = None def load_collection(self, collection_name: str) -> bool: """加載指定的集合""" if not self.chroma_client or not collection_name: return False try: # 獲取集合時,如果需要指定 embedding_function (通常在創建時指定) # 此處是讀取,所以集合的 embedding_function 已經固定 # 我們將在查詢時使用 self.query_embedding_function 來生成 query_embeddings self.current_collection = self.chroma_client.get_collection(collection_name) self.logger.info(f"已加載集合: {collection_name}") return True except Exception as e: self.logger.error(f"加載集合時出錯: {str(e)}") self.current_collection = None return False def execute_query(self, query_text: str, n_results: int = 5, query_type: str = "basic", where: Dict = None, where_document: Dict = None, include: List[str] = None, metadata_filter: Dict = None, hybrid_alpha: float = None) -> List[Dict]: """執行查詢並返回結果 參數: query_text: 查詢文本 n_results: 返回結果數量 query_type: 查詢類型 (basic, metadata, hybrid, multi_vector) where: where 過濾條件 where_document: 文檔內容過濾條件 include: 指定包含的文檔 ID metadata_filter: 元數據過濾條件 hybrid_alpha: 混合搜索的權重參數(0-1之間,越大越傾向關鍵詞搜索) """ if not self.current_collection or not query_text: return [] try: query_params = { "n_results": n_results } # 基本查詢處理邏輯 if query_type == "basic": query_params["query_texts"] = [query_text] # 多向量查詢(用於比較多個查詢之間的相似性) elif query_type == "multi_vector": # 支持以 "|||" 或換行符分隔的多個查詢文本 if "|||" in query_text: query_texts = [text.strip() for text in query_text.split("|||")] else: query_texts = [text.strip() for text in query_text.splitlines() if text.strip()] query_params["query_texts"] = query_texts # 添加其他查詢參數 if where: query_params["where"] = where if where_document: query_params["where_document"] = where_document if include: query_params["include"] = include if metadata_filter: # 直接將元數據過濾條件轉換為 where 條件 if "where" not in query_params: query_params["where"] = {} query_params["where"].update(metadata_filter) # 混合搜索處理 if query_type == "hybrid" and hybrid_alpha is not None: # 檢查 ChromaDB 版本是否支持混合搜索 if hasattr(self.current_collection, "query") and "alpha" in inspect.signature(self.current_collection.query).parameters: query_params["alpha"] = hybrid_alpha # 混合搜索通常需要 query_texts if "query_texts" not in query_params: query_params["query_texts"] = [query_text] else: self.logger.warning("當前 ChromaDB 版本不支持混合搜索,將使用基本查詢") query_type = "basic" # 降級為基本查詢 query_params["query_texts"] = [query_text] elif query_type == "hybrid" and hybrid_alpha is None: # 如果是混合搜索但未提供 alpha,則默認為基本搜索 self.logger.warning("混合搜索未提供 Alpha 值,將使用基本查詢") query_type = "basic" query_params["query_texts"] = [query_text] # 如果 query_type 不是 multi_vector 且 query_texts 未設置,則設置 if query_type not in ["multi_vector", "hybrid"] and "query_texts" not in query_params: query_params["query_texts"] = [query_text] # 如果選擇了外部嵌入模型且不是混合查詢,則生成查詢嵌入 if query_type != "hybrid" and \ "query_texts" in query_params and \ self.query_embedding_function: texts_to_embed = query_params["query_texts"] try: # self.query_embedding_function 接受 List[str] 返回 List[List[float]] generated_embeddings = self.query_embedding_function(texts_to_embed) if generated_embeddings and all(isinstance(emb, list) for emb in generated_embeddings): query_params["query_embeddings"] = generated_embeddings if "query_texts" in query_params: # 確保它存在才刪除 del query_params["query_texts"] self.logger.info(f"使用 {self.selected_embedding_model_name} 生成了 {len(generated_embeddings)} 個查詢嵌入。") else: self.logger.warning(f"未能使用 {self.selected_embedding_model_name} 為所有查詢文本生成有效嵌入。將回退到使用集合預設嵌入函數進行文本查詢。嵌入結果: {generated_embeddings}") except Exception as e: self.logger.error(f"使用 {self.selected_embedding_model_name} 生成查詢嵌入時出錯: {e}。將回退到使用集合預設嵌入函數進行文本查詢。") # 執行查詢 results = self.current_collection.query(**query_params) # 處理結果 processed_results = [] # 獲取查詢返回的所有結果列表 ids_list = results.get('ids', [[]]) documents_list = results.get('documents', [[]]) metadatas_list = results.get('metadatas', [[]]) distances_list = results.get('distances', [[]]) # 確保列表長度一致,並為空列表提供默認值 num_queries = len(ids_list) if not documents_list or len(documents_list) != num_queries: documents_list = [[] for _ in range(num_queries)] if not metadatas_list or len(metadatas_list) != num_queries: metadatas_list = [[{}] * len(ids_list[i]) for i in range(num_queries)] if not distances_list or len(distances_list) != num_queries: distances_list = [[0.0] * len(ids_list[i]) for i in range(num_queries)] # 對於多查詢文本的情況,需要分別處理每個查詢的結果 for query_idx, (ids, documents, metadatas, distances) in enumerate(zip( ids_list, documents_list, metadatas_list, distances_list )): # 處理每個查詢結果 for i, (doc_id, document, metadata, distance) in enumerate(zip( ids, documents, metadatas if metadatas else [{}] * len(ids), # 再次確保元數據存在 distances if distances else [0.0] * len(ids) # 再次確保距離存在 )): # 計算相似度分數 similarity = 1.0 - min(float(distance) if distance is not None else 1.0, 1.0) result_item = { "rank": i + 1, "query_index": query_idx, "id": doc_id, "document": document, "metadata": metadata if metadata else {}, # 確保 metadata 是字典 "similarity": similarity, "distance": float(distance) if distance is not None else 0.0, "query_type": query_type } if query_type == "hybrid": result_item["hybrid_alpha"] = hybrid_alpha processed_results.append(result_item) self.query_results = processed_results self.logger.info(f"查詢完成,找到 {len(processed_results)} 個結果,查詢類型: {query_type}") return processed_results except Exception as e: self.logger.error(f"執行查詢時出錯: {str(e)}") self.query_results = [] return [] def get_documents_by_ids(self, doc_ids: List[str]) -> List[Dict]: """按文檔ID列表獲取文檔""" if not self.current_collection: self.logger.warning("沒有選擇集合,無法按 ID 獲取文檔。") return [] if not doc_ids: self.logger.warning("未提供文檔 ID。") return [] try: results = self.current_collection.get( ids=doc_ids, include=["documents", "metadatas"] ) processed_results = [] retrieved_ids = results.get('ids', []) retrieved_documents = results.get('documents', []) retrieved_metadatas = results.get('metadatas', []) # 創建一個字典以便快速查找已檢索到的文檔信息 found_docs_map = {} for i, r_id in enumerate(retrieved_ids): found_docs_map[r_id] = { "document": retrieved_documents[i] if i < len(retrieved_documents) else None, "metadata": retrieved_metadatas[i] if i < len(retrieved_metadatas) else {} } rank_counter = 1 for original_id in doc_ids: # 遍歷原始請求的ID,以保持某種順序感,並標記未找到的 if original_id in found_docs_map: doc_data = found_docs_map[original_id] if doc_data["document"] is not None: processed_results.append({ "rank": rank_counter, "id": original_id, "document": doc_data["document"], "metadata": doc_data["metadata"], "similarity": None, # Not applicable "distance": None, # Not applicable "query_type": "id_lookup" }) rank_counter += 1 else: # ID 存在但文檔為空(理論上不應發生在 get 中,除非 include 設置問題) self.logger.warning(f"ID {original_id} 找到但文檔內容為空。") # else: # ID 未在返回結果中找到,可以選擇不添加到 processed_results 或添加一個標記 # self.logger.info(f"ID {original_id} 未在集合中找到。") self.query_results = processed_results self.logger.info(f"按 ID 查詢完成,從請求的 {len(doc_ids)} 個ID中,實際找到 {len(processed_results)} 個文檔。") return processed_results except Exception as e: self.logger.error(f"按 ID 獲取文檔時出錯: {str(e)}") # traceback.print_exc() # For debugging self.query_results = [] return [] def get_collection_info(self, collection_name: str) -> Dict: """獲取集合的詳細信息""" if not self.chroma_client: return {} try: collection = self.chroma_client.get_collection(collection_name) count = collection.count() # 獲取一個樣本來確定向量維度 sample = collection.peek(1) dimension = len(sample['embeddings'][0]) if 'embeddings' in sample and sample['embeddings'] else "未知" return { "name": collection_name, "document_count": count, "dimension": dimension } except Exception as e: self.logger.error(f"獲取集合信息時出錯: {str(e)}") return { "name": collection_name, "document_count": "未知", "dimension": "未知" } def export_results(self, file_path: str, format: str = "csv") -> bool: """導出查詢結果""" if not self.query_results: return False try: df = pd.DataFrame(self.query_results) # 根據格式導出 if format.lower() == "csv": df.to_csv(file_path, index=False, encoding='utf-8-sig') elif format.lower() == "json": df.to_json(file_path, orient='records', force_ascii=False, indent=4) elif format.lower() == "excel": df.to_excel(file_path, index=False) else: return False self.logger.info(f"結果已導出到: {file_path}") return True except Exception as e: self.logger.error(f"導出結果時出錯: {str(e)}") return False class ChromaDBReaderUI: """ChromaDB備份讀取器的用戶界面""" def __init__(self, root): self.root = root self.reader = ChromaDBReader() # 設置窗口 self.root.title("ChromaDB 備份讀取器") self.root.geometry("1280x800") # 初始化嵌入模型相關變量 self.embedding_model_var = tk.StringVar(value="預設 (ChromaDB)") # 顯示名稱 self.embedding_models = { "預設 (ChromaDB)": "default", "all-MiniLM-L6-v2 (ST)": "all-MiniLM-L6-v2", "paraphrase-multilingual-MiniLM-L12-v2 (ST)": "paraphrase-multilingual-MiniLM-L12-v2", "paraphrase-multilingual-mpnet-base-v2 (ST)": "paraphrase-multilingual-mpnet-base-v2" # 添加新的模型選項 } self.setup_ui() # 默認主題 self.current_theme = "darkly" # ttkbootstrap的深色主題 # 存儲配置 self.config_path = os.path.join(str(Path.home()), ".chroma_reader_config.json") self.config = self.load_config() # 應用保存的配置 if self.config.get("last_backups_dir"): self.backups_dir_var.set(self.config["last_backups_dir"]) self.load_backups_directory() def setup_ui(self): """設置用戶界面""" # 創建主佈局 self.main_frame = ttk.Frame(self.root, padding=10) self.main_frame.pack(fill=BOTH, expand=YES) # 左側面板 (備份和集合選擇) self.left_panel = ttk.Frame(self.main_frame, width=300) self.left_panel.pack(side=LEFT, fill=Y, padx=(0, 10)) # 右側面板 (查詢和結果) self.right_panel = ttk.Frame(self.main_frame) self.right_panel.pack(side=LEFT, fill=BOTH, expand=YES) # 設置狀態欄 (提前,以確保 self.status_var 在其他地方使用前已定義) self.setup_status_bar() # 設置左側面板 self.setup_directory_frame() self.setup_embedding_model_frame() # 新增嵌入模型選擇框架 self.setup_backups_frame() self.setup_collections_frame() # 設置右側面板 self.setup_query_frame() self.setup_results_frame() # 設置菜單 self.setup_menu() def setup_menu(self): """設置菜單欄""" menubar = tk.Menu(self.root) self.root.config(menu=menubar) # 文件菜單 file_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="文件", menu=file_menu) file_menu.add_command(label="選擇備份目錄", command=self.browse_directory) file_menu.add_command(label="刷新備份列表", command=self.refresh_backups) file_menu.add_separator() file_menu.add_command(label="導出結果...", command=self.export_results_dialog) file_menu.add_separator() file_menu.add_command(label="退出", command=self.root.quit) # 視圖菜單 view_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="視圖", menu=view_menu) view_menu.add_command(label="切換深色/淺色主題", command=self.toggle_theme) # 幫助菜單 help_menu = tk.Menu(menubar, tearoff=0) menubar.add_cascade(label="幫助", menu=help_menu) help_menu.add_command(label="關於", command=self.show_about) help_menu.add_command(label="查看日誌", command=self.open_log_file) def setup_directory_frame(self): """設置目錄選擇框架""" dir_frame = ttk.LabelFrame(self.left_panel, text="備份目錄", padding=10) dir_frame.pack(fill=X, pady=(0, 10)) self.backups_dir_var = tk.StringVar() ttk.Entry(dir_frame, textvariable=self.backups_dir_var).pack(side=LEFT, fill=X, expand=YES) ttk.Button(dir_frame, text="瀏覽", command=self.browse_directory).pack(side=LEFT, padx=(5, 0)) ttk.Button(dir_frame, text="載入", command=self.load_backups_directory).pack(side=LEFT, padx=(5, 0)) def setup_embedding_model_frame(self): """設置查詢嵌入模型選擇框架""" embedding_frame = ttk.LabelFrame(self.left_panel, text="查詢嵌入模型", padding=10) embedding_frame.pack(fill=X, pady=(0, 10)) self.embedding_model_combo = ttk.Combobox( embedding_frame, textvariable=self.embedding_model_var, values=list(self.embedding_models.keys()), state="readonly" ) self.embedding_model_combo.pack(fill=X, expand=YES) self.embedding_model_combo.set(list(self.embedding_models.keys())[0]) # 設置預設顯示值 self.embedding_model_combo.bind("<>", self.on_embedding_model_changed) # 初始化Reader中的嵌入模型選擇 self.on_embedding_model_changed() def setup_backups_frame(self): """設置備份列表框架""" backups_frame = ttk.LabelFrame(self.left_panel, text="備份列表", padding=10) backups_frame.pack(fill=BOTH, expand=YES, pady=(0, 10)) # 備份搜索 search_frame = ttk.Frame(backups_frame) search_frame.pack(fill=X, pady=(0, 5)) self.backup_search_var = tk.StringVar() self.backup_search_var.trace("w", self.filter_backups) ttk.Label(search_frame, text="搜索:").pack(side=LEFT) ttk.Entry(search_frame, textvariable=self.backup_search_var).pack(side=LEFT, fill=X, expand=YES) # 備份列表 list_frame = ttk.Frame(backups_frame) list_frame.pack(fill=BOTH, expand=YES) columns = ("name", "date") self.backups_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10) self.backups_tree.heading("name", text="名稱") self.backups_tree.heading("date", text="日期") self.backups_tree.column("name", width=100) self.backups_tree.column("date", width=150) scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.backups_tree.yview) self.backups_tree.configure(yscrollcommand=scrollbar.set) self.backups_tree.pack(side=LEFT, fill=BOTH, expand=YES) scrollbar.pack(side=LEFT, fill=Y) self.backups_tree.bind("<>", self.on_backup_selected) def setup_collections_frame(self): """設置集合列表框架""" collections_frame = ttk.LabelFrame(self.left_panel, text="集合列表", padding=10) collections_frame.pack(fill=BOTH, expand=YES) # 集合搜索 search_frame = ttk.Frame(collections_frame) search_frame.pack(fill=X, pady=(0, 5)) self.collection_search_var = tk.StringVar() self.collection_search_var.trace("w", self.filter_collections) ttk.Label(search_frame, text="搜索:").pack(side=LEFT) ttk.Entry(search_frame, textvariable=self.collection_search_var).pack(side=LEFT, fill=X, expand=YES) # 集合列表 list_frame = ttk.Frame(collections_frame) list_frame.pack(fill=BOTH, expand=YES) columns = ("name", "count") self.collections_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10) self.collections_tree.heading("name", text="名稱") self.collections_tree.heading("count", text="文檔數") self.collections_tree.column("name", width=150) self.collections_tree.column("count", width=100) scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.collections_tree.yview) self.collections_tree.configure(yscrollcommand=scrollbar.set) self.collections_tree.pack(side=LEFT, fill=BOTH, expand=YES) scrollbar.pack(side=LEFT, fill=Y) self.collections_tree.bind("<>", self.on_collection_selected) def setup_query_frame(self): """設置查詢框架""" query_frame = ttk.LabelFrame(self.right_panel, text="查詢", padding=10) query_frame.pack(fill=X, pady=(0, 10)) # 創建一個 Notebook 以包含不同的查詢類型標籤頁 self.query_notebook = ttk.Notebook(query_frame) self.query_notebook.pack(fill=X, pady=5) # 基本查詢標籤頁 self.basic_query_frame = ttk.Frame(self.query_notebook) self.query_notebook.add(self.basic_query_frame, text="基本查詢") # 元數據查詢標籤頁 self.metadata_query_frame = ttk.Frame(self.query_notebook) self.query_notebook.add(self.metadata_query_frame, text="元數據查詢") # 混合查詢標籤頁 self.hybrid_query_frame = ttk.Frame(self.query_notebook) self.query_notebook.add(self.hybrid_query_frame, text="混合查詢") # 多向量查詢標籤頁 self.multi_vector_frame = ttk.Frame(self.query_notebook) self.query_notebook.add(self.multi_vector_frame, text="多向量查詢") # ID 查詢標籤頁 (新增) self.id_query_frame = ttk.Frame(self.query_notebook) self.query_notebook.add(self.id_query_frame, text="ID 查詢") # 設置基本查詢頁面 self.setup_basic_query_tab() # 設置元數據查詢頁面 self.setup_metadata_query_tab() # 設置混合查詢頁面 self.setup_hybrid_query_tab() # 設置多向量查詢頁面 self.setup_multi_vector_tab() # 設置 ID 查詢頁面 (新增) self.setup_id_query_tab() # 查詢參數(共用部分) params_frame = ttk.Frame(query_frame) params_frame.pack(fill=X) ttk.Label(params_frame, text="結果數量:").pack(side=LEFT) self.n_results_var = tk.StringVar(value="5") ttk.Spinbox(params_frame, from_=1, to=100, textvariable=self.n_results_var, width=5).pack(side=LEFT, padx=(5, 20)) # 查詢按鈕 ttk.Button( query_frame, text="執行查詢", command=self.execute_query, # 注意:這個 execute_query 方法將被新的替換 style="Accent.TButton" ).pack(pady=10) def setup_basic_query_tab(self): """設置基本查詢標籤頁""" ttk.Label(self.basic_query_frame, text="查詢文本:").pack(anchor=W) self.basic_query_text = tk.Text(self.basic_query_frame, height=4, width=50) self.basic_query_text.pack(fill=X, pady=5) def setup_metadata_query_tab(self): """設置元數據查詢標籤頁""" ttk.Label(self.metadata_query_frame, text="查詢文本:").pack(anchor=W) self.metadata_query_text = tk.Text(self.metadata_query_frame, height=4, width=50) self.metadata_query_text.pack(fill=X, pady=5) ttk.Label(self.metadata_query_frame, text="元數據過濾條件 (JSON 格式):").pack(anchor=W) self.metadata_filter_text = tk.Text(self.metadata_query_frame, height=4, width=50) self.metadata_filter_text.pack(fill=X, pady=5) self.metadata_filter_text.insert("1.0", '{"key": "value"}') # 添加一個幫助按鈕,顯示元數據過濾語法的說明 ttk.Button( self.metadata_query_frame, text="?", width=2, command=self.show_metadata_help ).pack(anchor=E) def setup_hybrid_query_tab(self): """設置混合查詢標籤頁""" ttk.Label(self.hybrid_query_frame, text="查詢文本:").pack(anchor=W) self.hybrid_query_text = tk.Text(self.hybrid_query_frame, height=4, width=50) self.hybrid_query_text.pack(fill=X, pady=5) alpha_frame = ttk.Frame(self.hybrid_query_frame) alpha_frame.pack(fill=X) ttk.Label(alpha_frame, text="Alpha 值 (0-1):").pack(side=LEFT) self.hybrid_alpha_var = tk.DoubleVar(value=0.5) ttk.Scale( alpha_frame, from_=0.0, to=1.0, variable=self.hybrid_alpha_var, orient=tk.HORIZONTAL, length=200 ).pack(side=LEFT, padx=5, fill=X, expand=YES) # 創建一個Label來顯示Scale的當前值 self.hybrid_alpha_label = ttk.Label(alpha_frame, text=f"{self.hybrid_alpha_var.get():.2f}") self.hybrid_alpha_label.pack(side=LEFT) # 綁定Scale的變動到更新Label的函數 self.hybrid_alpha_var.trace_add("write", lambda *args: self.hybrid_alpha_label.config(text=f"{self.hybrid_alpha_var.get():.2f}")) ttk.Label(self.hybrid_query_frame, text="注意: Alpha=0 完全使用向量搜索,Alpha=1 完全使用關鍵詞搜索").pack(pady=2) ttk.Label(self.hybrid_query_frame, text="混合查詢將使用集合原始嵌入模型,忽略上方選擇的查詢嵌入模型。", font=("TkDefaultFont", 8)).pack(pady=2) def setup_multi_vector_tab(self): """設置多向量查詢標籤頁""" ttk.Label(self.multi_vector_frame, text="多個查詢文本 (每行一個,或使用 ||| 分隔):").pack(anchor=W) self.multi_vector_text = tk.Text(self.multi_vector_frame, height=6, width=50) self.multi_vector_text.pack(fill=X, pady=5) self.multi_vector_text.insert("1.0", "查詢文本 1\n|||查詢文本 2\n|||查詢文本 3") ttk.Label(self.multi_vector_frame, text="用於比較多個查詢之間的相似性").pack(pady=5) def setup_id_query_tab(self): """設置ID查詢標籤頁""" ttk.Label(self.id_query_frame, text="文檔 ID (每行一個,或用逗號/空格分隔):").pack(anchor=tk.W) self.id_query_text = tk.Text(self.id_query_frame, height=6, width=50) self.id_query_text.pack(fill=tk.X, pady=5) self.id_query_text.insert("1.0", "id1\nid2,id3 id4") # 示例 ttk.Label(self.id_query_frame, text="此查詢將獲取指定ID的文檔,忽略上方“結果數量”設置。").pack(pady=5) def show_metadata_help(self): """顯示元數據過濾語法說明""" help_text = """元數據過濾語法示例: 基本過濾: {"category": "文章"} # 精確匹配 範圍過濾: {"date": {"$gt": "2023-01-01"}} # 大於 {"date": {"$lt": "2023-12-31"}} # 小於 {"count": {"$gte": 10}} # 大於等於 {"count": {"$lte": 100}} # 小於等於 多條件過濾: {"$and": [{"category": "文章"}, {"author": "張三"}]} # AND 條件 {"$or": [{"category": "文章"}, {"category": "新聞"}]} # OR 條件 注意: 此處語法遵循 ChromaDB 的過濾語法,非標準 JSON 查詢語法。 """ messagebox.showinfo("元數據過濾語法說明", help_text) def setup_results_frame(self): """設置結果顯示框架""" self.results_notebook = ttk.Notebook(self.right_panel) self.results_notebook.pack(fill=BOTH, expand=YES) # 列表視圖 - 使用標準 Frame 作為容器 list_frame = ttk.Frame(self.results_notebook) self.results_notebook.add(list_frame, text="列表視圖") self.list_view = ttk.Frame(list_frame) self.list_view.pack(fill=BOTH, expand=YES) # 詳細視圖 - 使用標準 Frame 作為容器 detail_frame = ttk.Frame(self.results_notebook) self.results_notebook.add(detail_frame, text="詳細視圖") self.detail_view = ttk.Frame(detail_frame) self.detail_view.pack(fill=BOTH, expand=YES) # 可視化視圖 self.visual_view = ttk.Frame(self.results_notebook) self.results_notebook.add(self.visual_view, text="可視化") # 比較視圖 self.compare_view = ttk.Frame(self.results_notebook) self.results_notebook.add(self.compare_view, text="比較視圖") def setup_status_bar(self): """設置狀態欄""" status_frame = ttk.Frame(self.root) status_frame.pack(side=BOTTOM, fill=X) self.status_var = tk.StringVar(value="就緒") status_label = ttk.Label(status_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=W) status_label.pack(fill=X) def on_embedding_model_changed(self, event=None): """處理查詢嵌入模型選擇變更事件""" selected_display_name = self.embedding_model_var.get() model_name_key = self.embedding_models.get(selected_display_name, "default") if hasattr(self, 'reader') and self.reader: self.reader.set_query_embedding_model(model_name_key) # 更新Reader中的模型 # 更新狀態欄提示 if model_name_key == "default": self.status_var.set("查詢將使用集合內部嵌入模型。") elif self.reader.query_embedding_function: # 檢查模型是否成功加載 self.status_var.set(f"查詢將使用外部模型: {selected_display_name}") else: # 加載失敗 self.status_var.set(f"模型 {selected_display_name} 加載失敗/無效,將使用集合內部模型。") else: # Reader尚未初始化,這通常在UI初始化早期發生 # self.reader.set_query_embedding_model 會在 setup_embedding_model_frame 中首次調用時處理 pass def browse_directory(self): """瀏覽選擇備份目錄""" directory = filedialog.askdirectory( title="選擇ChromaDB備份目錄", initialdir=self.backups_dir_var.get() or str(Path.home()) ) if directory: self.backups_dir_var.set(directory) self.load_backups_directory() def load_backups_directory(self): """加載備份目錄""" directory = self.backups_dir_var.get() if not directory: return self.status_var.set("正在掃描備份...") self.root.update_idletasks() if self.reader.set_backups_directory(directory): self.refresh_backups_list() self.status_var.set(f"已找到 {len(self.reader.backups)} 個備份") # 保存配置 self.config["last_backups_dir"] = directory self.save_config() else: self.status_var.set("無法掃描備份目錄") messagebox.showerror("錯誤", f"無法掃描備份目錄: {directory}") def refresh_backups(self): """刷新備份列表""" if not self.reader.backups_dir: messagebox.showinfo("提示", "請先選擇備份目錄") return self.status_var.set("正在刷新備份...") self.root.update_idletasks() if self.reader.scan_backups(): self.refresh_backups_list() self.status_var.set(f"已刷新,找到 {len(self.reader.backups)} 個備份") else: self.status_var.set("刷新備份失敗") messagebox.showerror("錯誤", "無法刷新備份列表") def refresh_backups_list(self): """刷新備份列表顯示""" # 清空列表 for item in self.backups_tree.get_children(): self.backups_tree.delete(item) # 添加備份 for backup in self.reader.backups: self.backups_tree.insert( "", "end", values=(backup["name"], backup["formatted_date"]) ) def filter_backups(self, *args): """根據搜索條件過濾備份列表""" search_text = self.backup_search_var.get().lower() # 清空列表 for item in self.backups_tree.get_children(): self.backups_tree.delete(item) # 添加匹配的備份 for backup in self.reader.backups: if search_text in backup["name"].lower() or search_text in backup["formatted_date"].lower(): self.backups_tree.insert( "", "end", values=(backup["name"], backup["formatted_date"]) ) def on_backup_selected(self, event): """處理備份選擇事件""" selection = self.backups_tree.selection() if not selection: return # 獲取選定項的索引 item_id = selection[0] # item_index = self.backups_tree.index(item_id) # 這個索引是相對於當前顯示的項目的 # 直接從 Treeview item 中獲取備份名稱,然後在 self.reader.backups 中查找 try: backup_name_from_tree = self.backups_tree.item(item_id)["values"][0] except IndexError: self.logger.error("無法從 Treeview 獲取備份名稱") return actual_backup_index = -1 for i, backup_info in enumerate(self.reader.backups): if backup_info["name"] == backup_name_from_tree: actual_backup_index = i break if actual_backup_index == -1: self.logger.error(f"在備份列表中未找到名為 {backup_name_from_tree} 的備份") return # 載入備份 self.status_var.set(f"正在載入備份: {backup_name_from_tree}...") self.root.update_idletasks() # 確保 Reader 中的嵌入模型是最新的 (雖然 on_embedding_model_changed 應該已經處理了) # selected_display_name = self.embedding_model_var.get() # model_key = self.embedding_models.get(selected_display_name, "default") # self.reader.set_query_embedding_model(model_key) # 這行不需要,因為模型選擇是獨立的 def load_backup_thread(): # load_backup 不再需要 embedding_model_name 參數,因為嵌入模型選擇是針對查詢的 success = self.reader.load_backup(actual_backup_index) self.root.after(0, lambda: self.finalize_backup_loading(success, backup_name_from_tree)) threading.Thread(target=load_backup_thread).start() def finalize_backup_loading(self, success: bool, backup_name: str): """完成備份載入處理""" if success: self.refresh_collections_list() self.status_var.set(f"已載入備份: {backup_name}") else: self.status_var.set(f"載入備份失敗: {backup_name}") messagebox.showerror("錯誤", f"無法載入備份: {backup_name}") def refresh_collections_list(self): """刷新集合列表顯示""" # 清空列表 for item in self.collections_tree.get_children(): self.collections_tree.delete(item) # 添加集合 for collection in self.reader.collection_names: info = self.reader.get_collection_info(collection.name) self.collections_tree.insert( "", "end", values=(collection.name, info["document_count"]) ) def filter_collections(self, *args): """根據搜索條件過濾集合列表""" search_text = self.collection_search_var.get().lower() # 清空列表 for item in self.collections_tree.get_children(): self.collections_tree.delete(item) # 添加匹配的集合 for collection in self.reader.collection_names: if search_text in collection.name.lower(): info = self.reader.get_collection_info(collection.name) self.collections_tree.insert( "", "end", values=(collection.name, info["document_count"]) ) def on_collection_selected(self, event): """處理集合選擇事件""" selection = self.collections_tree.selection() if not selection: return # 獲取選定項的集合名稱 item_id = selection[0] collection_name = self.collections_tree.item(item_id)["values"][0] # 載入集合 self.status_var.set(f"正在載入集合: {collection_name}...") self.root.update_idletasks() def load_collection_thread(): success = self.reader.load_collection(collection_name) self.root.after(0, lambda: self.finalize_collection_loading(success, collection_name)) threading.Thread(target=load_collection_thread).start() def finalize_collection_loading(self, success: bool, collection_name: str): """完成集合載入處理""" if success: self.status_var.set(f"已載入集合: {collection_name}") # 獲取集合詳細信息並顯示 info = self.reader.get_collection_info(collection_name) info_text = f"集合: {info['name']}\n文檔數: {info['document_count']}\n向量維度: {info['dimension']}" # messagebox.showinfo("集合信息", info_text) # 暫時註解掉,避免每次選集合都彈窗 else: self.status_var.set(f"載入集合失敗: {collection_name}") messagebox.showerror("錯誤", f"無法載入集合: {collection_name}") def execute_query(self): """執行向量查詢""" if not self.reader.current_collection: messagebox.showinfo("提示", "請先選擇一個集合") return # 根據當前選擇的標籤頁確定查詢類型 try: current_tab_widget = self.query_notebook.nametowidget(self.query_notebook.select()) if current_tab_widget == self.basic_query_frame: current_tab = 0 elif current_tab_widget == self.metadata_query_frame: current_tab = 1 elif current_tab_widget == self.hybrid_query_frame: current_tab = 2 elif current_tab_widget == self.multi_vector_frame: current_tab = 3 elif current_tab_widget == self.id_query_frame: # 新增 ID 查詢頁判斷 current_tab = 4 else: messagebox.showerror("錯誤", "未知的查詢標籤頁") return except tk.TclError: # Notebook可能還沒有任何分頁被選中 messagebox.showerror("錯誤", "請選擇一個查詢類型標籤頁") return # 獲取查詢參數 try: n_results = int(self.n_results_var.get()) except ValueError: messagebox.showerror("錯誤", "結果數量必須是整數") return # 執行不同類型的查詢 if current_tab == 0: # 基本查詢 query_text = self.basic_query_text.get("1.0", tk.END).strip() if not query_text: messagebox.showinfo("提示", "請輸入查詢文本") return self.status_var.set("正在執行基本查詢...") self.execute_basic_query(query_text, n_results) elif current_tab == 1: # 元數據查詢 query_text = self.metadata_query_text.get("1.0", tk.END).strip() metadata_filter_text = self.metadata_filter_text.get("1.0", tk.END).strip() if not query_text: # 元數據查詢的文本也可以是空的,如果只想用metadata_filter # messagebox.showinfo("提示", "請輸入查詢文本") # return pass # 允許空查詢文本 try: metadata_filter = json.loads(metadata_filter_text) if metadata_filter_text else None except json.JSONDecodeError: messagebox.showerror("錯誤", "元數據過濾條件必須是有效的 JSON 格式") return if not query_text and not metadata_filter: messagebox.showinfo("提示", "請輸入查詢文本或元數據過濾條件") return self.status_var.set("正在執行元數據查詢...") self.execute_metadata_query(query_text, n_results, metadata_filter) elif current_tab == 2: # 混合查詢 query_text = self.hybrid_query_text.get("1.0", tk.END).strip() hybrid_alpha = self.hybrid_alpha_var.get() if not query_text: messagebox.showinfo("提示", "請輸入查詢文本") return self.status_var.set("正在執行混合查詢...") self.execute_hybrid_query(query_text, n_results, hybrid_alpha) elif current_tab == 3: # 多向量查詢 query_text = self.multi_vector_text.get("1.0", tk.END).strip() if not query_text: messagebox.showinfo("提示", "請輸入查詢文本") return self.status_var.set("正在執行多向量查詢...") self.execute_multi_vector_query(query_text, n_results) elif current_tab == 4: # ID 查詢 id_input_str = self.id_query_text.get("1.0", tk.END).strip() if not id_input_str: messagebox.showinfo("提示", "請輸入文檔 ID。") return # 解析 ID: 支持逗號、空格、換行符分隔 doc_ids = [id_val.strip() for id_val in re.split(r'[,\s\n]+', id_input_str) if id_val.strip()] if not doc_ids: messagebox.showinfo("提示", "未解析到有效的文檔 ID。") return self.status_var.set("正在按 ID 獲取文檔...") self.execute_id_lookup_query(doc_ids) def execute_basic_query(self, query_text, n_results): """執行基本查詢""" self.status_var.set(f"正在執行基本查詢: {query_text[:30]}...") self.root.update_idletasks() def query_thread(): results = self.reader.execute_query( query_text=query_text, n_results=n_results, query_type="basic" ) self.root.after(0, lambda: self.display_results(results)) threading.Thread(target=query_thread, daemon=True).start() def execute_metadata_query(self, query_text, n_results, metadata_filter): """執行元數據查詢""" self.status_var.set(f"正在執行元數據查詢: {query_text[:30]}...") self.root.update_idletasks() def query_thread(): results = self.reader.execute_query( query_text=query_text, n_results=n_results, query_type="metadata", # 這裡應該是 "metadata" 但後端邏輯會轉為 where metadata_filter=metadata_filter ) self.root.after(0, lambda: self.display_results(results)) threading.Thread(target=query_thread, daemon=True).start() def execute_hybrid_query(self, query_text, n_results, hybrid_alpha): """執行混合查詢""" self.status_var.set(f"正在執行混合查詢 (α={hybrid_alpha:.2f}): {query_text[:30]}...") self.root.update_idletasks() def query_thread(): results = self.reader.execute_query( query_text=query_text, n_results=n_results, query_type="hybrid", hybrid_alpha=hybrid_alpha ) self.root.after(0, lambda: self.display_results(results)) threading.Thread(target=query_thread, daemon=True).start() def execute_multi_vector_query(self, query_text, n_results): """執行多向量查詢""" self.status_var.set(f"正在執行多向量查詢: {query_text.splitlines()[0][:30] if query_text.splitlines() else ''}...") self.root.update_idletasks() def query_thread(): results = self.reader.execute_query( query_text=query_text, n_results=n_results, query_type="multi_vector" ) self.root.after(0, lambda: self.display_results(results)) threading.Thread(target=query_thread, daemon=True).start() def execute_id_lookup_query(self, doc_ids: List[str]): """執行ID查找查詢""" self.status_var.set(f"正在按 ID 獲取 {len(doc_ids)} 個文檔...") self.root.update_idletasks() def query_thread(): results = self.reader.get_documents_by_ids(doc_ids) self.root.after(0, lambda: self.display_results(results)) threading.Thread(target=query_thread, daemon=True).start() def display_results(self, results): """顯示查詢結果""" if not results: self.status_var.set("查詢完成,未找到結果") messagebox.showinfo("查詢結果", "未找到匹配的結果") return self.status_var.set(f"查詢完成,找到 {len(results)} 個結果") # 清空所有視圖 (這部分由各個顯示函數內部處理) # 顯示列表視圖 self.display_list_view(results) # 顯示詳細視圖 self.display_detail_view(results) # 顯示可視化視圖 self.display_visual_view(results) # 顯示比較視圖 self.display_compare_view(results) def display_list_view(self, results): """顯示列表視圖""" # 清空現有內容 for widget in self.list_view.winfo_children(): widget.destroy() # 創建表格 columns = ("rank", "similarity", "query_type", "id", "document") tree = ttk.Treeview(self.list_view, columns=columns, show="headings") tree.heading("rank", text="#") tree.heading("similarity", text="相似度") tree.heading("query_type", text="查詢類型") tree.heading("id", text="文檔ID") tree.heading("document", text="文檔內容") tree.column("rank", width=50, anchor=CENTER) tree.column("similarity", width=100, anchor=CENTER) tree.column("query_type", width=120, anchor=CENTER) # 調整寬度以適應更長的類型名稱 tree.column("id", width=150) tree.column("document", width=530) # 調整寬度 # 確定查詢類型名稱映射 query_type_names = { "basic": "基本查詢", "metadata": "元數據查詢", "hybrid": "混合查詢", "multi_vector": "多向量查詢", "id_lookup": "ID 查詢" # 新增 } # 添加結果到表格 for result in results: raw_query_type = result.get("query_type", "basic") display_query_type = query_type_names.get(raw_query_type, raw_query_type.capitalize()) if raw_query_type == "hybrid" and "hybrid_alpha" in result: display_query_type += f" (α={result['hybrid_alpha']:.2f})" if raw_query_type == "multi_vector" and "query_index" in result: display_query_type += f" (Q{result['query_index']+1})" similarity_display = f"{result.get('similarity', 0.0):.4f}" if result.get('similarity') is not None else "N/A" tree.insert( "", "end", values=( result.get("rank", "-"), similarity_display, display_query_type, result.get("id", "N/A"), result.get("document", "")[:100] + ("..." if len(result.get("document", "")) > 100 else "") ) ) # 添加滾動條 scrollbar = ttk.Scrollbar(self.list_view, orient=VERTICAL, command=tree.yview) tree.configure(yscrollcommand=scrollbar.set) # 雙擊項目顯示完整內容 tree.bind("", lambda event: self.show_full_document(tree)) # 佈局 tree.pack(side=LEFT, fill=BOTH, expand=YES) scrollbar.pack(side=RIGHT, fill=Y) def show_full_document(self, tree): """顯示完整的文檔內容""" selection = tree.selection() if not selection: return item_id = selection[0] rank_str = tree.item(item_id)["values"][0] try: rank = int(rank_str) if 1 <= rank <= len(self.reader.query_results): result = self.reader.query_results[rank - 1] # 創建詳細內容窗口 details_window = tk.Toplevel(self.root) details_window.title(f"文檔詳細內容 - {result['id']}") details_window.geometry("800x600") frame = ttk.Frame(details_window, padding=10) frame.pack(fill=BOTH, expand=YES) # 添加文檔信息 info_text = f"文檔ID: {result['id']}\n" if result.get('similarity') is not None: info_text += f"相似度: {result['similarity']:.4f}\n" else: info_text += "相似度: N/A\n" if result['metadata']: info_text += "\n元數據:\n" for key, value in result['metadata'].items(): info_text += f"{key}: {value}\n" ttk.Label(frame, text=info_text, justify=LEFT).pack(anchor=W, pady=(0, 10)) # 添加文檔內容 ttk.Label(frame, text="文檔內容:", justify=LEFT).pack(anchor=W) text_area = tk.Text(frame, wrap=tk.WORD) text_area.insert(tk.END, result['document']) text_area.config(state=tk.DISABLED) scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) text_area.configure(yscrollcommand=scrollbar.set) text_area.pack(side=LEFT, fill=BOTH, expand=YES) scrollbar.pack(side=LEFT, fill=Y) # 添加複製按鈕 ttk.Button( details_window, text="複製內容", command=lambda: self.copy_to_clipboard(result['document']) ).pack(pady=10) except (ValueError, IndexError): pass def copy_to_clipboard(self, text): """複製文本到剪貼板""" self.root.clipboard_clear() self.root.clipboard_append(text) self.status_var.set("已複製到剪貼板") def display_detail_view(self, results): """顯示詳細視圖""" # 清空現有內容 for widget in self.detail_view.winfo_children(): widget.destroy() # 創建滾動區域 canvas = tk.Canvas(self.detail_view) scrollbar = ttk.Scrollbar(self.detail_view, orient="vertical", command=canvas.yview) scrollable_frame = ttk.Frame(canvas) scrollable_frame.bind( "", lambda e: canvas.configure(scrollregion=canvas.bbox("all")) ) canvas.create_window((0, 0), window=scrollable_frame, anchor="nw") canvas.configure(yscrollcommand=scrollbar.set) # 為每個結果創建一個卡片 for i, result in enumerate(results): # 創建卡片框架 card = ttk.Frame(scrollable_frame, padding=10, relief="solid", borderwidth=1) card.pack(fill=X, padx=10, pady=5, anchor=W) # 卡片標題 title_frame = ttk.Frame(card) title_frame.pack(fill=X) similarity_text_detail = f"{result['similarity']:.4f}" if result.get('similarity') is not None else "N/A" ttk.Label( title_frame, text=f"#{result['rank']} - 相似度: {similarity_text_detail}", font=("TkDefaultFont", 10, "bold") ).pack(side=LEFT) ttk.Label( title_frame, text=f"ID: {result['id']}", font=("TkDefaultFont", 8) ).pack(side=RIGHT) ttk.Separator(card, orient=HORIZONTAL).pack(fill=X, pady=5) # 文檔內容 content_frame = ttk.Frame(card) content_frame.pack(fill=X) doc_text = tk.Text(content_frame, wrap=tk.WORD, height=4) doc_text.insert(tk.END, result['document']) doc_text.config(state=tk.DISABLED) doc_text.pack(fill=X) # 如果有元數據,顯示元數據 if result['metadata'] and len(result['metadata']) > 0: ttk.Separator(card, orient=HORIZONTAL).pack(fill=X, pady=5) metadata_frame = ttk.Frame(card) metadata_frame.pack(fill=X) ttk.Label( metadata_frame, text="元數據:", font=("TkDefaultFont", 9) ).pack(anchor=W) for key, value in result['metadata'].items(): ttk.Label( metadata_frame, text=f"{key}: {value}", font=("TkDefaultFont", 8) ).pack(anchor=W, padx=10) # 操作按鈕 button_frame = ttk.Frame(card) button_frame.pack(fill=X, pady=(5, 0)) ttk.Button( button_frame, text="查看完整內容", command=lambda r=result: self.show_full_document_from_result(r) ).pack(side=LEFT, padx=5) ttk.Button( button_frame, text="複製內容", command=lambda d=result['document']: self.copy_to_clipboard(d) ).pack(side=LEFT, padx=5) # 配置滾動區域 canvas.pack(side=LEFT, fill=BOTH, expand=True) scrollbar.pack(side=RIGHT, fill=Y) def show_full_document_from_result(self, result): """從結果直接顯示完整的文檔內容""" # 創建詳細內容窗口 details_window = tk.Toplevel(self.root) details_window.title(f"文檔詳細內容 - {result['id']}") details_window.geometry("800x600") frame = ttk.Frame(details_window, padding=10) frame.pack(fill=BOTH, expand=YES) # 添加文檔信息 info_text = f"文檔ID: {result['id']}\n" if result.get('similarity') is not None: info_text += f"相似度: {result['similarity']:.4f}\n" else: info_text += "相似度: N/A\n" if result['metadata']: info_text += "\n元數據:\n" for key, value in result['metadata'].items(): info_text += f"{key}: {value}\n" ttk.Label(frame, text=info_text, justify=LEFT).pack(anchor=W, pady=(0, 10)) # 添加文檔內容 ttk.Label(frame, text="文檔內容:", justify=LEFT).pack(anchor=W) text_area = tk.Text(frame, wrap=tk.WORD) text_area.insert(tk.END, result['document']) text_area.config(state=tk.DISABLED) scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) text_area.configure(yscrollcommand=scrollbar.set) text_area.pack(side=LEFT, fill=BOTH, expand=YES) scrollbar.pack(side=LEFT, fill=Y) # 添加複製按鈕 ttk.Button( details_window, text="複製內容", command=lambda: self.copy_to_clipboard(result['document']) ).pack(pady=10) def display_visual_view(self, results): """顯示可視化視圖""" # 清空現有內容 for widget in self.visual_view.winfo_children(): widget.destroy() if len(results) == 0: return # 創建框架 figure_frame = ttk.Frame(self.visual_view) figure_frame.pack(fill=BOTH, expand=YES, padx=10, pady=10) # 創建圖表 fig = plt.Figure(figsize=(10, 6), dpi=100) # 相似度柱狀圖 ax1 = fig.add_subplot(121) # 提取數據 ranks = [r["rank"] for r in results] similarities = [r["similarity"] for r in results] # 繪製相似度柱狀圖 bars = ax1.bar(ranks, similarities, color='skyblue') # 添加數據標籤 for bar in bars: height = bar.get_height() ax1.text( bar.get_x() + bar.get_width()/2., height + 0.01, f'{height:.3f}', ha='center', va='bottom', rotation=0, fontsize=8 ) ax1.set_xlabel('排名') ax1.set_ylabel('相似度') ax1.set_title('查詢結果相似度') ax1.set_ylim(0, 1) ax1.set_xticks(ranks) # 相似度曲線圖 ax2 = fig.add_subplot(122) ax2.plot(ranks, similarities, 'o-', color='orange') # 添加數據標籤 for i, (x, y) in enumerate(zip(ranks, similarities)): ax2.text(x, y + 0.02, f'{y:.3f}', ha='center', va='bottom', fontsize=8) ax2.set_xlabel('排名') ax2.set_ylabel('相似度') ax2.set_title('相似度分佈曲線') ax2.set_ylim(0, 1) ax2.set_xticks(ranks) # 調整佈局 fig.tight_layout() # 將圖表嵌入到 Tkinter 窗口 canvas = FigureCanvasTkAgg(fig, figure_frame) canvas.draw() canvas.get_tk_widget().pack(fill=BOTH, expand=YES) def display_compare_view(self, results): """顯示比較視圖""" # 清空現有內容 for widget in self.compare_view.winfo_children(): widget.destroy() if len(results) < 2: ttk.Label( self.compare_view, text="需要至少2個結果才能進行比較", font=("TkDefaultFont", 12) ).pack(pady=20) return # 創建比較視圖 ttk.Label( self.compare_view, text="結果比較", font=("TkDefaultFont", 14, "bold") ).pack(pady=(10, 20)) # 創建比較表格 columns = ["特性"] + [f"#{r['rank']}" for r in results] # 創建框架以包含表格和滾動條 table_frame = ttk.Frame(self.compare_view) table_frame.pack(fill=BOTH, expand=YES, padx=10, pady=10) tree = ttk.Treeview(table_frame, columns=columns, show="headings") for col in columns: tree.heading(col, text=col) tree.column(col, width=100, anchor=CENTER) # 相似度行 tree.insert( "", "end", values=["相似度"] + [f"{r['similarity']:.4f}" for r in results] ) # 文檔ID行 tree.insert( "", "end", values=["文檔ID"] + [r['id'] for r in results] ) # 文檔長度行 tree.insert( "", "end", values=["文檔長度"] + [len(r['document']) for r in results] ) # 從元數據提取共同鍵 all_keys = set() for result in results: if result['metadata']: for key in result['metadata'].keys(): all_keys.add(key) # 為每個元數據鍵添加一行 for key in sorted(all_keys): values = ["元數據: " + key] for result in results: if result['metadata'] and key in result['metadata']: values.append(str(result['metadata'][key])) else: values.append("-") tree.insert("", "end", values=values) # 添加垂直滾動條 vsb = ttk.Scrollbar(table_frame, orient="vertical", command=tree.yview) tree.configure(yscrollcommand=vsb.set) # 添加水平滾動條 hsb = ttk.Scrollbar(table_frame, orient="horizontal", command=tree.xview) tree.configure(xscrollcommand=hsb.set) # 放置表格和滾動條 tree.grid(column=0, row=0, sticky='nsew') vsb.grid(column=1, row=0, sticky='ns') hsb.grid(column=0, row=1, sticky='ew') # 配置表格框架的網格 table_frame.columnconfigure(0, weight=1) table_frame.rowconfigure(0, weight=1) def export_results_dialog(self): """顯示導出結果對話框""" if not self.reader.query_results: messagebox.showinfo("提示", "沒有可導出的結果") return # 詢問導出格式和文件路徑 formats = [ ("CSV 文件", "*.csv"), ("JSON 文件", "*.json"), ("Excel 文件", "*.xlsx") ] file_path = filedialog.asksaveasfilename( title="導出結果", filetypes=formats, defaultextension=".csv" ) if not file_path: return # 確定導出格式 ext = os.path.splitext(file_path)[1].lower() format_map = { ".csv": "csv", ".json": "json", ".xlsx": "excel" } format_type = format_map.get(ext, "csv") # 執行導出 success = self.reader.export_results(file_path, format_type) if success: messagebox.showinfo("導出成功", f"結果已成功導出到: {file_path}") else: messagebox.showerror("導出失敗", "導出結果時發生錯誤") def toggle_theme(self): """切換深色/淺色主題""" if self.current_theme == "darkly": self.current_theme = "cosmo" # 淺色主題 ttk.Style().theme_use("cosmo") else: self.current_theme = "darkly" # 深色主題 ttk.Style().theme_use("darkly") # 保存配置 self.config["theme"] = self.current_theme self.save_config() def show_about(self): """顯示關於對話框""" about_text = "ChromaDB 備份讀取器\n\n" about_text += "版本: 1.0.0\n\n" about_text += "這是一個用於讀取和查詢ChromaDB備份的工具,支持相似度搜索和結果可視化。\n\n" about_text += "功能包括:\n" about_text += "- 讀取備份目錄\n" about_text += "- 查詢集合數據\n" about_text += "- 多種視圖顯示結果\n" about_text += "- 結果導出\n" messagebox.showinfo("關於", about_text) def open_log_file(self): """打開日誌文件""" log_path = "chroma_reader.log" if os.path.exists(log_path): # 創建日誌查看器窗口 log_window = tk.Toplevel(self.root) log_window.title("日誌查看器") log_window.geometry("800x600") frame = ttk.Frame(log_window, padding=10) frame.pack(fill=BOTH, expand=YES) # 添加日誌內容 text_area = tk.Text(frame, wrap=tk.WORD) try: with open(log_path, "r", encoding="utf-8") as f: log_content = f.read() except UnicodeDecodeError: try: with open(log_path, "r", encoding="gbk") as f: log_content = f.read() except: log_content = "無法讀取日誌文件" text_area.insert(tk.END, log_content) text_area.config(state=tk.DISABLED) scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview) text_area.configure(yscrollcommand=scrollbar.set) text_area.pack(side=LEFT, fill=BOTH, expand=YES) scrollbar.pack(side=LEFT, fill=Y) # 添加刷新和清空按鈕 button_frame = ttk.Frame(log_window) button_frame.pack(fill=X, pady=10) ttk.Button( button_frame, text="刷新", command=lambda: self.refresh_log_view(text_area) ).pack(side=LEFT, padx=5) ttk.Button( button_frame, text="清空日誌", command=lambda: self.clear_log_file(text_area) ).pack(side=LEFT, padx=5) else: messagebox.showinfo("提示", "日誌文件不存在") def refresh_log_view(self, text_area): """刷新日誌查看器內容""" log_path = "chroma_reader.log" if os.path.exists(log_path): try: with open(log_path, "r", encoding="utf-8") as f: log_content = f.read() except UnicodeDecodeError: try: with open(log_path, "r", encoding="gbk") as f: log_content = f.read() except: log_content = "無法讀取日誌文件" text_area.config(state=tk.NORMAL) text_area.delete("1.0", tk.END) text_area.insert(tk.END, log_content) text_area.config(state=tk.DISABLED) def clear_log_file(self, text_area): """清空日誌文件""" if messagebox.askyesno("確認", "確定要清空日誌文件嗎?"): log_path = "chroma_reader.log" try: with open(log_path, "w") as f: f.write("") text_area.config(state=tk.NORMAL) text_area.delete("1.0", tk.END) text_area.config(state=tk.DISABLED) messagebox.showinfo("成功", "日誌文件已清空") except Exception as e: messagebox.showerror("錯誤", f"清空日誌文件時出錯: {str(e)}") def load_config(self): """載入配置""" default_config = { "last_backups_dir": "", "theme": "darkly" } if os.path.exists(self.config_path): try: with open(self.config_path, "r", encoding="utf-8") as f: return json.load(f) except: return default_config return default_config def save_config(self): """保存配置""" try: with open(self.config_path, "w", encoding="utf-8") as f: json.dump(self.config, f, indent=4) except Exception as e: self.logger.error(f"保存配置時出錯: {str(e)}") def main(): """程序入口點""" root = ttk.Window(themename="darkly") app = ChromaDBReaderUI(root) root.mainloop() if __name__ == "__main__": main()