Fix encoding issues, enhance ChromaDB reader with ID query and embedding model selection

This commit is contained in:
z060142 2025-05-09 11:29:56 +08:00
parent 2a68f04e87
commit 65df12a20e
3 changed files with 729 additions and 84 deletions

View File

@ -946,13 +946,18 @@ class WolfChatSetup(tk.Tk):
logger.info(f"Starting bot: {sys.executable} {bot_script_name}")
# Ensure CWD is script's directory if main.py relies on relative paths
script_dir = os.path.dirname(os.path.abspath(__file__))
current_env = os.environ.copy()
current_env["PYTHONIOENCODING"] = "utf-8"
self.bot_process_instance = subprocess.Popen(
[sys.executable, bot_script_name],
cwd=script_dir, # Run main.py from its directory
stdout=subprocess.PIPE, # Capture output
stderr=subprocess.STDOUT, # Redirect stderr to stdout
text=True,
bufsize=1 # Line buffered
encoding='utf-8', # Specify UTF-8 encoding
errors='replace', # Handle potential encoding errors
bufsize=1, # Line buffered
env=current_env # Set PYTHONIOENCODING
)
bot_process_instance = self.bot_process_instance # Update global
@ -1852,7 +1857,21 @@ class WolfChatSetup(tk.Tk):
messagebox.showwarning("Already Running", "Another process is already running. Please stop it first.")
return
self.running_process = subprocess.Popen([sys.executable, "main.py"])
# Run main.py, capturing output with UTF-8 encoding and setting PYTHONIOENCODING
current_env = os.environ.copy()
current_env["PYTHONIOENCODING"] = "utf-8"
self.running_process = subprocess.Popen(
[sys.executable, "main.py"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding='utf-8',
errors='replace',
bufsize=1,
env=current_env # Set PYTHONIOENCODING
)
# Start a thread to log bot's output for this independent run as well
threading.Thread(target=self._log_subprocess_output, args=(self.running_process, "ChatBot"), daemon=True).start()
print("Attempting to start main.py...")
self.update_run_button_states(False) # Disable run buttons, enable stop
except Exception as e:

View File

@ -3,6 +3,7 @@ import tkinter as tk
from tkinter import filedialog, messagebox
import json
import chromadb
from chromadb.utils import embedding_functions # 新增導入
import datetime
import pandas as pd
import threading
@ -15,6 +16,8 @@ from ttkbootstrap.scrolled import ScrolledFrame
import numpy as np
import logging
from typing import List, Dict, Any, Optional, Union, Tuple
import inspect # 用於檢查函數簽名,判斷是否支持混合搜索
import re # 新增導入 for ID parsing in UI
class ChromaDBReader:
"""ChromaDB備份讀取器的主數據模型"""
@ -28,6 +31,9 @@ class ChromaDBReader:
self.query_results = [] # 當前查詢結果
self.chroma_client = None # ChromaDB客戶端
self.selected_embedding_model_name = "default" # 用於查詢的嵌入模型
self.query_embedding_function = None # 實例化的查詢嵌入函數, None 表示使用集合內部預設
# 設置日誌
logging.basicConfig(
level=logging.INFO,
@ -119,12 +125,41 @@ class ChromaDBReader:
self.collection_names = []
return False
def set_query_embedding_model(self, model_name: str):
"""設置查詢時使用的嵌入模型"""
self.selected_embedding_model_name = model_name
if model_name == "default":
self.query_embedding_function = None # 表示使用集合的內部嵌入函數
self.logger.info("查詢將使用集合內部嵌入模型。")
elif model_name == "all-MiniLM-L6-v2":
try:
# 注意: sentence-transformers 庫需要安裝
self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-MiniLM-L6-v2")
self.logger.info(f"查詢將使用外部嵌入模型: {model_name}")
except Exception as e:
self.logger.error(f"無法加載 SentenceTransformer all-MiniLM-L6-v2: {e}。將使用集合內部模型。")
self.query_embedding_function = None
elif model_name == "paraphrase-multilingual-MiniLM-L12-v2":
try:
# 注意: sentence-transformers 庫需要安裝
self.query_embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="paraphrase-multilingual-MiniLM-L12-v2")
self.logger.info(f"查詢將使用外部嵌入模型: {model_name}")
except Exception as e:
self.logger.error(f"無法加載 SentenceTransformer paraphrase-multilingual-MiniLM-L12-v2: {e}。將使用集合內部模型。")
self.query_embedding_function = None
else:
self.logger.warning(f"未知的查詢嵌入模型: {model_name}, 將使用集合內部模型。")
self.query_embedding_function = None
def load_collection(self, collection_name: str) -> bool:
"""加載指定的集合"""
if not self.chroma_client or not collection_name:
return False
try:
# 獲取集合時,如果需要指定 embedding_function (通常在創建時指定)
# 此處是讀取,所以集合的 embedding_function 已經固定
# 我們將在查詢時使用 self.query_embedding_function 來生成 query_embeddings
self.current_collection = self.chroma_client.get_collection(collection_name)
self.logger.info(f"已加載集合: {collection_name}")
return True
@ -133,40 +168,156 @@ class ChromaDBReader:
self.current_collection = None
return False
def execute_query(self, query_text: str, n_results: int = 5) -> List[Dict]:
"""執行查詢並返回結果"""
def execute_query(self, query_text: str, n_results: int = 5,
query_type: str = "basic",
where: Dict = None,
where_document: Dict = None,
include: List[str] = None,
metadata_filter: Dict = None,
hybrid_alpha: float = None) -> List[Dict]:
"""執行查詢並返回結果
參數:
query_text: 查詢文本
n_results: 返回結果數量
query_type: 查詢類型 (basic, metadata, hybrid, multi_vector)
where: where 過濾條件
where_document: 文檔內容過濾條件
include: 指定包含的文檔 ID
metadata_filter: 元數據過濾條件
hybrid_alpha: 混合搜索的權重參數0-1之間越大越傾向關鍵詞搜索
"""
if not self.current_collection or not query_text:
return []
try:
results = self.current_collection.query(
query_texts=[query_text],
n_results=n_results
)
query_params = {
"n_results": n_results
}
# 轉換結果為更易用的格式
# 基本查詢處理邏輯
if query_type == "basic":
query_params["query_texts"] = [query_text]
# 多向量查詢(用於比較多個查詢之間的相似性)
elif query_type == "multi_vector":
# 支持以 "|||" 或換行符分隔的多個查詢文本
if "|||" in query_text:
query_texts = [text.strip() for text in query_text.split("|||")]
else:
query_texts = [text.strip() for text in query_text.splitlines() if text.strip()]
query_params["query_texts"] = query_texts
# 添加其他查詢參數
if where:
query_params["where"] = where
if where_document:
query_params["where_document"] = where_document
if include:
query_params["include"] = include
if metadata_filter:
# 直接將元數據過濾條件轉換為 where 條件
if "where" not in query_params:
query_params["where"] = {}
query_params["where"].update(metadata_filter)
# 混合搜索處理
if query_type == "hybrid" and hybrid_alpha is not None:
# 檢查 ChromaDB 版本是否支持混合搜索
if hasattr(self.current_collection, "query") and "alpha" in inspect.signature(self.current_collection.query).parameters:
query_params["alpha"] = hybrid_alpha
# 混合搜索通常需要 query_texts
if "query_texts" not in query_params:
query_params["query_texts"] = [query_text]
else:
self.logger.warning("當前 ChromaDB 版本不支持混合搜索,將使用基本查詢")
query_type = "basic" # 降級為基本查詢
query_params["query_texts"] = [query_text]
elif query_type == "hybrid" and hybrid_alpha is None:
# 如果是混合搜索但未提供 alpha則默認為基本搜索
self.logger.warning("混合搜索未提供 Alpha 值,將使用基本查詢")
query_type = "basic"
query_params["query_texts"] = [query_text]
# 如果 query_type 不是 multi_vector 且 query_texts 未設置,則設置
if query_type not in ["multi_vector", "hybrid"] and "query_texts" not in query_params:
query_params["query_texts"] = [query_text]
# 如果選擇了外部嵌入模型且不是混合查詢,則生成查詢嵌入
if query_type != "hybrid" and \
"query_texts" in query_params and \
self.query_embedding_function:
texts_to_embed = query_params["query_texts"]
try:
# self.query_embedding_function 接受 List[str] 返回 List[List[float]]
generated_embeddings = self.query_embedding_function(texts_to_embed)
if generated_embeddings and all(isinstance(emb, list) for emb in generated_embeddings):
query_params["query_embeddings"] = generated_embeddings
if "query_texts" in query_params: # 確保它存在才刪除
del query_params["query_texts"]
self.logger.info(f"使用 {self.selected_embedding_model_name} 生成了 {len(generated_embeddings)} 個查詢嵌入。")
else:
self.logger.warning(f"未能使用 {self.selected_embedding_model_name} 為所有查詢文本生成有效嵌入。將回退到使用集合預設嵌入函數進行文本查詢。嵌入結果: {generated_embeddings}")
except Exception as e:
self.logger.error(f"使用 {self.selected_embedding_model_name} 生成查詢嵌入時出錯: {e}。將回退到使用集合預設嵌入函數進行文本查詢。")
# 執行查詢
results = self.current_collection.query(**query_params)
# 處理結果
processed_results = []
for i, (doc_id, document, metadata, distance) in enumerate(zip(
results['ids'][0],
results['documents'][0],
results['metadatas'][0] if 'metadatas' in results and results['metadatas'][0] else [{}] * len(results['ids'][0]),
results['distances'][0] if 'distances' in results else [0] * len(results['ids'][0])
)):
# 計算相似度分數 (將距離轉換為相似度: 1 - 歸一化距離)
# 注意: 根據ChromaDB使用的距離度量可能需要調整
similarity = 1.0 - min(distance, 1.0) # 確保值在0-1之間
processed_results.append({
# 獲取查詢返回的所有結果列表
ids_list = results.get('ids', [[]])
documents_list = results.get('documents', [[]])
metadatas_list = results.get('metadatas', [[]])
distances_list = results.get('distances', [[]])
# 確保列表長度一致,並為空列表提供默認值
num_queries = len(ids_list)
if not documents_list or len(documents_list) != num_queries:
documents_list = [[] for _ in range(num_queries)]
if not metadatas_list or len(metadatas_list) != num_queries:
metadatas_list = [[{}] * len(ids_list[i]) for i in range(num_queries)]
if not distances_list or len(distances_list) != num_queries:
distances_list = [[0.0] * len(ids_list[i]) for i in range(num_queries)]
# 對於多查詢文本的情況,需要分別處理每個查詢的結果
for query_idx, (ids, documents, metadatas, distances) in enumerate(zip(
ids_list,
documents_list,
metadatas_list,
distances_list
)):
# 處理每個查詢結果
for i, (doc_id, document, metadata, distance) in enumerate(zip(
ids, documents,
metadatas if metadatas else [{}] * len(ids), # 再次確保元數據存在
distances if distances else [0.0] * len(ids) # 再次確保距離存在
)):
# 計算相似度分數
similarity = 1.0 - min(float(distance) if distance is not None else 1.0, 1.0)
result_item = {
"rank": i + 1,
"query_index": query_idx,
"id": doc_id,
"document": document,
"metadata": metadata,
"metadata": metadata if metadata else {}, # 確保 metadata 是字典
"similarity": similarity,
"distance": distance
})
"distance": float(distance) if distance is not None else 0.0,
"query_type": query_type
}
if query_type == "hybrid":
result_item["hybrid_alpha"] = hybrid_alpha
processed_results.append(result_item)
self.query_results = processed_results
self.logger.info(f"查詢完成,找到 {len(processed_results)} 個結果")
self.logger.info(f"查詢完成,找到 {len(processed_results)} 個結果,查詢類型: {query_type}")
return processed_results
except Exception as e:
@ -174,6 +325,64 @@ class ChromaDBReader:
self.query_results = []
return []
def get_documents_by_ids(self, doc_ids: List[str]) -> List[Dict]:
"""按文檔ID列表獲取文檔"""
if not self.current_collection:
self.logger.warning("沒有選擇集合,無法按 ID 獲取文檔。")
return []
if not doc_ids:
self.logger.warning("未提供文檔 ID。")
return []
try:
results = self.current_collection.get(
ids=doc_ids,
include=["documents", "metadatas"]
)
processed_results = []
retrieved_ids = results.get('ids', [])
retrieved_documents = results.get('documents', [])
retrieved_metadatas = results.get('metadatas', [])
# 創建一個字典以便快速查找已檢索到的文檔信息
found_docs_map = {}
for i, r_id in enumerate(retrieved_ids):
found_docs_map[r_id] = {
"document": retrieved_documents[i] if i < len(retrieved_documents) else None,
"metadata": retrieved_metadatas[i] if i < len(retrieved_metadatas) else {}
}
rank_counter = 1
for original_id in doc_ids: # 遍歷原始請求的ID以保持某種順序感並標記未找到的
if original_id in found_docs_map:
doc_data = found_docs_map[original_id]
if doc_data["document"] is not None:
processed_results.append({
"rank": rank_counter,
"id": original_id,
"document": doc_data["document"],
"metadata": doc_data["metadata"],
"similarity": None, # Not applicable
"distance": None, # Not applicable
"query_type": "id_lookup"
})
rank_counter += 1
else: # ID 存在但文檔為空(理論上不應發生在 get 中,除非 include 設置問題)
self.logger.warning(f"ID {original_id} 找到但文檔內容為空。")
# else: # ID 未在返回結果中找到,可以選擇不添加到 processed_results 或添加一個標記
# self.logger.info(f"ID {original_id} 未在集合中找到。")
self.query_results = processed_results
self.logger.info(f"按 ID 查詢完成,從請求的 {len(doc_ids)} 個ID中實際找到 {len(processed_results)} 個文檔。")
return processed_results
except Exception as e:
self.logger.error(f"按 ID 獲取文檔時出錯: {str(e)}")
# traceback.print_exc() # For debugging
self.query_results = []
return []
def get_collection_info(self, collection_name: str) -> Dict:
"""獲取集合的詳細信息"""
if not self.chroma_client:
@ -235,7 +444,19 @@ class ChromaDBReaderUI:
# 設置窗口
self.root.title("ChromaDB 備份讀取器")
self.root.geometry("1280x800")
self.setup_ui()
# 初始化嵌入模型相關變量
self.embedding_model_var = tk.StringVar(value="預設 (ChromaDB)") # 顯示名稱
self.embedding_models = {
"預設 (ChromaDB)": "default",
"all-MiniLM-L6-v2 (ST)": "all-MiniLM-L6-v2",
"paraphrase-multilingual-MiniLM-L12-v2 (ST)": "paraphrase-multilingual-MiniLM-L12-v2"
}
# 初始化 reader 中的嵌入模型 (確保 reader 實例已創建)
# self.reader.set_query_embedding_model(self.embedding_models[self.embedding_model_var.get()])
# ^^^ 這行需要在 setup_ui 之後,或者在 on_embedding_model_changed 中處理首次設置
self.setup_ui() # setup_ui 會創建 reader 實例
# 默認主題
self.current_theme = "darkly" # ttkbootstrap的深色主題
@ -263,8 +484,12 @@ class ChromaDBReaderUI:
self.right_panel = ttk.Frame(self.main_frame)
self.right_panel.pack(side=LEFT, fill=BOTH, expand=YES)
# 設置狀態欄 (提前,以確保 self.status_var 在其他地方使用前已定義)
self.setup_status_bar()
# 設置左側面板
self.setup_directory_frame()
self.setup_embedding_model_frame() # 新增嵌入模型選擇框架
self.setup_backups_frame()
self.setup_collections_frame()
@ -272,9 +497,6 @@ class ChromaDBReaderUI:
self.setup_query_frame()
self.setup_results_frame()
# 設置狀態欄
self.setup_status_bar()
# 設置菜單
self.setup_menu()
@ -315,6 +537,24 @@ class ChromaDBReaderUI:
ttk.Button(dir_frame, text="瀏覽", command=self.browse_directory).pack(side=LEFT, padx=(5, 0))
ttk.Button(dir_frame, text="載入", command=self.load_backups_directory).pack(side=LEFT, padx=(5, 0))
def setup_embedding_model_frame(self):
"""設置查詢嵌入模型選擇框架"""
embedding_frame = ttk.LabelFrame(self.left_panel, text="查詢嵌入模型", padding=10)
embedding_frame.pack(fill=X, pady=(0, 10))
self.embedding_model_combo = ttk.Combobox(
embedding_frame,
textvariable=self.embedding_model_var,
values=list(self.embedding_models.keys()),
state="readonly"
)
self.embedding_model_combo.pack(fill=X, expand=YES)
self.embedding_model_combo.set(list(self.embedding_models.keys())[0]) # 設置預設顯示值
self.embedding_model_combo.bind("<<ComboboxSelected>>", self.on_embedding_model_changed)
# 初始化Reader中的嵌入模型選擇
self.on_embedding_model_changed()
def setup_backups_frame(self):
"""設置備份列表框架"""
backups_frame = ttk.LabelFrame(self.left_panel, text="備份列表", padding=10)
@ -388,12 +628,46 @@ class ChromaDBReaderUI:
query_frame = ttk.LabelFrame(self.right_panel, text="查詢", padding=10)
query_frame.pack(fill=X, pady=(0, 10))
# 查詢文本輸入
ttk.Label(query_frame, text="查詢文本:").pack(anchor=W)
self.query_text = tk.Text(query_frame, height=4, width=50)
self.query_text.pack(fill=X, pady=5)
# 創建一個 Notebook 以包含不同的查詢類型標籤頁
self.query_notebook = ttk.Notebook(query_frame)
self.query_notebook.pack(fill=X, pady=5)
# 查詢參數
# 基本查詢標籤頁
self.basic_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.basic_query_frame, text="基本查詢")
# 元數據查詢標籤頁
self.metadata_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.metadata_query_frame, text="元數據查詢")
# 混合查詢標籤頁
self.hybrid_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.hybrid_query_frame, text="混合查詢")
# 多向量查詢標籤頁
self.multi_vector_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.multi_vector_frame, text="多向量查詢")
# ID 查詢標籤頁 (新增)
self.id_query_frame = ttk.Frame(self.query_notebook)
self.query_notebook.add(self.id_query_frame, text="ID 查詢")
# 設置基本查詢頁面
self.setup_basic_query_tab()
# 設置元數據查詢頁面
self.setup_metadata_query_tab()
# 設置混合查詢頁面
self.setup_hybrid_query_tab()
# 設置多向量查詢頁面
self.setup_multi_vector_tab()
# 設置 ID 查詢頁面 (新增)
self.setup_id_query_tab()
# 查詢參數(共用部分)
params_frame = ttk.Frame(query_frame)
params_frame.pack(fill=X)
@ -405,10 +679,103 @@ class ChromaDBReaderUI:
ttk.Button(
query_frame,
text="執行查詢",
command=self.execute_query,
command=self.execute_query, # 注意:這個 execute_query 方法將被新的替換
style="Accent.TButton"
).pack(pady=10)
def setup_basic_query_tab(self):
"""設置基本查詢標籤頁"""
ttk.Label(self.basic_query_frame, text="查詢文本:").pack(anchor=W)
self.basic_query_text = tk.Text(self.basic_query_frame, height=4, width=50)
self.basic_query_text.pack(fill=X, pady=5)
def setup_metadata_query_tab(self):
"""設置元數據查詢標籤頁"""
ttk.Label(self.metadata_query_frame, text="查詢文本:").pack(anchor=W)
self.metadata_query_text = tk.Text(self.metadata_query_frame, height=4, width=50)
self.metadata_query_text.pack(fill=X, pady=5)
ttk.Label(self.metadata_query_frame, text="元數據過濾條件 (JSON 格式):").pack(anchor=W)
self.metadata_filter_text = tk.Text(self.metadata_query_frame, height=4, width=50)
self.metadata_filter_text.pack(fill=X, pady=5)
self.metadata_filter_text.insert("1.0", '{"key": "value"}')
# 添加一個幫助按鈕,顯示元數據過濾語法的說明
ttk.Button(
self.metadata_query_frame,
text="?",
width=2,
command=self.show_metadata_help
).pack(anchor=E)
def setup_hybrid_query_tab(self):
"""設置混合查詢標籤頁"""
ttk.Label(self.hybrid_query_frame, text="查詢文本:").pack(anchor=W)
self.hybrid_query_text = tk.Text(self.hybrid_query_frame, height=4, width=50)
self.hybrid_query_text.pack(fill=X, pady=5)
alpha_frame = ttk.Frame(self.hybrid_query_frame)
alpha_frame.pack(fill=X)
ttk.Label(alpha_frame, text="Alpha 值 (0-1):").pack(side=LEFT)
self.hybrid_alpha_var = tk.DoubleVar(value=0.5)
ttk.Scale(
alpha_frame,
from_=0.0, to=1.0,
variable=self.hybrid_alpha_var,
orient=tk.HORIZONTAL,
length=200
).pack(side=LEFT, padx=5, fill=X, expand=YES)
# 創建一個Label來顯示Scale的當前值
self.hybrid_alpha_label = ttk.Label(alpha_frame, text=f"{self.hybrid_alpha_var.get():.2f}")
self.hybrid_alpha_label.pack(side=LEFT)
# 綁定Scale的變動到更新Label的函數
self.hybrid_alpha_var.trace_add("write", lambda *args: self.hybrid_alpha_label.config(text=f"{self.hybrid_alpha_var.get():.2f}"))
ttk.Label(self.hybrid_query_frame, text="注意: Alpha=0 完全使用向量搜索Alpha=1 完全使用關鍵詞搜索").pack(pady=2)
ttk.Label(self.hybrid_query_frame, text="混合查詢將使用集合原始嵌入模型,忽略上方選擇的查詢嵌入模型。", font=("TkDefaultFont", 8)).pack(pady=2)
def setup_multi_vector_tab(self):
"""設置多向量查詢標籤頁"""
ttk.Label(self.multi_vector_frame, text="多個查詢文本 (每行一個,或使用 ||| 分隔):").pack(anchor=W)
self.multi_vector_text = tk.Text(self.multi_vector_frame, height=6, width=50)
self.multi_vector_text.pack(fill=X, pady=5)
self.multi_vector_text.insert("1.0", "查詢文本 1\n|||查詢文本 2\n|||查詢文本 3")
ttk.Label(self.multi_vector_frame, text="用於比較多個查詢之間的相似性").pack(pady=5)
def setup_id_query_tab(self):
"""設置ID查詢標籤頁"""
ttk.Label(self.id_query_frame, text="文檔 ID (每行一個,或用逗號/空格分隔):").pack(anchor=tk.W)
self.id_query_text = tk.Text(self.id_query_frame, height=6, width=50)
self.id_query_text.pack(fill=tk.X, pady=5)
self.id_query_text.insert("1.0", "id1\nid2,id3 id4") # 示例
ttk.Label(self.id_query_frame, text="此查詢將獲取指定ID的文檔忽略上方“結果數量”設置。").pack(pady=5)
def show_metadata_help(self):
"""顯示元數據過濾語法說明"""
help_text = """元數據過濾語法示例:
基本過濾:
{"category": "文章"} # 精確匹配
範圍過濾:
{"date": {"$gt": "2023-01-01"}} # 大於
{"date": {"$lt": "2023-12-31"}} # 小於
{"count": {"$gte": 10}} # 大於等於
{"count": {"$lte": 100}} # 小於等於
多條件過濾:
{"$and": [{"category": "文章"}, {"author": "張三"}]} # AND 條件
{"$or": [{"category": "文章"}, {"category": "新聞"}]} # OR 條件
注意: 此處語法遵循 ChromaDB 的過濾語法非標準 JSON 查詢語法
"""
messagebox.showinfo("元數據過濾語法說明", help_text)
def setup_results_frame(self):
"""設置結果顯示框架"""
self.results_notebook = ttk.Notebook(self.right_panel)
@ -443,6 +810,26 @@ class ChromaDBReaderUI:
status_label = ttk.Label(status_frame, textvariable=self.status_var, relief=tk.SUNKEN, anchor=W)
status_label.pack(fill=X)
def on_embedding_model_changed(self, event=None):
"""處理查詢嵌入模型選擇變更事件"""
selected_display_name = self.embedding_model_var.get()
model_name_key = self.embedding_models.get(selected_display_name, "default")
if hasattr(self, 'reader') and self.reader:
self.reader.set_query_embedding_model(model_name_key) # 更新Reader中的模型
# 更新狀態欄提示
if model_name_key == "default":
self.status_var.set("查詢將使用集合內部嵌入模型。")
elif self.reader.query_embedding_function: # 檢查模型是否成功加載
self.status_var.set(f"查詢將使用外部模型: {selected_display_name}")
else: # 加載失敗
self.status_var.set(f"模型 {selected_display_name} 加載失敗/無效,將使用集合內部模型。")
else:
# Reader尚未初始化這通常在UI初始化早期發生
# self.reader.set_query_embedding_model 會在 setup_embedding_model_frame 中首次調用時處理
pass
def browse_directory(self):
"""瀏覽選擇備份目錄"""
directory = filedialog.askdirectory(
@ -527,27 +914,38 @@ class ChromaDBReaderUI:
# 獲取選定項的索引
item_id = selection[0]
item_index = self.backups_tree.index(item_id)
# item_index = self.backups_tree.index(item_id) # 這個索引是相對於當前顯示的項目的
# 獲取所有顯示的備份項目
visible_items = self.backups_tree.get_children()
if item_index >= len(visible_items):
# 直接從 Treeview item 中獲取備份名稱,然後在 self.reader.backups 中查找
try:
backup_name_from_tree = self.backups_tree.item(item_id)["values"][0]
except IndexError:
self.logger.error("無法從 Treeview 獲取備份名稱")
return
# 查找此顯示項對應的實際備份索引
backup_name = self.backups_tree.item(visible_items[item_index])["values"][0]
backup_index = next((i for i, b in enumerate(self.reader.backups) if b["name"] == backup_name), -1)
actual_backup_index = -1
for i, backup_info in enumerate(self.reader.backups):
if backup_info["name"] == backup_name_from_tree:
actual_backup_index = i
break
if backup_index == -1:
if actual_backup_index == -1:
self.logger.error(f"在備份列表中未找到名為 {backup_name_from_tree} 的備份")
return
# 載入備份
self.status_var.set(f"正在載入備份: {backup_name}...")
self.status_var.set(f"正在載入備份: {backup_name_from_tree}...")
self.root.update_idletasks()
# 確保 Reader 中的嵌入模型是最新的 (雖然 on_embedding_model_changed 應該已經處理了)
# selected_display_name = self.embedding_model_var.get()
# model_key = self.embedding_models.get(selected_display_name, "default")
# self.reader.set_query_embedding_model(model_key) # 這行不需要,因為模型選擇是獨立的
def load_backup_thread():
success = self.reader.load_backup(backup_index)
self.root.after(0, lambda: self.finalize_backup_loading(success, backup_name))
# load_backup 不再需要 embedding_model_name 參數,因為嵌入模型選擇是針對查詢的
success = self.reader.load_backup(actual_backup_index)
self.root.after(0, lambda: self.finalize_backup_loading(success, backup_name_from_tree))
threading.Thread(target=load_backup_thread).start()
@ -618,7 +1016,7 @@ class ChromaDBReaderUI:
# 獲取集合詳細信息並顯示
info = self.reader.get_collection_info(collection_name)
info_text = f"集合: {info['name']}\n文檔數: {info['document_count']}\n向量維度: {info['dimension']}"
messagebox.showinfo("集合信息", info_text)
# messagebox.showinfo("集合信息", info_text) # 暫時註解掉,避免每次選集合都彈窗
else:
self.status_var.set(f"載入集合失敗: {collection_name}")
messagebox.showerror("錯誤", f"無法載入集合: {collection_name}")
@ -629,25 +1027,170 @@ class ChromaDBReaderUI:
messagebox.showinfo("提示", "請先選擇一個集合")
return
query_text = self.query_text.get("1.0", tk.END).strip()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
# 根據當前選擇的標籤頁確定查詢類型
try:
current_tab_widget = self.query_notebook.nametowidget(self.query_notebook.select())
if current_tab_widget == self.basic_query_frame:
current_tab = 0
elif current_tab_widget == self.metadata_query_frame:
current_tab = 1
elif current_tab_widget == self.hybrid_query_frame:
current_tab = 2
elif current_tab_widget == self.multi_vector_frame:
current_tab = 3
elif current_tab_widget == self.id_query_frame: # 新增 ID 查詢頁判斷
current_tab = 4
else:
messagebox.showerror("錯誤", "未知的查詢標籤頁")
return
except tk.TclError: # Notebook可能還沒有任何分頁被選中
messagebox.showerror("錯誤", "請選擇一個查詢類型標籤頁")
return
# 獲取查詢參數
try:
n_results = int(self.n_results_var.get())
except ValueError:
messagebox.showerror("錯誤", "結果數量必須是整數")
return
self.status_var.set("正在執行查詢...")
self.root.update_idletasks()
# 執行不同類型的查詢
if current_tab == 0: # 基本查詢
query_text = self.basic_query_text.get("1.0", tk.END).strip()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
return
self.status_var.set("正在執行基本查詢...")
self.execute_basic_query(query_text, n_results)
elif current_tab == 1: # 元數據查詢
query_text = self.metadata_query_text.get("1.0", tk.END).strip()
metadata_filter_text = self.metadata_filter_text.get("1.0", tk.END).strip()
if not query_text: # 元數據查詢的文本也可以是空的如果只想用metadata_filter
# messagebox.showinfo("提示", "請輸入查詢文本")
# return
pass # 允許空查詢文本
try:
metadata_filter = json.loads(metadata_filter_text) if metadata_filter_text else None
except json.JSONDecodeError:
messagebox.showerror("錯誤", "元數據過濾條件必須是有效的 JSON 格式")
return
if not query_text and not metadata_filter:
messagebox.showinfo("提示", "請輸入查詢文本或元數據過濾條件")
return
self.status_var.set("正在執行元數據查詢...")
self.execute_metadata_query(query_text, n_results, metadata_filter)
elif current_tab == 2: # 混合查詢
query_text = self.hybrid_query_text.get("1.0", tk.END).strip()
hybrid_alpha = self.hybrid_alpha_var.get()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
return
self.status_var.set("正在執行混合查詢...")
self.execute_hybrid_query(query_text, n_results, hybrid_alpha)
elif current_tab == 3: # 多向量查詢
query_text = self.multi_vector_text.get("1.0", tk.END).strip()
if not query_text:
messagebox.showinfo("提示", "請輸入查詢文本")
return
self.status_var.set("正在執行多向量查詢...")
self.execute_multi_vector_query(query_text, n_results)
elif current_tab == 4: # ID 查詢
id_input_str = self.id_query_text.get("1.0", tk.END).strip()
if not id_input_str:
messagebox.showinfo("提示", "請輸入文檔 ID。")
return
# 解析 ID: 支持逗號、空格、換行符分隔
doc_ids = [id_val.strip() for id_val in re.split(r'[,\s\n]+', id_input_str) if id_val.strip()]
if not doc_ids:
messagebox.showinfo("提示", "未解析到有效的文檔 ID。")
return
self.status_var.set("正在按 ID 獲取文檔...")
self.execute_id_lookup_query(doc_ids)
def execute_basic_query(self, query_text, n_results):
"""執行基本查詢"""
self.status_var.set(f"正在執行基本查詢: {query_text[:30]}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(query_text, n_results)
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="basic"
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread).start()
threading.Thread(target=query_thread, daemon=True).start()
def execute_metadata_query(self, query_text, n_results, metadata_filter):
"""執行元數據查詢"""
self.status_var.set(f"正在執行元數據查詢: {query_text[:30]}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="metadata", # 這裡應該是 "metadata" 但後端邏輯會轉為 where
metadata_filter=metadata_filter
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_hybrid_query(self, query_text, n_results, hybrid_alpha):
"""執行混合查詢"""
self.status_var.set(f"正在執行混合查詢 (α={hybrid_alpha:.2f}): {query_text[:30]}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="hybrid",
hybrid_alpha=hybrid_alpha
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_multi_vector_query(self, query_text, n_results):
"""執行多向量查詢"""
self.status_var.set(f"正在執行多向量查詢: {query_text.splitlines()[0][:30] if query_text.splitlines() else ''}...")
self.root.update_idletasks()
def query_thread():
results = self.reader.execute_query(
query_text=query_text,
n_results=n_results,
query_type="multi_vector"
)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def execute_id_lookup_query(self, doc_ids: List[str]):
"""執行ID查找查詢"""
self.status_var.set(f"正在按 ID 獲取 {len(doc_ids)} 個文檔...")
self.root.update_idletasks()
def query_thread():
results = self.reader.get_documents_by_ids(doc_ids)
self.root.after(0, lambda: self.display_results(results))
threading.Thread(target=query_thread, daemon=True).start()
def display_results(self, results):
"""顯示查詢結果"""
@ -679,27 +1222,49 @@ class ChromaDBReaderUI:
widget.destroy()
# 創建表格
columns = ("rank", "similarity", "id", "document")
columns = ("rank", "similarity", "query_type", "id", "document")
tree = ttk.Treeview(self.list_view, columns=columns, show="headings")
tree.heading("rank", text="#")
tree.heading("similarity", text="相似度")
tree.heading("query_type", text="查詢類型")
tree.heading("id", text="文檔ID")
tree.heading("document", text="文檔內容")
tree.column("rank", width=50, anchor=CENTER)
tree.column("similarity", width=100, anchor=CENTER)
tree.column("id", width=200)
tree.column("document", width=600)
tree.column("query_type", width=120, anchor=CENTER) # 調整寬度以適應更長的類型名稱
tree.column("id", width=150)
tree.column("document", width=530) # 調整寬度
# 確定查詢類型名稱映射
query_type_names = {
"basic": "基本查詢",
"metadata": "元數據查詢",
"hybrid": "混合查詢",
"multi_vector": "多向量查詢",
"id_lookup": "ID 查詢" # 新增
}
# 添加結果到表格
for result in results:
raw_query_type = result.get("query_type", "basic")
display_query_type = query_type_names.get(raw_query_type, raw_query_type.capitalize())
if raw_query_type == "hybrid" and "hybrid_alpha" in result:
display_query_type += f" (α={result['hybrid_alpha']:.2f})"
if raw_query_type == "multi_vector" and "query_index" in result:
display_query_type += f" (Q{result['query_index']+1})"
similarity_display = f"{result.get('similarity', 0.0):.4f}" if result.get('similarity') is not None else "N/A"
tree.insert(
"", "end",
values=(
result["rank"],
f"{result['similarity']:.4f}",
result["id"],
result["document"][:100] + ("..." if len(result["document"]) > 100 else "")
result.get("rank", "-"),
similarity_display,
display_query_type,
result.get("id", "N/A"),
result.get("document", "")[:100] + ("..." if len(result.get("document", "")) > 100 else "")
)
)
@ -710,7 +1275,6 @@ class ChromaDBReaderUI:
# 雙擊項目顯示完整內容
tree.bind("<Double-1>", lambda event: self.show_full_document(tree))
# 使用 Frame 容器來實現滾動功能
# 佈局
tree.pack(side=LEFT, fill=BOTH, expand=YES)
scrollbar.pack(side=RIGHT, fill=Y)
@ -739,7 +1303,10 @@ class ChromaDBReaderUI:
# 添加文檔信息
info_text = f"文檔ID: {result['id']}\n"
if result.get('similarity') is not None:
info_text += f"相似度: {result['similarity']:.4f}\n"
else:
info_text += "相似度: N/A\n"
if result['metadata']:
info_text += "\n元數據:\n"
@ -806,9 +1373,10 @@ class ChromaDBReaderUI:
title_frame = ttk.Frame(card)
title_frame.pack(fill=X)
similarity_text_detail = f"{result['similarity']:.4f}" if result.get('similarity') is not None else "N/A"
ttk.Label(
title_frame,
text=f"#{result['rank']} - 相似度: {result['similarity']:.4f}",
text=f"#{result['rank']} - 相似度: {similarity_text_detail}",
font=("TkDefaultFont", 10, "bold")
).pack(side=LEFT)
@ -881,7 +1449,10 @@ class ChromaDBReaderUI:
# 添加文檔信息
info_text = f"文檔ID: {result['id']}\n"
if result.get('similarity') is not None:
info_text += f"相似度: {result['similarity']:.4f}\n"
else:
info_text += "相似度: N/A\n"
if result['metadata']:
info_text += "\n元數據:\n"

View File

@ -4,6 +4,8 @@
import pyautogui
import cv2 # opencv-python
import numpy as np
import sys # Added for special character handling
import io # Added for special character handling
import pyperclip
import time
import os
@ -22,6 +24,26 @@ import math # Added for distance calculation in dual method
# Or could use threading.Event()
monitoring_paused_flag = [False] # List containing a boolean
# --- Global Error Handling Setup for Text Encoding ---
def handle_text_encoding(text, default_text="[無法處理的文字]"):
"""安全處理任何文字,確保不會因編碼問題而崩潰程序"""
if text is None:
return default_text
try:
# 嘗試使用 utf-8 編碼
return text
except UnicodeEncodeError:
try:
# 嘗試將特殊字符替換為可顯示字符
return text.encode('utf-8', errors='replace').decode('utf-8')
except:
# 最後手段:忽略任何無法處理的字符
try:
return text.encode('utf-8', errors='ignore').decode('utf-8')
except:
return default_text
# --- Color Config Loading ---
def load_bubble_colors(config_path='bubble_colors.json'):
"""Loads bubble color configuration from a JSON file."""
@ -1068,7 +1090,13 @@ class InteractionModule:
if copied and copied_text and copied_text != "___MCP_CLEAR___":
print(f"Successfully copied text, length: {len(copied_text)}")
return copied_text.strip()
# 添加編碼安全處理
try:
safe_text = handle_text_encoding(copied_text.strip())
return safe_text
except Exception as e:
print(f"Error handling copied text encoding: {str(e)}")
return copied_text.strip() # 即使有問題也嘗試返回原始文字
else:
print("Error: Copy operation unsuccessful or clipboard content invalid.")
return None
@ -2115,17 +2143,31 @@ def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queu
# 7. Send Trigger Info to Main Thread
print("\n>>> Putting trigger info in Queue <<<")
print(f" Sender: {sender_name}")
print(f" Content: {bubble_text[:100]}...")
try:
# 安全地處理和顯示發送者名稱
safe_sender_display = handle_text_encoding(sender_name, "[未知發送者]")
print(f" Sender: {safe_sender_display}")
# 安全地處理和顯示消息內容
if bubble_text:
display_text = bubble_text[:100] + "..." if len(bubble_text) > 100 else bubble_text
safe_content_display = handle_text_encoding(display_text, "[無法處理的文字內容]")
print(f" Content: {safe_content_display}")
else:
print(" Content: [空]")
except Exception as e_display:
print(f"Error displaying message info: {str(e_display)}")
print(f" Bubble Region: {bubble_region}") # Original region for context
print(f" Reply Context Activated: {reply_context_activated}")
try:
# 確保所有文字數據都經過安全處理
data_to_send = {
'sender': sender_name,
'text': bubble_text,
'bubble_region': bubble_region, # Send original region for context if needed
'sender': handle_text_encoding(sender_name, "[未知發送者]"),
'text': handle_text_encoding(bubble_text, "[無法處理的文字內容]"),
'bubble_region': bubble_region,
'reply_context_activated': reply_context_activated,
'bubble_snapshot': bubble_snapshot, # Send the snapshot used
'bubble_snapshot': bubble_snapshot,
'search_area': search_area
}
trigger_queue.put(data_to_send)
@ -2136,13 +2178,26 @@ def run_ui_monitoring_loop(trigger_queue: queue.Queue, command_queue: queue.Queu
break # Exit the 'for target_bubble_info in sorted_bubbles' loop
except Exception as q_err:
print(f"Error putting data in Queue: {q_err}")
# Don't break if queue put fails, maybe try next bubble? Or log and break?
print(f"Error preparing or enqueueing data: {q_err}")
# 嘗試使用最小數據集合保證功能性
try:
minimal_data = {
'sender': "[數據處理錯誤]",
'text': handle_text_encoding(bubble_text[:100] if bubble_text else "[內容獲取失敗]"), # Apply encoding here too
'bubble_region': bubble_region,
'reply_context_activated': False, # Sensible default
'bubble_snapshot': bubble_snapshot, # Keep snapshot if available
'search_area': search_area
}
trigger_queue.put(minimal_data)
print("Minimal fallback data placed in Queue after error.")
except Exception as min_q_err:
print(f"Critical failure: Could not place any data in queue: {min_q_err}")
# Let's break here too, as something is wrong.
print("Breaking scan cycle due to queue error.")
break
# End of keyword found block (if keyword_coords:)
# End of keyword found block (if result:)
# End of loop through sorted bubbles (for target_bubble_info...)
# If the loop finished without breaking (i.e., no trigger processed), wait the full interval.