1832 lines
76 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import tkinter as tk
from tkinter import filedialog, messagebox
import json
import chromadb
from chromadb.utils import embedding_functions # 新增導入
import datetime
import pandas as pd
import threading
from pathlib import Path
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import ttkbootstrap as ttk
from ttkbootstrap.constants import *
from ttkbootstrap.scrolled import ScrolledFrame
import numpy as np
import logging
from typing import List, Dict, Any, Optional, Union, Tuple
import inspect # 用於檢查函數簽名,判斷是否支持混合搜索
import re # 新增導入 for ID parsing in UI
class ChromaDBReader:
"""ChromaDB備份讀取器的主數據模型"""
def __init__(self):
self.backups_dir = ""
self.backups = [] # 所有備份的列表
self.current_backup = None # 當前選擇的備份
self.current_collection = None # 當前選擇的集合
self.collection_names = [] # 當前備份中的集合列表
self.query_results = [] # 當前查詢結果
self.chroma_client = None # ChromaDB客戶端
self.selected_embedding_model_name = "default" # 用於查詢的嵌入模型
self.query_embedding_function = None # 實例化的查詢嵌入函數, None 表示使用集合內部預設
# 設置日誌
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("chroma_reader.log", encoding='utf-8'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger("ChromaDBReader")
def set_backups_directory(self, directory_path: str) -> bool:
"""設置備份目錄並掃描備份"""
if not os.path.exists(directory_path):
self.logger.error(f"備份目錄不存在: {directory_path}")
return False
self.backups_dir = directory_path
return self.scan_backups()
def scan_backups(self) -> bool:
"""掃描備份目錄中的所有備份"""
self.backups = []
try:
# 查找所有以chroma_backup_開頭的目錄
for item in os.listdir(self.backups_dir):
item_path = os.path.join(self.backups_dir, item)
if os.path.isdir(item_path) and item.startswith("chroma_backup_"):
# 提取備份日期時間
try:
date_str = item.replace("chroma_backup_", "")
date_obj = datetime.datetime.strptime(date_str, "%Y-%m-%d_%H-%M-%S")
backup_info = {
"name": item,
"path": item_path,
"date": date_obj,
"formatted_date": date_obj.strftime("%Y年%m月%d%H:%M:%S")
}
# 檢查是否是有效的ChromaDB目錄
if self._is_valid_chroma_backup(item_path):
self.backups.append(backup_info)
except Exception as e:
self.logger.warning(f"無法解析備份 {item}: {str(e)}")
# 按日期排序,最新的排在前面
self.backups.sort(key=lambda x: x["date"], reverse=True)
self.logger.info(f"找到 {len(self.backups)} 個備份")
return True
except Exception as e:
self.logger.error(f"掃描備份時出錯: {str(e)}")
return False
def _is_valid_chroma_backup(self, backup_path: str) -> bool:
"""檢查目錄是否為有效的ChromaDB備份"""
# 檢查是否存在關鍵ChromaDB文件
sqlite_path = os.path.join(backup_path, "chroma.sqlite3")
return os.path.exists(sqlite_path)
def load_backup(self, backup_index: int) -> bool:
"""加載指定的備份"""
if backup_index < 0 or backup_index >= len(self.backups):
self.logger.error(f"無效的備份索引: {backup_index}")
return False
try:
self.current_backup = self.backups[backup_index]
backup_path = self.current_backup["path"]
# 初始化ChromaDB客戶端
self.chroma_client = chromadb.PersistentClient(path=backup_path)
# 獲取所有集合名稱
self.collection_names = self.chroma_client.list_collections()
self.current_collection = None
self.query_results = []
self.logger.info(f"已加載備份: {self.current_backup['name']}")
self.logger.info(f"找到 {len(self.collection_names)} 個集合")
return True
except Exception as e:
self.logger.error(f"加載備份時出錯: {str(e)}")
self.current_backup = None
self.chroma_client = None
self.collection_names = []
return False
def set_query_embedding_model(self, model_name: str):
"""設置查詢時使用的嵌入模型"""
self.selected_embedding_model_name = model_name
if model_name == "default":
self.query_embedding_function = None # 表示使用集合的內部嵌入函數
self.logger.info("查詢將使用集合內部嵌入模型。")
elif model_name == "all-MiniLM-L6-v2":
try:
# 注意: sentence-transformers 庫需要安裝
self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")
self.logger.info(f"查詢將使用外部嵌入模型: {model_name}")
except Exception as e:
self.logger.error(f"無法加載 SentenceTransformer all-MiniLM-L6-v2: {e}。將使用集合內部模型。")
self.query_embedding_function = None
elif model_name == "paraphrase-multilingual-MiniLM-L12-v2":
try:
# 注意: sentence-transformers 庫需要安裝
self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="paraphrase-multilingual-MiniLM-L12-v2")
self.logger.info(f"查詢將使用外部嵌入模型: {model_name}")
except Exception as e:
self.logger.error(f"無法加載 SentenceTransformer paraphrase-multilingual-MiniLM-L12-v2: {e}。將使用集合內部模型。")
self.query_embedding_function = None
# 添加新的模型支持
elif model_name == "paraphrase-multilingual-mpnet-base-v2":
try:
# 注意: sentence-transformers 庫需要安裝
self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="sentence-transformers/paraphrase-multilingual-mpnet-base-v2")
self.logger.info(f"查詢將使用外部嵌入模型: {model_name}")
except Exception as e:
self.logger.error(f"無法加載 SentenceTransformer paraphrase-multilingual-mpnet-base-v2: {e}。將使用集合內部模型。")
self.query_embedding_function = None
else:
self.logger.warning(f"未知的查詢嵌入模型: {model_name}, 將使用集合內部模型。")
self.query_embedding_function = None
def load_collection(self, collection_name: str) -> bool:
"""加載指定的集合"""
if not self.chroma_client or not collection_name:
return False
try:
# 獲取集合時,如果需要指定 embedding_function (通常在創建時指定)
# 此處是讀取,所以集合的 embedding_function 已經固定
# 我們將在查詢時使用 self.query_embedding_function 來生成 query_embeddings
self.current_collection = self.chroma_client.get_collection(collection_name)
self.logger.info(f"已加載集合: {collection_name}")
return True
except Exception as e:
self.logger.error(f"加載集合時出錯: {str(e)}")
self.current_collection = None
return False
def execute_query(self, query_text: str, n_results: int = 5,
query_type: str = "basic",
where: Dict = None,
where_document: Dict = None,
include: List[str] = None,
metadata_filter: Dict = None,
hybrid_alpha: float = None) -> List[Dict]:
"""執行查詢並返回結果
參數:
query_text: 查詢文本
n_results: 返回結果數量
query_type: 查詢類型 (basic, metadata, hybrid, multi_vector)
where: where 過濾條件
where_document: 文檔內容過濾條件
include: 指定包含的文檔 ID
metadata_filter: 元數據過濾條件
hybrid_alpha: 混合搜索的權重參數0-1之間越大越傾向關鍵詞搜索
"""
if not self.current_collection or not query_text:
return []
try:
query_params = {
"n_results": n_results
}
# 基本查詢處理邏輯
if query_type == "basic":
query_params["query_texts"] = [query_text]
# 多向量查詢(用於比較多個查詢之間的相似性)
elif query_type == "multi_vector":
# 支持以 "|||" 或換行符分隔的多個查詢文本
if "|||" in query_text:
query_texts = [text.strip() for text in query_text.split("|||")]
else:
query_texts = [text.strip() for text in query_text.splitlines() if text.strip()]
query_params["query_texts"] = query_texts
# 添加其他查詢參數
if where:
query_params["where"] = where
if where_document:
query_params["where_document"] = where_document
if include:
query_params["include"] = include
if metadata_filter:
# 直接將元數據過濾條件轉換為 where 條件
if "where" not in query_params:
query_params["where"] = {}
query_params["where"].update(metadata_filter)
# 混合搜索處理
if query_type == "hybrid" and hybrid_alpha is not None:
# 檢查 ChromaDB 版本是否支持混合搜索
if hasattr(self.current_collection, "query") and "alpha" in inspect.signature(self.current_collection.query).parameters:
query_params["alpha"] = hybrid_alpha
# 混合搜索通常需要 query_texts
if "query_texts" not in query_params:
query_params["query_texts"] = [query_text]
else:
self.logger.warning("當前 ChromaDB 版本不支持混合搜索,將使用基本查詢")
query_type = "basic" # 降級為基本查詢
query_params["query_texts"] = [query_text]
elif query_type == "hybrid" and hybrid_alpha is None:
# 如果是混合搜索但未提供 alpha則默認為基本搜索
self.logger.warning("混合搜索未提供 Alpha 值,將使用基本查詢")
query_type = "basic"
query_params["query_texts"] = [query_text]
# 如果 query_type 不是 multi_vector 且 query_texts 未設置,則設置
if query_type not in ["multi_vector", "hybrid"] and "query_texts" not in query_params:
query_params["query_texts"] = [query_text]
# 如果選擇了外部嵌入模型且不是混合查詢,則生成查詢嵌入
if query_type != "hybrid" and \
"query_texts" in query_params and \
self.query_embedding_function:
texts_to_embed = query_params["query_texts"]
try:
# self.query_embedding_function 接受 List[str] 返回 List[List[float]]
generated_embeddings = self.query_embedding_function(texts_to_embed)
if generated_embeddings and all(isinstance(emb, list) for emb in generated_embeddings):
query_params["query_embeddings"] = generated_embeddings
if "query_texts" in query_params: # 確保它存在才刪除
del query_params["query_texts"]
self.logger.info(f"使用 {self.selected_embedding_model_name} 生成了 {len(generated_embeddings)} 個查詢嵌入。")
else:
self.logger.warning(f"未能使用 {self.selected_embedding_model_name} 為所有查詢文本生成有效嵌入。將回退到使用集合預設嵌入函數進行文本查詢。嵌入結果: {generated_embeddings}")
except Exception as e:
self.logger.error(f"使用 {self.selected_embedding_model_name} 生成查詢嵌入時出錯: {e}。將回退到使用集合預設嵌入函數進行文本查詢。")
# 執行查詢
results = self.current_collection.query(**query_params)
# 處理結果
processed_results = []
# 獲取查詢返回的所有結果列表
ids_list = results.get('ids', [[]])
documents_list = results.get('documents', [[]])
metadatas_list = results.get('metadatas', [[]])
distances_list = results.get('distances', [[]])
# 確保列表長度一致,並為空列表提供默認值
num_queries = len(ids_list)
if not documents_list or len(documents_list) != num_queries:
documents_list = [[] for _ in range(num_queries)]
if not metadatas_list or len(metadatas_list) != num_queries:
metadatas_list = [[{}] * len(ids_list[i]) for i in range(num_queries)]
if not distances_list or len(distances_list) != num_queries:
distances_list = [[0.0] * len(ids_list[i]) for i in range(num_queries)]
# 對於多查詢文本的情況,需要分別處理每個查詢的結果
for query_idx, (ids, documents, metadatas, distances) in enumerate(zip(
ids_list,
documents_list,
metadatas_list,
distances_list
)):
# 處理每個查詢結果
for i, (doc_id, document, metadata, distance) in enumerate(zip(
ids, documents,
metadatas if metadatas else [{}] * len(ids), # 再次確保元數據存在
distances if distances else [0.0] * len(ids) # 再次確保距離存在
)):
# 計算相似度分數
similarity = 1.0 - min(float(distance) if distance is not None else 1.0, 1.0)
result_item = {
"rank": i + 1,
"query_index": query_idx,
"id": doc_id,
"document": document,
"metadata": metadata if metadata else {}, # 確保 metadata 是字典
"similarity": similarity,
"distance": float(distance) if distance is not None else 0.0,
"query_type": query_type
}
if query_type == "hybrid":
result_item["hybrid_alpha"] = hybrid_alpha
processed_results.append(result_item)
self.query_results = processed_results
self.logger.info(f"查詢完成,找到 {len(processed_results)} 個結果,查詢類型: {query_type}")
return processed_results
except Exception as e:
self.logger.error(f"執行查詢時出錯: {str(e)}")
self.query_results = []
return []
def get_documents_by_ids(self, doc_ids: List[str]) -> List[Dict]:
"""按文檔ID列表獲取文檔"""
if not self.current_collection:
self.logger.warning("沒有選擇集合,無法按 ID 獲取文檔。")
return []
if not doc_ids:
self.logger.warning("未提供文檔 ID。")
return []
try:
results = self.current_collection.get(
ids=doc_ids,
include=["documents", "metadatas"]
)
processed_results = []
retrieved_ids = results.get('ids', [])
retrieved_documents = results.get('documents', [])
retrieved_metadatas = results.get('metadatas', [])
# 創建一個字典以便快速查找已檢索到的文檔信息
found_docs_map = {}
for i, r_id in enumerate(retrieved_ids):
found_docs_map[r_id] = {
"document": retrieved_documents[i] if i < len(retrieved_documents) else None,
"metadata": retrieved_metadatas[i] if i < len(retrieved_metadatas) else {}
}
rank_counter = 1
for original_id in doc_ids: # 遍歷原始請求的ID以保持某種順序感並標記未找到的
if original_id in found_docs_map:
doc_data = found_docs_map[original_id]
if doc_data["document"] is not None:
processed_results.append({
"rank": rank_counter,
"id": original_id,
"document": doc_data["document"],
"metadata": doc_data["metadata"],
"similarity": None, # Not applicable
"distance": None, # Not applicable
"query_type": "id_lookup"
})
rank_counter += 1
else: # ID 存在但文檔為空(理論上不應發生在 get 中,除非 include 設置問題)
self.logger.warning(f"ID {original_id} 找到但文檔內容為空。")
# else: # ID 未在返回結果中找到,可以選擇不添加到 processed_results 或添加一個標記
# self.logger.info(f"ID {original_id} 未在集合中找到。")
self.query_results = processed_results
self.logger.info(f"按 ID 查詢完成,從請求的 {len(doc_ids)} 個ID中實際找到 {len(processed_results)} 個文檔。")
return processed_results
except Exception as e:
self.logger.error(f"按 ID 獲取文檔時出錯: {str(e)}")
# traceback.print_exc() # For debugging
self.query_results = []
return []
def get_collection_info(self, collection_name: str) -> Dict:
"""獲取集合的詳細信息"""
if not self.chroma_client:
return {}
try:
collection = self.chroma_client.get_collection(collection_name)
count = collection.count()
# 獲取一個樣本來確定向量維度
sample = collection.peek(1)
dimension = len(sample['embeddings'][0]) if 'embeddings' in sample and sample['embeddings'] else "未知"
return {
"name": collection_name,
"document_count": count,
"dimension": dimension
}
except Exception as e:
self.logger.error(f"獲取集合信息時出錯: {str(e)}")
return {
"name": collection_name,
"document_count": "未知",
"dimension": "未知"
}
def export_results(self, file_path: str, format: str = "csv") -> bool:
"""導出查詢結果"""
if not self.query_results:
return False
try:
df = pd.DataFrame(self.query_results)
# 根據格式導出
if format.lower() == "csv":
df.to_csv(file_path, index=False, encoding='utf-8-sig')
elif format.lower() == "json":
df.to_json(file_path, orient='records', force_ascii=False, indent=4)
elif format.lower() == "excel":
df.to_excel(file_path, index=False)
else:
return False
self.logger.info(f"結果已導出到: {file_path}")
return True
except Exception as e:
self.logger.error(f"導出結果時出錯: {str(e)}")
return False
class ChromaDBReaderUI:
"""ChromaDB備份讀取器的用戶界面"""
def __init__(self, root):
self.root = root
self.reader = ChromaDBReader()
# 設置窗口
self.root.title("ChromaDB 備份讀取器")
self.root.geometry("1280x800")
# 初始化嵌入模型相關變量
self.embedding_model_var = tk.StringVar(value="預設 (ChromaDB)") # 顯示名稱
self.embedding_models = {
"預設 (ChromaDB)": "default",
"all-MiniLM-L6-v2 (ST)": "all-MiniLM-L6-v2",
"paraphrase-multilingual-MiniLM-L12-v2 (ST)": "paraphrase-multilingual-MiniLM-L12-v2",
"paraphrase-multilingual-mpnet-base-v2 (ST)": "paraphrase-multilingual-mpnet-base-v2" # 添加新的模型選項
}
self.setup_ui()
# 默認主題
self.current_theme = "darkly" # ttkbootstrap的深色主題
# 存儲配置
self.config_path = os.path.join(str(Path.home()), ".chroma_reader_config.json")
self.config = self.load_config()
# 應用保存的配置
if self.config.get("last_backups_dir"):
self.backups_dir_var.set(self.config["last_backups_dir"])
self.load_backups_directory()
def setup_ui(self):
"""設置用戶界面"""
# 創建主佈局
self.main_frame = ttk.Frame(self.root, padding=10)
self.main_frame.pack(fill=BOTH, expand=YES)
# 左側面板 (備份和集合選擇)
self.left_panel = ttk.Frame(self.main_frame, width=300)
self.left_panel.pack(side=LEFT, fill=Y, padx=(0, 10))
# 右側面板 (查詢和結果)
self.right_panel = ttk.Frame(self.main_frame)
self.right_panel.pack(side=LEFT, fill=BOTH, expand=YES)
# 設置狀態欄 (提前,以確保 self.status_var 在其他地方使用前已定義)
self.setup_status_bar()
# 設置左側面板
self.setup_directory_frame()
self.setup_embedding_model_frame() # 新增嵌入模型選擇框架
self.setup_backups_frame()
self.setup_collections_frame()
# 設置右側面板
self.setup_query_frame()
self.setup_results_frame()
# 設置菜單
self.setup_menu()
def setup_menu(self):
"""設置菜單欄"""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# 文件菜單
file_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="文件", menu=file_menu)
file_menu.add_command(label="選擇備份目錄", command=self.browse_directory)
file_menu.add_command(label="刷新備份列表", command=self.refresh_backups)
file_menu.add_separator()
file_menu.add_command(label="導出結果...", command=self.export_results_dialog)
file_menu.add_separator()
file_menu.add_command(label="退出", command=self.root.quit)
# 視圖菜單
view_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="視圖", menu=view_menu)
view_menu.add_command(label="切換深色/淺色主題", command=self.toggle_theme)
# 幫助菜單
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="幫助", menu=help_menu)
help_menu.add_command(label="關於", command=self.show_about)
help_menu.add_command(label="查看日誌", command=self.open_log_file)
def setup_directory_frame(self):
"""設置目錄選擇框架"""
dir_frame = ttk.LabelFrame(self.left_panel, text="備份目錄", padding=10)
dir_frame.pack(fill=X, pady=(0, 10))
self.backups_dir_var = tk.StringVar()
ttk.Entry(dir_frame, textvariable=self.backups_dir_var).pack(side=LEFT, fill=X, expand=YES)
ttk.Button(dir_frame, text="瀏覽", command=self.browse_directory).pack(side=LEFT, padx=(5, 0))
ttk.Button(dir_frame, text="載入", command=self.load_backups_directory).pack(side=LEFT, padx=(5, 0))
def setup_embedding_model_frame(self):
"""設置查詢嵌入模型選擇框架"""
embedding_frame = ttk.LabelFrame(self.left_panel, text="查詢嵌入模型", padding=10)
embedding_frame.pack(fill=X, pady=(0, 10))
self.embedding_model_combo = ttk.Combobox(
embedding_frame,
textvariable=self.embedding_model_var,
values=list(self.embedding_models.keys()),
state="readonly"
)
self.embedding_model_combo.pack(fill=X, expand=YES)
self.embedding_model_combo.set(list(self.embedding_models.keys())[0]) # 設置預設顯示值
self.embedding_model_combo.bind("<<ComboboxSelected>>", self.on_embedding_model_changed)
# 初始化Reader中的嵌入模型選擇
self.on_embedding_model_changed()
def setup_backups_frame(self):
"""設置備份列表框架"""
backups_frame = ttk.LabelFrame(self.left_panel, text="備份列表", padding=10)
backups_frame.pack(fill=BOTH, expand=YES, pady=(0, 10))
# 備份搜索
search_frame = ttk.Frame(backups_frame)
search_frame.pack(fill=X, pady=(0, 5))
self.backup_search_var = tk.StringVar()
self.backup_search_var.trace("w", self.filter_backups)
ttk.Label(search_frame, text="搜索:").pack(side=LEFT)
ttk.Entry(search_frame, textvariable=self.backup_search_var).pack(side=LEFT, fill=X, expand=YES)
# 備份列表
list_frame = ttk.Frame(backups_frame)
list_frame.pack(fill=BOTH, expand=YES)
columns = ("name", "date")
self.backups_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10)
self.backups_tree.heading("name", text="名稱")
self.backups_tree.heading("date", text="日期")
self.backups_tree.column("name", width=100)
self.backups_tree.column("date", width=150)
scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.backups_tree.yview)
self.backups_tree.configure(yscrollcommand=scrollbar.set)
self.backups_tree.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=LEFT, fill=Y)
self.backups_tree.bind("<<TreeviewSelect>>", self.on_backup_selected)
def setup_collections_frame(self):
"""設置集合列表框架"""
collections_frame = ttk.LabelFrame(self.left_panel, text="集合列表", padding=10)
collections_frame.pack(fill=BOTH, expand=YES)
# 集合搜索
search_frame = ttk.Frame(collections_frame)
search_frame.pack(fill=X, pady=(0, 5))
self.collection_search_var = tk.StringVar()
self.collection_search_var.trace("w", self.filter_collections)
ttk.Label(search_frame, text="搜索:").pack(side=LEFT)
ttk.Entry(search_frame, textvariable=self.collection_search_var).pack(side=LEFT, fill=X, expand=YES)
# 集合列表
list_frame = ttk.Frame(collections_frame)
list_frame.pack(fill=BOTH, expand=YES)
columns = ("name", "count")
self.collections_tree = ttk.Treeview(list_frame, columns=columns, show="headings", height=10)
self.collections_tree.heading("name", text="名稱")
self.collections_tree.heading("count", text="文檔數")
self.collections_tree.column("name", width=150)
self.collections_tree.column("count", width=100)
scrollbar = ttk.Scrollbar(list_frame, orient=VERTICAL, command=self.collections_tree.yview)
self.collections_tree.configure(yscrollcommand=scrollbar.set)
self.collections_tree.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=LEFT, fill=Y)
self.collections_tree.bind("<<TreeviewSelect>>", self.on_collection_selected)
def setup_query_frame(self):
"""設置查詢框架"""
query_frame = ttk.LabelFrame(self.right_panel, text="查詢", padding=10)
query_frame.pack(fill=X, pady=(0, 10))
# 創建一個 Notebook 以包含不同的查詢類型標籤頁
self.query_notebook = ttk.Notebook(query_frame)
self.query_notebook.pack(fill=X, pady=5)
# 基本查詢標籤頁
self.basic_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.basic_query_frame, text="基本查詢")
# 元數據查詢標籤頁
self.metadata_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.metadata_query_frame, text="元數據查詢")
# 混合查詢標籤頁
self.hybrid_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.hybrid_query_frame, text="混合查詢")
# 多向量查詢標籤頁
self.multi_vector_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.multi_vector_frame, text="多向量查詢")
# ID 查詢標籤頁 (新增)
self.id_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.id_query_frame, text="ID 查詢")
# 設置基本查詢頁面
self.setup_basic_query_tab()
# 設置元數據查詢頁面
self.setup_metadata_query_tab()
# 設置混合查詢頁面
self.setup_hybrid_query_tab()
# 設置多向量查詢頁面
self.setup_multi_vector_tab()
# 設置 ID 查詢頁面 (新增)
self.setup_id_query_tab()
# 查詢參數(共用部分)
params_frame = ttk.Frame(query_frame)
params_frame.pack(fill=X)
ttk.Label(params_frame, text="結果數量:").pack(side=LEFT)
self.n_results_var = tk.StringVar(value="5")
ttk.Spinbox(params_frame, from_=1, to=100, textvariable=self.n_results_var, width=5).pack(side=LEFT, padx=(5, 20))
# 查詢按鈕
ttk.Button(
query_frame,
text="執行查詢",
command=self.execute_query, # 注意:這個 execute_query 方法將被新的替換
style="Accent.TButton"
).pack(pady=10)
def setup_basic_query_tab(self):
"""設置基本查詢標籤頁"""
ttk.Label(self.basic_query_frame, text="查詢文本:").pack(anchor=W)
self.basic_query_text = tk.Text(self.basic_query_frame, height=4, width=50)
self.basic_query_text.pack(fill=X, pady=5)
def setup_metadata_query_tab(self):
"""設置元數據查詢標籤頁"""
ttk.Label(self.metadata_query_frame, text="查詢文本:").pack(anchor=W)
self.metadata_query_text = tk.Text(self.metadata_query_frame, height=4, width=50)
self.metadata_query_text.pack(fill=X, pady=5)
ttk.Label(self.metadata_query_frame, text="元數據過濾條件 (JSON 格式):").pack(anchor=W)
self.metadata_filter_text = tk.Text(self.metadata_query_frame, height=4, width=50)
self.metadata_filter_text.pack(fill=X, pady=5)
self.metadata_filter_text.insert("1.0", '{"key": "value"}')
# 添加一個幫助按鈕,顯示元數據過濾語法的說明
ttk.Button(
self.metadata_query_frame,
text="?",
width=2,
command=self.show_metadata_help
).pack(anchor=E)
def setup_hybrid_query_tab(self):
"""設置混合查詢標籤頁"""
ttk.Label(self.hybrid_query_frame, text="查詢文本:").pack(anchor=W)
self.hybrid_query_text = tk.Text(self.hybrid_query_frame, height=4, width=50)
self.hybrid_query_text.pack(fill=X, pady=5)
alpha_frame = ttk.Frame(self.hybrid_query_frame)
alpha_frame.pack(fill=X)
ttk.Label(alpha_frame, text="Alpha 值 (0-1):").pack(side=LEFT)
self.hybrid_alpha_var = tk.DoubleVar(value=0.5)
ttk.Scale(
alpha_frame,
from_=0.0, to=1.0,
variable=self.hybrid_alpha_var,
orient=tk.HORIZONTAL,
length=200
).pack(side=LEFT, padx=5, fill=X, expand=YES)
# 創建一個Label來顯示Scale的當前值
self.hybrid_alpha_label = ttk.Label(alpha_frame, text=f"{self.hybrid_alpha_var.get():.2f}")
self.hybrid_alpha_label.pack(side=LEFT)
# 綁定Scale的變動到更新Label的函數
self.hybrid_alpha_var.trace_add("write", lambda *args: self.hybrid_alpha_label.config(text=f"{self.hybrid_alpha_var.get():.2f}"))
ttk.Label(self.hybrid_query_frame, text="注意: Alpha=0 完全使用向量搜索Alpha=1 完全使用關鍵詞搜索").pack(pady=2)
ttk.Label(self.hybrid_query_frame, text="混合查詢將使用集合原始嵌入模型,忽略上方選擇的查詢嵌入模型。", font=("TkDefaultFont", 8)).pack(pady=2)
def setup_multi_vector_tab(self):
"""設置多向量查詢標籤頁"""
ttk.Label(self.multi_vector_frame, text="多個查詢文本 (每行一個,或使用 ||| 分隔):").pack(anchor=W)
self.multi_vector_text = tk.Text(self.multi_vector_frame, height=6, width=50)
self.multi_vector_text.pack(fill=X, pady=5)
self.multi_vector_text.insert("1.0", "查詢文本 1\n|||查詢文本 2\n|||查詢文本 3")
ttk.Label(self.multi_vector_frame, text="用於比較多個查詢之間的相似性").pack(pady=5)
def setup_id_query_tab(self):
"""設置ID查詢標籤頁"""
ttk.Label(self.id_query_frame, text="文檔 ID (每行一個,或用逗號/空格分隔):").pack(anchor=tk.W)
self.id_query_text = tk.Text(self.id_query_frame, height=6, width=50)
self.id_query_text.pack(fill=tk.X, pady=5)
self.id_query_text.insert("1.0", "id1\nid2,id3 id4") # 示例
ttk.Label(self.id_query_frame, text="此查詢將獲取指定ID的文檔忽略上方“結果數量”設置。").pack(pady=5)
def show_metadata_help(self):
"""顯示元數據過濾語法說明"""
help_text = """元數據過濾語法示例:
基本過濾:
{"category": "文章"} # 精確匹配
範圍過濾:
{"date": {"$gt": "2023-01-01"}} # 大於
{"date": {"$lt": "2023-12-31"}} # 小於
{"count": {"$gte": 10}} # 大於等於
{"count": {"$lte": 100}} # 小於等於
多條件過濾:
{"$and": [{"category": "文章"}, {"author": "張三"}]} # AND 條件
{"$or": [{"category": "文章"}, {"category": "新聞"}]} # OR 條件
注意: 此處語法遵循 ChromaDB 的過濾語法,非標準 JSON 查詢語法。
"""
messagebox.showinfo("元數據過濾語法說明", help_text)
def setup_results_frame(self):
"""設置結果顯示框架"""
self.results_notebook = ttk.Notebook(self.right_panel)
self.results_notebook.pack(fill=BOTH, expand=YES)
# 列表視圖 - 使用標準 Frame 作為容器
list_frame = ttk.Frame(self.results_notebook)
self.results_notebook.add(list_frame, text="列表視圖")
self.list_view = ttk.Frame(list_frame)
self.list_view.pack(fill=BOTH, expand=YES)
# 詳細視圖 - 使用標準 Frame 作為容器
detail_frame = ttk.Frame(self.results_notebook)
self.results_notebook.add(detail_frame, text="詳細視圖")
self.detail_view = ttk.Frame(detail_frame)
self.detail_view.pack(fill=BOTH, expand=YES)
# 可視化視圖
self.visual_view = ttk.Frame(self.results_notebook)
self.results_notebook.add(self.visual_view, text="可視化")
# 比較視圖
self.compare_view = ttk.Frame(self.results_notebook)
self.results_notebook.add(self.compare_view, text="比較視圖")
def setup_status_bar(self):
"""設置狀態欄"""
status_frame = ttk.Frame(self.root)
status_frame.pack(side=BOTTOM, fill=X)
self.status_var = tk.StringVar(value="就緒")
status_label = ttk.Label(status_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=W)
status_label.pack(fill=X)
def on_embedding_model_changed(self, event=None):
"""處理查詢嵌入模型選擇變更事件"""
selected_display_name = self.embedding_model_var.get()
model_name_key = self.embedding_models.get(selected_display_name, "default")
if hasattr(self, 'reader') and self.reader:
self.reader.set_query_embedding_model(model_name_key) # 更新Reader中的模型
# 更新狀態欄提示
if model_name_key == "default":
self.status_var.set("查詢將使用集合內部嵌入模型。")
elif self.reader.query_embedding_function: # 檢查模型是否成功加載
self.status_var.set(f"查詢將使用外部模型: {selected_display_name}")
else: # 加載失敗
self.status_var.set(f"模型 {selected_display_name} 加載失敗/無效,將使用集合內部模型。")
else:
# Reader尚未初始化這通常在UI初始化早期發生
# self.reader.set_query_embedding_model 會在 setup_embedding_model_frame 中首次調用時處理
pass
def browse_directory(self):
"""瀏覽選擇備份目錄"""
directory = filedialog.askdirectory(
title="選擇ChromaDB備份目錄",
initialdir=self.backups_dir_var.get() or str(Path.home())
)
if directory:
self.backups_dir_var.set(directory)
self.load_backups_directory()
def load_backups_directory(self):
"""加載備份目錄"""
directory = self.backups_dir_var.get()
if not directory:
return
self.status_var.set("正在掃描備份...")
self.root.update_idletasks()
if self.reader.set_backups_directory(directory):
self.refresh_backups_list()
self.status_var.set(f"已找到 {len(self.reader.backups)} 個備份")
# 保存配置
self.config["last_backups_dir"] = directory
self.save_config()
else:
self.status_var.set("無法掃描備份目錄")
messagebox.showerror("錯誤", f"無法掃描備份目錄: {directory}")
def refresh_backups(self):
"""刷新備份列表"""
if not self.reader.backups_dir:
messagebox.showinfo("提示", "請先選擇備份目錄")
return
self.status_var.set("正在刷新備份...")
self.root.update_idletasks()
if self.reader.scan_backups():
self.refresh_backups_list()
self.status_var.set(f"已刷新,找到 {len(self.reader.backups)} 個備份")
else:
self.status_var.set("刷新備份失敗")
messagebox.showerror("錯誤", "無法刷新備份列表")
def refresh_backups_list(self):
"""刷新備份列表顯示"""
# 清空列表
for item in self.backups_tree.get_children():
self.backups_tree.delete(item)
# 添加備份
for backup in self.reader.backups:
self.backups_tree.insert(
"", "end",
values=(backup["name"], backup["formatted_date"])
)
def filter_backups(self, *args):
"""根據搜索條件過濾備份列表"""
search_text = self.backup_search_var.get().lower()
# 清空列表
for item in self.backups_tree.get_children():
self.backups_tree.delete(item)
# 添加匹配的備份
for backup in self.reader.backups:
if search_text in backup["name"].lower() or search_text in backup["formatted_date"].lower():
self.backups_tree.insert(
"", "end",
values=(backup["name"], backup["formatted_date"])
)
def on_backup_selected(self, event):
"""處理備份選擇事件"""
selection = self.backups_tree.selection()
if not selection:
return
# 獲取選定項的索引
item_id = selection[0]
# item_index = self.backups_tree.index(item_id) # 這個索引是相對於當前顯示的項目的
# 直接從 Treeview item 中獲取備份名稱,然後在 self.reader.backups 中查找
try:
backup_name_from_tree = self.backups_tree.item(item_id)["values"][0]
except IndexError:
self.logger.error("無法從 Treeview 獲取備份名稱")
return
actual_backup_index = -1
for i, backup_info in enumerate(self.reader.backups):
if backup_info["name"] == backup_name_from_tree:
actual_backup_index = i
break
if actual_backup_index == -1:
self.logger.error(f"在備份列表中未找到名為 {backup_name_from_tree} 的備份")
return
# 載入備份
self.status_var.set(f"正在載入備份: {backup_name_from_tree}...")
self.root.update_idletasks()
# 確保 Reader 中的嵌入模型是最新的 (雖然 on_embedding_model_changed 應該已經處理了)
# selected_display_name = self.embedding_model_var.get()
# model_key = self.embedding_models.get(selected_display_name, "default")
# self.reader.set_query_embedding_model(model_key) # 這行不需要,因為模型選擇是獨立的
def load_backup_thread():
# load_backup 不再需要 embedding_model_name 參數,因為嵌入模型選擇是針對查詢的
success = self.reader.load_backup(actual_backup_index)
self.root.after(0, lambda: self.finalize_backup_loading(success, backup_name_from_tree))
threading.Thread(target=load_backup_thread).start()
def finalize_backup_loading(self, success: bool, backup_name: str):
"""完成備份載入處理"""
if success:
self.refresh_collections_list()
self.status_var.set(f"已載入備份: {backup_name}")
else:
self.status_var.set(f"載入備份失敗: {backup_name}")
messagebox.showerror("錯誤", f"無法載入備份: {backup_name}")
def refresh_collections_list(self):
"""刷新集合列表顯示"""
# 清空列表
for item in self.collections_tree.get_children():
self.collections_tree.delete(item)
# 添加集合
for collection in self.reader.collection_names:
info = self.reader.get_collection_info(collection.name)
self.collections_tree.insert(
"", "end",
values=(collection.name, info["document_count"])
)
def filter_collections(self, *args):
"""根據搜索條件過濾集合列表"""
search_text = self.collection_search_var.get().lower()
# 清空列表
for item in self.collections_tree.get_children():
self.collections_tree.delete(item)
# 添加匹配的集合
for collection in self.reader.collection_names:
if search_text in collection.name.lower():
info = self.reader.get_collection_info(collection.name)
self.collections_tree.insert(
"", "end",
values=(collection.name, info["document_count"])
)
def on_collection_selected(self, event):
"""處理集合選擇事件"""
selection = self.collections_tree.selection()
if not selection:
return
# 獲取選定項的集合名稱
item_id = selection[0]
collection_name = self.collections_tree.item(item_id)["values"][0]
# 載入集合
self.status_var.set(f"正在載入集合: {collection_name}...")
self.root.update_idletasks()
def load_collection_thread():
success = self.reader.load_collection(collection_name)
self.root.after(0, lambda: self.finalize_collection_loading(success, collection_name))
threading.Thread(target=load_collection_thread).start()
def finalize_collection_loading(self, success: bool, collection_name: str):
"""完成集合載入處理"""
if success:
self.status_var.set(f"已載入集合: {collection_name}")
# 獲取集合詳細信息並顯示
info = self.reader.get_collection_info(collection_name)
info_text = f"集合: {info['name']}\n文檔數: {info['document_count']}\n向量維度: {info['dimension']}"
# messagebox.showinfo("集合信息", info_text) # 暫時註解掉,避免每次選集合都彈窗
else:
self.status_var.set(f"載入集合失敗: {collection_name}")
messagebox.showerror("錯誤", f"無法載入集合: {collection_name}")
def execute_query(self):
"""執行向量查詢"""
if not self.reader.current_collection:
messagebox.showinfo("提示", "請先選擇一個集合")
return
# 根據當前選擇的標籤頁確定查詢類型
try:
current_tab_widget = self.query_notebook.nametowidget(self.query_notebook.select())
if current_tab_widget == self.basic_query_frame:
current_tab = 0
elif current_tab_widget == self.metadata_query_frame:
current_tab = 1
elif current_tab_widget == self.hybrid_query_frame:
current_tab = 2
elif current_tab_widget == self.multi_vector_frame:
current_tab = 3
elif current_tab_widget == self.id_query_frame: # 新增 ID 查詢頁判斷
current_tab = 4
else:
messagebox.showerror("錯誤", "未知的查詢標籤頁")
return
except tk.TclError: # Notebook可能還沒有任何分頁被選中
messagebox.showerror("錯誤", "請選擇一個查詢類型標籤頁")
return
# 獲取查詢參數
try:
n_results = int(self.n_results_var.get())
except ValueError:
messagebox.showerror("錯誤", "結果數量必須是整數")
return
# 執行不同類型的查詢
if current_tab == 0: # 基本查詢
query_text = self.basic_query_text.get("1.0", tk.END).strip()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
return
self.status_var.set("正在執行基本查詢...")
self.execute_basic_query(query_text, n_results)
elif current_tab == 1: # 元數據查詢
query_text = self.metadata_query_text.get("1.0", tk.END).strip()
metadata_filter_text = self.metadata_filter_text.get("1.0", tk.END).strip()
if not query_text: # 元數據查詢的文本也可以是空的如果只想用metadata_filter
# messagebox.showinfo("提示", "請輸入查詢文本")
# return
pass # 允許空查詢文本
try:
metadata_filter = json.loads(metadata_filter_text) if metadata_filter_text else None
except json.JSONDecodeError:
messagebox.showerror("錯誤", "元數據過濾條件必須是有效的 JSON 格式")
return
if not query_text and not metadata_filter:
messagebox.showinfo("提示", "請輸入查詢文本或元數據過濾條件")
return
self.status_var.set("正在執行元數據查詢...")
self.execute_metadata_query(query_text, n_results, metadata_filter)
elif current_tab == 2: # 混合查詢
query_text = self.hybrid_query_text.get("1.0", tk.END).strip()
hybrid_alpha = self.hybrid_alpha_var.get()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
return
self.status_var.set("正在執行混合查詢...")
self.execute_hybrid_query(query_text, n_results, hybrid_alpha)
elif current_tab == 3: # 多向量查詢
query_text = self.multi_vector_text.get("1.0", tk.END).strip()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
return
self.status_var.set("正在執行多向量查詢...")
self.execute_multi_vector_query(query_text, n_results)
elif current_tab == 4: # ID 查詢
id_input_str = self.id_query_text.get("1.0", tk.END).strip()
if not id_input_str:
messagebox.showinfo("提示", "請輸入文檔 ID。")
return
# 解析 ID: 支持逗號、空格、換行符分隔
doc_ids = [id_val.strip() for id_val in re.split(r'[,\s\n]+', id_input_str) if id_val.strip()]
if not doc_ids:
messagebox.showinfo("提示", "未解析到有效的文檔 ID。")
return
self.status_var.set("正在按 ID 獲取文檔...")
self.execute_id_lookup_query(doc_ids)
def execute_basic_query(self, query_text, n_results):
"""執行基本查詢"""
self.status_var.set(f"正在執行基本查詢: {query_text[:30]}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="basic"
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_metadata_query(self, query_text, n_results, metadata_filter):
"""執行元數據查詢"""
self.status_var.set(f"正在執行元數據查詢: {query_text[:30]}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="metadata", # 這裡應該是 "metadata" 但後端邏輯會轉為 where
metadata_filter=metadata_filter
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_hybrid_query(self, query_text, n_results, hybrid_alpha):
"""執行混合查詢"""
self.status_var.set(f"正在執行混合查詢 (α={hybrid_alpha:.2f}): {query_text[:30]}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="hybrid",
hybrid_alpha=hybrid_alpha
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_multi_vector_query(self, query_text, n_results):
"""執行多向量查詢"""
self.status_var.set(f"正在執行多向量查詢: {query_text.splitlines()[0][:30] if query_text.splitlines() else ''}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="multi_vector"
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_id_lookup_query(self, doc_ids: List[str]):
"""執行ID查找查詢"""
self.status_var.set(f"正在按 ID 獲取 {len(doc_ids)} 個文檔...")
self.root.update_idletasks()
def query_thread():
results = self.reader.get_documents_by_ids(doc_ids)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def display_results(self, results):
"""顯示查詢結果"""
if not results:
self.status_var.set("查詢完成,未找到結果")
messagebox.showinfo("查詢結果", "未找到匹配的結果")
return
self.status_var.set(f"查詢完成,找到 {len(results)} 個結果")
# 清空所有視圖 (這部分由各個顯示函數內部處理)
# 顯示列表視圖
self.display_list_view(results)
# 顯示詳細視圖
self.display_detail_view(results)
# 顯示可視化視圖
self.display_visual_view(results)
# 顯示比較視圖
self.display_compare_view(results)
def display_list_view(self, results):
"""顯示列表視圖"""
# 清空現有內容
for widget in self.list_view.winfo_children():
widget.destroy()
# 創建表格
columns = ("rank", "similarity", "query_type", "id", "document")
tree = ttk.Treeview(self.list_view, columns=columns, show="headings")
tree.heading("rank", text="#")
tree.heading("similarity", text="相似度")
tree.heading("query_type", text="查詢類型")
tree.heading("id", text="文檔ID")
tree.heading("document", text="文檔內容")
tree.column("rank", width=50, anchor=CENTER)
tree.column("similarity", width=100, anchor=CENTER)
tree.column("query_type", width=120, anchor=CENTER) # 調整寬度以適應更長的類型名稱
tree.column("id", width=150)
tree.column("document", width=530) # 調整寬度
# 確定查詢類型名稱映射
query_type_names = {
"basic": "基本查詢",
"metadata": "元數據查詢",
"hybrid": "混合查詢",
"multi_vector": "多向量查詢",
"id_lookup": "ID 查詢" # 新增
}
# 添加結果到表格
for result in results:
raw_query_type = result.get("query_type", "basic")
display_query_type = query_type_names.get(raw_query_type, raw_query_type.capitalize())
if raw_query_type == "hybrid" and "hybrid_alpha" in result:
display_query_type += f" (α={result['hybrid_alpha']:.2f})"
if raw_query_type == "multi_vector" and "query_index" in result:
display_query_type += f" (Q{result['query_index']+1})"
similarity_display = f"{result.get('similarity', 0.0):.4f}" if result.get('similarity') is not None else "N/A"
tree.insert(
"", "end",
values=(
result.get("rank", "-"),
similarity_display,
display_query_type,
result.get("id", "N/A"),
result.get("document", "")[:100] + ("..." if len(result.get("document", "")) > 100 else "")
)
)
# 添加滾動條
scrollbar = ttk.Scrollbar(self.list_view, orient=VERTICAL, command=tree.yview)
tree.configure(yscrollcommand=scrollbar.set)
# 雙擊項目顯示完整內容
tree.bind("<Double-1>", lambda event: self.show_full_document(tree))
# 佈局
tree.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=RIGHT, fill=Y)
def show_full_document(self, tree):
"""顯示完整的文檔內容"""
selection = tree.selection()
if not selection:
return
item_id = selection[0]
rank_str = tree.item(item_id)["values"][0]
try:
rank = int(rank_str)
if 1 <= rank <= len(self.reader.query_results):
result = self.reader.query_results[rank - 1]
# 創建詳細內容窗口
details_window = tk.Toplevel(self.root)
details_window.title(f"文檔詳細內容 - {result['id']}")
details_window.geometry("800x600")
frame = ttk.Frame(details_window, padding=10)
frame.pack(fill=BOTH, expand=YES)
# 添加文檔信息
info_text = f"文檔ID: {result['id']}\n"
if result.get('similarity') is not None:
info_text += f"相似度: {result['similarity']:.4f}\n"
else:
info_text += "相似度: N/A\n"
if result['metadata']:
info_text += "\n元數據:\n"
for key, value in result['metadata'].items():
info_text += f"{key}: {value}\n"
ttk.Label(frame, text=info_text, justify=LEFT).pack(anchor=W, pady=(0, 10))
# 添加文檔內容
ttk.Label(frame, text="文檔內容:", justify=LEFT).pack(anchor=W)
text_area = tk.Text(frame, wrap=tk.WORD)
text_area.insert(tk.END, result['document'])
text_area.config(state=tk.DISABLED)
scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview)
text_area.configure(yscrollcommand=scrollbar.set)
text_area.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=LEFT, fill=Y)
# 添加複製按鈕
ttk.Button(
details_window,
text="複製內容",
command=lambda: self.copy_to_clipboard(result['document'])
).pack(pady=10)
except (ValueError, IndexError):
pass
def copy_to_clipboard(self, text):
"""複製文本到剪貼板"""
self.root.clipboard_clear()
self.root.clipboard_append(text)
self.status_var.set("已複製到剪貼板")
def display_detail_view(self, results):
"""顯示詳細視圖"""
# 清空現有內容
for widget in self.detail_view.winfo_children():
widget.destroy()
# 創建滾動區域
canvas = tk.Canvas(self.detail_view)
scrollbar = ttk.Scrollbar(self.detail_view, orient="vertical", command=canvas.yview)
scrollable_frame = ttk.Frame(canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
# 為每個結果創建一個卡片
for i, result in enumerate(results):
# 創建卡片框架
card = ttk.Frame(scrollable_frame, padding=10, relief="solid", borderwidth=1)
card.pack(fill=X, padx=10, pady=5, anchor=W)
# 卡片標題
title_frame = ttk.Frame(card)
title_frame.pack(fill=X)
similarity_text_detail = f"{result['similarity']:.4f}" if result.get('similarity') is not None else "N/A"
ttk.Label(
title_frame,
text=f"#{result['rank']} - 相似度: {similarity_text_detail}",
font=("TkDefaultFont", 10, "bold")
).pack(side=LEFT)
ttk.Label(
title_frame,
text=f"ID: {result['id']}",
font=("TkDefaultFont", 8)
).pack(side=RIGHT)
ttk.Separator(card, orient=HORIZONTAL).pack(fill=X, pady=5)
# 文檔內容
content_frame = ttk.Frame(card)
content_frame.pack(fill=X)
doc_text = tk.Text(content_frame, wrap=tk.WORD, height=4)
doc_text.insert(tk.END, result['document'])
doc_text.config(state=tk.DISABLED)
doc_text.pack(fill=X)
# 如果有元數據,顯示元數據
if result['metadata'] and len(result['metadata']) > 0:
ttk.Separator(card, orient=HORIZONTAL).pack(fill=X, pady=5)
metadata_frame = ttk.Frame(card)
metadata_frame.pack(fill=X)
ttk.Label(
metadata_frame,
text="元數據:",
font=("TkDefaultFont", 9)
).pack(anchor=W)
for key, value in result['metadata'].items():
ttk.Label(
metadata_frame,
text=f"{key}: {value}",
font=("TkDefaultFont", 8)
).pack(anchor=W, padx=10)
# 操作按鈕
button_frame = ttk.Frame(card)
button_frame.pack(fill=X, pady=(5, 0))
ttk.Button(
button_frame,
text="查看完整內容",
command=lambda r=result: self.show_full_document_from_result(r)
).pack(side=LEFT, padx=5)
ttk.Button(
button_frame,
text="複製內容",
command=lambda d=result['document']: self.copy_to_clipboard(d)
).pack(side=LEFT, padx=5)
# 配置滾動區域
canvas.pack(side=LEFT, fill=BOTH, expand=True)
scrollbar.pack(side=RIGHT, fill=Y)
def show_full_document_from_result(self, result):
"""從結果直接顯示完整的文檔內容"""
# 創建詳細內容窗口
details_window = tk.Toplevel(self.root)
details_window.title(f"文檔詳細內容 - {result['id']}")
details_window.geometry("800x600")
frame = ttk.Frame(details_window, padding=10)
frame.pack(fill=BOTH, expand=YES)
# 添加文檔信息
info_text = f"文檔ID: {result['id']}\n"
if result.get('similarity') is not None:
info_text += f"相似度: {result['similarity']:.4f}\n"
else:
info_text += "相似度: N/A\n"
if result['metadata']:
info_text += "\n元數據:\n"
for key, value in result['metadata'].items():
info_text += f"{key}: {value}\n"
ttk.Label(frame, text=info_text, justify=LEFT).pack(anchor=W, pady=(0, 10))
# 添加文檔內容
ttk.Label(frame, text="文檔內容:", justify=LEFT).pack(anchor=W)
text_area = tk.Text(frame, wrap=tk.WORD)
text_area.insert(tk.END, result['document'])
text_area.config(state=tk.DISABLED)
scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview)
text_area.configure(yscrollcommand=scrollbar.set)
text_area.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=LEFT, fill=Y)
# 添加複製按鈕
ttk.Button(
details_window,
text="複製內容",
command=lambda: self.copy_to_clipboard(result['document'])
).pack(pady=10)
def display_visual_view(self, results):
"""顯示可視化視圖"""
# 清空現有內容
for widget in self.visual_view.winfo_children():
widget.destroy()
if len(results) == 0:
return
# 創建框架
figure_frame = ttk.Frame(self.visual_view)
figure_frame.pack(fill=BOTH, expand=YES, padx=10, pady=10)
# 創建圖表
fig = plt.Figure(figsize=(10, 6), dpi=100)
# 相似度柱狀圖
ax1 = fig.add_subplot(121)
# 提取數據
ranks = [r["rank"] for r in results]
similarities = [r["similarity"] for r in results]
# 繪製相似度柱狀圖
bars = ax1.bar(ranks, similarities, color='skyblue')
# 添加數據標籤
for bar in bars:
height = bar.get_height()
ax1.text(
bar.get_x() + bar.get_width()/2.,
height + 0.01,
f'{height:.3f}',
ha='center', va='bottom',
rotation=0,
fontsize=8
)
ax1.set_xlabel('排名')
ax1.set_ylabel('相似度')
ax1.set_title('查詢結果相似度')
ax1.set_ylim(0, 1)
ax1.set_xticks(ranks)
# 相似度曲線圖
ax2 = fig.add_subplot(122)
ax2.plot(ranks, similarities, 'o-', color='orange')
# 添加數據標籤
for i, (x, y) in enumerate(zip(ranks, similarities)):
ax2.text(x, y + 0.02, f'{y:.3f}', ha='center', va='bottom', fontsize=8)
ax2.set_xlabel('排名')
ax2.set_ylabel('相似度')
ax2.set_title('相似度分佈曲線')
ax2.set_ylim(0, 1)
ax2.set_xticks(ranks)
# 調整佈局
fig.tight_layout()
# 將圖表嵌入到 Tkinter 窗口
canvas = FigureCanvasTkAgg(fig, figure_frame)
canvas.draw()
canvas.get_tk_widget().pack(fill=BOTH, expand=YES)
def display_compare_view(self, results):
"""顯示比較視圖"""
# 清空現有內容
for widget in self.compare_view.winfo_children():
widget.destroy()
if len(results) < 2:
ttk.Label(
self.compare_view,
text="需要至少2個結果才能進行比較",
font=("TkDefaultFont", 12)
).pack(pady=20)
return
# 創建比較視圖
ttk.Label(
self.compare_view,
text="結果比較",
font=("TkDefaultFont", 14, "bold")
).pack(pady=(10, 20))
# 創建比較表格
columns = ["特性"] + [f"#{r['rank']}" for r in results]
# 創建框架以包含表格和滾動條
table_frame = ttk.Frame(self.compare_view)
table_frame.pack(fill=BOTH, expand=YES, padx=10, pady=10)
tree = ttk.Treeview(table_frame, columns=columns, show="headings")
for col in columns:
tree.heading(col, text=col)
tree.column(col, width=100, anchor=CENTER)
# 相似度行
tree.insert(
"", "end",
values=["相似度"] + [f"{r['similarity']:.4f}" for r in results]
)
# 文檔ID行
tree.insert(
"", "end",
values=["文檔ID"] + [r['id'] for r in results]
)
# 文檔長度行
tree.insert(
"", "end",
values=["文檔長度"] + [len(r['document']) for r in results]
)
# 從元數據提取共同鍵
all_keys = set()
for result in results:
if result['metadata']:
for key in result['metadata'].keys():
all_keys.add(key)
# 為每個元數據鍵添加一行
for key in sorted(all_keys):
values = ["元數據: " + key]
for result in results:
if result['metadata'] and key in result['metadata']:
values.append(str(result['metadata'][key]))
else:
values.append("-")
tree.insert("", "end", values=values)
# 添加垂直滾動條
vsb = ttk.Scrollbar(table_frame, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=vsb.set)
# 添加水平滾動條
hsb = ttk.Scrollbar(table_frame, orient="horizontal", command=tree.xview)
tree.configure(xscrollcommand=hsb.set)
# 放置表格和滾動條
tree.grid(column=0, row=0, sticky='nsew')
vsb.grid(column=1, row=0, sticky='ns')
hsb.grid(column=0, row=1, sticky='ew')
# 配置表格框架的網格
table_frame.columnconfigure(0, weight=1)
table_frame.rowconfigure(0, weight=1)
def export_results_dialog(self):
"""顯示導出結果對話框"""
if not self.reader.query_results:
messagebox.showinfo("提示", "沒有可導出的結果")
return
# 詢問導出格式和文件路徑
formats = [
("CSV 文件", "*.csv"),
("JSON 文件", "*.json"),
("Excel 文件", "*.xlsx")
]
file_path = filedialog.asksaveasfilename(
title="導出結果",
filetypes=formats,
defaultextension=".csv"
)
if not file_path:
return
# 確定導出格式
ext = os.path.splitext(file_path)[1].lower()
format_map = {
".csv": "csv",
".json": "json",
".xlsx": "excel"
}
format_type = format_map.get(ext, "csv")
# 執行導出
success = self.reader.export_results(file_path, format_type)
if success:
messagebox.showinfo("導出成功", f"結果已成功導出到: {file_path}")
else:
messagebox.showerror("導出失敗", "導出結果時發生錯誤")
def toggle_theme(self):
"""切換深色/淺色主題"""
if self.current_theme == "darkly":
self.current_theme = "cosmo" # 淺色主題
ttk.Style().theme_use("cosmo")
else:
self.current_theme = "darkly" # 深色主題
ttk.Style().theme_use("darkly")
# 保存配置
self.config["theme"] = self.current_theme
self.save_config()
def show_about(self):
"""顯示關於對話框"""
about_text = "ChromaDB 備份讀取器\n\n"
about_text += "版本: 1.0.0\n\n"
about_text += "這是一個用於讀取和查詢ChromaDB備份的工具支持相似度搜索和結果可視化。\n\n"
about_text += "功能包括:\n"
about_text += "- 讀取備份目錄\n"
about_text += "- 查詢集合數據\n"
about_text += "- 多種視圖顯示結果\n"
about_text += "- 結果導出\n"
messagebox.showinfo("關於", about_text)
def open_log_file(self):
"""打開日誌文件"""
log_path = "chroma_reader.log"
if os.path.exists(log_path):
# 創建日誌查看器窗口
log_window = tk.Toplevel(self.root)
log_window.title("日誌查看器")
log_window.geometry("800x600")
frame = ttk.Frame(log_window, padding=10)
frame.pack(fill=BOTH, expand=YES)
# 添加日誌內容
text_area = tk.Text(frame, wrap=tk.WORD)
try:
with open(log_path, "r", encoding="utf-8") as f:
log_content = f.read()
except UnicodeDecodeError:
try:
with open(log_path, "r", encoding="gbk") as f:
log_content = f.read()
except:
log_content = "無法讀取日誌文件"
text_area.insert(tk.END, log_content)
text_area.config(state=tk.DISABLED)
scrollbar = ttk.Scrollbar(frame, orient=VERTICAL, command=text_area.yview)
text_area.configure(yscrollcommand=scrollbar.set)
text_area.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=LEFT, fill=Y)
# 添加刷新和清空按鈕
button_frame = ttk.Frame(log_window)
button_frame.pack(fill=X, pady=10)
ttk.Button(
button_frame,
text="刷新",
command=lambda: self.refresh_log_view(text_area)
).pack(side=LEFT, padx=5)
ttk.Button(
button_frame,
text="清空日誌",
command=lambda: self.clear_log_file(text_area)
).pack(side=LEFT, padx=5)
else:
messagebox.showinfo("提示", "日誌文件不存在")
def refresh_log_view(self, text_area):
"""刷新日誌查看器內容"""
log_path = "chroma_reader.log"
if os.path.exists(log_path):
try:
with open(log_path, "r", encoding="utf-8") as f:
log_content = f.read()
except UnicodeDecodeError:
try:
with open(log_path, "r", encoding="gbk") as f:
log_content = f.read()
except:
log_content = "無法讀取日誌文件"
text_area.config(state=tk.NORMAL)
text_area.delete("1.0", tk.END)
text_area.insert(tk.END, log_content)
text_area.config(state=tk.DISABLED)
def clear_log_file(self, text_area):
"""清空日誌文件"""
if messagebox.askyesno("確認", "確定要清空日誌文件嗎?"):
log_path = "chroma_reader.log"
try:
with open(log_path, "w") as f:
f.write("")
text_area.config(state=tk.NORMAL)
text_area.delete("1.0", tk.END)
text_area.config(state=tk.DISABLED)
messagebox.showinfo("成功", "日誌文件已清空")
except Exception as e:
messagebox.showerror("錯誤", f"清空日誌文件時出錯: {str(e)}")
def load_config(self):
"""載入配置"""
default_config = {
"last_backups_dir": "",
"theme": "darkly"
}
if os.path.exists(self.config_path):
try:
with open(self.config_path, "r", encoding="utf-8") as f:
return json.load(f)
except:
return default_config
return default_config
def save_config(self):
"""保存配置"""
try:
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(self.config, f, indent=4)
except Exception as e:
self.logger.error(f"保存配置時出錯: {str(e)}")
def main():
"""程序入口點"""
root = ttk.Window(themename="darkly")
app = ChromaDBReaderUI(root)
root.mainloop()
if __name__ == "__main__":
main()