diff --git a/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/config.json b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/config.json new file mode 100644 index 0000000000000000000000000000000000000000..776f4c252380ca6abf07caa43931c3ebcea5e256 --- /dev/null +++ b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/config.json @@ -0,0 +1,48 @@ +{ + "tools": { + "create_knowledge_base": { + "zh": "创建一个新的知识库。知识库是文档的容器,每个知识库可以有自己的chunk_size和embedding配置。创建后需要调用select_knowledge_base来选择该知识库才能使用。\n\n参数说明:\n- kb_name:知识库名称(必填,必须唯一)\n- chunk_size:chunk大小,单位token(必填,例如512、1024)\n- embedding_model:向量化模型名称(可选,例如text-embedding-ada-002)\n- embedding_endpoint:向量化服务端点URL(可选)\n- embedding_api_key:向量化服务API Key(可选)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含创建结果\n - kb_id:知识库ID\n - kb_name:知识库名称\n - chunk_size:chunk大小", + "en": "Create a new knowledge base. A knowledge base is a container for documents, and each knowledge base can have its own chunk_size and embedding configuration. After creation, you need to call select_knowledge_base to select this knowledge base before using it.\n\nParameters:\n- kb_name: Knowledge base name (required, must be unique)\n- chunk_size: Chunk size in tokens (required, e.g., 512, 1024)\n- embedding_model: Embedding model name (optional, e.g., text-embedding-ada-002)\n- embedding_endpoint: Embedding service endpoint URL (optional)\n- embedding_api_key: Embedding service API Key (optional)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing creation results\n - kb_id: Knowledge base ID\n - kb_name: Knowledge base name\n - chunk_size: Chunk size" + }, + "delete_knowledge_base": { + "zh": "删除指定的知识库。不能删除当前正在使用的知识库。删除知识库会级联删除该知识库下的所有文档和chunks。\n\n参数说明:\n- kb_name:知识库名称(必填)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含删除结果\n - kb_name:已删除的知识库名称", + "en": "Delete the specified knowledge base. Cannot delete the currently active knowledge base. Deleting a knowledge base will cascade delete all documents and chunks under it.\n\nParameters:\n- kb_name: Knowledge base name (required)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing deletion results\n - kb_name: Deleted knowledge base name" + }, + "list_knowledge_bases": { + "zh": "列出所有可用的知识库。返回所有知识库的详细信息,包括当前选中的知识库。\n\n参数说明:\n无参数\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含知识库列表\n - knowledge_bases:知识库列表,每个知识库包含:\n - id:知识库ID\n - name:知识库名称\n - chunk_size:chunk大小\n - embedding_model:向量化模型\n - created_at:创建时间\n - is_current:是否为当前选中的知识库\n - count:知识库数量\n - current_kb_id:当前选中的知识库ID", + "en": "List all available knowledge bases. Returns detailed information about all knowledge bases, including the currently selected one.\n\nParameters:\nNo parameters\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing knowledge base list\n - knowledge_bases: List of knowledge bases, each containing:\n - id: Knowledge base ID\n - name: Knowledge base name\n - chunk_size: Chunk size\n - embedding_model: Embedding model\n - created_at: Creation time\n - is_current: Whether this is the currently selected knowledge base\n - count: Number of knowledge bases\n - current_kb_id: Currently selected knowledge base ID" + }, + "select_knowledge_base": { + "zh": "选择一个知识库作为当前使用的知识库。选择后,后续的文档导入、查询等操作都会在该知识库中进行。\n\n参数说明:\n- kb_name:知识库名称(必填)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含选择结果\n - kb_id:知识库ID\n - kb_name:知识库名称\n - document_count:该知识库下的文档数量", + "en": "Select a knowledge base as the currently active one. After selection, subsequent operations such as document import and search will be performed in this knowledge base.\n\nParameters:\n- kb_name: Knowledge base name (required)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing selection results\n - kb_id: Knowledge base ID\n - kb_name: Knowledge base name\n - document_count: Number of documents in this knowledge base" + }, + "import_document": { + "zh": "导入文档到当前选中的知识库(支持多文件并发导入)。支持TXT、DOCX、DOC格式。文档会被解析、切分为chunks,并异步批量生成向量存储到数据库中。多个文档会并发处理,提高导入效率。如果文档名称已存在,会自动添加时间戳避免冲突。\n\n参数说明:\n- file_paths:文件路径列表(绝对路径),支持1~n个文件(必填)\n- chunk_size:chunk大小,单位token(可选,默认使用知识库的chunk_size)\n\n返回值:\n- success:布尔值,表示是否成功(只要有文件成功导入即为true)\n- message:字符串,描述操作结果(包含成功和失败的数量)\n- data:字典,包含导入结果\n - total:总文件数\n - success_count:成功导入的文件数\n - failed_count:失败的文件数\n - success_files:成功导入的文件列表,每个包含:\n - file_path:文件路径\n - doc_name:文档名称\n - chunk_count:chunk数量\n - failed_files:失败的文件列表,每个包含:\n - file_path:文件路径\n - error:错误信息", + "en": "Import documents into the currently selected knowledge base (supports concurrent import of multiple files). Supports TXT, DOCX, and DOC formats. Documents will be parsed, split into chunks, and vectors will be generated asynchronously in batch and stored in the database. Multiple documents are processed concurrently to improve import efficiency. If the document name already exists, a timestamp will be automatically added to avoid conflicts.\n\nParameters:\n- file_paths: List of file paths (absolute paths), supports 1~n files (required)\n- chunk_size: Chunk size in tokens (optional, defaults to knowledge base's chunk_size)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful (true if any file was successfully imported)\n- message: String, describing the operation result (includes counts of successful and failed imports)\n- data: Dictionary containing import results\n - total: Total number of files\n - success_count: Number of successfully imported files\n - failed_count: Number of failed files\n - success_files: List of successfully imported files, each containing:\n - file_path: File path\n - doc_name: Document name\n - chunk_count: Number of chunks\n - failed_files: List of failed files, each containing:\n - file_path: File path\n - error: Error message" + }, + "search": { + "zh": "在当前选中的知识库中进行混合检索。结合关键词检索(FTS5)和向量检索(sqlite-vec),使用加权方式合并结果(关键词权重0.3,向量权重0.7),去重后使用Jaccard相似度重排序,返回最相关的top-k个结果。\n\n参数说明:\n- query:查询文本(必填)\n- top_k:返回数量(可选,默认从配置读取,通常为5)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述检索结果\n- data:字典,包含检索结果\n - chunks:chunk列表,每个chunk包含:\n - id:chunk ID\n - doc_id:文档ID\n - content:chunk内容\n - tokens:token数量\n - chunk_index:chunk索引\n - doc_name:文档名称\n - score:综合检索分数\n - count:结果数量", + "en": "Perform hybrid search in the currently selected knowledge base. Combines keyword search (FTS5) and vector search (sqlite-vec), merges results using weighted approach (keyword weight 0.3, vector weight 0.7), deduplicates, reranks using Jaccard similarity, and returns the top-k most relevant results.\n\nParameters:\n- query: Query text (required)\n- top_k: Number of results to return (optional, default from config, usually 5)\n\nReturn value:\n- success: Boolean, indicating whether the search was successful\n- message: String, describing the search result\n- data: Dictionary containing search results\n - chunks: List of chunks, each containing:\n - id: Chunk ID\n - doc_id: Document ID\n - content: Chunk content\n - tokens: Number of tokens\n - chunk_index: Chunk index\n - doc_name: Document name\n - score: Combined search score\n - count: Number of results" + }, + "list_documents": { + "zh": "查看当前选中的知识库下的所有文档列表。返回文档的详细信息。\n\n参数说明:\n无参数\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含文档列表\n - documents:文档列表,每个文档包含:\n - id:文档ID\n - name:文档名称\n - file_path:文件路径\n - file_type:文件类型\n - chunk_size:chunk大小\n - created_at:创建时间\n - updated_at:更新时间\n - count:文档数量", + "en": "List all documents in the currently selected knowledge base. Returns detailed information about the documents.\n\nParameters:\nNo parameters\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing document list\n - documents: List of documents, each containing:\n - id: Document ID\n - name: Document name\n - file_path: File path\n - file_type: File type\n - chunk_size: Chunk size\n - created_at: Creation time\n - updated_at: Update time\n - count: Number of documents" + }, + "delete_document": { + "zh": "删除当前选中的知识库下的指定文档。删除文档会级联删除该文档的所有chunks。\n\n参数说明:\n- doc_name:文档名称(必填)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含删除结果\n - doc_name:已删除的文档名称", + "en": "Delete the specified document from the currently selected knowledge base. Deleting a document will cascade delete all chunks of that document.\n\nParameters:\n- doc_name: Document name (required)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing deletion results\n - doc_name: Deleted document name" + }, + "update_document": { + "zh": "修改文档的chunk_size并重新解析文档。会删除原有的chunks,使用新的chunk_size重新切分文档,并异步批量生成新的向量。\n\n参数说明:\n- doc_name:文档名称(必填)\n- chunk_size:新的chunk大小,单位token(必填)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含修改结果\n - doc_id:文档ID\n - doc_name:文档名称\n - chunk_count:新的chunk数量\n - chunk_size:新的chunk大小", + "en": "Update the document's chunk_size and re-parse the document. Will delete existing chunks, re-split the document using the new chunk_size, and asynchronously generate new vectors in batch.\n\nParameters:\n- doc_name: Document name (required)\n- chunk_size: New chunk size in tokens (required)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing update results\n - doc_id: Document ID\n - doc_name: Document name\n - chunk_count: New number of chunks\n - chunk_size: New chunk size" + }, + "export_database": { + "zh": "导出整个kb.db数据库文件到指定路径。\n\n参数说明:\n- export_path:导出路径(绝对路径,必填)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含导出结果\n - source_path:源数据库路径\n - export_path:导出路径", + "en": "Export the entire kb.db database file to the specified path.\n\nParameters:\n- export_path: Export path (absolute path, required)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing export results\n - source_path: Source database path\n - export_path: Export path" + }, + "import_database": { + "zh": "导入一个.db数据库文件,将其中的内容合并到kb.db中。导入时会自动处理重名冲突,为知识库和文档名称添加时间戳。\n\n参数说明:\n- source_db_path:源数据库文件路径(绝对路径,必填)\n\n返回值:\n- success:布尔值,表示是否成功\n- message:字符串,描述操作结果\n- data:字典,包含导入结果\n - source_path:源数据库路径\n - imported_kb_count:导入的知识库数量\n - imported_doc_count:导入的文档数量", + "en": "Import a .db database file and merge its contents into kb.db. Import will automatically handle name conflicts by adding timestamps to knowledge base and document names.\n\nParameters:\n- source_db_path: Source database file path (absolute path, required)\n\nReturn value:\n- success: Boolean, indicating whether the operation was successful\n- message: String, describing the operation result\n- data: Dictionary containing import results\n - source_path: Source database path\n - imported_kb_count: Number of imported knowledge bases\n - imported_doc_count: Number of imported documents" + } + } +} diff --git a/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/deps.toml b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/deps.toml new file mode 100644 index 0000000000000000000000000000000000000000..1b44d05199626d5da47504a2e7105b183c657db9 --- /dev/null +++ b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/deps.toml @@ -0,0 +1,15 @@ +[system] + +[pip] +tiktoken = ">=0.8.0" +python-docx = ">=1.1.0" +chardet = ">=5.2.0" +jieba = ">=0.42.1" +aiohttp = ">=3.9.0" +sqlite-vec = ">=0.1.6" +sqlalchemy = ">=2.0.0" +mcp = ">=0.1.0" +fastapi = ">=0.100.0" +uvicorn = ">=0.23.0" +python-multipart = ">=0.0.6" +PyMuPDF = ">=1.23.0" diff --git a/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/rag_config.json b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/rag_config.json new file mode 100644 index 0000000000000000000000000000000000000000..4bed2cec5be31e6c8db495827792d17a48335847 --- /dev/null +++ b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/rag_config.json @@ -0,0 +1,20 @@ +{ + "embedding": { + "type": "openai", + "api_key": "", + "endpoint": "https://dashscope.aliyuncs.com/compatible-mode/v1/embeddings", + "model_name": "text-embedding-v4", + "timeout": 30, + "vector_dimension": 1024 + }, + "token": { + "model": "gpt-4", + "max_tokens": 8192, + "default_chunk_size": 1024 + }, + "search": { + "default_top_k": 5, + "max_top_k": 100 + } +} + diff --git a/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/requirements.txt b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..0d6b3aebc80788ba362d45dfd785aa1c7b0d1c6e --- /dev/null +++ b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/requirements.txt @@ -0,0 +1,12 @@ +tiktoken>=0.8.0 +python-docx>=1.1.0 +chardet>=5.2.0 +jieba>=0.42.1 +aiohttp>=3.9.0 +sqlite-vec>=0.1.6 +sqlalchemy>=2.0.0 +mcp>=0.1.0 +fastapi>=0.100.0 +uvicorn>=0.23.0 +python-multipart>=0.0.6 +PyMuPDF>=1.23.0 diff --git a/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/tool.py b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/tool.py new file mode 100644 index 0000000000000000000000000000000000000000..06f0a20e3de42d5ec333623aef7668bf9376e825 --- /dev/null +++ b/mcp_center/servers/oe-cli-mcp-server/mcp_tools/rag_tools/tool.py @@ -0,0 +1,601 @@ +import os +import sys +import uuid +import shutil +import logging +import asyncio +from typing import Optional, Dict, Any, List + +current_dir = os.path.dirname(os.path.abspath(__file__)) +if current_dir not in sys.path: + sys.path.append(current_dir) + +from base.manager.database_manager import Database +from base.manager.document_manager import DocumentManager, import_document as _import_document, update_document as _update_document +from base.config import get_default_top_k +from base.models import KnowledgeBase +from base.search.weighted_keyword_and_vector_search import weighted_keyword_and_vector_search + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s' +) +logger = logging.getLogger(__name__) + +_db_instance: Optional[Database] = None +_db_path = os.path.join(current_dir, "database", "kb.db") +_current_kb_id: Optional[str] = None + + +def _get_db() -> Database: + """获取数据库实例(固定使用kb.db)""" + global _db_instance + if _db_instance is None: + db_dir = os.path.dirname(_db_path) + if not os.path.exists(db_dir): + os.makedirs(db_dir, exist_ok=True) + _db_instance = Database(_db_path) + return _db_instance + + +def _ensure_active_kb(result: Dict[str, Any]) -> Optional[str]: + """确保已选择知识库""" + if not _current_kb_id: + result["message"] = "请先选择知识库" + return None + return _current_kb_id + + +def create_knowledge_base( + kb_name: str, + chunk_size: int, + embedding_model: Optional[str] = None, + embedding_endpoint: Optional[str] = None, + embedding_api_key: Optional[str] = None +) -> Dict[str, Any]: + """ + 新增知识库 + + :param kb_name: 知识库名称 + :param chunk_size: chunk 大小(token 数) + :param embedding_model: 向量化模型名称(可选) + :param embedding_endpoint: 向量化服务端点(可选) + :param embedding_api_key: 向量化服务 API Key(可选) + :return: 创建结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + db = _get_db() + session = db.get_session() + try: + # 检查知识库名称是否已存在 + existing_kb = db.get_knowledge_base(kb_name) + if existing_kb: + result["message"] = f"知识库 '{kb_name}' 已存在" + return result + + kb_id = str(uuid.uuid4()) + if db.add_knowledge_base(kb_id, kb_name, chunk_size, + embedding_model, embedding_endpoint, embedding_api_key): + result["success"] = True + result["message"] = f"成功创建知识库: {kb_name}" + result["data"] = { + "kb_id": kb_id, + "kb_name": kb_name, + "chunk_size": chunk_size + } + else: + result["message"] = "创建知识库失败" + finally: + session.close() + except Exception as e: + logger.exception(f"[create_knowledge_base] 创建知识库失败: {e}") + result["message"] = "创建知识库失败" + + return result + + +def delete_knowledge_base(kb_name: str) -> Dict[str, Any]: + """ + 删除知识库 + + :param kb_name: 知识库名称 + :return: 删除结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + db = _get_db() + kb = db.get_knowledge_base(kb_name) + if not kb: + result["message"] = f"知识库 '{kb_name}' 不存在" + return result + + # 检查是否是当前使用的知识库 + global _current_kb_id + if _current_kb_id == kb.id: + result["message"] = "不能删除当前正在使用的知识库" + return result + + if db.delete_knowledge_base(kb.id): + result["success"] = True + result["message"] = f"成功删除知识库: {kb_name}" + result["data"] = {"kb_name": kb_name} + else: + result["message"] = "删除知识库失败" + except Exception as e: + logger.exception(f"[delete_knowledge_base] 删除知识库失败: {e}") + result["message"] = "删除知识库失败" + + return result + + +def list_knowledge_bases() -> Dict[str, Any]: + """ + 查看知识库列表 + + :return: 知识库列表 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + db = _get_db() + kbs = db.list_knowledge_bases() + + knowledge_bases = [] + global _current_kb_id + for kb in kbs: + knowledge_bases.append({ + "id": kb.id, + "name": kb.name, + "chunk_size": kb.chunk_size, + "embedding_model": kb.embedding_model, + "created_at": kb.created_at.isoformat() if kb.created_at else None, + "is_current": _current_kb_id == kb.id + }) + + result["success"] = True + result["message"] = f"找到 {len(knowledge_bases)} 个知识库" + result["data"] = { + "knowledge_bases": knowledge_bases, + "count": len(knowledge_bases), + "current_kb_id": _current_kb_id + } + except Exception as e: + logger.exception(f"[list_knowledge_bases] 获取知识库列表失败: {e}") + result["message"] = "获取知识库列表失败" + + return result + + +def select_knowledge_base(kb_name: str) -> Dict[str, Any]: + """ + 选择知识库 + + :param kb_name: 知识库名称 + :return: 选择结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + db = _get_db() + kb = db.get_knowledge_base(kb_name) + if not kb: + result["message"] = f"知识库 '{kb_name}' 不存在" + return result + + global _current_kb_id + _current_kb_id = kb.id + + session = db.get_session() + try: + manager = DocumentManager(session) + docs = manager.list_documents_by_kb(kb.id) + doc_count = len(docs) + finally: + session.close() + + result["success"] = True + result["message"] = f"成功选择知识库,共 {doc_count} 个文档" + result["data"] = { + "kb_id": kb.id, + "kb_name": kb.name, + "document_count": doc_count + } + except Exception as e: + logger.exception(f"[select_knowledge_base] 选择知识库失败: {e}") + result["message"] = "选择知识库失败" + + return result + + +async def import_document(file_paths: List[str], chunk_size: Optional[int] = None) -> Dict[str, Any]: + """ + 上传文档到当前知识库(异步,支持多文件并发导入) + + :param file_paths: 文件路径列表(绝对路径),支持1~n个文件 + :param chunk_size: chunk 大小(token 数,可选,默认使用知识库的chunk_size) + :return: 导入结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + kb_id = _ensure_active_kb(result) + if not kb_id: + return result + + if not file_paths: + result["message"] = "文件路径列表为空" + return result + + # 验证文件路径是否存在 + invalid_paths = [path for path in file_paths if not os.path.exists(path)] + if invalid_paths: + result["message"] = f"以下文件路径不存在: {', '.join(invalid_paths)}" + return result + + db = _get_db() + # 先获取知识库信息 + session = db.get_session() + try: + kb = session.query(KnowledgeBase).filter_by(id=kb_id).first() + if not kb: + result["message"] = "知识库不存在" + return result + + if chunk_size is None: + chunk_size = kb.chunk_size + finally: + session.close() + + # 并发处理多个文件,每个文件使用独立的 session + async def import_single_file(file_path: str): + """为单个文件创建独立的 session 并导入""" + file_session = db.get_session() + try: + return await _import_document(file_session, kb_id, file_path, chunk_size) + finally: + file_session.close() + + tasks = [ + import_single_file(file_path) + for file_path in file_paths + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # 统计结果 + success_count = 0 + failed_count = 0 + success_files = [] + failed_files = [] + + for i, res in enumerate(results): + file_path = file_paths[i] + if isinstance(res, Exception): + failed_count += 1 + failed_files.append({ + "file_path": file_path, + "error": str(res) + }) + logger.exception(f"[import_document] 导入文件失败: {file_path}, 错误: {res}") + else: + success, message, data = res + if success: + success_count += 1 + success_files.append({ + "file_path": file_path, + "doc_name": data.get("doc_name") if data else os.path.basename(file_path), + "chunk_count": data.get("chunk_count", 0) if data else 0 + }) + else: + failed_count += 1 + failed_files.append({ + "file_path": file_path, + "error": message + }) + + result["success"] = success_count > 0 + result["message"] = f"成功导入 {success_count} 个文档,失败 {failed_count} 个" + result["data"] = { + "total": len(file_paths), + "success_count": success_count, + "failed_count": failed_count, + "success_files": success_files, + "failed_files": failed_files + } + except Exception as e: + logger.exception(f"[import_document] 导入文档失败: {e}") + result["message"] = f"导入文档失败: {str(e)}" + + return result + + +async def search(query: str, top_k: Optional[int] = None) -> Dict[str, Any]: + """ + 在当前知识库中查询(异步) + + :param query: 查询文本 + :param top_k: 返回数量(可选,默认从配置读取) + :return: 检索结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + if top_k is None: + top_k = get_default_top_k() + + kb_id = _ensure_active_kb(result) + if not kb_id: + return result + + weight_keyword = 0.3 + weight_vector = 0.7 + + try: + db = _get_db() + session = db.get_session() + try: + # 获取当前知识库的所有文档ID + manager = DocumentManager(session) + docs = manager.list_documents_by_kb(kb_id) + doc_ids = [doc.id for doc in docs] + + if not doc_ids: + result["message"] = "当前知识库中没有文档" + result["data"] = {"chunks": []} + return result + + conn = session.connection() + chunks = await weighted_keyword_and_vector_search( + conn, query, top_k, weight_keyword, weight_vector, doc_ids + ) + finally: + session.close() + + if not chunks: + result["message"] = "未找到相关结果" + result["data"] = {"chunks": []} + return result + + result["success"] = True + result["message"] = f"找到 {len(chunks)} 个相关结果" + result["data"] = { + "chunks": chunks, + "count": len(chunks) + } + except Exception as e: + logger.exception(f"[search] 搜索失败: {e}") + result["message"] = "搜索失败" + + return result + + +def list_documents() -> Dict[str, Any]: + """ + 查看当前知识库下的文档列表 + + :return: 文档列表 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + kb_id = _ensure_active_kb(result) + if not kb_id: + return result + + db = _get_db() + session = db.get_session() + try: + manager = DocumentManager(session) + docs = manager.list_documents_by_kb(kb_id) + finally: + session.close() + + documents = [] + for doc in docs: + documents.append({ + "id": doc.id, + "name": doc.name, + "file_path": doc.file_path, + "file_type": doc.file_type, + "chunk_size": doc.chunk_size, + "created_at": doc.created_at.isoformat() if doc.created_at else None, + "updated_at": doc.updated_at.isoformat() if doc.updated_at else None + }) + + result["success"] = True + result["message"] = f"找到 {len(documents)} 个文档" + result["data"] = { + "documents": documents, + "count": len(documents) + } + except Exception as e: + logger.exception(f"[list_documents] 获取文档列表失败: {e}") + result["message"] = "获取文档列表失败" + + return result + + +def delete_document(doc_name: str) -> Dict[str, Any]: + """ + 删除当前知识库下的文档 + + :param doc_name: 文档名称 + :return: 删除结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + kb_id = _ensure_active_kb(result) + if not kb_id: + return result + + db = _get_db() + session = db.get_session() + try: + manager = DocumentManager(session) + if manager.delete_document(kb_id, doc_name): + result["success"] = True + result["message"] = f"成功删除文档: {doc_name}" + result["data"] = {"doc_name": doc_name} + else: + result["message"] = f"文档 '{doc_name}' 不存在或删除失败" + finally: + session.close() + except Exception as e: + logger.exception(f"[delete_document] 删除文档失败: {e}") + result["message"] = "删除文档失败" + + return result + + +async def update_document(doc_name: str, chunk_size: int) -> Dict[str, Any]: + """ + 修改文档的chunk_size并重新解析(异步) + + :param doc_name: 文档名称 + :param chunk_size: 新的chunk大小 + :return: 修改结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + kb_id = _ensure_active_kb(result) + if not kb_id: + return result + + db = _get_db() + session = db.get_session() + try: + success, message, data = await _update_document(session, kb_id, doc_name, chunk_size) + result["success"] = success + result["message"] = message + result["data"] = data or {} + finally: + session.close() + except Exception as e: + logger.exception(f"[update_document] 修改文档失败: {e}") + result["message"] = "修改文档失败" + + return result + + +def export_database(export_path: str) -> Dict[str, Any]: + """ + 导出整个kb.db数据库文件 + + :param export_path: 导出路径(绝对路径) + :return: 导出结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + if not os.path.exists(_db_path): + result["message"] = "数据库文件不存在" + return result + + if not export_path: + result["message"] = "导出路径不能为空" + return result + + # 确保导出路径以 .db 结尾 + if not export_path.endswith(('.db', '.sqlite', '.sqlite3')): + export_path += '.db' + + # 确保目标目录存在 + export_dir = os.path.dirname(export_path) + if export_dir and not os.path.exists(export_dir): + os.makedirs(export_dir, exist_ok=True) + + shutil.copy2(_db_path, export_path) + + result["success"] = True + result["message"] = f"成功导出数据库到: {export_path}" + result["data"] = { + "source_path": _db_path, + "export_path": export_path + } + except Exception as e: + logger.exception(f"[export_database] 导出数据库失败: {e}") + result["message"] = f"导出数据库失败: {str(e)}" + + return result + + +def import_database(source_db_path: str) -> Dict[str, Any]: + """ + 导入一个.db数据库文件,将其中的内容合并到kb.db中 + + :param source_db_path: 源数据库文件路径(绝对路径) + :return: 导入结果 + """ + result = { + "success": False, + "message": "", + "data": {} + } + + try: + if not source_db_path: + result["message"] = "源数据库路径不能为空" + return result + + if not os.path.exists(source_db_path): + result["message"] = f"源数据库文件不存在: {source_db_path}" + return result + + db = _get_db() + imported_kb_count, imported_doc_count = db.import_database(source_db_path) + + result["success"] = True + result["message"] = f"成功导入,共 {imported_kb_count} 个知识库,{imported_doc_count} 个文档" + result["data"] = { + "source_path": source_db_path, + "imported_kb_count": imported_kb_count, + "imported_doc_count": imported_doc_count + } + except Exception as e: + logger.exception(f"[import_database] 导入数据库失败: {e}") + result["message"] = f"导入数据库失败: {str(e)}" + + return result