diff --git a/oe-cli-mcp-server/.gitignore b/oe-cli-mcp-server/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..e2b1089851263d8283a3eaa188bbf2892e17411d --- /dev/null +++ b/oe-cli-mcp-server/.gitignore @@ -0,0 +1,17 @@ +# 虚拟环境(本地开发环境,无需提交) +venv/ +env/ +ENV/ +.venv/ +virtualenv/ +*_venv/ # 匹配自定义虚拟环境名称(如 target_venv) + +# 编译产物 +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python + +test_tool +data/tool_state.json diff --git a/oe-cli-mcp-server/README.en.md b/oe-cli-mcp-server/README.en.md new file mode 100644 index 0000000000000000000000000000000000000000..46d9216f7bcd64b826e1ca7fb265dab76552c9d9 --- /dev/null +++ b/oe-cli-mcp-server/README.en.md @@ -0,0 +1,36 @@ +# oe-cli-mcp-server + +#### Description +{**When you're done, you can delete the content in this README and update the file with details for others getting started with your repository**} + +#### Software Architecture +Software architecture description + +#### Installation + +1. xxxx +2. xxxx +3. xxxx + +#### Instructions + +1. xxxx +2. xxxx +3. xxxx + +#### Contribution + +1. Fork the repository +2. Create Feat_xxx branch +3. Commit your code +4. Create Pull Request + + +#### Gitee Feature + +1. You can use Readme\_XXX.md to support different languages, such as Readme\_en.md, Readme\_zh.md +2. Gitee blog [blog.gitee.com](https://blog.gitee.com) +3. Explore open source project [https://gitee.com/explore](https://gitee.com/explore) +4. The most valuable open source project [GVP](https://gitee.com/gvp) +5. The manual of Gitee [https://gitee.com/help](https://gitee.com/help) +6. The most popular members [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/oe-cli-mcp-server/README.md b/oe-cli-mcp-server/README.md new file mode 100644 index 0000000000000000000000000000000000000000..029f872ca685ab1fb7abda847b2365faa2df7ec1 --- /dev/null +++ b/oe-cli-mcp-server/README.md @@ -0,0 +1,39 @@ +# oe-cli-mcp-server + +#### 介绍 +{**以下是 Gitee 平台说明,您可以替换此简介** +Gitee 是 OSCHINA 推出的基于 Git 的代码托管平台(同时支持 SVN)。专为开发者提供稳定、高效、安全的云端软件开发协作平台 +无论是个人、团队、或是企业,都能够用 Gitee 实现代码托管、项目管理、协作开发。企业项目请看 [https://gitee.com/enterprises](https://gitee.com/enterprises)} + +#### 软件架构 +软件架构说明 + + +#### 安装教程 + +1. xxxx +2. xxxx +3. xxxx + +#### 使用说明 + +1. export PYTHONPATH=/home/tsn/oe-cli-mcp-server:$PYTHONPATH +2. xxxx +3. xxxx + +#### 参与贡献 + +1. Fork 本仓库 +2. 新建 Feat_xxx 分支 +3. 提交代码 +4. 新建 Pull Request + + +#### 特技 + +1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md +2. Gitee 官方博客 [blog.gitee.com](https://blog.gitee.com) +3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解 Gitee 上的优秀开源项目 +4. [GVP](https://gitee.com/gvp) 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目 +5. Gitee 官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) +6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/) diff --git a/oe-cli-mcp-server/client/client.py b/oe-cli-mcp-server/client/client.py new file mode 100644 index 0000000000000000000000000000000000000000..69e390c4259cde2b7ff48e49c2dfa80f1aa56c62 --- /dev/null +++ b/oe-cli-mcp-server/client/client.py @@ -0,0 +1,131 @@ +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2025. All rights reserved. +"""MCP Client""" + +import asyncio +import logging +from contextlib import AsyncExitStack +from typing import Union +from enum import Enum +from mcp import ClientSession +from mcp.client.sse import sse_client +from numpy.f2py.crackfortran import reset_global_f2py_vars + +logger = logging.getLogger(__name__) + + +class MCPStatus(str, Enum): + """MCP状态枚举""" + UNINITIALIZED = "UNINITIALIZED" + RUNNING = "RUNNING" + STOPPED = "STOPPED" + ERROR = "ERROR" + + +class MCPClient: + """MCP客户端基类""" + + def __init__(self, url: str, headers: dict[str, str]) -> None: + """初始化MCP Client""" + self.url = url + self.headers = headers + self.client: Union[ClientSession, None] = None + self.status = MCPStatus.UNINITIALIZED + + async def _main_loop( + self + ) -> None: + """ + 创建MCP Client + + 抽象函数;作用为在初始化的时候使用MCP SDK创建Client + 由于目前MCP的实现中Client和Session是1:1的关系,所以直接创建了 :class:`~mcp.ClientSession` + """ + # 创建Client + try: + client = sse_client( + url=self.url, + headers=self.headers + ) + except Exception as e: + self.error_sign.set() + err = f"创建Client失败,错误信息:{e}" + print(err) + raise Exception(err) + # 创建Client、Session + try: + exit_stack = AsyncExitStack() + read, write = await exit_stack.enter_async_context(client) + self.client = ClientSession(read, write) + session = await exit_stack.enter_async_context(self.client) + # 初始化Client + await session.initialize() + except Exception: + self.error_sign.set() + self.status = MCPStatus.STOPPED + err = f"初始化Client失败,错误信息:{e}" + print(err) + raise + + self.ready_sign.set() + self.status = MCPStatus.RUNNING + # 等待关闭信号 + await self.stop_sign.wait() + + # 关闭Client + try: + await exit_stack.aclose() # type: ignore[attr-defined] + self.status = MCPStatus.STOPPED + except Exception: + print(f"关闭Client失败,错误信息:{e}") + + async def init(self) -> None: + """ + 初始化 MCP Client类 + :return: None + """ + # 初始化变量 + self.ready_sign = asyncio.Event() + self.error_sign = asyncio.Event() + self.stop_sign = asyncio.Event() + + # 创建协程 + self.task = asyncio.create_task(self._main_loop()) + + # 等待初始化完成 + done, pending = await asyncio.wait( + [asyncio.create_task(self.ready_sign.wait()), + asyncio.create_task(self.error_sign.wait())], + return_when=asyncio.FIRST_COMPLETED + ) + if self.error_sign.is_set(): + self.status = MCPStatus.ERROR + print("MCP Client 初始化失败") + raise Exception("MCP Client 初始化失败") + + async def call_tool(self, tool_name: str, params: dict) -> "CallToolResult": + """调用MCP Server的工具""" + return await self.client.call_tool(tool_name, params) + + async def stop(self) -> None: + """停止MCP Client""" + self.stop_sign.set() + try: + await self.task + except Exception as e: + err = f"关闭MCP Client失败,错误信息:{e}" + print(err) + + +async def main() -> None: + """测试MCP Client""" + url = "http://0.0.0.0:8002/sse" + headers = {} + client = MCPClient(url, headers) + await client.init() + #result = await client.call_tool("list_knowledge_bases", {}) + result = await client.call_tool("nvidia_smi_status", {}) + print(result) + await client.stop() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/oe-cli-mcp-server/mcp-server.service b/oe-cli-mcp-server/mcp-server.service new file mode 100644 index 0000000000000000000000000000000000000000..74a8514a0a848ff0de9dab8dd720679ef5ddcd9c --- /dev/null +++ b/oe-cli-mcp-server/mcp-server.service @@ -0,0 +1,33 @@ +[Unit] +Description=MCP Tool Registration Service +After=network.target +After=multi-user.target +# 新增:确保文件系统就绪(避免工作目录未挂载) +RequiresMountsFor=/home/tsn/oe-cli-mcp-server + +[Service] +User=root +Group=root +# 工作目录:必须是项目根目录(已正确配置,确保 server.py 中相对路径生效) +WorkingDirectory=/home/tsn/oe-cli-mcp-server + +Environment="PATH=/home/tsn/oe-cli-mcp-server/venv/global/bin:$PATH" + +ExecStart=/home/tsn/oe-cli-mcp-server/venv/global/bin/python /home/tsn/oe-cli-mcp-server/mcp_server/server.py + +# 原有合理配置保留 +Restart=always +RestartSec=5 +KillMode=control-group +Environment="LANGUAGE=zh" +Environment="LOG_LEVEL=INFO" +StandardOutput=journal+console +StandardError=journal+console + +# 新增:限制服务资源(可选,避免占用过多CPU/内存) +LimitCPU=4 +LimitMEMLOCK=infinity +LimitNOFILE=65535 + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/MCP_TOOLS.json b/oe-cli-mcp-server/mcp_server/MCP_TOOLS.json new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/oe-cli-mcp-server/mcp_server/api_server.py b/oe-cli-mcp-server/mcp_server/api_server.py new file mode 100644 index 0000000000000000000000000000000000000000..fe4f8ab7ef507f0be38fe62eb41e8badff3eb721 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/api_server.py @@ -0,0 +1,149 @@ +# mcp_server/api_server.py +from fastapi import FastAPI, HTTPException, Query +from pydantic import BaseModel +import uvicorn +import threading +import logging + +# 导入单例 McpServer(用于调用核心业务逻辑) +from mcp_server.server import McpServer +from mcp_tools.tool_type import ToolType + +# 日志配置(与主服务保持一致) +logger = logging.getLogger("McpApiServer") + +# -------------------------- FastAPI 初始化 -------------------------- +def create_fastapi_app() -> FastAPI: + """创建 FastAPI 实例并定义接口(独立于 McpServer 类)""" + app = FastAPI( + title="MCP Tool Manager API", + description="用于管理 MCP 工具包的 HTTP 接口(替代原 Socket 服务)", + version="1.0.0" + ) + + # -------------------------- 数据模型(参数校验)-------------------------- + class AddToolRequest(BaseModel): + type: str = Query(..., description="工具类型:system(系统包)/ custom(自定义包)") + value: str = Query(..., description="系统包填 ToolType 值,自定义包填 zip 路径或包名") + + class RemoveToolRequest(BaseModel): + type: str = Query(..., description="工具类型:system(系统包)/ custom(自定义包)") + value: str = Query(..., description="系统包填 ToolType 值,自定义包填包名") + + # -------------------------- 核心:获取 McpServer 单例实例 -------------------------- + def get_mcp_server() -> McpServer: + """获取 McpServer 单例(确保与主服务使用同一个实例)""" + try: + return McpServer._instance # 直接获取单例(依赖原 singleton 装饰器的 _instance 属性) + except AttributeError: + raise HTTPException(status_code=503, detail="MCP 主服务未初始化") + + # -------------------------- API 接口定义 -------------------------- + @app.post("/tool/add", summary="新增工具包") + def add_tool(type: str = Query(...), value: str = Query(...)): + """新增工具包(system/custom 类型)""" + mcp_server = get_mcp_server() + try: + if type == "system": + # 转换为 ToolType 枚举 + try: + tool_type = ToolType(value) + except ValueError: + raise HTTPException(status_code=400, detail=f"不支持的系统工具类型:{value}(参考 ToolType 枚举)") + mcp_server.load(tool_type) + elif type == "custom": + mcp_server.load(value) + else: + raise HTTPException(status_code=400, detail="type 只能是 system 或 custom") + return {"success": True, "message": f"新增 {value} 成功"} + except HTTPException: + raise # 直接抛出已定义的 HTTP 异常 + except Exception as e: + logger.error(f"新增工具包失败:{str(e)}") + raise HTTPException(status_code=500, detail=f"新增失败:{str(e)}") + + @app.post("/tool/remove", summary="删除工具包") + def remove_tool(type: str = Query(...), value: str = Query(...)): + """删除工具包(system/custom 类型)""" + mcp_server = get_mcp_server() + try: + if type == "system": + try: + tool_type = ToolType(value) + except ValueError: + raise HTTPException(status_code=400, detail=f"不支持的系统工具类型:{value}") + mcp_server.remove(tool_type) + elif type == "custom": + mcp_server.remove(value) + else: + raise HTTPException(status_code=400, detail="type 只能是 system 或 custom") + return {"success": True, "message": f"删除 {value} 成功,重启后生效"} + except HTTPException: + raise + except Exception as e: + logger.error(f"删除工具包失败:{str(e)}") + raise HTTPException(status_code=500, detail=f"删除失败:{str(e)}") + + @app.get("/tool/list", summary="查询所有已加载工具包") + def list_tools(): + """查询所有工具包及对应的函数""" + mcp_server = get_mcp_server() + try: + pkg_funcs = {} + for pkg in mcp_server.list_packages(): + pkg_funcs[pkg] = mcp_server.list_funcs(pkg) + return { + "success": True, + "data": { + "pkg_funcs": pkg_funcs, + "total_packages": len(pkg_funcs) + } + } + except Exception as e: + logger.error(f"查询工具包失败:{str(e)}") + raise HTTPException(status_code=500, detail=f"查询失败:{str(e)}") + + @app.post("/tool/init", summary="初始化系统(仅保留基础运维包)") + def init_system(): + """初始化系统,卸载所有包并仅保留基础运维包""" + mcp_server = get_mcp_server() + try: + mcp_server.init() + return {"success": True, "message": "初始化成功(仅保留基础运维包)"} + except Exception as e: + logger.error(f"初始化系统失败:{str(e)}") + raise HTTPException(status_code=500, detail=f"初始化失败:{str(e)}") + + @app.post("/tool/restart", summary="重启 MCP 服务") + def restart_service(): + """重启 MCP 服务(重新加载所有工具包)""" + mcp_server = get_mcp_server() + try: + mcp_server.restart() + return {"success": True, "message": "服务重启成功"} + except Exception as e: + logger.error(f"重启服务失败:{str(e)}") + raise HTTPException(status_code=500, detail=f"重启失败:{str(e)}") + + return app + +# -------------------------- 启动 FastAPI 服务(独立线程)-------------------------- +def start_fastapi_server(host: str = "0.0.0.0", port: int = 8003): + """在独立线程中启动 FastAPI 服务(不阻塞主服务)""" + app = create_fastapi_app() + logger.info(f"FastAPI 服务启动中:http://{host}:{port}") + logger.info(f"接口文档地址:http://{host}:{port}/docs") + + def run_server(): + uvicorn.run( + app, + host=host, + port=port, + log_level="warning", # 仅输出警告以上日志 + access_log=False # 关闭访问日志(避免刷屏) + ) + + # 启动独立线程(daemon=True:主进程退出时,API 服务也退出) + api_thread = threading.Thread(target=run_server, daemon=True) + api_thread.start() + return api_thread \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/cli.py b/oe-cli-mcp-server/mcp_server/cli.py new file mode 100755 index 0000000000000000000000000000000000000000..029bd722a0e4d88fa7c692c38600059573036126 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/cli.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3 +import logging +import os +import sys +with open("/etc/systemd/system/mcp-server.service", "r") as f: + for line in f: + if line.strip().startswith("WorkingDirectory="): + PROJECT_ROOT = line.strip().split("=", 1)[1] + break + +# 加入 sys.path +sys.path.insert(0, PROJECT_ROOT) +from mcp_server.cli.parse_args import parse_args +from mcp_server.cli.handle import ( + handle_add, handle_remove, handle_tool, handle_init, + handle_start, handle_log, handle_llm, handle_config, handle_stop +) + +# 日志极简配置 +logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + +def main(): + args = parse_args() + success = False + + # 命令调度(直接映射,无冗余) + if args.add: + success = handle_add(args.add) + elif args.remove: + success = handle_remove(args.remove) + elif args.tool: + success = handle_tool() + elif args.init: + success = handle_init() + elif args.start: + success = handle_start() + elif args.log: + success = handle_log() + elif args.llm: + success = handle_llm(args.model, args.apikey, args.name) + elif args.stop: + success = handle_stop() + elif args.config: + success = handle_config(args.config) + + raise SystemExit(0 if success else 1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/cli/__init__.py b/oe-cli-mcp-server/mcp_server/cli/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/oe-cli-mcp-server/mcp_server/cli/handle.py b/oe-cli-mcp-server/mcp_server/cli/handle.py new file mode 100644 index 0000000000000000000000000000000000000000..ef2838aaca5286c91f435d6825d44571166b6bb4 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/cli/handle.py @@ -0,0 +1,162 @@ +import logging +import os +import subprocess +import toml +from mcp_tools.tool_type import ToolType +from util.get_project_root import get_project_root +from util.test_llm_valid import is_llm_config_valid +from .socket import send_socket_request + +# 路径配置(直接硬编码,简化) +PUBLIC_CONFIG_PATH = os.path.join(get_project_root(), "config/public_config.toml") + +logger = logging.getLogger(__name__) + +# -------------------------- 工具包操作 -------------------------- +def handle_add(pkg_input): + """处理 -add 命令""" + type_map = {"智能运维": ToolType.BASE.value, "智算调优": ToolType.AI.value, + "通算调优": ToolType.CAL.value, "镜像运维": ToolType.MIRROR.value, "个性化": ToolType.PERSONAL.value} + + if pkg_input in type_map: + params = {"type": "system", "value": type_map[pkg_input]} + elif os.path.isfile(pkg_input) and pkg_input.endswith(".zip"): + params = {"type": "custom", "value": os.path.abspath(pkg_input)} + else: + print(f"❌ 不支持的包类型:{pkg_input}") + raise SystemExit(1) + + result = send_socket_request("add", params) + print(f"✅ {result['message']}" if result["success"] else f"❌ {result['message']}") + return result["success"] + +def handle_remove(pkg_input): + """处理 -remove 命令""" + type_map = {"智能运维": ToolType.BASE.value, "智算调优": ToolType.AI.value, + "通算调优": ToolType.CAL.value, "镜像运维": ToolType.MIRROR.value, "个性化": ToolType.PERSONAL.value} + + params = {"type": "system" if pkg_input in type_map else "custom", + "value": type_map.get(pkg_input, pkg_input)} + result = send_socket_request("remove", params) + print(f"✅ {result['message']}" if result["success"] else f"❌ {result['message']}") + return result["success"] + +def handle_tool(): + """处理 -tool 命令""" + result = send_socket_request("list") + if not result["success"]: + print(f"❌ {result['message']}") + return False + + print(f"\n📋 当前已加载工具包(共{result['total']}个):") + for pkg, funcs in result["pkg_funcs"].items(): + print(f"- {pkg}:{len(funcs)}个工具 → {', '.join(funcs)}") + return True + +def handle_init(): + """处理 -init 命令""" + result = send_socket_request("init") + print(f"✅ {result['message']}" if result["success"] else f"❌ {result['message']}") + return result["success"] + +# -------------------------- 服务操作 -------------------------- +def handle_start(): + """处理 -start 命令""" + try: + subprocess.run(["sudo", "systemctl", "start", "mcp-server"], check=True) + print("✅ 服务启动成功") + return True + except Exception as e: + print(f"❌ 启动失败:{str(e)}") + return False + +def handle_stop(): + """处理 -stop 命令""" + try: + subprocess.run(["sudo", "systemctl", "stop", "mcp-server"], check=True) + print("✅ 服务终止成功") + return True + except Exception as e: + print(f"❌ 终止失败:{str(e)}") + return False + +def handle_restart(): + """处理 -restart 命令""" + try: + subprocess.run(["sudo", "systemctl", "restart", "mcp-server"], check=True) + print("✅ 服务重启成功") + return True + except Exception as e: + print(f"❌ 重启失败:{str(e)}") + return False + +def handle_log(): + """处理 -log 命令""" + try: + subprocess.run(["sudo", "journalctl", "-u", "mcp-server", "-f"], check=True) + except KeyboardInterrupt: + print("\n📌 日志查看退出") + except Exception as e: + print(f"❌ 查看失败:{str(e)}") + return True + +# -------------------------- 配置操作 -------------------------- +def handle_llm(model, apikey, name): + """处理 -llm 命令""" + if not all([model, apikey, name]): + print("❌ 缺少参数:--model、--apikey、--name 必须同时指定") + return False + + if not is_llm_config_valid(model, apikey, name): + print("❌ 大模型校验失败") + return False + + try: + with open(PUBLIC_CONFIG_PATH, "r") as f: + config = toml.load(f) + config["llm_remote"] = model + config["llm_api_key"] = apikey + config["llm_model"] = name + with open(PUBLIC_CONFIG_PATH, "w") as f: + toml.dump(config, f) + + subprocess.run(["sudo", "systemctl", "restart", "mcp-server"], check=True) + print(f"✅ 大模型配置成功(模型:{name})") + return True + except Exception as e: + print(f"❌ 配置失败:{str(e)}") + return False + +def handle_config(key_value): + """处理 -config 命令""" + if "=" not in key_value: + print("❌ 格式错误:需为 键=值(如 -config port=8002)") + return False + + key, value = key_value.split("=", 1) + supported = ["language", "max_tokens", "temperature", "port"] + if key not in supported: + print(f"❌ 不支持的键:{key}(支持:{', '.join(supported)})") + return False + + try: + with open(PUBLIC_CONFIG_PATH, "r") as f: + config = toml.load(f) + # 类型转换 + if key in ["max_tokens", "port"]: + value = int(value) + elif key == "temperature": + value = float(value) + config[key] = value + with open(PUBLIC_CONFIG_PATH, "w") as f: + toml.dump(config, f) + + if key == "port": + subprocess.run(["sudo", "systemctl", "restart", "mcp-server"], check=True) + print(f"✅ 配置 {key}={value}(已重启服务)") + else: + print(f"✅ 配置 {key}={value}(下次重启生效)") + return True + except Exception as e: + print(f"❌ 配置失败:{str(e)}") + return False \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/cli/parse_args.py b/oe-cli-mcp-server/mcp_server/cli/parse_args.py new file mode 100644 index 0000000000000000000000000000000000000000..d55c243ed2153fa77e57e61f9ba4ca40cd7142ad --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/cli/parse_args.py @@ -0,0 +1,25 @@ +import argparse + +def parse_args(): + """解析命令行参数(仅保留核心参数,对齐原有命令)""" + parser = argparse.ArgumentParser(description="mcp-server 命令行工具") + + # 互斥命令组(一次一个核心操作) + command_group = parser.add_mutually_exclusive_group(required=True) + command_group.add_argument("-add", metavar="包名/zip路径", help="新增工具包(示例:-add 智算调优 或 -add ./custom.zip)") + command_group.add_argument("-remove", metavar="包名", help="删除工具包(示例:-remove 智算调优)") + command_group.add_argument("-tool", action="store_true", help="查看已加载工具包") + command_group.add_argument("-init", action="store_true", help="初始化服务(仅保留基础运维包)") + command_group.add_argument("-log", action="store_true", help="查看服务实时日志") + command_group.add_argument("-start", action="store_true", help="启动 mcp-server 服务") + command_group.add_argument("-stop", action="store_true", help="终止 mcp-server 服务") + command_group.add_argument("-restart", action="store_true", help="重启 mcp-server 服务") + command_group.add_argument("-llm", action="store_true", help="配置大模型(需配合 --model/--apikey/--name)") + command_group.add_argument("-config", metavar="键=值", help="修改公共配置(示例:-config language=en)") + + # 大模型配置附属参数 + parser.add_argument("--model", help="大模型地址(如 http://127.0.0.1:8000)") + parser.add_argument("--apikey", help="大模型 API 密钥") + parser.add_argument("--name", help="大模型名称(如 qwen、gpt-3.5)") + + return parser.parse_args() \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/cli/socket.py b/oe-cli-mcp-server/mcp_server/cli/socket.py new file mode 100644 index 0000000000000000000000000000000000000000..6dd683d0b2d544a8cdff4af75dee38017a6504e8 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/cli/socket.py @@ -0,0 +1,32 @@ +import socket +import json +import logging +from typing import Dict, Any + +# Socket配置(直接硬编码,简化配置) +SOCKET_HOST = "127.0.0.1" +SOCKET_PORT = 8003 +SOCKET_TIMEOUT = 5 + +logger = logging.getLogger(__name__) + +def send_socket_request(action: str, params: Dict[str, Any] = None) -> Dict[str, Any]: + """发送Socket请求到服务端(极简封装,失败直接返回错误)""" + params = params or {} + request = json.dumps({"action": action, "params": params}, ensure_ascii=False).encode("utf-8") + + try: + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.settimeout(SOCKET_TIMEOUT) + client.connect((SOCKET_HOST, SOCKET_PORT)) + client.send(request) + response = json.loads(client.recv(4096).decode("utf-8")) + client.close() + return response + except socket.timeout: + return {"success": False, "message": "连接服务超时"} + except ConnectionRefusedError: + return {"success": False, "message": "服务未启动,请先执行 -start"} + except Exception as e: + logger.error(f"Socket通信失败:{str(e)}") + return {"success": False, "message": f"通信失败:{str(e)}"} \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/dependency.py b/oe-cli-mcp-server/mcp_server/dependency.py new file mode 100644 index 0000000000000000000000000000000000000000..0c06aea398bd9a22811c099f5ed10946f3109b92 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/dependency.py @@ -0,0 +1,265 @@ +import os +import subprocess +import toml +import sys +import logging +from typing import Any, Optional, Dict +from pkg_resources import get_distribution, DistributionNotFound +from util.get_project_root import get_project_root + +logger = logging.getLogger(__name__) +class DepVenvManager: + """ + 依赖与虚拟环境管理类 + 聚合功能:虚拟环境生命周期管理 + 系统/Python依赖安装 + 依赖冲突检测 + """ + def __init__(self): + """初始化:绑定虚拟环境路径、初始化日志器(公共属性一次定义)""" + + self.logger = logger + self.logger.setLevel(logging.INFO) + + # 1. 虚拟环境路径(固定规范,适配 openEuler) + self.project_root = get_project_root() + if not self.project_root: + self.project_root = os.getcwd() + self.logger.warning(f"无法获取项目根目录,使用当前工作目录:{self.project_root}") + + self.venv_root = os.path.join(self.project_root, "venv") + self.global_venv_path = os.path.join(self.venv_root, "global") + self.isolated_venv_root = os.path.join(self.venv_root, "isolated") + + # 2. 初始化目录(确保根目录存在) + os.makedirs(self.venv_root, exist_ok=True) + os.makedirs(self.isolated_venv_root, exist_ok=True) + + def _get_installed_packages(self, venv_path: str) -> Dict[str, str]: + """辅助方法:获取指定虚拟环境中已安装的Python包(包名→版本号)""" + pip_path = self.get_venv_pip(venv_path) + try: + # 用 pip list --format=json 获取结构化输出,解析效率高 + result = subprocess.run( + [pip_path, "list", "--format=json"], + capture_output=True, text=True, check=True + ) + packages = json.loads(result.stdout) + # 转为字典:{包名: 版本号}(忽略大小写,比如 requests → Requests) + return {pkg["name"].lower(): pkg["version"] for pkg in packages} + except Exception as e: + self.logger.error(f"获取已安装包失败:{str(e)}") + return {} # 异常时返回空字典,降级为全量安装(避免卡住) + + + def _get_venv_site_packages(self, venv_path: str) -> str: + """私有辅助方法:获取虚拟环境 site-packages 路径""" + python_version = f"python{sys.version_info.major}.{sys.version_info.minor}" + return os.path.join(venv_path, "lib", python_version, "site-packages") + + # -------------------------- 1. 虚拟环境管理方法 -------------------------- + def create_global_venv(self) -> str: + """创建全局共用虚拟环境""" + if os.path.exists(self.global_venv_path): + return self.global_venv_path + + self.logger.info(f"创建全局虚拟环境:{self.global_venv_path}") + subprocess.run( + [sys.executable, "-m", "venv", self.global_venv_path], + check=True, capture_output=True, text=True + ) + self.logger.info("全局虚拟环境创建完成") + return self.global_venv_path + + def create_isolated_venv(self, tool_id: str) -> str: + """创建独立虚拟环境(用于依赖冲突的 tool)""" + venv_path = os.path.join(self.isolated_venv_root, tool_id) + if os.path.exists(venv_path): + return venv_path + + self.logger.warning(f"依赖冲突,创建独立虚拟环境:{venv_path}") + subprocess.run( + [sys.executable, "-m", "venv", venv_path], + check=True, capture_output=True, text=True + ) + self.logger.info("独立虚拟环境创建完成") + return venv_path + + def delete_isolated_venv(self, tool_id: str) -> bool: + """删除指定 tool 的独立虚拟环境(保护全局环境)""" + venv_path = os.path.join(self.isolated_venv_root, tool_id) + if not os.path.exists(venv_path): + self.logger.debug(f"独立虚拟环境不存在:{venv_path}") + return True + + try: + import shutil + shutil.rmtree(venv_path) + self.logger.info(f"删除独立虚拟环境:{venv_path}") + return True + except Exception as e: + self.logger.error(f"删除独立虚拟环境失败:{str(e)}") + return False + + def get_venv_pip(self, venv_path: Optional[str] = None) -> str: + """获取指定虚拟环境的 pip 路径""" + # 优先使用指定环境,否则使用当前激活环境 + target_venv = venv_path or os.getenv("VIRTUAL_ENV") + if not target_venv: + raise Exception("未激活虚拟环境,请先执行 source ./venv/global/bin/activate") + return os.path.join(target_venv, "bin", "pip") + + # -------------------------- 2. 依赖冲突检测方法 -------------------------- + def check_pip_compatibility(self, pip_deps: Dict[str, str], venv_path: str) -> bool: + """检测 Python 依赖与目标环境的兼容性 """ + site_packages = self._get_venv_site_packages(venv_path) + sys.path.insert(0, site_packages) + + try: + for dep_name, ver_constraint in pip_deps.items(): + if not ver_constraint.strip(): + continue + + # 检查依赖是否已安装 + try: + installed_ver = get_distribution(dep_name).version + except DistributionNotFound: + continue # 未安装,无冲突 + + # 版本约束校验(==/>=/<=) + if ver_constraint.startswith("==") and installed_ver != ver_constraint[2:].strip(): + self.logger.debug(f"版本不兼容:{dep_name}(需{ver_constraint},当前{installed_ver})") + return False + if ver_constraint.startswith(">=") and installed_ver < ver_constraint[2:].strip(): + self.logger.debug(f"版本过低:{dep_name}(需{ver_constraint},当前{installed_ver})") + return False + if ver_constraint.startswith("<=") and installed_ver > ver_constraint[2:].strip(): + self.logger.debug(f"版本过高:{dep_name}(需{ver_constraint},当前{installed_ver})") + return False + return True + finally: + sys.path.remove(site_packages) + + # -------------------------- 3. 依赖安装方法 -------------------------- + def install_system_deps(self, system_deps: Dict[str, str]) -> Any: + """安装系统依赖""" + result = {"success": [], "failed": []} + if not system_deps: + return result + + self.logger.info("=== 开始安装系统依赖(yum)===") + for dep_name, yum_cmd in system_deps.items(): + # 检查是否已安装 + verify_cmd = f"{dep_name} --version" if dep_name != "docker" else "docker --version" + if subprocess.run(verify_cmd, shell=True, capture_output=True).returncode == 0: + self.logger.debug(f"系统依赖[{dep_name}]已安装,跳过") + result["success"].append(dep_name) + continue + + # 执行 yum 安装 + try: + self.logger.info(f"安装:{dep_name} → 命令:{yum_cmd}") + subprocess.run(yum_cmd, shell=True, check=True, text=True) + result["success"].append(dep_name) + except subprocess.CalledProcessError as e: + err_msg = f"返回码{e.returncode}:{e.stderr.strip()}" + self.logger.error(f"系统依赖[{dep_name}]安装失败:{err_msg}") + result["failed"].append(f"{dep_name}({err_msg})") + return result + + def install_pip_deps(self, pip_deps: Dict[str, str], venv_path: str, pip_index_url: Optional[str] = None) -> Any: + """安装Python依赖(新增:先检查已安装包,仅安装缺失的)""" + self.logger.info(f"开始处理依赖 | 环境:{os.path.basename(venv_path)} | 需检查依赖:{list(pip_deps.keys())}") + result = {"success": [], "failed": [], "skipped": []} # 新增 skipped 记录跳过的依赖 + if not pip_deps: + self.logger.warning("无需要安装的Python依赖") + return result + + pip_path = self.get_venv_pip(venv_path) + # 关键步骤1:获取当前环境已安装的包(格式:{包名: 版本号}) + installed_pkgs = self._get_installed_packages(venv_path) + + # 关键步骤2:筛选出「未安装」或「版本不匹配」的依赖 + need_install = {} + for dep_name, ver_constraint in pip_deps.items(): + # 处理版本约束(比如 ver_constraint 是 "==3.4.0",提取版本号 "3.4.0") + ver_wanted = ver_constraint.strip().lstrip("==") if ver_constraint.strip() else None + if dep_name not in installed_pkgs: + need_install[dep_name] = ver_constraint # 未安装,需要安装 + self.logger.info(f"包 {dep_name} 未安装,需安装版本:{ver_constraint}") + else: + ver_installed = installed_pkgs[dep_name] + if ver_wanted and ver_installed != ver_wanted: + need_install[dep_name] = ver_constraint # 版本不匹配,需要更新 + self.logger.info(f"包 {dep_name} 已安装版本 {ver_installed},需更新为:{ver_wanted}") + else: + result["skipped"].append(f"{dep_name}=={ver_installed}") # 已安装且版本匹配,跳过 + self.logger.info(f"包 {dep_name}=={ver_installed} 已存在,跳过安装") + + # 无需安装新依赖,直接返回 + if not need_install: + self.logger.info("所有依赖均已安装,无需额外操作") + return result + + # 关键步骤3:仅安装筛选出的「需要安装/更新」的依赖 + self.logger.info(f"开始安装缺失/不匹配的依赖:{need_install}") + for dep_name, ver_constraint in need_install.items(): + dep_spec = f"{dep_name}{ver_constraint.strip()}" if ver_constraint.strip() else dep_name + install_cmd = [pip_path, "install", "-q", "--no-cache-dir"] + if pip_index_url: + trusted_host = pip_index_url.split("://")[-1].split("/")[0] + install_cmd.extend(["--index-url", pip_index_url, "--trusted-host", trusted_host]) + install_cmd.append(dep_spec) + + try: + subprocess.run(install_cmd, check=True, capture_output=True, text=True) + result["success"].append(dep_spec) + self.logger.info(f"✅ 安装成功:{dep_spec}") + except subprocess.CalledProcessError as e: + err_msg = e.stderr.strip() + self.logger.error(f"❌ 安装失败:{dep_spec} | 错误:{err_msg[:100]}") + result["failed"].append(f"{dep_spec}({err_msg})") + + self.logger.info(f"依赖处理完成 | 成功:{len(result['success'])} 个 | 失败:{len(result['failed'])} 个 | 跳过:{len(result['skipped'])} 个") + return result + + def execute_deps_script(self, deps_script_path: str, venv_path: str) -> Any: + """执行完整依赖脚本(系统+Python依赖)""" + if not os.path.exists(deps_script_path): + raise FileNotFoundError(f"依赖脚本不存在:{deps_script_path}") + + # 读取依赖脚本 + with open(deps_script_path, "r", encoding="utf-8") as f: + deps_data = toml.load(f) + + # 读取是否有配置pip源 + pip_index_url = deps_data.get("pip_config", {}).get("index_url") + + # 安装系统依赖 + Python依赖 + system_result = self.install_system_deps(deps_data.get("system", {})) + pip_result = self.install_pip_deps(deps_data.get("pip", {}), venv_path, pip_index_url) + + # 输出结果 + total_ok = len(system_result["success"]) + len(pip_result["success"]) + total_fail = len(system_result["failed"]) + len(pip_result["failed"]) + self.logger.info(f"=== 依赖脚本执行完成 == 成功:{total_ok} 个 | 失败:{total_fail} 个 ===") + + return {"system": system_result, "pip": pip_result} + + # -------------------------- 4. tool的执行环境设置 -------------------------- + def select_venv_for_tool(self, tool_id: str, deps_script_path: Optional[str] = None) -> str: + """为 tool 选择合适的虚拟环境(全局优先,冲突则用独立环境)""" + # 无依赖 → 全局环境 + if not deps_script_path or not os.path.exists(deps_script_path): + self.logger.debug(f"tool[{tool_id}]无依赖,使用全局环境") + return self.create_global_venv() + + # 有依赖 → 先检查全局兼容性 + with open(deps_script_path, "r", encoding="utf-8") as f: + pip_deps = toml.load(f).get("pip_deps", {}) + + global_venv = self.create_global_venv() + if self.check_pip_compatibility(pip_deps, global_venv): + self.logger.debug(f"tool[{tool_id}]依赖与全局环境兼容") + return global_venv + + # 不兼容 → 独立环境 + return self.create_isolated_venv(tool_id) \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/manager/manager.py b/oe-cli-mcp-server/mcp_server/manager/manager.py new file mode 100644 index 0000000000000000000000000000000000000000..ba710c2dbf774995eb67dba0b58782aad4a3a2e2 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/manager/manager.py @@ -0,0 +1,321 @@ +# mcp_server/manager/manager.py +""" +Tool 全局管理器(对外统一API层) +核心职责:封装内部组件,对外提供简洁、统一的Tool操作API +- 角色:协调者(不包含业务逻辑,仅转发调用+参数校验) +- 设计原则:对外透明、API语义化、兼容原有使用习惯、屏蔽内部实现 +- 依赖:PackageLoader(加载)、PackageUnloader(卸载)、ToolRepository(查询) +""" +import json +import logging +import os +from typing import Dict, List, Optional, Any +from config.base_config_loader import get_tool_state_path +from mcp_server.manager.tool_repository import ToolRepository, tool_repository as default_repo +from mcp_server.manager.package_loader import PackageLoader, package_loader as default_loader +from mcp_server.manager.package_unloader import PackageUnloader, package_unloader as default_unloader + +logger = logging.getLogger(__name__) + +# 类型别名:与内部组件保持一致,提升API可读性 +ToolType = str +PackageName = str +PackageDir = str +FuncName = str + + +class ToolManager: + """Tool全局管理器:对外提供加载/卸载/查询一站式API""" + + def __init__(self, + tool_repository: Optional[ToolRepository] = None, + package_loader: Optional[PackageLoader] = None, + package_unloader: Optional[PackageUnloader] = None): + """ + 初始化管理器(依赖注入,便于测试和组件替换) + :param tool_repository: 数据仓库组件(默认使用全局单例) + :param package_loader: 包加载器组件(默认使用全局单例) + :param package_unloader: 包卸载器组件(默认使用全局单例) + """ + self._repo = tool_repository or default_repo + self._loader = package_loader or default_loader + self._unloader = package_unloader or default_unloader + self._state_file_path = get_tool_state_path() + logger.info("ToolManager 初始化完成 | 内部组件已就绪") + + # ------------------------------------------------------------------------- + # 加载操作API(转发给PackageLoader) + # ------------------------------------------------------------------------- + def load_package(self, package_dir: PackageDir) -> Optional[PackageName]: + """ + 加载单个包(最小操作单元) + :param package_dir: 包目录绝对路径 + :return: 成功返回包名,失败返回None + """ + if not package_dir: + logger.error("[Load Package Failed] 原因:包目录不能为空") + return None + result = self._loader.load_package(package_dir) + if result: + self.persist_tool_state() + return result + + def load_tool_type(self, tool_type: ToolType) -> Dict[str, Any]: + """ + 加载指定分类下所有包(批量加载) + :param tool_type: 分类名(支持字符串/Enum) + :return: 加载结果统计(总包数/成功数/失败数等) + """ + if not tool_type: + logger.error("[Load ToolType Failed] 原因:分类名不能为空") + return self._init_empty_result("分类名不能为空") + result = self._loader.load_tool_type(tool_type) + self.persist_tool_state() + return result + + # ------------------------------------------------------------------------- + # 卸载操作API(转发给PackageUnloader) + # ------------------------------------------------------------------------- + def unload_package(self, package_name: PackageName, delete_env: bool = True) -> bool: + """ + 卸载单个包(最小操作单元) + :param package_name: 要卸载的包名 + :param delete_env: 是否删除独立环境(默认True,全局环境不删除) + :return: 卸载成功返回True,失败返回False + """ + if not package_name: + logger.error("[Unload Package Failed] 原因:包名不能为空") + return False + result = self._unloader.unload_package(package_name, delete_env) + if result: + self.persist_tool_state() + return result + + def unload_tool_type(self, tool_type: ToolType, delete_env: bool = True) -> bool: + """ + 卸载指定分类下所有包(批量卸载) + :param tool_type: 分类名(支持字符串/Enum) + :param delete_env: 是否删除独立环境(默认True) + :return: 整体卸载成功返回True(所有包都卸载成功),部分失败返回False + """ + if not tool_type: + logger.error("[Unload ToolType Failed] 原因:分类名不能为空") + return False + + result = self._unloader.unload_tool_type(tool_type, delete_env) + if result: + self.persist_tool_state() + return result + + # ------------------------------------------------------------------------- + # 查询操作API(转发给ToolRepository) + # ------------------------------------------------------------------------- + def get_package_info(self, package_name: PackageName) -> Optional[Dict[str, Any]]: + """ + 查询包完整信息(含环境、函数列表等) + :param package_name: 包名 + :return: 包信息字典,包不存在返回None + """ + if not package_name: + logger.error("[Get Package Info Failed] 原因:包名不能为空") + return None + return self._repo.get_package(package_name) + + def get_func_info(self, func_name: FuncName) -> Optional[Dict[str, Any]]: + """ + 查询函数详情(含所属包/分类/环境等关联信息) + :param func_name: 函数名(全局唯一) + :return: 函数信息字典,函数不存在返回None + """ + if not func_name: + logger.error("[Get Func Info Failed] 原因:函数名不能为空") + return None + return self._repo.get_func(func_name) + + def get_tool_type_info(self, tool_type: ToolType) -> Optional[Dict[str, Any]]: + """ + 查询分类详情(含下属包列表、统计信息等) + :param tool_type: 分类名 + :return: 分类信息字典,分类不存在返回None + """ + if not tool_type: + logger.error("[Get ToolType Info Failed] 原因:分类名不能为空") + return None + return self._repo.get_tool_type(tool_type) + + # ------------------------------------------------------------------------- + # 列表查询API(转发给ToolRepository) + # ------------------------------------------------------------------------- + def list_packages(self, tool_type: Optional[ToolType] = None) -> List[PackageName]: + """ + 列出所有包名(可选按分类过滤) + :param tool_type: 分类名(None表示列出所有包) + :return: 包名列表(按添加时间升序排列) + """ + return self._repo.list_packages(tool_type) + + def list_funcs(self, package_name: Optional[PackageName] = None) -> List[FuncName]: + """ + 列出所有函数名(可选按包过滤) + :param package_name: 包名(None表示列出所有函数) + :return: 函数名列表 + """ + return self._repo.list_funcs(package_name) + + def list_tool_types(self) -> List[ToolType]: + """列出所有已注册的分类名""" + return self._repo.list_tool_types() + + # ------------------------------------------------------------------------- + # 持久化相关API(转发给ToolRepository) + # ------------------------------------------------------------------------- + def get_serializable_data(self) -> Dict[str, Any]: + """获取可序列化数据(用于持久化存储)""" + data = self._repo.get_serializable_data() + logger.debug(f"[Serializable Data Got] 包数:{len(data.get('serializable_packages', {}))}") + return data + + def get_package_path(self,package_name): + data = self._repo.get_serializable_data() + packages_info = data.get('serializable_packages', {}) + return packages_info[package_name]["package_dir"] + + def load_serializable_data(self, data: Dict[str, Any]) -> Dict[str, int]: + """ + 加载序列化数据(用于重启后恢复关联关系) + :param data: 序列化数据(来自持久化文件) + :return: 恢复结果统计 + """ + if not isinstance(data, dict): + logger.error("[Load Serializable Data Failed] 原因:数据格式必须是字典") + return {"total_package": 0, "success_package": 0, "fail_package": 0} + return self._repo.load_serializable_data(data) + + def reload_package_functions(self) -> Dict[str, Any]: + """ + 重新导入所有已加载包的函数对象(持久化恢复后必须执行) + 逻辑:从 ToolRepository 获取包目录 → 用 PackageLoader 重新导入函数 → 更新到仓库 + :return: 重新导入结果统计 + """ + result = { + "total_package": 0, + "success_package": 0, + "fail_package": [], + "total_func": 0 + } + + # 1. 从仓库获取所有已恢复元信息的包 + package_names = self.list_packages() + result["total_package"] = len(package_names) + if result["total_package"] == 0: + logger.info("[Reload Functions] 无已加载的包,跳过重新导入") + return result + + # 2. 逐个包重新导入函数 + for pkg_name in package_names: + pkg_info = self.get_package_info(pkg_name) + if not pkg_info or not pkg_info.get("package_dir"): + logger.error(f"[Reload Functions Failed] 包 {pkg_name} 无有效目录信息") + result["fail_package"].append(pkg_name) + continue + + try: + # 关键:调用 load_package 重新导入(依赖 PackageLoader 内部兼容“已存在包”) + loaded_pkg_name = self.load_package(pkg_info["package_dir"]) + + if loaded_pkg_name: + # 重新导入成功:统计包数和函数数 + result["success_package"] += 1 + # 获取该包重新导入后的函数数,更新统计 + funcs = self.list_funcs(pkg_name) + result["total_func"] += len(funcs) + logger.info(f"[Reload Functions Success] 包 {pkg_name}:{len(funcs)} 个函数") + else: + # load_package 返回 None:可能是包已存在但函数导入失败,或其他错误 + logger.warning(f"[Reload Functions Warning] 包 {pkg_name} 未成功重新导入") + result["fail_package"].append(pkg_name) + except Exception as e: + logger.error(f"[Reload Functions Failed] 包 {pkg_name}:{str(e)}", exc_info=True) + result["fail_package"].append(pkg_name) + + # 补充日志:明确最终结果 + logger.info( + f"[Reload Functions Done] 总包数:{result['total_package']} | " + f"成功:{result['success_package']} | 失败:{len(result['fail_package'])} | " + f"总函数数:{result['total_func']}" + ) + return result + + # ------------------------------------------------------------------------- + # 持久化核心API(对外暴露,支持手动触发) + # ------------------------------------------------------------------------- + def persist_tool_state(self) -> bool: + """ + 持久化Tool状态(将内存中的三级关联数据写入文件) + :return: 持久化成功返回True,失败返回False + """ + try: + # 1. 从仓库获取可序列化数据 + serializable_data = self._repo.get_serializable_data() + # 2. 写入文件(保证原子性:先写临时文件,再替换目标文件) + temp_file = f"{self._state_file_path}.tmp" + with open(temp_file, "w", encoding="utf-8") as f: + json.dump(serializable_data, f, ensure_ascii=False, indent=2) + # 3. 替换目标文件(避免写入过程中断导致文件损坏) + os.replace(temp_file, self._state_file_path) + logger.info(f"[Tool State Persisted] 成功写入持久化文件 | 包数:{len(serializable_data.get('serializable_packages', {}))}") + return True + except Exception as e: + logger.error(f"[Tool State Persist Failed] 原因:{str(e)}", exc_info=True) + # 清理临时文件 + if os.path.exists(f"{self._state_file_path}.tmp"): + os.remove(f"{self._state_file_path}.tmp") + return False + + def restore_tool_state(self) -> Dict[str, int]: + """ + 恢复Tool状态(从持久化文件加载数据到内存) + :return: 恢复结果统计(总包数/成功数/失败数) + """ + # 1. 校验文件是否存在 + if not os.path.exists(self._state_file_path): + logger.warning(f"[Tool State Restore Failed] 原因:持久化文件不存在 - {self._state_file_path}") + return {"total_package": 0, "success_package": 0, "fail_package": 0} + try: + # 2. 读取文件数据 + with open(self._state_file_path, "r", encoding="utf-8") as f: + serializable_data = json.load(f) + # 3. 调用仓库加载数据到内存 + result = self._repo.load_serializable_data(serializable_data) + logger.info(f"[Tool State Restored] 从持久化文件恢复 | 结果:{result}") + return result + except json.JSONDecodeError: + logger.error(f"[Tool State Restore Failed] 原因:持久化文件格式错误 - {self._state_file_path}") + # 可选:备份损坏的文件 + if os.path.exists(self._state_file_path): + backup_file = f"{self._state_file_path}.corrupt.{os.path.getmtime(self._state_file_path)}" + os.rename(self._state_file_path, backup_file) + logger.info(f"[Corrupt File Backed Up] 备份路径:{backup_file}") + return {"total_package": 0, "success_package": 0, "fail_package": 0} + except Exception as e: + logger.error(f"[Tool State Restore Failed] 原因:{str(e)}", exc_info=True) + return {"total_package": 0, "success_package": 0, "fail_package": 0} + + # ------------------------------------------------------------------------- + # 内部辅助方法(私有,单一职责) + # ------------------------------------------------------------------------- + @staticmethod + def _init_empty_result(fail_reason: str) -> Dict[str, Any]: + """初始化空的加载结果统计""" + return { + "tool_type": "", + "total_package": 0, + "success_package": 0, + "fail_package": [], + "success_func": 0, + "fail_reason": fail_reason + } + + +# 全局单例实例(对外暴露的唯一入口,保持原有使用习惯) +tool_manager = ToolManager() diff --git a/oe-cli-mcp-server/mcp_server/manager/package_loader.py b/oe-cli-mcp-server/mcp_server/manager/package_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..c79a471cb6dd47e9ced8f1067d3013c17f0051d7 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/manager/package_loader.py @@ -0,0 +1,340 @@ +# mcp_server/manager/package_loader.py +""" +包加载器(纯加载流程层) +核心职责:仅负责「包/分类级」加载流程,最小操作单元为包 +- 流程:校验包合法性 → 准备环境 → 安装依赖 → 导入模块 → 校验函数 → 调用仓库存储 +- 设计原则:无状态、纯流程、依赖注入、结果导向 +- 依赖:DepVenvManager(环境/依赖管理)、ToolRepository(数据存储) +""" + +import os +import sys +import json +import logging +from typing import Dict, List, Optional, Any +from importlib.util import spec_from_file_location, module_from_spec + +from mcp_server.dependency import DepVenvManager +from util.get_project_root import get_project_root +from util.tool_package_file_check import tool_package_file_check +from mcp_server.manager.tool_repository import ToolRepository, tool_repository as default_repo + +logger = logging.getLogger(__name__) + +# 类型别名:与ToolRepository保持一致 +ToolType = str +PackageName = str +PackageDir = str +FuncName = str +FuncDetail = Dict[str, Any] + + +class PackageLoader: + """包加载器:封装包/分类级加载流程,不直接操作数据存储""" + + def __init__(self, + dep_manager: Optional[DepVenvManager] = None, + tool_repository: Optional[ToolRepository] = None): + """ + 初始化加载器(依赖注入,便于测试和替换) + :param dep_manager: 环境/依赖管理器(默认自动创建) + :param tool_repository: 数据仓库(默认使用全局单例) + """ + self._dep_manager = dep_manager or DepVenvManager() + self._tool_repo = tool_repository or default_repo + # 初始化全局环境(确保全局虚拟环境存在) + self._dep_manager.create_global_venv() + logger.info("PackageLoader 初始化完成 | 全局虚拟环境已就绪") + + # ------------------------------------------------------------------------- + # 公开API(对外暴露的加载方法) + # ------------------------------------------------------------------------- + def load_package(self, package_dir: PackageDir) -> Optional[PackageName]: + """ + 加载单个包(核心流程) + :param package_dir: 包目录绝对路径 + :return: 成功返回包名,失败返回None + """ + logger.info(f"[Package Load Start] 包目录:{package_dir}") + + # 1. 前置准备:路径标准化 + 包合法性校验 + normalized_dir = self._normalize_path(package_dir) + if not self._validate_package_dir(normalized_dir): + logger.error(f"[Package Load Failed] 包目录:{normalized_dir} | 原因:包合法性校验失败") + return None + + # 2. 解析包基础信息 + package_name = self._get_package_name(normalized_dir) + tool_type = self._get_tool_type_by_package_dir(normalized_dir) + func_configs = self._load_func_configs(normalized_dir) + if not func_configs: + logger.error(f"[Package Load Failed] 包名:{package_name} | 原因:无有效函数配置") + return None + + # 3. 环境准备:选择全局/独立环境 + env_result = self._prepare_package_env(package_name, normalized_dir) + if not env_result: + logger.error(f"[Package Load Failed] 包名:{package_name} | 原因:环境准备失败") + return None + venv_path, venv_type = env_result + + # 4. 依赖安装:安装包所需系统/Python依赖 + if not package_name in self._tool_repo.list_packages(): + + if not self._install_package_deps(package_name, normalized_dir, venv_path): + logger.error(f"[Package Load Failed] 包名:{package_name} | 原因:依赖安装失败") + return None + + # 5. 模块导入:加载tool.py模块(注入环境依赖) + tool_module = self._load_tool_module(package_name, normalized_dir, venv_path) + if not tool_module: + logger.error(f"[Package Load Failed] 包名:{package_name} | 原因:模块导入失败") + return None + + # 6. 函数校验:校验函数可调用性,组装函数详情 + valid_funcs = self._validate_and_assemble_funcs(package_name, tool_module, func_configs) + if not valid_funcs: + logger.error(f"[Package Load Failed] 包名:{package_name} | 原因:无有效可调用函数") + return None + + # 7. 数据存储:调用仓库保存包+函数信息 + if self._tool_repo.add_package( + package_name=package_name, + tool_type=tool_type, + package_dir=normalized_dir, + venv_path=venv_path, + venv_type=venv_type, + funcs=valid_funcs + ): + logger.info(f"[Package Load Success] 包名:{package_name} | 分类:{tool_type} | 有效函数数:{len(valid_funcs)}") + return package_name + else: + logger.error(f"[Package Load Failed] 包名:{package_name} | 原因:仓库存储失败") + return None + + def load_tool_type(self, tool_type: ToolType) -> Dict[str, Any]: + """ + 加载指定分类下所有包(批量加载) + :param tool_type: 分类名(支持字符串/Enum) + :return: 加载结果统计 + """ + # 标准化分类名(支持Enum类型) + tool_type_str = self._normalize_tool_type(tool_type) + result = self._init_load_result(tool_type_str) + + # 获取分类目录 + type_dir = self._get_tool_type_dir(tool_type_str) + if not os.path.isdir(type_dir): + result["fail_reason"] = f"分类目录不存在:{type_dir}" + logger.error(f"[ToolType Load Failed] 分类:{tool_type_str} | 原因:{result['fail_reason']}") + return self._log_and_return_result(result) + + # 遍历分类下所有包目录 + for item in os.listdir(type_dir): + item_path = os.path.join(type_dir, item) + if not os.path.isdir(item_path): + logger.debug(f"[ToolType Load Skip] 分类:{tool_type_str} | 原因:非目录 - {item_path}") + continue + + result["total_package"] += 1 + package_name = self.load_package(item_path) + + if package_name: + result["success_package"] += 1 + result["success_func"] += len(self._tool_repo.list_funcs(package_name)) + else: + result["fail_package"].append(item) + + return self._log_and_return_result(result) + + # ------------------------------------------------------------------------- + # 内部辅助方法(私有,单一职责) + # ------------------------------------------------------------------------- + @staticmethod + def _normalize_path(path: str) -> PackageDir: + """标准化路径(绝对路径+去除冗余)""" + return os.path.abspath(os.path.normpath(path)) + + @staticmethod + def _normalize_tool_type(tool_type: ToolType) -> str: + """标准化分类名(支持Enum类型)""" + return tool_type.value if hasattr(tool_type, "value") else str(tool_type) + + @staticmethod + def _get_package_name(package_dir: PackageDir) -> PackageName: + """从包目录获取包名(目录名)""" + return os.path.basename(package_dir) + + @staticmethod + def _get_tool_type_by_package_dir(package_dir: PackageDir) -> ToolType: + """从包目录获取所属分类(父目录名)""" + return os.path.basename(os.path.dirname(package_dir)) + + def _get_tool_type_dir(self, tool_type: str) -> PackageDir: + """获取分类目录路径""" + root_dir = get_project_root() + if not root_dir: + logger.error("[ToolType Dir Get Failed] 原因:tool_package根目录未配置") + return "" + return os.path.join(self._normalize_path(root_dir), "mcp_tools", tool_type) + + def _validate_package_dir(self, package_dir: PackageDir) -> bool: + """校验包目录合法性(存在tool.py + 通过基础校验)""" + if not tool_package_file_check(package_dir): + logger.error(f"[Package Validate Failed] 包目录:{package_dir} | 原因:包文件校验失败") + return False + return True + + def _load_func_configs(self, package_dir: PackageDir) -> Dict[FuncName, str]: + """从config.json加载函数配置(函数名→描述)""" + config_path = os.path.join(package_dir, "config.json") + package_name = self._get_package_name(package_dir) + + try: + with open(config_path, "r", encoding="utf-8") as f: + config_data = json.load(f) + # 提取tools节点下的函数配置(key=函数名,value=描述) + func_configs = config_data.get("tools", {}) + if not isinstance(func_configs, dict) or len(func_configs) == 0: + logger.warning(f"[Func Config Load Empty] 包名:{package_name} | 原因:config.json中tools节点为空") + return {} + logger.debug(f"[Func Config Load Success] 包名:{package_name} | 加载函数配置数:{len(func_configs)}") + return func_configs + except FileNotFoundError: + logger.error(f"[Func Config Load Failed] 包名:{package_name} | 原因:config.json不存在 - {config_path}") + return {} + except json.JSONDecodeError: + logger.error(f"[Func Config Load Failed] 包名:{package_name} | 原因:config.json格式错误") + return {} + except Exception as e: + logger.error(f"[Func Config Load Failed] 包名:{package_name} | 原因:{str(e)}", exc_info=True) + return {} + + def _prepare_package_env(self, package_name: PackageName, package_dir: PackageDir) -> Optional[tuple[str, str]]: + """为包准备环境(选择全局/独立环境)""" + try: + deps_path = os.path.join(package_dir, "deps.toml") + deps_exists = os.path.exists(deps_path) + + # 调用环境管理器选择环境(包名为环境标识) + venv_path = self._dep_manager.select_venv_for_tool( + tool_id=package_name, + deps_script_path=deps_path if deps_exists else None + ) + venv_type = "global" if venv_path == self._dep_manager.global_venv_path else "isolated" + logger.debug(f"[Package Env Prepared] 包名:{package_name} | 环境路径:{venv_path} | 环境类型:{venv_type}") + return venv_path, venv_type + except Exception as e: + logger.error(f"[Package Env Prepare Failed] 包名:{package_name} | 原因:{str(e)}", exc_info=True) + return None + + def _install_package_deps(self, package_name: PackageName, package_dir: PackageDir, venv_path: str) -> bool: + """安装包的依赖(系统依赖+Python依赖)""" + deps_path = os.path.join(package_dir, "deps.toml") + if not os.path.exists(deps_path): + logger.debug(f"[Package Deps Skip] 包名:{package_name} | 原因:无deps.toml依赖配置") + return True + + try: + # 调用环境管理器执行依赖安装 + logger.info("-------------正在安装相关依赖---------------") + install_result = self._dep_manager.execute_deps_script( + deps_script_path=deps_path, + venv_path=venv_path + ) + # 收集失败的依赖(仅报警,不中断流程) + failed_deps = install_result["system"]["failed"] + install_result["pip"]["failed"] + if failed_deps: + logger.warning(f"[Package Deps Partial Failed] 包名:{package_name} | 失败依赖:{failed_deps}") + logger.debug(f"[Package Deps Installed] 包名:{package_name} | 依赖安装完成") + return True + except Exception as e: + logger.error(f"[Package Deps Install Failed] 包名:{package_name} | 原因:{str(e)}", exc_info=True) + return False + + def _load_tool_module(self, package_name: PackageName, package_dir: PackageDir, venv_path: str) -> Optional[Any]: + """加载tool.py模块(注入环境的site-packages路径)""" + tool_py_path = os.path.join(package_dir, "tool.py") + module_name = f"package_{package_name}_module" # 唯一模块名,避免冲突 + + # 注入环境的site-packages路径(确保依赖可导入) + site_packages = self._dep_manager._get_venv_site_packages(venv_path) + sys.path.insert(0, site_packages) + + try: + # 清理旧模块缓存(避免重复加载导致的问题) + if module_name in sys.modules: + del sys.modules[module_name] + + # 动态导入模块 + spec = spec_from_file_location(module_name, tool_py_path) + if not spec or not spec.loader: + raise ValueError("模块规范无效,无法加载") + + module = module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + + logger.debug(f"[Tool Module Loaded] 包名:{package_name} | 模块名:{module_name}") + return module + except Exception as e: + logger.error(f"[Tool Module Load Failed] 包名:{package_name} | 原因:{str(e)}", exc_info=True) + return None + finally: + # 清理临时路径,避免污染全局sys.path + if site_packages in sys.path: + sys.path.remove(site_packages) + + def _validate_and_assemble_funcs(self, + package_name: PackageName, + tool_module: Any, + func_configs: Dict[FuncName, str]) -> Dict[FuncName, FuncDetail]: + """校验函数可调用性,组装函数详情字典""" + valid_funcs = {} + for func_name, description in func_configs.items(): + # 校验函数是否存在且可调用 + if not hasattr(tool_module, func_name): + logger.warning(f"[Func Validate Skipped] 包名:{package_name} | 函数名:{func_name} | 原因:模块中不存在该函数") + continue + func_obj = getattr(tool_module, func_name) + if not callable(func_obj): + logger.warning(f"[Func Validate Skipped] 包名:{package_name} | 函数名:{func_name} | 原因:非可调用对象") + continue + + # 组装函数详情(深拷贝避免外部修改) + valid_funcs[func_name] = { + "func": func_obj, + "description": description + } + + logger.debug(f"[Func Validate Completed] 包名:{package_name} | 有效函数数:{len(valid_funcs)} | 总配置数:{len(func_configs)}") + return valid_funcs + + @staticmethod + def _init_load_result(tool_type: str) -> Dict[str, Any]: + """初始化分类加载结果统计字典""" + return { + "tool_type": tool_type, + "total_package": 0, + "success_package": 0, + "fail_package": [], + "success_func": 0, + "fail_reason": "" + } + + @staticmethod + def _log_and_return_result(result: Dict[str, Any]) -> Dict[str, Any]: + """日志输出加载结果并返回""" + logger.info( + f"[ToolType Load Completed] 分类:{result['tool_type']} | " + f"总包数:{result['total_package']} | " + f"成功包数:{result['success_package']} | " + f"失败包数:{len(result['fail_package'])} | " + f"成功函数数:{result['success_func']} | " + f"失败原因:{result['fail_reason']}" + ) + return result + + +# 单例实例(默认使用全局依赖和仓库) +package_loader = PackageLoader() \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/manager/package_unloader.py b/oe-cli-mcp-server/mcp_server/manager/package_unloader.py new file mode 100644 index 0000000000000000000000000000000000000000..060e8a12664958e8d0949dbc7ec9b9d5b782ff38 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/manager/package_unloader.py @@ -0,0 +1,140 @@ +# mcp_server/manager/package_unloader.py +""" +包卸载器(纯卸载流程层) +核心职责:仅负责「包/分类级」卸载流程,最小操作单元为包 +- 流程:查询包信息 → 清理函数关联 → 删除独立环境 → 调用仓库删除数据 +- 设计原则:无状态、纯流程、依赖注入、结果导向 +- 依赖:DepVenvManager(环境删除)、ToolRepository(数据查询/删除) +""" + +import logging +from typing import Dict, List, Optional, Any + +from mcp_server.dependency import DepVenvManager +from mcp_server.manager.tool_repository import ToolRepository, tool_repository as default_repo + +logger = logging.getLogger(__name__) + +# 类型别名:与ToolRepository、PackageLoader保持一致 +ToolType = str +PackageName = str + + +class PackageUnloader: + """包卸载器:封装包/分类级卸载流程,不直接操作数据存储""" + + def __init__(self, + dep_manager: Optional[DepVenvManager] = None, + tool_repository: Optional[ToolRepository] = None): + """ + 初始化卸载器(依赖注入,便于测试和替换) + :param dep_manager: 环境/依赖管理器(默认自动创建) + :param tool_repository: 数据仓库(默认使用全局单例) + """ + self._dep_manager = dep_manager or DepVenvManager() + self._tool_repo = tool_repository or default_repo + logger.info("PackageUnloader 初始化完成") + + # ------------------------------------------------------------------------- + # 公开API(对外暴露的卸载方法) + # ------------------------------------------------------------------------- + def unload_package(self, package_name: PackageName, delete_env: bool = True) -> bool: + """ + 卸载单个包(核心流程) + :param package_name: 要卸载的包名 + :param delete_env: 是否删除独立环境(默认True,全局环境不删除) + :return: 卸载成功返回True,失败返回False + """ + logger.info(f"[Package Unload Start] 包名:{package_name} | 删除环境:{delete_env}") + + # 1. 查询包信息(校验存在性+获取关联数据) + package_info = self._tool_repo.get_package(package_name) + if not package_info: + logger.error(f"[Package Unload Failed] 包名:{package_name} | 原因:包不存在") + return False + + venv_type = package_info["venv_type"] + venv_path = package_info["venv_path"] + tool_type = package_info["tool_type"] + + # 2. 条件删除独立环境(全局环境不删除) + if delete_env and venv_type == "isolated": + if not self._delete_isolated_env(package_name, venv_path): + # 环境删除失败不中断包卸载,仅报警 + logger.warning(f"[Package Env Delete Failed] 包名:{package_name} | 环境路径:{venv_path}") + + # 3. 调用仓库删除包数据(连带函数/分类关联) + if self._tool_repo.delete_package(package_name): + logger.info(f"[Package Unload Success] 包名:{package_name} | 分类:{tool_type} | 环境类型:{venv_type}") + return True + else: + logger.error(f"[Package Unload Failed] 包名:{package_name} | 原因:仓库删除失败") + return False + + def unload_tool_type(self, tool_type: ToolType, delete_env: bool = True) -> bool: + """ + 卸载指定分类下所有包(批量卸载) + :param tool_type: 分类名(支持字符串/Enum) + :param delete_env: 是否删除独立环境(默认True) + :return: 整体卸载成功返回True(所有包都卸载成功),部分失败返回False + """ + # 标准化分类名(支持Enum类型) + tool_type_str = self._normalize_tool_type(tool_type) + logger.info(f"[ToolType Unload Start] 分类:{tool_type_str} | 删除环境:{delete_env}") + + # 1. 查询分类信息(校验存在性+获取下属包) + type_info = self._tool_repo.get_tool_type(tool_type_str) + if not type_info: + logger.error(f"[ToolType Unload Failed] 分类:{tool_type_str} | 原因:分类不存在") + return False + + package_names = type_info["package_names"] + if not package_names: + logger.info(f"[ToolType Unload Completed] 分类:{tool_type_str} | 原因:无下属包") + return True + + # 2. 批量卸载下属包 + unload_results = [] + for pkg_name in package_names: + result = self.unload_package(pkg_name, delete_env) + unload_results.append(result) + if not result: + logger.error(f"[ToolType Unload Package Failed] 分类:{tool_type_str} | 包名:{pkg_name}") + + # 3. 校验整体结果(所有包都卸载成功才返回True) + all_success = all(unload_results) + logger.info( + f"[ToolType Unload Completed] 分类:{tool_type_str} | " + f"处理包数:{len(package_names)} | " + f"成功数:{sum(unload_results)} | " + f"失败数:{len(unload_results) - sum(unload_results)} | " + f"整体成功:{all_success}" + ) + return all_success + + # ------------------------------------------------------------------------- + # 内部辅助方法(私有,单一职责) + # ------------------------------------------------------------------------- + @staticmethod + def _normalize_tool_type(tool_type: ToolType) -> str: + """标准化分类名(支持Enum类型)""" + return tool_type.value if hasattr(tool_type, "value") else str(tool_type) + + def _delete_isolated_env(self, package_name: PackageName, venv_path: str) -> bool: + """删除包的独立环境(封装环境管理器调用)""" + try: + # 调用环境管理器删除独立环境(包名为环境标识) + delete_success = self._dep_manager.delete_isolated_venv(tool_id=package_name) + if delete_success: + logger.debug(f"[Isolated Env Deleted] 包名:{package_name} | 环境路径:{venv_path}") + return delete_success + except Exception as e: + logger.error( + f"[Isolated Env Delete Failed] 包名:{package_name} | 环境路径:{venv_path} | 原因:{str(e)}", + exc_info=True + ) + return False + + +# 单例实例(默认使用全局依赖和仓库) +package_unloader = PackageUnloader() \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/manager/tool_repository.py b/oe-cli-mcp-server/mcp_server/manager/tool_repository.py new file mode 100644 index 0000000000000000000000000000000000000000..5aadfeecd8ee2103f276d45c96ca0e3d5fa9008e --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/manager/tool_repository.py @@ -0,0 +1,452 @@ +# mcp_server/manager/tool_repository.py +""" +Tool 数据仓库(纯数据操作层) +核心职责:维护「分类→包→函数」三级关联数据,提供原子化增删改查方法 +- 核心操作单元:包(函数为包的附属资源,不支持单独操作) +- 设计原则:纯数据处理、无业务逻辑、强数据一致性、API语义化 +- 数据隔离:对外暴露数据均为深拷贝,避免外部修改内部状态 +""" + +import logging +from typing import Dict, List, Optional, Any +from datetime import datetime +from copy import deepcopy + +logger = logging.getLogger(__name__) + +# 类型别名:提升代码可读性与类型一致性 +ToolType = str +PackageName = str +PackageDir = str +FuncName = str +FuncDetail = Dict[str, Any] # 格式:{"func": callable, "description": str} + + +class ToolRepository: + """Tool数据仓库:封装三级关联数据的原子操作,确保数据一致性""" + + def __init__(self): + """初始化内存存储结构(三级关联,仅内存持有)""" + # 1. 包核心存储:key=包名,value=包完整信息(含函数详情) + self._packages: Dict[PackageName, Dict[str, Any]] = {} + # 2. 分类-包映射:key=分类名,value=包名列表(优化分类查询性能) + self._type_package_map: Dict[ToolType, List[PackageName]] = {} + # 3. 函数-包映射:key=函数名,value=包名(快速反向查询函数所属包) + self._func_package_map: Dict[FuncName, PackageName] = {} + + # ------------------------------------------------------------------------- + # 包级核心操作(核心API:添加/删除/查询/列表) + # ------------------------------------------------------------------------- + def add_package(self, + package_name: PackageName, + tool_type: ToolType, + package_dir: PackageDir, + venv_path: str, + venv_type: str, + funcs: Dict[FuncName, FuncDetail]) -> bool: + """ + 原子化添加包(含下属函数),确保三级关联数据同步更新 + :param package_name: 包名(全局唯一) + :param tool_type: 所属分类(如 base/docker) + :param package_dir: 包目录绝对路径 + :param venv_path: 包关联的虚拟环境路径 + :param venv_type: 环境类型(global/isolated) + :param funcs: 包内函数详情字典(key=函数名,value=FuncDetail) + :return: 添加成功返回True,失败返回False + """ + # 1. 前置参数校验(避免无效数据入库) + if not self._validate_add_package_params(package_name, tool_type, package_dir, venv_path, venv_type, funcs): + return False + + # 2. 原子化添加(所有关联操作要么全成功,要么全回滚) + try: + # 2.1 存储包完整信息(深拷贝避免外部修改影响内部状态) + self._packages[package_name] = { + "package_name": package_name, + "tool_type": tool_type, + "package_dir": package_dir, + "venv_path": venv_path, + "venv_type": venv_type, + "funcs": deepcopy(funcs), + "create_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + # 2.2 更新分类-包关联映射 + self._add_to_type_package_map(tool_type, package_name) + + # 2.3 更新函数-包关联映射 + self._add_to_func_package_map(funcs.keys(), package_name) + + logger.info( + f"[Package Added] 包名:{package_name} | 分类:{tool_type} | 函数数:{len(funcs)} | 环境类型:{venv_type}" + ) + return True + except Exception as e: + logger.error(f"[Package Add Failed] 包名:{package_name} | 原因:{str(e)}", exc_info=True) + self._rollback_add_package(package_name, tool_type, funcs.keys()) + return False + + def delete_package(self, package_name: PackageName) -> bool: + """ + 原子化删除包(连带删除下属函数、分类关联) + :param package_name: 要删除的包名 + :return: 删除成功返回True,失败返回False + """ + # 1. 校验包是否存在 + if package_name not in self._packages: + logger.warning(f"[Package Delete Failed] 包名:{package_name} | 原因:包不存在") + return False + + # 2. 缓存关联数据(用于后续清理映射) + package = self._packages[package_name] + tool_type = package["tool_type"] + func_names = list(package["funcs"].keys()) + + # 3. 原子化删除操作 + try: + # 3.1 删除包核心数据 + del self._packages[package_name] + + # 3.2 清理分类-包关联(空分类自动删除) + self._remove_from_type_package_map(tool_type, package_name) + + # 3.3 清理函数-包关联 + self._remove_from_func_package_map(func_names) + + logger.info( + f"[Package Deleted] 包名:{package_name} | 分类:{tool_type} | 连带删除函数数:{len(func_names)}" + ) + return True + except Exception as e: + logger.error(f"[Package Delete Failed] 包名:{package_name} | 原因:{str(e)}", exc_info=True) + return False + + def get_package(self, package_name: PackageName) -> Optional[Dict[str, Any]]: + """ + 查询包完整信息(含下属函数),返回深拷贝避免外部修改 + :param package_name: 包名 + :return: 包完整信息字典,包不存在返回None + """ + package = self._packages.get(package_name) + return deepcopy(package) if package else None + + def list_packages(self, tool_type: Optional[ToolType] = None) -> List[PackageName]: + """ + 列出包名列表(可选按分类过滤),按添加时间升序排列 + :param tool_type: 分类名(None表示列出所有包) + :return: 包名列表(拷贝版,避免外部修改) + """ + if tool_type: + # 按分类过滤,返回拷贝避免外部修改内部列表 + return self._type_package_map.get(tool_type, []).copy() + # 列出所有包,按创建时间升序排序 + return sorted( + self._packages.keys(), + key=lambda pkg_name: self._packages[pkg_name]["create_time"] + ) + + # ------------------------------------------------------------------------- + # 函数级查询操作(附属API:仅查询,无添加/删除) + # ------------------------------------------------------------------------- + def get_func(self, func_name: FuncName) -> Optional[Dict[str, Any]]: + """ + 查询函数详情(含所属包/分类信息) + :param func_name: 函数名(全局唯一) + :return: 函数详情字典,函数不存在返回None + """ + # 1. 快速定位所属包 + package_name = self._func_package_map.get(func_name) + if not package_name: + logger.debug(f"[Func Query Failed] 函数名:{func_name} | 原因:函数不存在") + return None + + # 2. 校验数据一致性(避免映射失效导致的脏数据) + package = self._packages.get(package_name) + if not package or func_name not in package["funcs"]: + logger.warning(f"[Data Inconsistent] 函数名:{func_name} | 原因:函数-包映射失效,已清理") + self._remove_from_func_package_map([func_name]) + return None + + # 3. 组装函数完整信息(含关联的包/分类信息) + func_detail = package["funcs"][func_name] + return deepcopy({ + "func_name": func_name, + "func": func_detail["func"], + "description": func_detail["description"], + "package_name": package_name, + "tool_type": package["tool_type"], + "package_dir": package["package_dir"], + "venv_path": package["venv_path"], + "venv_type": package["venv_type"] + }) + + def list_funcs(self, package_name: Optional[PackageName] = None) -> List[FuncName]: + """ + 列出函数名列表(可选按包过滤) + :param package_name: 包名(None表示列出所有函数) + :return: 函数名列表(拷贝版) + """ + if package_name: + package = self._packages.get(package_name) + return list(package["funcs"].keys()) if package else [] + # 列出所有函数,按所属包的创建时间排序 + sorted_packages = self.list_packages() + all_funcs = [] + for pkg_name in sorted_packages: + pkg_funcs = list(self._packages[pkg_name]["funcs"].keys()) + all_funcs.extend(pkg_funcs) + return all_funcs + + # ------------------------------------------------------------------------- + # 分类级操作(聚合API:查询/删除) + # ------------------------------------------------------------------------- + def get_tool_type(self, tool_type: ToolType) -> Optional[Dict[str, Any]]: + """ + 查询分类详情(含下属包列表、统计信息) + :param tool_type: 分类名 + :return: 分类详情字典,分类不存在返回None + """ + package_names = self._type_package_map.get(tool_type) + if not package_names: + logger.debug(f"[ToolType Query Failed] 分类名:{tool_type} | 原因:分类不存在") + return None + + # 统计分类下的有效包数和函数总数 + valid_packages = [pkg_name for pkg_name in package_names if pkg_name in self._packages] + total_funcs = 0 + for pkg_name in valid_packages: + total_funcs += len(self._packages[pkg_name]["funcs"]) + + return { + "tool_type": tool_type, + "package_names": valid_packages.copy(), + "total_package": len(valid_packages), + "total_func": total_funcs + } + + def list_tool_types(self) -> List[ToolType]: + """列出所有已注册的分类名""" + return list(self._type_package_map.keys()) + + def delete_tool_type(self, tool_type: ToolType) -> bool: + """ + 原子化删除分类(连带删除下属所有包) + :param tool_type: 分类名 + :return: 整体删除成功返回True,部分失败返回False + """ + # 1. 校验分类是否存在 + if tool_type not in self._type_package_map: + logger.warning(f"[ToolType Delete Failed] 分类名:{tool_type} | 原因:分类不存在") + return False + + # 2. 缓存分类下所有包(避免删除过程中映射变化) + package_names = self._type_package_map[tool_type].copy() + if not package_names: + logger.info(f"[ToolType Delete] 分类名:{tool_type} | 无下属包,直接删除分类") + del self._type_package_map[tool_type] + return True + + # 3. 批量删除下属包 + delete_results = [self.delete_package(pkg_name) for pkg_name in package_names] + all_success = all(delete_results) + + logger.info( + f"[ToolType Delete Completed] 分类名:{tool_type} | 处理包数:{len(package_names)} | 全部成功:{all_success}" + ) + return all_success + + # ------------------------------------------------------------------------- + # 序列化/反序列化(用于持久化与重启恢复) + # ------------------------------------------------------------------------- + def get_serializable_data(self) -> Dict[str, Any]: + """ + 获取可序列化的数据(用于持久化) + 说明:剔除不可序列化的函数对象,仅保留元信息 + """ + serializable_packages = {} + + for pkg_name, pkg_info in self._packages.items(): + serializable_packages[pkg_name] = { + "package_name": pkg_name, + "tool_type": pkg_info["tool_type"], + "package_dir": pkg_info["package_dir"], + "venv_path": pkg_info["venv_path"], + "venv_type": pkg_info["venv_type"], + "func_names": list(pkg_info["funcs"].keys()), + "create_time": pkg_info["create_time"] + } + + return { + "serializable_packages": serializable_packages, + "type_package_map": self._type_package_map.copy(), + "func_package_map": self._func_package_map.copy(), + "serialize_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + + def load_serializable_data(self, data: Dict[str, Any]) -> Dict[str, int]: + """ + 加载序列化数据(用于重启后恢复关联关系) + 说明:仅恢复元信息和关联映射,函数对象需通过PackageLoader重新导入 + :param data: 序列化数据(来自持久化文件) + :return: 恢复结果统计(成功/失败包数) + """ + result = { + "total_package": 0, + "success_package": 0, + "fail_package": 0 + } + + try: + serializable_packages = data.get("serializable_packages", {}) + type_package_map = data.get("type_package_map", {}) + func_package_map = data.get("func_package_map", {}) + + result["total_package"] = len(serializable_packages) + + # 恢复包元信息(无函数对象) + for pkg_name, pkg_meta in serializable_packages.items(): + self._packages[pkg_name] = { + "package_name": pkg_name, + "tool_type": pkg_meta["tool_type"], + "package_dir": pkg_meta["package_dir"], + "venv_path": pkg_meta["venv_path"], + "venv_type": pkg_meta["venv_type"], + "funcs": {}, # 函数对象后续通过加载包补充 + "create_time": pkg_meta["create_time"] + } + result["success_package"] += 1 + + # 恢复分类-包映射 + self._type_package_map = deepcopy(type_package_map) + + # 恢复函数-包映射 + self._func_package_map = deepcopy(func_package_map) + + result["fail_package"] = result["total_package"] - result["success_package"] + logger.info( + f"[Serializable Data Loaded] 总包数:{result['total_package']} | " + f"成功恢复:{result['success_package']} | 失败:{result['fail_package']}" + ) + return result + except Exception as e: + logger.error(f"[Serializable Data Load Failed] 原因:{str(e)}", exc_info=True) + # 异常时清空已恢复数据,避免脏数据 + self._reset_all_data() + return result + + def update_package_functions(self, package_name: str, funcs: Dict[str, callable]): + """ + 更新包的函数对象(持久化恢复后补充) + :param package_name: 包名 + :param funcs: 函数名→函数对象的映射 + """ + if package_name not in self._packages: + raise ValueError(f"包 {package_name} 不存在于仓库中") + # 更新包的函数对象(覆盖原有空的 funcs) + self._packages[package_name]["funcs"] = funcs + # 同步更新函数-包映射(确保 get_func 能找到函数) + for func_name in funcs.keys(): + self._func_package_map[func_name] = package_name + + # ------------------------------------------------------------------------- + # 内部辅助方法(私有,不对外暴露) + # ------------------------------------------------------------------------- + def _validate_add_package_params(self, + package_name: PackageName, + tool_type: ToolType, + package_dir: PackageDir, + venv_path: str, + venv_type: str, + funcs: Dict[FuncName, FuncDetail]) -> bool: + """校验添加包的参数合法性""" + # 1. 非空校验 + if not all([package_name, tool_type, package_dir, venv_path, venv_type]): + logger.error(f"[Param Invalid] 包名:{package_name} | 原因:必填参数不能为空") + return False + + # 2. 包名唯一性校验 + if package_name in self._packages: + logger.warning(f"[Param Invalid] 包名:{package_name} | 原因:包名已存在") + #return False + + # 3. 环境类型合法性校验 + if venv_type not in ["global", "isolated"]: + logger.error(f"[Param Invalid] 包名:{package_name} | 原因:环境类型必须是 global/isolated") + return False + + # 4. 函数参数校验 + if not isinstance(funcs, dict) or len(funcs) == 0: + logger.error(f"[Param Invalid] 包名:{package_name} | 原因:函数列表不能为空字典") + return False + + # 5. 函数名唯一性校验(全局唯一) + duplicate_funcs = [func_name for func_name in funcs if func_name in self._func_package_map] + if duplicate_funcs: + logger.warning(f"[Param Invalid] 包名:{package_name} | 原因:函数名重复 - {duplicate_funcs}") + #return False + + # 6. 函数详情格式校验 + for func_name, func_detail in funcs.items(): + if not isinstance(func_detail, dict) or "func" not in func_detail or "description" not in func_detail: + logger.error(f"[Param Invalid] 包名:{package_name} | 原因:函数[{func_name}]格式错误(需包含func和description)") + return False + if not callable(func_detail["func"]): + logger.error(f"[Param Invalid] 包名:{package_name} | 原因:函数[{func_name}]必须是可调用对象") + return False + + return True + + def _add_to_type_package_map(self, tool_type: ToolType, package_name: PackageName) -> None: + """添加分类-包关联""" + if tool_type not in self._type_package_map: + self._type_package_map[tool_type] = [] + if package_name not in self._type_package_map[tool_type]: + self._type_package_map[tool_type].append(package_name) + + def _remove_from_type_package_map(self, tool_type: ToolType, package_name: PackageName) -> None: + """移除分类-包关联(空分类自动删除)""" + if tool_type not in self._type_package_map: + return + if package_name in self._type_package_map[tool_type]: + self._type_package_map[tool_type].remove(package_name) + # 空分类自动清理 + if not self._type_package_map[tool_type]: + del self._type_package_map[tool_type] + logger.debug(f"[ToolType Cleaned] 分类名:{tool_type} | 原因:无下属包") + + def _add_to_func_package_map(self, func_names: List[FuncName], package_name: PackageName) -> None: + """添加函数-包关联""" + for func_name in func_names: + self._func_package_map[func_name] = package_name + + def _remove_from_func_package_map(self, func_names: List[FuncName]) -> None: + """移除函数-包关联""" + for func_name in func_names: + if func_name in self._func_package_map: + del self._func_package_map[func_name] + + def _rollback_add_package(self, + package_name: PackageName, + tool_type: ToolType, + func_names: List[FuncName]) -> None: + """添加包失败时回滚数据,确保数据一致性""" + logger.debug(f"[Add Package Rollback] 包名:{package_name} | 开始回滚数据") + # 回滚包数据 + if package_name in self._packages: + del self._packages[package_name] + # 回滚分类-包关联 + self._remove_from_type_package_map(tool_type, package_name) + # 回滚函数-包关联 + self._remove_from_func_package_map(func_names) + logger.debug(f"[Add Package Rollback] 包名:{package_name} | 回滚完成") + + def _reset_all_data(self) -> None: + """重置所有数据(异常恢复时使用)""" + self._packages.clear() + self._type_package_map.clear() + self._func_package_map.clear() + logger.warning("[Data Reset] 所有存储数据已清空") + + +# 单例实例(全局共享一个数据仓库,确保数据一致性) +tool_repository = ToolRepository() \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/mcp_manager.py b/oe-cli-mcp-server/mcp_server/mcp_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..50ade9fe70712fd9f124bfa4a7b0f7fe86d10e3b --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/mcp_manager.py @@ -0,0 +1,153 @@ +# mcp_server/server.py +import json +import os.path +import threading +from functools import wraps +from typing import Dict, Any, Optional +from mcp.server import FastMCP +from config.base_config_loader import BaseConfig +from mcp_server.manager.manager import ToolManager, logger +from mcp_tools.tool_type import ToolType +from util.get_project_root import get_project_root +from util.zip_tool_util import unzip_tool + +# -------------------------- 导入独立的 FastAPI 启动函数 -------------------------- +from mcp_server.api_server import start_fastapi_server + +PUBLIC_CONFIG_PATH = os.path.join(get_project_root(), "config/public_config.toml") +PERSIST_FILE = os.path.join(get_project_root(), "data/tool_state.json") + +def singleton(cls): + """线程安全的单例装饰器,不侵入原有类逻辑""" + _instance = None + _lock = threading.Lock() + + @wraps(cls) + def wrapper(*args, **kwargs): + nonlocal _instance + if _instance is None: + with _lock: + if _instance is None: + _instance = cls(*args, **kwargs) + return _instance + + def reset_instance(): + nonlocal _instance + with _lock: + _instance = None + + wrapper.reset_instance = reset_instance + wrapper._instance = _instance # 暴露 _instance 属性,供 API 服务调用 + return wrapper + +@singleton +class McpServer(ToolManager): + def __init__(self, name, host, port): + super().__init__() + self.mcp = FastMCP(name, host=host, port=port) + self.host = host + self.port = port + self.language = BaseConfig().get_config().public_config.language + self.PERSIST_FILE = PERSIST_FILE + + # -------------------------- 原有核心业务方法不变 -------------------------- + def _mcp_register(self, packages=None): + if not packages: + packages = self.list_packages() + for package_name in packages: + func_names = self.list_funcs(package_name) + for func_name in func_names: + func_info = self.get_func_info(func_name) + tool_func = func_info["func"] + description = func_info["description"]["zh"] if self.language == "zh" else func_info["description"]["en"] + self.mcp.tool(name=func_name, description=description)(tool_func) + logger.info("tool 注册成功") + return self.mcp + + def _reset(self): + del self.mcp + self.mcp = FastMCP("mcp实例", host=self.host, port=self.port) + self.restore_tool_state() + self.reload_package_functions() + for pkg in self.list_packages(): + self.load(pkg) + + def load(self, mcp_collection: ToolType | str): + before_pkg_count = len(self.list_packages()) + if isinstance(mcp_collection, str): + if os.path.basename(mcp_collection)[:-4] == ".zip": + unzip_tool(mcp_collection) + package_name = os.path.basename(mcp_collection)[:-4] + package_dir = os.path.join(get_project_root(), "mcp_tools/personal_tools", package_name) + else: + package_name = mcp_collection + package_dir = self.get_package_path(package_name) + if package_dir in self.list_packages(): + logger.warning(f"自定义包 {package_name} 已加载,无需重复添加") + return + result = self.load_package(package_dir) + if not result: + logger.error(f"自定义包 {package_name} 加载失败") + return + packages_to_register = [package_name] + elif isinstance(mcp_collection, ToolType): + tool_type_value = mcp_collection.value + if tool_type_value in self.list_tool_types(): + logger.warning(f"系统包类型 {tool_type_value} 已加载,无需重复添加") + return + load_result = self.load_tool_type(tool_type_value) + if load_result["success_package"] == 0: + logger.error(f"系统包类型 {tool_type_value} 加载失败:{load_result['fail_reason']}") + return + packages_to_register = [pkg for pkg in self.list_packages(tool_type_value) + if pkg not in self.list_packages()[:before_pkg_count]] + else: + logger.error(f"不支持的加载类型:{type(mcp_collection)}") + return + self._mcp_register(packages_to_register) + after_pkg_count = len(self.list_packages()) + logger.info(f"加载成功:原有 {before_pkg_count} 个包,新增 {after_pkg_count - before_pkg_count} 个包,当前共 {after_pkg_count} 个包") + + def remove(self, mcp_collection): + if isinstance(mcp_collection, ToolType): + self.unload_tool_type(mcp_collection.value) + else: + self.unload_package(mcp_collection) + self._reset() + + def init(self): + del self.mcp + all_types = self.list_tool_types() + for mcp_type in all_types: + self.unload_tool_type(mcp_type) + BaseConfig().update_config(default=True) + self.reload_config() + self.mcp = FastMCP("mcp实例", host=self.host, port=self.port) + self.load(ToolType.BASE) + logger.info(f"初始化完成:仅保留基础运维包") + + def reload_config(self): + import toml + BaseConfig().update_config() + with open(PUBLIC_CONFIG_PATH, "r", encoding="utf-8") as f: + config = toml.load(f) + self.port = config["port"] + self.host = config["host"] + + def restart(self): + self._reset() + self.start() + + # -------------------------- 简化 start 方法:调用独立的 FastAPI 启动函数 -------------------------- + def start(self): + # 1. 启动独立的 FastAPI 服务(线程不阻塞) + start_fastapi_server(host="0.0.0.0", port=8003) + # 2. 启动 FastMCP 主服务(原有逻辑不变) + self._reset() + self.mcp.run(transport='sse') + +# -------------------------- 启动服务(原有逻辑不变)-------------------------- +if __name__ == "__main__": + config = BaseConfig().get_config().public_config + mcp_server = McpServer("MCP_Tool_Service", config.host, config.port) + mcp_server.start() \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_server/server.py b/oe-cli-mcp-server/mcp_server/server.py new file mode 100755 index 0000000000000000000000000000000000000000..1e8e5a4af0e6f2005b88df222b16ec75401b5411 --- /dev/null +++ b/oe-cli-mcp-server/mcp_server/server.py @@ -0,0 +1,16 @@ +# server.py +import os +import sys +PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) +from config.base_config_loader import BaseConfig +from mcp_server.mcp_manager import McpServer + +config = BaseConfig().get_config().public_config + + + +if __name__ == "__main__": + server = McpServer("mcp实例", host=config.host, port=config.port) + server.start() diff --git a/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/base.py b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/base.py new file mode 100644 index 0000000000000000000000000000000000000000..e8c74dabba14c15aed05d650ac3848f357a3267d --- /dev/null +++ b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/base.py @@ -0,0 +1,152 @@ +import logging +import subprocess +import paramiko +from typing import Dict, Optional + +from config.base_config_loader import BaseConfig +from config.base_config_loader import LanguageEnum +# 初始化日志(保持原逻辑) +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +def get_language() -> bool: + """获取语言配置:True=中文,False=英文""" + return BaseConfig().get_config().public_config.language == LanguageEnum.ZH + +def get_remote_auth(ip: str) -> Optional[Dict]: + """ + 获取服务器认证信息:匹配IP/主机名对应的连接配置 + """ + for host_config in BaseConfig().get_config().public_config.remote_hosts: + if ip in [host_config.host, host_config.name]: + return { + "host": host_config.host, + "port": host_config.port, + "username": host_config.username, + "password": host_config.password + } + return None + +def escape_shell_content(content: str) -> str: + """ + 转义shell命令中的特殊字符(避免注入和语法错误) + """ + # 转义单引号(最关键,因为命令中用单引号包裹参数) + return content.replace("'", "'\\''") + +def init_result_dict( + target_host: str, + result_type: str = "list", # result字段类型:list/str + include_file_path: bool = False +) -> Dict: + """ + 初始化返回结果字典(统一结构) + """ + result = { + "success": False, + "message": "", + "result": [] if result_type == "list" else "", + "target": target_host + } + if include_file_path: + result["file_path"] = "" + return result + +def run_local_command( + cmd: str, + result: Dict, + success_msg_zh: str, + success_msg_en: str, + is_list_result: bool = True, + in_place: bool = False +) -> Dict: + """ + 执行本地命令并处理结果 + """ + try: + logger.info(f"执行本地命令:{cmd}") + output = subprocess.check_output( + cmd, shell=True, text=True, stderr=subprocess.STDOUT + ) + result["success"] = True + result["message"] = success_msg_zh if get_language() else success_msg_en + # 处理返回结果(in_place=True时不返回内容) + if not in_place: + if is_list_result: + result["result"] = output.strip().split("\n") if output.strip() else [] + else: + result["result"] = output.strip() + # grep无匹配时退出码1,特殊处理 + except subprocess.CalledProcessError as e: + if e.returncode == 1 and "grep" in cmd: + result["success"] = True + result["message"] = "未找到匹配内容" if get_language() else "No matching content found" + else: + result["message"] = f"本地执行失败:{e.output.strip()}" if get_language() else f"Local execution failed: {e.output.strip()}" + logger.error(result["message"]) + except Exception as e: + result["message"] = f"本地处理异常:{str(e)}" if get_language() else f"Local processing exception: {str(e)}" + logger.error(result["message"]) + return result + +def run_remote_command( + cmd: str, + remote_auth: Dict, + result: Dict, + success_msg_zh: str, + success_msg_en: str, + is_list_result: bool = True, + in_place: bool = False +) -> Dict: + """ + 执行远程命令并处理结果(SSH连接封装) + """ + ssh_conn: Optional[paramiko.SSHClient] = None + try: + # 初始化SSH客户端 + ssh_conn = paramiko.SSHClient() + ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh_conn.connect( + hostname=remote_auth["host"], + port=remote_auth["port"], + username=remote_auth["username"], + password=remote_auth["password"], + timeout=10, + banner_timeout=10 + ) + logger.info(f"已连接远程主机:{remote_auth['host']},执行命令:{cmd}") + + # 执行命令 + stdin, stdout, stderr = ssh_conn.exec_command(cmd) + stdout_msg = stdout.read().decode("utf-8", errors="replace").strip() + stderr_msg = stderr.read().decode("utf-8", errors="replace").strip() + + # 处理结果 + if stderr_msg: + result["message"] = f"远程执行失败:{stderr_msg}" if get_language() else f"Remote execution failed: {stderr_msg}" + logger.error(result["message"]) + else: + result["success"] = True + result["message"] = success_msg_zh if get_language() else success_msg_en + if not in_place: + if is_list_result: + result["result"] = stdout_msg.split("\n") if stdout_msg else [] + else: + result["result"] = stdout_msg.strip() + except paramiko.AuthenticationException: + result["message"] = "SSH认证失败,请检查用户名和密码" if get_language() else "SSH authentication failed, check username and password" + logger.error(result["message"]) + except TimeoutError: + result["message"] = "SSH连接超时,请检查网络或主机状态" if get_language() else "SSH connection timed out, check network or host status" + logger.error(result["message"]) + except Exception as e: + result["message"] = f"远程处理异常:{str(e)}" if get_language() else f"Remote processing exception: {str(e)}" + logger.error(result["message"]) + finally: + # 关闭SSH连接 + if ssh_conn: + transport = ssh_conn.get_transport() + if transport and transport.is_active(): + ssh_conn.close() + logger.info(f"已关闭与{remote_auth['host']}的SSH连接") + return result \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/config.json b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/config.json new file mode 100644 index 0000000000000000000000000000000000000000..dd08cfb5c160f80644914db7b7149ac4cb2703b6 --- /dev/null +++ b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/config.json @@ -0,0 +1,28 @@ +{ + "tools": { + "file_grep_tool": { + "zh": "搜索文件中匹配指定模式的内容(本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填,如\"/var/log/syslog\")\n -pattern: 搜索模式(支持正则,必填,如\"error\")\n -options: grep可选参数(如\"-i\"忽略大小写、\"-n\"显示行号)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 匹配结果列表(成功时返回)\n -target: 执行目标主机", + "en": "Search for content matching the specified pattern in the file (supports local/remote)\n\nParameters:\n -target: Target host IP/hostname, None for local\n -file_path: Target file path (required, e.g. \"/var/log/syslog\")\n -pattern: Search pattern (supports regex, required, e.g. \"error\")\n -options: Optional grep parameters (e.g. \"-i\" ignore case, \"-n\" show line numbers)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: List of matching results (returned on success)\n -target: Target host for execution" + }, + "file_sed_tool": { + "zh": "替换文件中匹配的内容(本地/远程均支持,默认不修改原文件)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填)\n -pattern: 匹配模式(如\"s/old/new/g\",必填,g表示全局替换)\n -in_place: 是否直接修改原文件(True/False,默认False,仅输出结果)\n -options: sed可选参数(如\"-i.bak\"备份原文件)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 替换后内容(in_place=False时返回)\n -target: 执行目标主机", + "en": "Replace matching content in the file (supports local/remote, no original file modification by default)\n\nParameters:\n -target: Target host IP/hostname, None for local\n -file_path: Target file path (required)\n -pattern: Matching pattern (e.g. \"s/old/new/g\", required, g for global replacement)\n -in_place: Whether to modify the original file directly (True/False, default False, only output result)\n -options: Optional sed parameters (e.g. \"-i.bak\" backup original file)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: Content after replacement (returned when in_place=False)\n -target: Target host for execution" + }, + "file_awk_tool": { + "zh": "用awk处理文本文件(支持列提取、条件过滤等,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地(127.0.0.1)\n -file_path: 目标文件路径(必填,如\"/etc/passwd\"或\"/var/log/access.log\")\n -script: awk脚本(必填,示例:\"'{print $1,$3}'\"提取1、3列;\"'$3>100 {print $0}'\"过滤第3列大于100的行)\n -options: awk可选参数(示例:\"-F:\"指定分隔符为冒号;\"-v OFS=,\"指定输出分隔符为逗号)\n\n返回:\n -success: 执行结果(True=成功,False=失败)\n -message: 执行状态描述/错误提示(中文/英文根据配置自动切换)\n -result: awk处理结果列表(每行一个元素,无结果时返回空列表)\n -target: 实际执行的目标主机IP/hostname", + "en": "Process text files with awk (supports column extraction, condition filtering, local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost (127.0.0.1)\n -file_path: Target file path (required, e.g. \"/etc/passwd\" or \"/var/log/access.log\")\n -script: awk script (required, example: \"'{print $1,$3}'\" extract column 1&3; \"'$3>100 {print $0}'\" filter rows where column3>100)\n -options: Optional awk parameters (example: \"-F:\" set delimiter to colon; \"-v OFS=,\" set output delimiter to comma)\n\nReturns:\n -success: Execution result (True=success, False=failure)\n -message: Execution status/error prompt (auto-switch between Chinese/English)\n -result: List of awk processing results (one element per line, empty list if no result)\n -target: Actual target host IP/hostname for execution" + }, + "file_sort_tool": { + "zh": "对文本文件进行排序(支持按列、升序/降序、去重等,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地(127.0.0.1)\n -file_path: 目标文件路径(必填,如\"/tmp/logs.txt\")\n -options: sort可选参数(示例:\n \"-n\"按数字排序;\"-k3\"按第3列排序;\n \"-r\"降序排列;\"-u\"去重后排序;\n \"-t,\"指定逗号为分隔符)\n -output_file: 排序结果输出路径(可选,默认不保存到文件,仅返回结果)\n\n返回:\n -success: 执行结果(True=成功,False=失败)\n -message: 执行状态描述/错误提示(根据语言配置自动切换)\n -result: 排序结果列表(output_file为空时返回,每行一个元素)\n -target: 实际执行的目标主机", + "en": "Sort text files (supports column-based sorting, ascending/descending, deduplication, local/remote)\n\nParameters:\n -target: Target host IP/hostname, None for localhost (127.0.0.1)\n -file_path: Target file path (required, e.g. \"/tmp/logs.txt\")\n -options: Optional sort parameters (examples:\n \"-n\" numeric sort; \"-k3\" sort by 3rd column;\n \"-r\" reverse order; \"-u\" unique then sort;\n \"-t,\" set comma as delimiter)\n -output_file: Output path for sorted results (optional, default returns in result without saving)\n\nReturns:\n -success: Execution result (True=success, False=failure)\n -message: Execution status/error prompt (auto-switch language)\n -result: List of sorted results (returned if output_file is empty, one element per line)\n -target: Actual target host for execution" + }, + "file_unique_tool": { + "zh": "对文本文件进行去重处理(通常与sort配合使用,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填,如\"/tmp/duplicates.txt\")\n -options: unique可选参数(示例:\n \"-u\"仅显示唯一行;\"-d\"仅显示重复行;\n \"-c\"显示每行出现的次数;\"-i\"忽略大小写)\n -output_file: 去重结果输出路径(可选,默认不保存到文件)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 去重结果列表(output_file为空时返回)\n -target: 执行目标主机", + "en": "Deduplicate text files (usually used with sort, supports local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost\n -file_path: Target file path (required, e.g. \"/tmp/duplicates.txt\")\n -options: Optional unique parameters (examples:\n \"-u\" show only unique lines; \"-d\" show only duplicate lines;\n \"-c\" show count of each line; \"-i\" ignore case)\n -output_file: Output path for deduplicated results (optional, default returns in result)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: List of deduplicated results (returned if output_file is empty)\n -target: Target host for execution" + }, + "file_echo_tool": { + "zh": "向文件写入内容(支持创建文件、追加内容,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -content: 要写入的内容(必填,如\"Hello World\")\n -file_path: 目标文件路径(必填,如\"/tmp/message.txt\")\n -append: 是否追加内容(True=追加,False=覆盖,默认False)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -target: 执行目标主机\n -file_path: 实际写入的文件路径", + "en": "Write content to file (supports file creation, appending content, local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost\n -content: Content to write (required, e.g. \"Hello World\")\n -file_path: Target file path (required, e.g. \"/tmp/message.txt\")\n -append: Whether to append content (True=append, False=overwrite, default False)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -target: Target host for execution\n -file_path: Actual file path written to" + } + } +} \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/deps.toml b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/deps.toml new file mode 100644 index 0000000000000000000000000000000000000000..f6d4265365bc6855e8541d9083bc6311931c3b28 --- /dev/null +++ b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/deps.toml @@ -0,0 +1,18 @@ +[system] +# 系统基础工具(grep/sed/awk/sort/uniq/echo 通常系统自带,此处列出需确保安装的依赖) +#required_tools = [ +# "yum install -y grep sed gawk coreutils" +#] +# SSH客户端依赖(部分系统可能未预装) +#ssh_client = [ +# "yum install -y openssh-clients" +#] + +[pip] +# Python依赖包 +paramiko = "==4.0.0" # SSH远程连接 +typing_extensions = "==4.12.2" +psutil = "==7.0.0" +toml = "== 0.10.2" +mcp = "== 1.9.4" +scp = "== 0.15.0" # 类型提示兼容(如需) \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/tool.py b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/tool.py new file mode 100644 index 0000000000000000000000000000000000000000..97fc334c4d77b7b5830f24292721bb83ef03e917 --- /dev/null +++ b/oe-cli-mcp-server/mcp_tools/base_tools/file_tools/tool.py @@ -0,0 +1,343 @@ +from typing import Dict, Optional +from config.base_config_loader import LanguageEnum +from mcp_tools. base_tools. file_tools. base import ( + get_language, + get_remote_auth, + run_local_command, + run_remote_command, + init_result_dict, + escape_shell_content +) + +def file_grep_tool( + target: Optional[str] = None, + file_path: str = "", + pattern: str = "", + options: str = "", + lang: Optional[LanguageEnum] = LanguageEnum.ZH +) -> Dict: + is_zh = lang + target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" + result = init_result_dict(target_host) + + # 基础参数校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" + return result + if not pattern.strip(): + result["message"] = "搜索模式不能为空" if is_zh else "Search pattern cannot be empty" + return result + + # 构建grep命令(内容转义,避免shell注入) + escaped_pattern = escape_shell_content(pattern.strip()) + escaped_file = escape_shell_content(file_path.strip()) + grep_cmd = f"grep {options.strip()} '{escaped_pattern}' {escaped_file}" + + # 本地执行 + if target_host == "127.0.0.1": + return run_local_command( + cmd=grep_cmd, + result=result, + success_msg_zh=f"本地文件搜索完成(路径:{file_path})", + success_msg_en=f"Local file search completed (path: {file_path})", + is_list_result=True + ) + + # 远程执行 + remote_auth = get_remote_auth(target_host) + if not remote_auth: + result["message"] = f"未找到远程主机({target_host})的认证配置" if is_zh else f"Authentication config for remote host ({target_host}) not found" + return result + if not (remote_auth["username"] and remote_auth["password"]): + result["message"] = "远程执行需用户名和密码" if is_zh else "Username and password required for remote execution" + return result + + return run_remote_command( + cmd=grep_cmd, + remote_auth=remote_auth, + result=result, + success_msg_zh=f"远程文件搜索完成(主机:{target_host},路径:{file_path})", + success_msg_en=f"Remote file search completed (host: {target_host}, path: {file_path})", + is_list_result=True + ) + +def file_sed_tool( + target: Optional[str] = None, + file_path: str = "", + pattern: str = "", + in_place: bool = False, + options: str = "", + lang: Optional[LanguageEnum] = LanguageEnum.ZH +) -> Dict: + is_zh = lang + target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" + result = init_result_dict(target_host, result_type="str") + + # 基础校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" + return result + if not pattern.strip() or "s/" not in pattern: + result["message"] = "替换模式格式错误(需含s/)" if is_zh else "Replacement pattern format error (must contain s/)" + return result + + # 构建sed命令(内容转义) + escaped_pattern = escape_shell_content(pattern.strip()) + escaped_file = escape_shell_content(file_path.strip()) + in_place_opt = "-i" if in_place else "" + sed_cmd = f"sed {options.strip()} {in_place_opt} '{escaped_pattern}' {escaped_file}" + + # 本地执行 + if target_host == "127.0.0.1": + msg_zh = "原文件已修改" if in_place else "替换后内容已输出" + msg_en = "original file modified" if in_place else "replaced content output" + return run_local_command( + cmd=sed_cmd, + result=result, + success_msg_zh=f"本地sed执行成功({msg_zh},路径:{file_path})", + success_msg_en=f"Local sed executed successfully ({msg_en}, path: {file_path})", + is_list_result=False, + in_place=in_place + ) + + # 远程执行 + remote_auth = get_remote_auth(target_host) + if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): + result["message"] = "远程认证配置缺失" if is_zh else "Remote authentication config missing" + return result + + msg_zh = "原文件已修改" if in_place else "替换后内容已输出" + msg_en = "original file modified" if in_place else "replaced content output" + return run_remote_command( + cmd=sed_cmd, + remote_auth=remote_auth, + result=result, + success_msg_zh=f"远程sed执行成功({msg_zh},主机:{target_host})", + success_msg_en=f"Remote sed executed successfully ({msg_en}, host: {target_host})", + is_list_result=False, + in_place=in_place + ) + +def file_awk_tool( + target: Optional[str] = None, + file_path: str = "", + script: str = "", + options: str = "", + lang: Optional[LanguageEnum] = LanguageEnum.ZH +) -> Dict: + is_zh = lang + target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" + result = init_result_dict(target_host) + + # 基础参数校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空,请传入有效的文件路径(如\"/etc/passwd\")" if is_zh else "File path cannot be empty, please pass a valid path (e.g. \"/etc/passwd\")" + return result + if not script.strip(): + result["message"] = "awk脚本不能为空,请传入有效的处理逻辑(如\"'{print $1,$3}'\")" if is_zh else "awk script cannot be empty, please pass valid logic (e.g. \"'{print $1,$3}'\")" + return result + + # 构建awk命令(内容转义) + escaped_script = escape_shell_content(script.strip()) + escaped_file = escape_shell_content(file_path.strip()) + options_clean = options.strip() + awk_cmd = f"awk {options_clean} {escaped_script} {escaped_file}" + + # 本地执行 + if target_host == "127.0.0.1": + return run_local_command( + cmd=awk_cmd, + result=result, + success_msg_zh=f"本地awk处理成功(文件:{file_path.strip()})", + success_msg_en=f"Local awk processing succeeded (file: {file_path.strip()})", + is_list_result=True + ) + + # 远程执行 + remote_auth = get_remote_auth(target_host) + if not remote_auth: + result["message"] = f"未找到远程主机({target_host})的认证配置,请检查配置文件中的remote_hosts" if is_zh else f"Authentication config for remote host ({target_host}) not found, check remote_hosts in config" + return result + if not (remote_auth.get("username") and remote_auth.get("password")): + result["message"] = f"远程主机({target_host})的认证配置缺失用户名或密码,无法建立SSH连接" if is_zh else f"Remote host ({target_host}) auth config lacks username/password, cannot establish SSH connection" + return result + + return run_remote_command( + cmd=awk_cmd, + remote_auth=remote_auth, + result=result, + success_msg_zh=f"远程awk处理成功(主机:{target_host},文件:{file_path.strip()})", + success_msg_en=f"Remote awk processing succeeded (host: {target_host}, file: {file_path.strip()})", + is_list_result=True + ) + +def file_sort_tool( + target: Optional[str] = None, + file_path: str = "", + options: str = "", + output_file: str = "", + lang: Optional[LanguageEnum] = LanguageEnum.ZH +) -> Dict: + is_zh = lang + target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" + result = init_result_dict(target_host) + + # 参数校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空,请提供有效的文件路径" if is_zh else "File path cannot be empty, please provide a valid path" + return result + + # 构建sort命令(内容转义) + escaped_file = escape_shell_content(file_path.strip()) + escaped_output = escape_shell_content(output_file.strip()) if output_file.strip() else "" + options_clean = options.strip() + if output_file.strip(): + sort_cmd = f"sort {options_clean} {escaped_file} -o {escaped_output}" + else: + sort_cmd = f"sort {options_clean} {escaped_file}" + + # 本地执行 + if target_host == "127.0.0.1": + success_msg_zh = f"本地排序完成,结果已保存至:{output_file.strip()}" if output_file.strip() else f"本地排序完成" + success_msg_en = f"Local sort completed, result saved to: {output_file.strip()}" if output_file.strip() else f"Local sort completed" + return run_local_command( + cmd=sort_cmd, + result=result, + success_msg_zh=success_msg_zh, + success_msg_en=success_msg_en, + is_list_result=True, + in_place=bool(output_file.strip()) + ) + + # 远程执行 + remote_auth = get_remote_auth(target_host) + if not remote_auth: + result["message"] = f"未找到远程主机({target_host})的认证配置" if is_zh else f"Authentication config for remote host ({target_host}) not found" + return result + if not (remote_auth.get("username") and remote_auth.get("password")): + result["message"] = f"远程主机({target_host})的认证信息不完整(缺少用户名或密码)" if is_zh else f"Remote host ({target_host}) auth info incomplete (missing username/password)" + return result + + success_msg_zh = f"远程排序完成,结果已保存至:{output_file.strip()}(主机:{target_host})" if output_file.strip() else f"远程排序完成(主机:{target_host})" + success_msg_en = f"Remote sort completed, result saved to: {output_file.strip()} (host: {target_host})" if output_file.strip() else f"Remote sort completed (host: {target_host})" + return run_remote_command( + cmd=sort_cmd, + remote_auth=remote_auth, + result=result, + success_msg_zh=success_msg_zh, + success_msg_en=success_msg_en, + is_list_result=True, + in_place=bool(output_file.strip()) + ) + +def file_unique_tool( + target: Optional[str] = None, + file_path: str = "", + options: str = "", + output_file: str = "", + lang: Optional[LanguageEnum] = LanguageEnum.ZH +) -> Dict: + is_zh = lang + target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" + result = init_result_dict(target_host) + + # 参数校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" + return result + + # 构建unique命令(内容转义) + escaped_file = escape_shell_content(file_path.strip()) + escaped_output = escape_shell_content(output_file.strip()) if output_file.strip() else "" + options_clean = options.strip() + if output_file.strip(): + unique_cmd = f"uniq {options_clean} {escaped_file} {escaped_output}" + else: + unique_cmd = f"uniq {options_clean} {escaped_file}" + + # 本地执行 + if target_host == "127.0.0.1": + success_msg_zh = f"本地去重完成,结果已保存至:{output_file.strip()}" if output_file.strip() else f"本地去重完成" + success_msg_en = f"Local deduplication completed, result saved to: {output_file.strip()}" if output_file.strip() else f"Local deduplication completed" + return run_local_command( + cmd=unique_cmd, + result=result, + success_msg_zh=success_msg_zh, + success_msg_en=success_msg_en, + is_list_result=True, + in_place=bool(output_file.strip()) + ) + + # 远程执行 + remote_auth = get_remote_auth(target_host) + if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): + result["message"] = "远程认证配置缺失" if is_zh else "Remote auth config missing" + return result + + success_msg_zh = f"远程去重完成,结果已保存至:{output_file.strip()}(主机:{target_host})" if output_file.strip() else f"远程去重完成(主机:{target_host})" + success_msg_en = f"Remote deduplication completed, result saved to: {output_file.strip()} (host: {target_host})" if output_file.strip() else f"Remote deduplication completed (host: {target_host})" + return run_remote_command( + cmd=unique_cmd, + remote_auth=remote_auth, + result=result, + success_msg_zh=success_msg_zh, + success_msg_en=success_msg_en, + is_list_result=True, + in_place=bool(output_file.strip()) + ) + +def file_echo_tool( + target: Optional[str] = None, + content: str = "", + file_path: str = "", + append: bool = False, + lang: Optional[LanguageEnum] = LanguageEnum.ZH +) -> Dict: + is_zh = lang + target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" + result = init_result_dict(target_host, result_type="str", include_file_path=True) + + # 参数校验 + if not content.strip(): + result["message"] = "写入内容不能为空" if is_zh else "Content to write cannot be empty" + return result + if not file_path.strip(): + result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" + return result + + # 构建echo命令(内容转义) + escaped_content = escape_shell_content(content.strip()) + escaped_file = escape_shell_content(file_path.strip()) + redirect = ">>" if append else ">" + echo_cmd = f"echo '{escaped_content}' {redirect} {escaped_file}" + + # 本地执行 + if target_host == "127.0.0.1": + action = "追加" if append else "写入" + action_en = "appended" if append else "written" + return run_local_command( + cmd=echo_cmd, + result=result, + success_msg_zh=f"本地{action}成功,文件路径:{file_path.strip()}", + success_msg_en=f"Local {action_en} successfully, file path: {file_path.strip()}", + is_list_result=False, + in_place=True + ) + + # 远程执行 + remote_auth = get_remote_auth(target_host) + if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): + result["message"] = "远程认证配置缺失" if is_zh else "Remote auth config missing" + return result + + action = "追加" if append else "写入" + action_en = "appended" if append else "written" + return run_remote_command( + cmd=echo_cmd, + remote_auth=remote_auth, + result=result, + success_msg_zh=f"远程{action}成功(主机:{target_host}),文件路径:{file_path.strip()}", + success_msg_en=f"Remote {action_en} successfully (host: {target_host}), file path: {file_path.strip()}", + is_list_result=False, + in_place=True + ) \ No newline at end of file diff --git a/oe-cli-mcp-server/mcp_tools/tool_type.py b/oe-cli-mcp-server/mcp_tools/tool_type.py new file mode 100644 index 0000000000000000000000000000000000000000..0d90d873e5bfb6507fbea8c8faac1d013b51bf0d --- /dev/null +++ b/oe-cli-mcp-server/mcp_tools/tool_type.py @@ -0,0 +1,9 @@ +from enum import Enum + + +class ToolType(Enum): + BASE = "base_tools" + PERSONAL = "personal_tools" + AI = "AI_tools" + MIRROR = "mirror_tools" + CAL = "cal_tools" diff --git a/oe-cli-mcp-server/requirements.txt b/oe-cli-mcp-server/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..23a5e665ae7db56b7a49bb31f9941fe89c12a674 --- /dev/null +++ b/oe-cli-mcp-server/requirements.txt @@ -0,0 +1,9 @@ +# requirements.txt +paramiko==3.4.0 +typing_extensions>=4.12.2 +psutil==7.0.0 +toml==0.10.2 +mcp==1.9.4 +scp==0.15.0 +fastapi>=0.122.0 +uvicorn>=0.38.0 \ No newline at end of file diff --git a/oe-cli-mcp-server/run.sh b/oe-cli-mcp-server/run.sh new file mode 100755 index 0000000000000000000000000000000000000000..a368a26557f4f1954099e5b11208817df634c410 --- /dev/null +++ b/oe-cli-mcp-server/run.sh @@ -0,0 +1,13 @@ +cp mcp-server.service /etc/systemd/system/ + +source venv/global/bin/activate + + +systemctl daemon-reload +systemctl enable mcp-server.service +systemctl start mcp-server.service +systemctl status mcp-server + +chmod +x /home/tsn/oe-cli-mcp-server/mcp_server/cli.py +rm -f /usr/local/bin/mcp-server +sudo ln -s /home/tsn/oe-cli-mcp-server/mcp_server/cli.py /usr/local/bin/mcp-server \ No newline at end of file diff --git a/oe-cli-mcp-server/setup.py b/oe-cli-mcp-server/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..59149dfe4ff86c0b5559f8ecd4e4a5a798957f4c --- /dev/null +++ b/oe-cli-mcp-server/setup.py @@ -0,0 +1,14 @@ +from setuptools import setup, find_packages + +setup( + name="mcp-server", # 命令名 + version="0.1.0", + packages=find_packages(), # 自动找到 mcp_server 模块 + entry_points={ + "console_scripts": [ + # 配置命令:mcp-server → 指向 cli.py 的 main 函数 + "mcp-server = mcp_server.cli.cli:main", + ], + }, + python_requires=">=3.9", # 你的 Python 版本要求 +) \ No newline at end of file diff --git a/oe-cli-mcp-server/util/get_project_root.py b/oe-cli-mcp-server/util/get_project_root.py new file mode 100644 index 0000000000000000000000000000000000000000..f5e68f58697c7a3ae881a76848f7ac2037bddf29 --- /dev/null +++ b/oe-cli-mcp-server/util/get_project_root.py @@ -0,0 +1,20 @@ +import os + +def get_project_root() -> str: + """ + 基于当前文件路径,向上查找项目根目录(默认找 .git 文件夹,可修改标志文件) + :return: 项目根目录的绝对路径 + """ + # 获取当前脚本的绝对路径(__file__ 是当前文件的相对路径,abspath 转为绝对路径) + current_path = os.path.abspath(__file__) + + # 向上递归查找,直到找到包含 .git 的目录(即项目根目录) + while not os.path.exists(os.path.join(current_path, "mcp_server")): + # 向上跳一级目录 + parent_path = os.path.dirname(current_path) + # 防止递归到系统根目录(如 / 或 C:\) + if parent_path == current_path: + raise FileNotFoundError("未找到项目根目录(未发现 mcp_server 文件夹)") + current_path = parent_path + + return current_path diff --git a/oe-cli-mcp-server/util/get_type.py b/oe-cli-mcp-server/util/get_type.py new file mode 100644 index 0000000000000000000000000000000000000000..0744eb0f55a5ebd33905e1c296728addd18aada0 --- /dev/null +++ b/oe-cli-mcp-server/util/get_type.py @@ -0,0 +1,14 @@ +from mcp_tools.tool_type import ToolType + + +def get_type(package : str): + type_map = { + "base_tools" : ToolType.BASE, + "personal_tools" : ToolType.BASE, + "AI_tools" : ToolType.BASE, + "mirror_tools" : ToolType.BASE, + "cal_tools" : ToolType.BASE + } + if not package in type_map: + return ToolType.BASE + return type_map[package] \ No newline at end of file diff --git a/oe-cli-mcp-server/util/test.py b/oe-cli-mcp-server/util/test.py new file mode 100644 index 0000000000000000000000000000000000000000..413065913871131a0eda78255c36f4bff11cf309 --- /dev/null +++ b/oe-cli-mcp-server/util/test.py @@ -0,0 +1,3 @@ +import os + +print(os.path.basename("/home/tsn/oe-cli-mcp-server/mcp_tools/personal_tools/test_tool/base.py")) \ No newline at end of file diff --git a/oe-cli-mcp-server/util/test_llm_valid.py b/oe-cli-mcp-server/util/test_llm_valid.py new file mode 100644 index 0000000000000000000000000000000000000000..0d04fba403c3d754c9b62f5de59c8e92f1d5b949 --- /dev/null +++ b/oe-cli-mcp-server/util/test_llm_valid.py @@ -0,0 +1,35 @@ +import requests + +def is_llm_config_valid(API_URL: str, API_KEY: str = "", MODEL_NAME: str = "") -> bool: + """ + 极简验证大模型配置是否通畅 + :param API_URL: 模型 API 地址 + :param API_KEY: 可选 API Key + :param MODEL_NAME: 模型名 + :return: True=通畅,False=不通 + """ + try: + # 极简请求体(满足接口最基本要求) + payload = { + "model": MODEL_NAME, + "messages": [{"role": "user", "content": "hi"}], # 最短输入 + "max_tokens": 10, # 最少生成字数,加快速度 + "temperature": 0.0 + } + # 请求头 + headers = {"Content-Type": "application/json"} + if API_KEY: + headers["Authorization"] = f"Bearer {API_KEY}" + # 发送请求(超时 5 秒,快速失败) + response = requests.post( + API_URL, + json=payload, + headers=headers, + timeout=5, + verify=False # 忽略 SSL 证书校验(可选,简化验证) + ) + # 只要状态码 2xx,且返回有 choices,就认为通 + return response.status_code == 200 and "choices" in response.json() + except: + # 任何异常都视为“不通” + return False \ No newline at end of file diff --git a/oe-cli-mcp-server/util/tool_package_file_check.py b/oe-cli-mcp-server/util/tool_package_file_check.py new file mode 100644 index 0000000000000000000000000000000000000000..fbe13b7f8b1d888761b799e33dbb85706eaded1d --- /dev/null +++ b/oe-cli-mcp-server/util/tool_package_file_check.py @@ -0,0 +1,18 @@ +import os +from asyncio.log import logger + + +def tool_package_file_check(path: str) -> bool: + """ + 检查工具包文件是否存在 + :param path: 工具包文件路径 + :return: 是否存在 + """ + config_path = os.path.join(path, "config.json") + tool_path = os.path.join(path, "tool.py") + if not os.path.exists(config_path): + return False + if not os.path.exists(tool_path): + return False + return True + \ No newline at end of file diff --git a/oe-cli-mcp-server/util/venv_util.py b/oe-cli-mcp-server/util/venv_util.py new file mode 100644 index 0000000000000000000000000000000000000000..651699502c1a0268bdb042c49456c9fa8dd5347a --- /dev/null +++ b/oe-cli-mcp-server/util/venv_util.py @@ -0,0 +1,66 @@ +# util/venv_util.py(适配 openEuler 系统,仅保留 yum 逻辑) +import logging +import os +import subprocess +import toml + +def get_current_venv_pip() -> str: + """ + 获取当前激活的mcp虚拟环境的pip路径(仅适配 openEuler/Linux 系统) + :return: pip可执行文件的绝对路径 + :raises Exception: 未激活虚拟环境时抛出异常 + """ + + # 逻辑:通过VIRTUAL_ENV环境变量定位虚拟环境(激活后自动生成该变量) + venv_path = os.getenv("VIRTUAL_ENV") + if not venv_path: + raise Exception("未激活mcp虚拟环境,请先执行 source ./venv/global/bin/activate(文档2-142节)") + + return os.path.join(venv_path, "bin", "pip") + +def execute_simple_deps_script(deps_script_path: str): + """ + 执行简化版deps.toml脚本 + 安装逻辑:先装系统依赖(yum),再装Python依赖(当前虚拟环境pip) + :param deps_script_path: 简化版deps.toml的路径 + :raises FileNotFoundError: 依赖脚本不存在时抛出异常 + :raises subprocess.CalledProcessError: 依赖安装命令执行失败时抛出异常 + """ + + # 1. 读取deps.toml内容 + if not os.path.exists(deps_script_path): + raise FileNotFoundError(f"依赖脚本不存在:{deps_script_path}") + with open(deps_script_path, "r", encoding="utf-8") as f: + deps_data = toml.load(f) + + # 2. 安装系统依赖 + system_deps = deps_data.get("system_deps", {}) + if system_deps: + logging.info("=== 开始安装系统依赖(openEuler yum)===") + for dep_name, yum_cmd in system_deps.items(): + # 检查依赖是否已安装(通过 --version 或专用命令验证) + verify_cmd = f"{dep_name} --version" if dep_name != "docker" else "docker --version" + # 静默执行验证命令,返回码为0表示已安装 + if subprocess.run(verify_cmd, shell=True, capture_output=True, text=True).returncode == 0: + logging.info(f"系统依赖[{dep_name}]已安装,跳过") + continue + + # 执行 yum 安装命令(openEuler 专用) + logging.info(f"正在安装系统依赖[{dep_name}]:{yum_cmd}") + # check=True 确保命令失败时抛异常,便于上层捕获 + subprocess.run(yum_cmd, shell=True, check=True, text=True) + logging.info(f"系统依赖[{dep_name}]安装完成\n") + + # 3. 安装Python依赖 + pip_deps = deps_data.get("pip_deps", {}) + if pip_deps: + logging.info("=== 开始安装Python依赖(当前虚拟环境)===") + pip_path = get_current_venv_pip() + for dep_name, version in pip_deps.items(): + # 构造pip安装命令 + install_cmd = [pip_path, "install", "-q", f"{dep_name}{version}"] # -q 静默安装,减少输出 + logging.info(f"正在安装Python依赖[{dep_name}]:{' '.join(install_cmd)}") + subprocess.run(install_cmd, check=True, text=True) + logging.info(f"Python依赖[{dep_name}]安装完成\n") + + logging.info(f"所有依赖安装完成(依赖脚本:{deps_script_path})") \ No newline at end of file diff --git a/oe-cli-mcp-server/util/zip_tool_util.py b/oe-cli-mcp-server/util/zip_tool_util.py new file mode 100644 index 0000000000000000000000000000000000000000..4bf9643b52f5b63c2d52c7e9fdcfe5d9ccef1597 --- /dev/null +++ b/oe-cli-mcp-server/util/zip_tool_util.py @@ -0,0 +1,118 @@ +import os +import zipfile +import logging +from typing import Optional +import shutil + +from util.get_project_root import get_project_root + +# 全局目标目录(转为绝对路径,避免相对路径混乱) +target_dir = os.path.join(get_project_root(),"mcp_tools/personal_tools/") + +def clean_zip_extract_dir(dir_path: str) -> None: + """清理指定目录(递归删除,确保无残留)""" + if not os.path.exists(dir_path): + return + try: + shutil.rmtree(dir_path) + logging.info(f"已清理旧目录:{dir_path}") + except Exception as e: + logging.error(f"清理目录 {dir_path} 失败:{e}") + raise # 清理失败终止解压,避免残留干扰 + +def unzip_tool(zip_path: str, extract_to: Optional[str] = None) -> bool: + """ + 解压工具包到指定目录(仅一层路径,无嵌套) + :param zip_path: 工具包zip文件路径(相对/绝对路径) + :param extract_to: 解压目标目录(可选),默认 target_dir + :return: 解压是否成功 + """ + # 1. 基础校验:ZIP文件是否存在 + if not os.path.exists(zip_path): + logging.error(f"错误:工具包文件 {zip_path} 不存在。") + return False + + # 2. 确定最终解压根目录(默认 target_dir,支持自定义) + extract_root_dir = os.path.abspath(extract_to) if extract_to else target_dir + + # 3. 提取ZIP文件名(无后缀),作为最终的「一层路径」名称 + zip_filename = os.path.basename(zip_path) + final_extract_dir = os.path.join(extract_root_dir, os.path.splitext(zip_filename)[0]) + + # 4. 清理旧目录(若已存在,避免文件冲突) + if os.path.exists(final_extract_dir): + clean_zip_extract_dir(final_extract_dir) + + # 5. 创建最终解压目录(确保父目录存在) + try: + os.makedirs(final_extract_dir, exist_ok=True) + logging.info(f"已创建解压目录:{final_extract_dir}") + except PermissionError as e: + logging.error(f"错误:创建目录 {final_extract_dir} 权限不足:{e}") + return False + except Exception as e: + logging.error(f"错误:创建目录 {final_extract_dir} 失败:{e}") + return False + + # 6. 核心逻辑:解压并跳过ZIP内部顶层目录,直接提取内容到最终目录 + try: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + # 获取ZIP内部所有文件的路径 + zip_file_list = zip_ref.namelist() + if not zip_file_list: + logging.error(f"错误:ZIP文件 {zip_path} 为空。") + clean_zip_extract_dir(final_extract_dir) + return False + + # 找到ZIP内部的顶层目录(假设压缩时是「文件夹+内容」,顶层目录唯一) + # 例如:ZIP内部路径是「custom_gpu_tool/xxx.py」,顶层目录就是「custom_gpu_tool/」 + top_level_dirs = set() + for file_path in zip_file_list: + # 分割路径,取第一级目录(如「custom_gpu_tool/xxx.py」→ 「custom_gpu_tool」) + top_level = file_path.split(os.sep)[0] + if top_level: # 排除空路径(避免异常) + top_level_dirs.add(top_level) + + # 处理两种情况:ZIP内部有顶层目录 / 无顶层目录(直接是文件) + if len(top_level_dirs) == 1: + # 情况1:有唯一顶层目录(大多数Linux zip压缩的情况) + top_dir = next(iter(top_level_dirs)) + os.sep # 拼接路径分隔符(如「custom_gpu_tool/」) + for file_path in zip_file_list: + # 跳过顶层目录本身(只提取其下内容) + if file_path == top_dir: + continue + # 构建目标路径:去掉顶层目录前缀,直接放到 final_extract_dir 下 + target_file_path = os.path.join(final_extract_dir, file_path[len(top_dir):]) + # 创建目标文件的父目录(避免因子目录不存在报错) + os.makedirs(os.path.dirname(target_file_path), exist_ok=True) + # 提取文件并写入目标路径 + with zip_ref.open(file_path) as source, open(target_file_path, 'wb') as target: + shutil.copyfileobj(source, target) + logging.debug(f"已提取:{file_path} → {target_file_path}") + else: + # 情况2:ZIP内部无顶层目录(直接是文件/多个子目录),直接提取所有内容 + zip_ref.extractall(path=final_extract_dir) + logging.debug(f"ZIP无统一顶层目录,直接提取所有内容到 {final_extract_dir}") + + logging.info(f"成功:文件 {zip_path} 已解压到 {final_extract_dir}(仅一层路径)") + return True + + except zipfile.BadZipFile: + logging.error(f"错误:文件 {zip_path} 不是有效的ZIP文件。") + except PermissionError as e: + logging.error(f"错误:解压权限不足(目标目录不可写或文件被占用):{e}") + except Exception as e: + logging.error(f"错误:解压文件 {zip_path} 时发生异常:{e}") + + # 解压失败,清理临时目录 + clean_zip_extract_dir(final_extract_dir) + return False + +# 示例调用 +if __name__ == "__main__": + # 配置日志(按需调整级别) + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + + # 测试:解压 custom_gpu_tool.zip,最终路径为 mcp_tools/personal_tools/custom_gpu_tool/xxx + result = unzip_tool(zip_path="/home/tsn/cli_mcp_server/test_tool.zip") + print(f"解压结果:{'成功' if result else '失败'}") \ No newline at end of file