From 408c6cce7e71da4a3a49978d04aacf9b8694c6f6 Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Fri, 5 Dec 2025 17:41:35 +0800 Subject: [PATCH 01/10] =?UTF-8?q?mcp=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../oe_cli_mcp_server/client/client.py | 175 ++++++++- .../mcp_server/mcp_manager.py | 2 +- .../mcp_tools/base_tools/file_tools/base.py | 152 -------- .../base_tools/file_tools/config.json | 28 -- .../mcp_tools/base_tools/file_tools/deps.toml | 20 - .../mcp_tools/base_tools/file_tools/tool.py | 342 ------------------ .../oe_cli_mcp_server/requirements.txt | 3 +- 7 files changed, 175 insertions(+), 547 deletions(-) delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py diff --git a/mcp_center/servers/oe_cli_mcp_server/client/client.py b/mcp_center/servers/oe_cli_mcp_server/client/client.py index ed2b13ffcf..3aa4b6c476 100644 --- a/mcp_center/servers/oe_cli_mcp_server/client/client.py +++ b/mcp_center/servers/oe_cli_mcp_server/client/client.py @@ -7,7 +7,7 @@ from contextlib import AsyncExitStack from typing import TYPE_CHECKING, Union from pydantic import BaseModel, Field from enum import Enum -from mcp import ClientSession, StdioServerParameters +from mcp import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client @@ -120,12 +120,181 @@ class MCPClient: async def main() -> None: """测试MCP Client""" - url = "http://0.0.0.0:8002/sse" + url = "http://0.0.0.0:12555/sse" headers = {} client = MCPClient(url, headers) await client.init() - result = await client.call_tool("nvidia_smi_status", {}) + + # 初始化时多余的调用移除,保留下方有序测试用例 + # ================================== + # 1. sys_info_tool 测试用例(3个,修复无效枚举值) + # ================================== + print("\n" + "="*60) + print("1. sys_info_tool - 采集CPU+内存+磁盘+系统信息") + print("="*60) + result = await client.call_tool("sys_info_tool", {"info_types": ["cpu", "mem", "disk", "os"]}) + print(result) + + print("\n" + "="*60) + print("2. sys_info_tool - 单独采集网络信息(IP/网卡)") + print("="*60) + result = await client.call_tool("sys_info_tool", {"info_types": ["net"]}) + print(result) + + print("\n" + "="*60) + print("3. sys_info_tool - 采集安全信息(SELinux+防火墙)") + print("="*60) + result = await client.call_tool("sys_info_tool", {"info_types": ["selinux", "firewall"]}) + print(result) + + # 移除无效的 "kernel" 和 "all" 类型测试(工具不支持) + + # ================================== + # 2. file_tool 测试用例(4个,修复枚举值、参数名) + # ================================== + print("\n" + "="*60) + print("4. file_tool - 列出 /etc 目录下的 .conf 配置文件(过滤关键词)") + print("="*60) + # 用 ls + 后续过滤实现(工具无find枚举,参数名改为file_path) + result = await client.call_tool("file_tool", { + "action": "ls", + "file_path": "/etc", + "detail": False, + "encoding": "utf-8" + }) + print(result) + + print("\n" + "="*60) + print("5. file_tool - 读取 /etc/os-release 文件内容(系统版本)") + print("="*60) + # action改为cat,参数名改为file_path + result = await client.call_tool("file_tool", { + "action": "cat", + "file_path": "/etc/os-release", + "encoding": "utf-8" + }) + print(result) + + print("\n" + "="*60) + print("6. file_tool - 新建临时文件并写入内容") + print("="*60) + # 工具无find/mtime枚举,替换为add+edit实用场景 + result = await client.call_tool("file_tool", { + "action": "add", + "file_path": "/tmp/file_tool_test.txt", + "overwrite": True + }) + print("新建文件结果:", result) + result = await client.call_tool("file_tool", { + "action": "edit", + "file_path": "/tmp/file_tool_test.txt", + "content": "file_tool测试内容\n系统版本:Ubuntu 22.04", + "encoding": "utf-8" + }) + print("写入内容结果:", result) + + print("\n" + "="*60) + print("7. file_tool - 修改 /tmp/file_tool_test.txt 权限为755") + print("="*60) + # action改为chmod,参数名改为file_path + result = await client.call_tool("file_tool", { + "action": "chmod", + "file_path": "/tmp/file_tool_test.txt", + "mode": "755" + }) + print(result) + + # ================================== + # 3. pkg_tool 测试用例(4个,修复无效枚举、参数) + # ================================== + print("\n" + "="*60) + print("8. pkg_tool - 列出已安装的所有 nginx 相关包") + print("="*60) + result = await client.call_tool("pkg_tool", { + "action": "list", + "filter_key": "nginx" + }) + print(result) + + print("\n" + "="*60) + print("9. pkg_tool - 查询 openssh-server 包详情(版本/依赖)") + print("="*60) + result = await client.call_tool("pkg_tool", { + "action": "info", + "pkg_name": "openssh-server" + }) + print(result) + + print("\n" + "="*60) + print("10. pkg_tool - 仅更新系统安全补丁(替代check-update)") + print("="*60) + # 工具无check-update枚举,替换为update-sec(安全更新) + result = await client.call_tool("pkg_tool", { + "action": "update-sec", + "yes": True + }) + print(result) + + print("\n" + "="*60) + print("11. pkg_tool - 清理 yum/dnf 包缓存(all类型)") + print("="*60) + result = await client.call_tool("pkg_tool", { + "action": "clean", + "cache_type": "all", + "yes": True + }) + print(result) + + # ================================== + # 4. proc_tool 测试用例(4个,修复无效枚举、参数) + # ================================== + print("\n" + "="*60) + print("12. proc_tool - 查找所有 systemd 相关进程") + print("="*60) + result = await client.call_tool("proc_tool", { + "proc_actions": ["find"], + "proc_name": "systemd" + }) + print(result) + + print("\n" + "="*60) + print("13. proc_tool - 查询 PID=1 进程(systemd)资源占用") + print("="*60) + result = await client.call_tool("proc_tool", { + "proc_actions": ["stat"], + "pid": 1 + }) + print(result) + + print("\n" + "="*60) + print("14. proc_tool - 列出所有进程(后续可筛选CPU占用前5)") + print("="*60) + # 工具无top枚举,用list获取所有进程(业务层可筛选) + result = await client.call_tool("proc_tool", { + "proc_actions": ["list"] + }) + print(result) + + print("\n" + "="*60) + print("15. proc_tool - 重启 sshd 服务(systemd服务)") + print("="*60) + # 工具无tree枚举,替换为restart实用场景 + result = await client.call_tool("proc_tool", { + "proc_actions": ["restart"], + "service_name": "ssh" # Ubuntu中sshd服务名为ssh + }) + print(result) + + # 清理临时文件 + print("\n" + "="*60) + print("16. file_tool - 删除临时测试文件") + print("="*60) + result = await client.call_tool("file_tool", { + "action": "delete", + "file_path": "/tmp/file_tool_test.txt" + }) print(result) + await client.stop() if __name__ == "__main__": diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py b/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py index 7269a006be..4aff412229 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py @@ -79,7 +79,7 @@ class McpServer(ToolManager): if os.path.basename(mcp_collection)[:-4] == ".zip": unzip_tool(mcp_collection) package_name = os.path.basename(mcp_collection)[:-4] - package_dir = os.path.join(get_project_root(), "mcp_tools/personal_tools", package_name) + package_dir = os.path.join(get_project_root(), "mcp_tools/personal_tools", package_name)#存放到个性化目录下 else: package_name = mcp_collection package_dir = self.get_package_path(package_name) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py deleted file mode 100644 index 1a018f99ae..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py +++ /dev/null @@ -1,152 +0,0 @@ -import logging -import subprocess -import paramiko -from typing import Dict, Optional - -from servers.oe_cli_mcp_server.config.base_config_loader import BaseConfig, LanguageEnum - -# 初始化日志(保持原逻辑) -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - -def get_language() -> bool: - """获取语言配置:True=中文,False=英文""" - return BaseConfig().get_config().public_config.language == LanguageEnum.ZH - -def get_remote_auth(ip: str) -> Optional[Dict]: - """ - 获取服务器认证信息:匹配IP/主机名对应的连接配置 - """ - for host_config in BaseConfig().get_config().public_config.remote_hosts: - if ip in [host_config.host, host_config.name]: - return { - "host": host_config.host, - "port": host_config.port, - "username": host_config.username, - "password": host_config.password - } - return None - -def escape_shell_content(content: str) -> str: - """ - 转义shell命令中的特殊字符(避免注入和语法错误) - """ - # 转义单引号(最关键,因为命令中用单引号包裹参数) - return content.replace("'", "'\\''") - -def init_result_dict( - target_host: str, - result_type: str = "list", # result字段类型:list/str - include_file_path: bool = False -) -> Dict: - """ - 初始化返回结果字典(统一结构) - """ - result = { - "success": False, - "message": "", - "result": [] if result_type == "list" else "", - "target": target_host - } - if include_file_path: - result["file_path"] = "" - return result - -def run_local_command( - cmd: str, - result: Dict, - success_msg_zh: str, - success_msg_en: str, - is_list_result: bool = True, - in_place: bool = False -) -> Dict: - """ - 执行本地命令并处理结果 - """ - try: - logger.info(f"执行本地命令:{cmd}") - output = subprocess.check_output( - cmd, shell=True, text=True, stderr=subprocess.STDOUT - ) - result["success"] = True - result["message"] = success_msg_zh if get_language() else success_msg_en - # 处理返回结果(in_place=True时不返回内容) - if not in_place: - if is_list_result: - result["result"] = output.strip().split("\n") if output.strip() else [] - else: - result["result"] = output.strip() - # grep无匹配时退出码1,特殊处理 - except subprocess.CalledProcessError as e: - if e.returncode == 1 and "grep" in cmd: - result["success"] = True - result["message"] = "未找到匹配内容" if get_language() else "No matching content found" - else: - result["message"] = f"本地执行失败:{e.output.strip()}" if get_language() else f"Local execution failed: {e.output.strip()}" - logger.error(result["message"]) - except Exception as e: - result["message"] = f"本地处理异常:{str(e)}" if get_language() else f"Local processing exception: {str(e)}" - logger.error(result["message"]) - return result - -def run_remote_command( - cmd: str, - remote_auth: Dict, - result: Dict, - success_msg_zh: str, - success_msg_en: str, - is_list_result: bool = True, - in_place: bool = False -) -> Dict: - """ - 执行远程命令并处理结果(SSH连接封装) - """ - ssh_conn: Optional[paramiko.SSHClient] = None - try: - # 初始化SSH客户端 - ssh_conn = paramiko.SSHClient() - ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh_conn.connect( - hostname=remote_auth["host"], - port=remote_auth["port"], - username=remote_auth["username"], - password=remote_auth["password"], - timeout=10, - banner_timeout=10 - ) - logger.info(f"已连接远程主机:{remote_auth['host']},执行命令:{cmd}") - - # 执行命令 - stdin, stdout, stderr = ssh_conn.exec_command(cmd) - stdout_msg = stdout.read().decode("utf-8", errors="replace").strip() - stderr_msg = stderr.read().decode("utf-8", errors="replace").strip() - - # 处理结果 - if stderr_msg: - result["message"] = f"远程执行失败:{stderr_msg}" if get_language() else f"Remote execution failed: {stderr_msg}" - logger.error(result["message"]) - else: - result["success"] = True - result["message"] = success_msg_zh if get_language() else success_msg_en - if not in_place: - if is_list_result: - result["result"] = stdout_msg.split("\n") if stdout_msg else [] - else: - result["result"] = stdout_msg.strip() - except paramiko.AuthenticationException: - result["message"] = "SSH认证失败,请检查用户名和密码" if get_language() else "SSH authentication failed, check username and password" - logger.error(result["message"]) - except TimeoutError: - result["message"] = "SSH连接超时,请检查网络或主机状态" if get_language() else "SSH connection timed out, check network or host status" - logger.error(result["message"]) - except Exception as e: - result["message"] = f"远程处理异常:{str(e)}" if get_language() else f"Remote processing exception: {str(e)}" - logger.error(result["message"]) - finally: - # 关闭SSH连接 - if ssh_conn: - transport = ssh_conn.get_transport() - if transport and transport.is_active(): - ssh_conn.close() - logger.info(f"已关闭与{remote_auth['host']}的SSH连接") - return result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json deleted file mode 100644 index dd08cfb5c1..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "tools": { - "file_grep_tool": { - "zh": "搜索文件中匹配指定模式的内容(本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填,如\"/var/log/syslog\")\n -pattern: 搜索模式(支持正则,必填,如\"error\")\n -options: grep可选参数(如\"-i\"忽略大小写、\"-n\"显示行号)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 匹配结果列表(成功时返回)\n -target: 执行目标主机", - "en": "Search for content matching the specified pattern in the file (supports local/remote)\n\nParameters:\n -target: Target host IP/hostname, None for local\n -file_path: Target file path (required, e.g. \"/var/log/syslog\")\n -pattern: Search pattern (supports regex, required, e.g. \"error\")\n -options: Optional grep parameters (e.g. \"-i\" ignore case, \"-n\" show line numbers)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: List of matching results (returned on success)\n -target: Target host for execution" - }, - "file_sed_tool": { - "zh": "替换文件中匹配的内容(本地/远程均支持,默认不修改原文件)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填)\n -pattern: 匹配模式(如\"s/old/new/g\",必填,g表示全局替换)\n -in_place: 是否直接修改原文件(True/False,默认False,仅输出结果)\n -options: sed可选参数(如\"-i.bak\"备份原文件)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 替换后内容(in_place=False时返回)\n -target: 执行目标主机", - "en": "Replace matching content in the file (supports local/remote, no original file modification by default)\n\nParameters:\n -target: Target host IP/hostname, None for local\n -file_path: Target file path (required)\n -pattern: Matching pattern (e.g. \"s/old/new/g\", required, g for global replacement)\n -in_place: Whether to modify the original file directly (True/False, default False, only output result)\n -options: Optional sed parameters (e.g. \"-i.bak\" backup original file)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: Content after replacement (returned when in_place=False)\n -target: Target host for execution" - }, - "file_awk_tool": { - "zh": "用awk处理文本文件(支持列提取、条件过滤等,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地(127.0.0.1)\n -file_path: 目标文件路径(必填,如\"/etc/passwd\"或\"/var/log/access.log\")\n -script: awk脚本(必填,示例:\"'{print $1,$3}'\"提取1、3列;\"'$3>100 {print $0}'\"过滤第3列大于100的行)\n -options: awk可选参数(示例:\"-F:\"指定分隔符为冒号;\"-v OFS=,\"指定输出分隔符为逗号)\n\n返回:\n -success: 执行结果(True=成功,False=失败)\n -message: 执行状态描述/错误提示(中文/英文根据配置自动切换)\n -result: awk处理结果列表(每行一个元素,无结果时返回空列表)\n -target: 实际执行的目标主机IP/hostname", - "en": "Process text files with awk (supports column extraction, condition filtering, local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost (127.0.0.1)\n -file_path: Target file path (required, e.g. \"/etc/passwd\" or \"/var/log/access.log\")\n -script: awk script (required, example: \"'{print $1,$3}'\" extract column 1&3; \"'$3>100 {print $0}'\" filter rows where column3>100)\n -options: Optional awk parameters (example: \"-F:\" set delimiter to colon; \"-v OFS=,\" set output delimiter to comma)\n\nReturns:\n -success: Execution result (True=success, False=failure)\n -message: Execution status/error prompt (auto-switch between Chinese/English)\n -result: List of awk processing results (one element per line, empty list if no result)\n -target: Actual target host IP/hostname for execution" - }, - "file_sort_tool": { - "zh": "对文本文件进行排序(支持按列、升序/降序、去重等,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地(127.0.0.1)\n -file_path: 目标文件路径(必填,如\"/tmp/logs.txt\")\n -options: sort可选参数(示例:\n \"-n\"按数字排序;\"-k3\"按第3列排序;\n \"-r\"降序排列;\"-u\"去重后排序;\n \"-t,\"指定逗号为分隔符)\n -output_file: 排序结果输出路径(可选,默认不保存到文件,仅返回结果)\n\n返回:\n -success: 执行结果(True=成功,False=失败)\n -message: 执行状态描述/错误提示(根据语言配置自动切换)\n -result: 排序结果列表(output_file为空时返回,每行一个元素)\n -target: 实际执行的目标主机", - "en": "Sort text files (supports column-based sorting, ascending/descending, deduplication, local/remote)\n\nParameters:\n -target: Target host IP/hostname, None for localhost (127.0.0.1)\n -file_path: Target file path (required, e.g. \"/tmp/logs.txt\")\n -options: Optional sort parameters (examples:\n \"-n\" numeric sort; \"-k3\" sort by 3rd column;\n \"-r\" reverse order; \"-u\" unique then sort;\n \"-t,\" set comma as delimiter)\n -output_file: Output path for sorted results (optional, default returns in result without saving)\n\nReturns:\n -success: Execution result (True=success, False=failure)\n -message: Execution status/error prompt (auto-switch language)\n -result: List of sorted results (returned if output_file is empty, one element per line)\n -target: Actual target host for execution" - }, - "file_unique_tool": { - "zh": "对文本文件进行去重处理(通常与sort配合使用,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填,如\"/tmp/duplicates.txt\")\n -options: unique可选参数(示例:\n \"-u\"仅显示唯一行;\"-d\"仅显示重复行;\n \"-c\"显示每行出现的次数;\"-i\"忽略大小写)\n -output_file: 去重结果输出路径(可选,默认不保存到文件)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 去重结果列表(output_file为空时返回)\n -target: 执行目标主机", - "en": "Deduplicate text files (usually used with sort, supports local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost\n -file_path: Target file path (required, e.g. \"/tmp/duplicates.txt\")\n -options: Optional unique parameters (examples:\n \"-u\" show only unique lines; \"-d\" show only duplicate lines;\n \"-c\" show count of each line; \"-i\" ignore case)\n -output_file: Output path for deduplicated results (optional, default returns in result)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: List of deduplicated results (returned if output_file is empty)\n -target: Target host for execution" - }, - "file_echo_tool": { - "zh": "向文件写入内容(支持创建文件、追加内容,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -content: 要写入的内容(必填,如\"Hello World\")\n -file_path: 目标文件路径(必填,如\"/tmp/message.txt\")\n -append: 是否追加内容(True=追加,False=覆盖,默认False)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -target: 执行目标主机\n -file_path: 实际写入的文件路径", - "en": "Write content to file (supports file creation, appending content, local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost\n -content: Content to write (required, e.g. \"Hello World\")\n -file_path: Target file path (required, e.g. \"/tmp/message.txt\")\n -append: Whether to append content (True=append, False=overwrite, default False)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -target: Target host for execution\n -file_path: Actual file path written to" - } - } -} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml deleted file mode 100644 index dc15ebc850..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml +++ /dev/null @@ -1,20 +0,0 @@ -[system] -# 系统基础工具(grep/sed/awk/sort/uniq/echo 通常系统自带,此处列出需确保安装的依赖) -#required_tools = [ -# "yum install -y grep sed gawk coreutils" -#] -# SSH客户端依赖(部分系统可能未预装) -#ssh_client = [ -# "yum install -y openssh-clients" -#] -[pip_config] -index_url = "https://pypi.tuna.tsinghua.edu.cn/simple" - -[pip] -# Python依赖包 -paramiko = "==4.0.0" # SSH远程连接 -typing_extensions = "==4.12.2" -psutil = "==7.0.0" -toml = "== 0.10.2" -mcp = "== 1.9.4" -scp = "== 0.15.0" # 类型提示兼容(如需) \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py deleted file mode 100644 index 84a978a153..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py +++ /dev/null @@ -1,342 +0,0 @@ -from typing import Dict, Optional -from servers.oe_cli_mcp_server.config.base_config_loader import LanguageEnum -from servers.oe_cli_mcp_server.mcp_tools. base_tools. file_tools. base import ( - get_remote_auth, - run_local_command, - run_remote_command, - init_result_dict, - escape_shell_content -) - -def file_grep_tool( - target: Optional[str] = None, - file_path: str = "", - pattern: str = "", - options: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 基础参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - if not pattern.strip(): - result["message"] = "搜索模式不能为空" if is_zh else "Search pattern cannot be empty" - return result - - # 构建grep命令(内容转义,避免shell注入) - escaped_pattern = escape_shell_content(pattern.strip()) - escaped_file = escape_shell_content(file_path.strip()) - grep_cmd = f"grep {options.strip()} '{escaped_pattern}' {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - return run_local_command( - cmd=grep_cmd, - result=result, - success_msg_zh=f"本地文件搜索完成(路径:{file_path})", - success_msg_en=f"Local file search completed (path: {file_path})", - is_list_result=True - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth: - result["message"] = f"未找到远程主机({target_host})的认证配置" if is_zh else f"Authentication config for remote host ({target_host}) not found" - return result - if not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程执行需用户名和密码" if is_zh else "Username and password required for remote execution" - return result - - return run_remote_command( - cmd=grep_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程文件搜索完成(主机:{target_host},路径:{file_path})", - success_msg_en=f"Remote file search completed (host: {target_host}, path: {file_path})", - is_list_result=True - ) - -def file_sed_tool( - target: Optional[str] = None, - file_path: str = "", - pattern: str = "", - in_place: bool = False, - options: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host, result_type="str") - - # 基础校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - if not pattern.strip() or "s/" not in pattern: - result["message"] = "替换模式格式错误(需含s/)" if is_zh else "Replacement pattern format error (must contain s/)" - return result - - # 构建sed命令(内容转义) - escaped_pattern = escape_shell_content(pattern.strip()) - escaped_file = escape_shell_content(file_path.strip()) - in_place_opt = "-i" if in_place else "" - sed_cmd = f"sed {options.strip()} {in_place_opt} '{escaped_pattern}' {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - msg_zh = "原文件已修改" if in_place else "替换后内容已输出" - msg_en = "original file modified" if in_place else "replaced content output" - return run_local_command( - cmd=sed_cmd, - result=result, - success_msg_zh=f"本地sed执行成功({msg_zh},路径:{file_path})", - success_msg_en=f"Local sed executed successfully ({msg_en}, path: {file_path})", - is_list_result=False, - in_place=in_place - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程认证配置缺失" if is_zh else "Remote authentication config missing" - return result - - msg_zh = "原文件已修改" if in_place else "替换后内容已输出" - msg_en = "original file modified" if in_place else "replaced content output" - return run_remote_command( - cmd=sed_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程sed执行成功({msg_zh},主机:{target_host})", - success_msg_en=f"Remote sed executed successfully ({msg_en}, host: {target_host})", - is_list_result=False, - in_place=in_place - ) - -def file_awk_tool( - target: Optional[str] = None, - file_path: str = "", - script: str = "", - options: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 基础参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空,请传入有效的文件路径(如\"/etc/passwd\")" if is_zh else "File path cannot be empty, please pass a valid path (e.g. \"/etc/passwd\")" - return result - if not script.strip(): - result["message"] = "awk脚本不能为空,请传入有效的处理逻辑(如\"'{print $1,$3}'\")" if is_zh else "awk script cannot be empty, please pass valid logic (e.g. \"'{print $1,$3}'\")" - return result - - # 构建awk命令(内容转义) - escaped_script = escape_shell_content(script.strip()) - escaped_file = escape_shell_content(file_path.strip()) - options_clean = options.strip() - awk_cmd = f"awk {options_clean} {escaped_script} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - return run_local_command( - cmd=awk_cmd, - result=result, - success_msg_zh=f"本地awk处理成功(文件:{file_path.strip()})", - success_msg_en=f"Local awk processing succeeded (file: {file_path.strip()})", - is_list_result=True - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth: - result["message"] = f"未找到远程主机({target_host})的认证配置,请检查配置文件中的remote_hosts" if is_zh else f"Authentication config for remote host ({target_host}) not found, check remote_hosts in config" - return result - if not (remote_auth.get("username") and remote_auth.get("password")): - result["message"] = f"远程主机({target_host})的认证配置缺失用户名或密码,无法建立SSH连接" if is_zh else f"Remote host ({target_host}) auth config lacks username/password, cannot establish SSH connection" - return result - - return run_remote_command( - cmd=awk_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程awk处理成功(主机:{target_host},文件:{file_path.strip()})", - success_msg_en=f"Remote awk processing succeeded (host: {target_host}, file: {file_path.strip()})", - is_list_result=True - ) - -def file_sort_tool( - target: Optional[str] = None, - file_path: str = "", - options: str = "", - output_file: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空,请提供有效的文件路径" if is_zh else "File path cannot be empty, please provide a valid path" - return result - - # 构建sort命令(内容转义) - escaped_file = escape_shell_content(file_path.strip()) - escaped_output = escape_shell_content(output_file.strip()) if output_file.strip() else "" - options_clean = options.strip() - if output_file.strip(): - sort_cmd = f"sort {options_clean} {escaped_file} -o {escaped_output}" - else: - sort_cmd = f"sort {options_clean} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - success_msg_zh = f"本地排序完成,结果已保存至:{output_file.strip()}" if output_file.strip() else f"本地排序完成" - success_msg_en = f"Local sort completed, result saved to: {output_file.strip()}" if output_file.strip() else f"Local sort completed" - return run_local_command( - cmd=sort_cmd, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth: - result["message"] = f"未找到远程主机({target_host})的认证配置" if is_zh else f"Authentication config for remote host ({target_host}) not found" - return result - if not (remote_auth.get("username") and remote_auth.get("password")): - result["message"] = f"远程主机({target_host})的认证信息不完整(缺少用户名或密码)" if is_zh else f"Remote host ({target_host}) auth info incomplete (missing username/password)" - return result - - success_msg_zh = f"远程排序完成,结果已保存至:{output_file.strip()}(主机:{target_host})" if output_file.strip() else f"远程排序完成(主机:{target_host})" - success_msg_en = f"Remote sort completed, result saved to: {output_file.strip()} (host: {target_host})" if output_file.strip() else f"Remote sort completed (host: {target_host})" - return run_remote_command( - cmd=sort_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - -def file_unique_tool( - target: Optional[str] = None, - file_path: str = "", - options: str = "", - output_file: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - - # 构建unique命令(内容转义) - escaped_file = escape_shell_content(file_path.strip()) - escaped_output = escape_shell_content(output_file.strip()) if output_file.strip() else "" - options_clean = options.strip() - if output_file.strip(): - unique_cmd = f"uniq {options_clean} {escaped_file} {escaped_output}" - else: - unique_cmd = f"uniq {options_clean} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - success_msg_zh = f"本地去重完成,结果已保存至:{output_file.strip()}" if output_file.strip() else f"本地去重完成" - success_msg_en = f"Local deduplication completed, result saved to: {output_file.strip()}" if output_file.strip() else f"Local deduplication completed" - return run_local_command( - cmd=unique_cmd, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程认证配置缺失" if is_zh else "Remote auth config missing" - return result - - success_msg_zh = f"远程去重完成,结果已保存至:{output_file.strip()}(主机:{target_host})" if output_file.strip() else f"远程去重完成(主机:{target_host})" - success_msg_en = f"Remote deduplication completed, result saved to: {output_file.strip()} (host: {target_host})" if output_file.strip() else f"Remote deduplication completed (host: {target_host})" - return run_remote_command( - cmd=unique_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - -def file_echo_tool( - target: Optional[str] = None, - content: str = "", - file_path: str = "", - append: bool = False, - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host, result_type="str", include_file_path=True) - - # 参数校验 - if not content.strip(): - result["message"] = "写入内容不能为空" if is_zh else "Content to write cannot be empty" - return result - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - - # 构建echo命令(内容转义) - escaped_content = escape_shell_content(content.strip()) - escaped_file = escape_shell_content(file_path.strip()) - redirect = ">>" if append else ">" - echo_cmd = f"echo '{escaped_content}' {redirect} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - action = "追加" if append else "写入" - action_en = "appended" if append else "written" - return run_local_command( - cmd=echo_cmd, - result=result, - success_msg_zh=f"本地{action}成功,文件路径:{file_path.strip()}", - success_msg_en=f"Local {action_en} successfully, file path: {file_path.strip()}", - is_list_result=False, - in_place=True - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程认证配置缺失" if is_zh else "Remote auth config missing" - return result - - action = "追加" if append else "写入" - action_en = "appended" if append else "written" - return run_remote_command( - cmd=echo_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程{action}成功(主机:{target_host}),文件路径:{file_path.strip()}", - success_msg_en=f"Remote {action_en} successfully (host: {target_host}), file path: {file_path.strip()}", - is_list_result=False, - in_place=True - ) \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/requirements.txt b/mcp_center/servers/oe_cli_mcp_server/requirements.txt index d6df13ee84..dfaaed8a1d 100644 --- a/mcp_center/servers/oe_cli_mcp_server/requirements.txt +++ b/mcp_center/servers/oe_cli_mcp_server/requirements.txt @@ -7,4 +7,5 @@ mcp==1.9.4 scp==0.15.0 fastapi>=0.122.0 uvicorn>=0.38.0 -requests==2.31.0 \ No newline at end of file +requests==2.31.0 +pydantic ==2.5.2 \ No newline at end of file -- Gitee From 0b5a3cb0c0a66ce72f8b6ad1b49d2e3a6d253539 Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 10:22:48 +0800 Subject: [PATCH 02/10] =?UTF-8?q?mcp=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mcp_center/run.sh | 2 +- mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py | 2 +- mcp_center/servers/oe_cli_mcp_server/requirements.txt | 2 +- mcp_center/servers/oe_cli_mcp_server/run.sh | 6 +++--- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/mcp_center/run.sh b/mcp_center/run.sh index d77f47737b..ddf5936e00 100755 --- a/mcp_center/run.sh +++ b/mcp_center/run.sh @@ -2,7 +2,7 @@ SERVICE_DIR="/usr/lib/euler-copilot-framework/mcp_center/service" -/usr/lib/euler-copilot-framework/mcp_center/servers/oe-cli-mcp-server/run.sh +/usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/run.sh for service_file in "$SERVICE_DIR"/*.service; do if [ -f "$service_file" ]; then diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py b/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py index 7d319b5add..dc0a65fc1b 100755 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/venv/global/bin/python3 import logging import sys with open("/etc/systemd/system/mcp-server.service", "r") as f: diff --git a/mcp_center/servers/oe_cli_mcp_server/requirements.txt b/mcp_center/servers/oe_cli_mcp_server/requirements.txt index dfaaed8a1d..bd108e4f91 100644 --- a/mcp_center/servers/oe_cli_mcp_server/requirements.txt +++ b/mcp_center/servers/oe_cli_mcp_server/requirements.txt @@ -8,4 +8,4 @@ scp==0.15.0 fastapi>=0.122.0 uvicorn>=0.38.0 requests==2.31.0 -pydantic ==2.5.2 \ No newline at end of file +pydantic >=2.7.2 \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/run.sh b/mcp_center/servers/oe_cli_mcp_server/run.sh index bce85a1ab7..dcffd746dc 100755 --- a/mcp_center/servers/oe_cli_mcp_server/run.sh +++ b/mcp_center/servers/oe_cli_mcp_server/run.sh @@ -15,12 +15,12 @@ pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple pip install -r "$REQUIREMENTS" -i https://pypi.tuna.tsinghua.edu.cn/simple # 3. 部署systemd服务 -cp /home/tsn/framework-dev-with-mcp/mcp_center/servers/oe_cli_mcp_server/mcp-server.service /etc/systemd/system/ +cp /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp-server.service /etc/systemd/system/ systemctl daemon-reload systemctl enable mcp-server --now -systemctl status mcp-server + # 4. 全局命令链接 chmod +x /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py rm -f /usr/local/bin/mcp-server -ln -s /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py /usr/local/bin/mcp-server \ No newline at end of file +ln -s /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py /usr/local/bin/mcp-server -- Gitee From 851c3d3067723d334778fbf5f0b3d663b8c3c453 Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 10:30:35 +0800 Subject: [PATCH 03/10] =?UTF-8?q?oe-cli-mcp-server=20=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E8=B0=83=E6=95=B4+=E6=A0=87=E5=87=86=E5=8C=96oe-cli-mcp-server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../base_tools/sys_info_tool/base.py | 1 + .../base_tools/sys_info_tool/config.json | 8 +++ .../base_tools/sys_info_tool/deps.toml | 0 .../base_tools/sys_info_tool/tool.py | 50 +++++++++++++++++++ 4 files changed, 59 insertions(+) create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/deps.toml create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py new file mode 100644 index 0000000000..ea0c8a85cb --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py @@ -0,0 +1 @@ +q \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json new file mode 100644 index 0000000000..56df6d526b --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json @@ -0,0 +1,8 @@ +{ + "tools": { + "sys_info_tool": { + "zh": "【系统/硬件/安全信息采集工具】\n功能:批量采集OpenEuler系统的系统、硬件、安全类信息(纯Python+系统命令安全调用)\n\n【核心提示】\n1. 信息类型(info_types)为枚举值列表,支持多选,可选枚举值如下:\n - 系统类:os(版本/内核)、load(负载)、uptime(运行时间)\n - 硬件类:cpu(型号/核心)、mem(内存占用)、disk(分区/使用率)、gpu(显卡状态)、net(网卡/IP)\n - 安全类:selinux(状态)、firewall(防火墙规则)\n2. 输入格式示例:[\"cpu\", \"mem\", \"disk\"](必须为列表,单个类型也需用列表包裹,如[\"os\"]);\n3. 无额外参数,仅需传入info_types列表即可批量采集对应信息;\n4. 语言配置:自动读取系统全局配置(中文/英文),不提供外部设置接口;\n5. 依赖说明:GPU信息采集需安装nvidia-smi(仅支持NVIDIA显卡),防火墙信息需安装firewalld。\n\n【枚举类定义(必须遵守)】\n- InfoTypeEnum(信息类型枚举):os / load / uptime / cpu / mem / disk / gpu / net / selinux / firewall\n\n【参数详情】\n- info_types:信息类型列表(必填,枚举值见上方,支持多选,格式为列表)\n\n【返回值说明】\n- success:采集结果(True=成功,False=失败)\n- message:采集信息/错误提示(根据全局语言配置自动切换)\n- result:采集结果(结构化嵌套字典,key为信息类型,value为对应采集数据)\n- target:执行目标(固定为127.0.0.1,本地执行)\n- info_types:已采集的信息类型列表", + "en": "【System/Hardware/Security Info Collector】\nFunction: Batch collect system, hardware, security info of OpenEuler (Python native + safe system command call)\n\n【Core Guidelines】\n1. Info types (info_types) is an enum value list, supports multiple selection, optional enum values:\n - System: os (version/kernel), load (system load), uptime (system uptime)\n - Hardware: cpu (model/core), mem (memory usage), disk (partition/usage), gpu (GPU status), net (network card/IP)\n - Security: selinux (SELinux status), firewall (firewall rules)\n2. Input format example: [\"cpu\", \"mem\", \"disk\"] (must be a list, single type also needs to be wrapped in a list, e.g. [\"os\"]);\n3. No additional parameters, just pass info_types list to batch collect corresponding info;\n4. Language configuration: Automatically read global system configuration (Chinese/English), no external setting interface;\n5. Dependency note: GPU info requires nvidia-smi (NVIDIA GPU only), firewall info requires firewalld.\n\n【Enum Class Definition (Must Follow)】\n- InfoTypeEnum (Info Type Enum): os / load / uptime / cpu / mem / disk / gpu / net / selinux / firewall\n\n【Parameter Details】\n- info_types: Info type list (required, enum values see above, supports multiple selection, format is list)\n\n【Return Value Explanation】\n- success: Collection result (True=success, False=failure)\n- message: Collection info/error prompt (automatically switch based on global config)\n- result: Collected info (structured nested dict, key is info type, value is corresponding collected data)\n- target: Execution target (fixed as 127.0.0.1, local execution)\n- info_types: Collected info type list" + } + } +} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/deps.toml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py new file mode 100644 index 0000000000..626027ff1e --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py @@ -0,0 +1,50 @@ +from typing import Dict, List +from pydantic import Field + +from servers.oe_cli_mcp_server.mcp_tools.base_tools.sys_info_tool.base import ( + init_result_dict, + SystemInfoCollector, + InfoTypeEnum, + parse_info_types, + is_zh, + logger +) + +def sys_info_tool( + info_types: List[str] = Field( + ..., + description="信息类型列表(枚举值可选:os/load/uptime/cpu/mem/disk/gpu/net/selinux/firewall,支持多选)" + ) +) -> Dict: + """ + 系统/硬件/安全信息采集工具(支持批量采集多类信息) + 语言配置从全局BaseConfig读取,不提供外部接口 + """ + # 初始化结果字典 + result = init_result_dict() + collector = SystemInfoCollector() + use_zh = is_zh() + + # 解析信息类型列表(字符串列表→枚举列表) + try: + parsed_info_types = parse_info_types(info_types) + result["info_types"] = [t.value for t in parsed_info_types] # 记录原始输入的类型列表 + except ValueError as e: + result["message"] = str(e) + return result + + # 批量采集信息 + try: + batch_result = collector.collect_batch(parsed_info_types) + result["result"] = batch_result + result["success"] = True + + # 生成提示信息 + info_type_str = ",".join(result["info_types"]) + result["message"] = f"以下信息采集完成:{info_type_str}" if use_zh else f"Collected info types: {info_type_str}" + + except Exception as e: + result["message"] = f"信息采集失败:{str(e)}" if use_zh else f"Info collection failed: {str(e)}" + logger.error(f"System info batch collection error: {str(e)}") + + return result \ No newline at end of file -- Gitee From 74313ceb32f0c65a796b21c6985bf187f7a089fb Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 10:40:15 +0800 Subject: [PATCH 04/10] =?UTF-8?q?oe-cli-mcp-server=20mcp=5Fcenter=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mcp_tools/base_tools/file_tool/base.py | 239 ++++++++++++++++++ .../base_tools/file_tool/config.json | 8 + .../mcp_tools/base_tools/file_tool/deps.toml | 0 .../mcp_tools/base_tools/file_tool/tool.py | 96 +++++++ 4 files changed, 343 insertions(+) create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/deps.toml create mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py new file mode 100644 index 0000000000..b4cbfd03d4 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py @@ -0,0 +1,239 @@ +import logging +import os +import shutil +from enum import Enum +from typing import Dict, List +from pydantic import Field +from servers.oe_cli_mcp_server.config.base_config_loader import BaseConfig, LanguageEnum + +# 初始化日志 +logger = logging.getLogger("file_tool") +logger.setLevel(logging.INFO) +lang = BaseConfig().get_config().public_config.language +# ========== 枚举类定义(提升大模型识别性) ========== +class FileActionEnum(str, Enum): + """文件操作类型枚举""" + LS = "ls" # 查看文件/目录 + CAT = "cat" # 读取文件内容 + ADD = "add" # 新建空文件 + APPEND = "append" # 追加内容 + EDIT = "edit" # 覆盖内容 + RENAME = "rename" # 重命名 + CHMOD = "chmod" # 修改权限 + DELETE = "delete" # 删除文件/目录 + +class FileEncodingEnum(str, Enum): + """文件编码枚举""" + UTF8 = "utf-8" + GBK = "gbk" + GB2312 = "gb2312" + ASCII = "ascii" + +class CacheTypeEnum(str, Enum): + """缓存类型枚举(预留扩展)""" + ALL = "all" + PACKAGES = "packages" + METADATA = "metadata" + +# ========== 通用工具函数 ========== +def get_language() -> bool: + """获取语言配置:True=中文,False=英文""" + return BaseConfig().get_config().public_config.language == LanguageEnum.ZH + +def init_result_dict( + target_host: str = "127.0.0.1", + result_type: str = "list", + include_file_path: bool = True +) -> Dict: + """初始化返回结果字典(默认包含file_path字段)""" + result = { + "success": False, + "message": "", + "result": [] if result_type == "list" else "", + "target": target_host, + "file_path": "" + } + return result + +# ========== 文件管理核心类 ========== +class FileManager: + """文件管理核心类(Python原生实现,无shell依赖)""" + def __init__(self, lang: LanguageEnum = LanguageEnum.ZH): + self.is_zh = lang + + def _get_error_msg(self, zh_msg: str, en_msg: str) -> str: + """多语言错误提示""" + return zh_msg if self.is_zh else en_msg + + def ls(self, file_path: str, detail: bool = Field(False, description="是否显示详细信息")) -> List[Dict]: + """ + 查看文件/目录状态(Python实现ls功能) + :param file_path: 文件/目录路径(必填) + :param detail: 是否显示详细信息(默认False) + :return: 结构化文件信息列表 + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"路径不存在:{file_path}", f"Path not found: {file_path}")) + + result = [] + if os.path.isfile(file_path): + # 单个文件 + stat = os.stat(file_path) + file_info = { + "name": os.path.basename(file_path), + "path": file_path, + "size": stat.st_size, + "mtime": stat.st_mtime, + "mode": oct(stat.st_mode)[-3:], + "type": "file" + } + result.append(file_info) + else: + # 目录 + for item in os.listdir(file_path): + full_path = os.path.join(file_path, item) + stat = os.stat(full_path) + item_info = { + "name": item, + "path": full_path, + "size": stat.st_size, + "mtime": stat.st_mtime, + "mode": oct(stat.st_mode)[-3:], + "type": "file" if os.path.isfile(full_path) else "dir" + } + result.append(item_info) + return result + + def cat(self, + file_path: str, + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码")) -> List[str]: + """ + 读取文件内容(Python实现cat功能) + :param file_path: 文件路径(必填) + :param encoding: 文件编码(默认utf-8) + :return: 按行拆分的内容列表 + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"文件不存在:{file_path}", f"File not found: {file_path}")) + if os.path.isdir(file_path): + raise IsADirectoryError(self._get_error_msg(f"路径是目录,无法读取内容:{file_path}", f"Path is directory, cannot read content: {file_path}")) + + with open(file_path, "r", encoding=encoding.value, errors="ignore") as f: + content = [line.rstrip("\n") for line in f.readlines()] + return content + + def add(self, + file_path: str, + overwrite: bool = Field(False, description="是否覆盖已存在文件")) -> None: + """ + 新建空文件(Python实现touch功能) + :param file_path: 文件路径(必填) + :param overwrite: 已存在时是否覆盖(默认False) + """ + if os.path.exists(file_path) and not overwrite: + logger.info(self._get_error_msg(f"文件已存在,跳过创建:{file_path}", f"File exists, skip creation: {file_path}")) + return + + # 确保父目录存在 + parent_dir = os.path.dirname(file_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + + with open(file_path, "w", encoding=FileEncodingEnum.UTF8.value) as f: + pass + logger.info(self._get_error_msg(f"文件创建成功:{file_path}", f"File created successfully: {file_path}")) + + def append(self, + file_path: str, + content: str = Field(..., description="追加内容"), + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码")) -> None: + """ + 追加内容到文件 + :param file_path: 文件路径(必填) + :param content: 追加内容(必填) + :param encoding: 文件编码(默认utf-8) + """ + if not content: + raise ValueError(self._get_error_msg("追加内容不能为空", "Append content cannot be empty")) + + parent_dir = os.path.dirname(file_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + + with open(file_path, "a", encoding=encoding.value) as f: + f.write(content + "\n") + logger.info(self._get_error_msg(f"内容追加成功:{file_path}", f"Content appended successfully: {file_path}")) + + def edit(self, + file_path: str, + content: str = Field(..., description="覆盖内容"), + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码")) -> None: + """ + 覆盖写入文件内容 + :param file_path: 文件路径(必填) + :param content: 覆盖内容(必填) + :param encoding: 文件编码(默认utf-8) + """ + if not content: + raise ValueError(self._get_error_msg("覆盖内容不能为空", "Edit content cannot be empty")) + + parent_dir = os.path.dirname(file_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + + with open(file_path, "w", encoding=encoding.value) as f: + f.write(content) + logger.info(self._get_error_msg(f"文件内容覆盖成功:{file_path}", f"File content overwritten successfully: {file_path}")) + + def rename(self, + old_path: str, + new_path: str = Field(..., description="新路径")) -> None: + """ + 重命名文件/目录 + :param old_path: 原路径(必填) + :param new_path: 新路径(必填) + """ + if not os.path.exists(old_path): + raise FileNotFoundError(self._get_error_msg(f"原路径不存在:{old_path}", f"Old path not found: {old_path}")) + if os.path.exists(new_path): + raise FileExistsError(self._get_error_msg(f"新路径已存在:{new_path}", f"New path already exists: {new_path}")) + + shutil.move(old_path, new_path) + logger.info(self._get_error_msg(f"文件重命名成功:{old_path} → {new_path}", f"File renamed successfully: {old_path} → {new_path}")) + + def chmod(self, + file_path: str, + mode: str = Field(..., description="权限模式(如755)")) -> None: + """ + 修改文件权限 + :param file_path: 文件路径(必填) + :param mode: 权限模式(必填,如755) + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"文件不存在:{file_path}", f"File not found: {file_path}")) + + try: + numeric_mode = int(mode, 8) + os.chmod(file_path, numeric_mode) + logger.info(self._get_error_msg(f"权限修改成功:{file_path} → {mode}", f"Permission modified successfully: {file_path} → {mode}")) + except ValueError: + raise ValueError(self._get_error_msg(f"权限格式错误(需为8进制,如755):{mode}", f"Invalid mode format (must be octal, e.g.755): {mode}")) + + def delete(self, + file_path: str, + recursive: bool = Field(False, description="是否递归删除目录")) -> None: + """ + 删除文件/目录 + :param file_path: 文件/目录路径(必填) + :param recursive: 是否递归删除目录(默认False) + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"路径不存在:{file_path}", f"Path not found: {file_path}")) + + if os.path.isfile(file_path): + os.remove(file_path) + else: + if not recursive: + raise IsADirectoryError(self._get_error_msg(f"路径是目录,需开启recursive=True递归删除:{file_path}", f"Path is directory, set recursive=True to delete: {file_path}")) + shutil.rmtree(file_path) + logger.info(self._get_error_msg(f"路径删除成功:{file_path}", f"Path deleted successfully: {file_path}")) \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json new file mode 100644 index 0000000000..93845a4dbb --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json @@ -0,0 +1,8 @@ +{ + "tools": { + "file_tool": { + "zh": "【统一文件管理工具】\n功能:支持本地文件/目录的查、增、改、删全操作(纯Python实现,无shell依赖)\n\n【核心提示】\n1. 操作类型(action)必须从以下枚举值中选择:ls/cat/add/append/edit/rename/chmod/delete,不可传入其他值;\n2. 文件编码(encoding)必须从以下枚举值中选择:utf-8/gbk/gb2312/ascii,默认值为utf-8;\n3. 不同操作类型(action)对应不同必填参数,未满足则执行失败,请严格遵守:\n - ls(查看列表)/cat(读取内容)/delete(删除):仅需传入 file_path(文件/目录路径);\n - add(新建文件):需传入 file_path,可选传入 overwrite(是否覆盖已存在文件,默认False);\n - append(追加内容)/edit(覆盖内容):必须传入 file_path + content(写入/追加的内容);\n - rename(重命名):必须传入 file_path(原路径) + new_path(新路径);\n - chmod(修改权限):必须传入 file_path + mode(权限模式,如755/644,需为8进制格式);\n4. 非必填参数默认值:detail(ls是否显示详细信息)=False,recursive(删除目录是否递归)=False;\n5. 路径格式要求:file_path/new_path需传入绝对路径(如\"/tmp/test.txt\"),避免相对路径导致的找不到文件问题。\n\n【枚举类定义(必须遵守)】\n- FileActionEnum(操作类型枚举):ls / cat / add / append / edit / rename / chmod / delete\n- FileEncodingEnum(文件编码枚举):utf-8 / gbk / gb2312 / ascii\n\n【参数详情】\n- action:操作类型(必填,枚举值见上方)\n- file_path:目标文件/目录路径(必填,绝对路径)\n- content:写入/追加内容(add/edit/append操作必填)\n- new_path:重命名后的新路径(rename操作必填,绝对路径)\n- mode:权限模式(chmod操作必填,如755)\n- detail:ls操作是否显示详细信息(默认False)\n- overwrite:add操作是否覆盖已存在文件(默认False)\n- recursive:delete操作是否递归删除目录(默认False)\n- encoding:文件编码(枚举值见上方,默认utf-8)\n【返回值说明】\n- success:执行结果(True=成功,False=失败)\n- message:执行信息/错误提示(多语言)\n- result:操作结果(ls/cat返回结构化列表,其他操作返回空列表)\n- file_path:操作的文件路径(rename后自动更新为新路径)\n- target:执行目标(固定为127.0.0.1,本地执行)", + "en": "【Unified File Management Tool】\nFunction: Supports query/add/edit/delete operations for local files/directories (Python native implementation, no shell dependency)\n\n【Core Guidelines】\n1. Operation type (action) must be selected from the following enum values: ls/cat/add/append/edit/rename/chmod/delete, other values are not allowed;\n2. File encoding (encoding) must be selected from the following enum values: utf-8/gbk/gb2312/ascii, default value is utf-8;\n3. Different action types correspond to different required parameters, execution will fail if not met, please strictly follow:\n - ls (list files)/cat (read content)/delete (delete): Only need to pass file_path (file/directory path);\n - add (create empty file): Need to pass file_path, optionally pass overwrite (whether to overwrite existing file, default False);\n - append (append content)/edit (overwrite content): Must pass file_path + content (content to write/append);\n - rename (rename file/directory): Must pass file_path (old path) + new_path (new path);\n - chmod (modify permission): Must pass file_path + mode (permission mode, e.g.755/644, must be octal format);\n4. Default values for optional parameters: detail (whether to show detailed info for ls)=False, recursive (whether to delete directory recursively)=False;\n5. Path format requirement: file_path/new_path must be absolute path (e.g.\"/tmp/test.txt\"), avoid file not found due to relative path.\n\n【Enum Class Definition (Must Follow)】\n- FileActionEnum (Operation Type Enum): ls / cat / add / append / edit / rename / chmod / delete\n- FileEncodingEnum (File Encoding Enum): utf-8 / gbk / gb2312 / ascii\n\n【Parameter Details】\n- action: Operation type (required, enum values see above)\n- file_path: Target file/directory path (required, absolute path)\n- content: Content to write/append (required for add/edit/append)\n- new_path: New path after rename (required for rename, absolute path)\n- mode: Permission mode (required for chmod, e.g.755)\n- detail: Whether to show detailed info for ls (default False)\n- overwrite: Whether to overwrite existing file for add (default False)\n- recursive: Whether to delete directory recursively for delete (default False)\n- encoding: File encoding (enum values see above, default utf-8)\n- lang: Language (enum values: ZH/EN, default ZH)\n\n【Return Value Explanation】\n- success: Execution result (True=success, False=failure)\n- message: Execution info/error prompt (multilingual)\n- result: Operation result (structured list for ls/cat, empty list for others)\n- file_path: Operated file path (automatically updated to new path after rename)\n- target: Execution target (fixed as 127.0.0.1, local execution)" + } + } +} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/deps.toml new file mode 100644 index 0000000000..e69de29bb2 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py new file mode 100644 index 0000000000..cc456412e6 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py @@ -0,0 +1,96 @@ +from typing import Dict, Optional +from pydantic import Field +from servers.oe_cli_mcp_server.config.base_config_loader import LanguageEnum +from servers.oe_cli_mcp_server.mcp_tools.base_tools.file_tool.base import ( + init_result_dict, + FileManager, + FileActionEnum, + FileEncodingEnum, + logger, lang +) + +def file_tool( + action: FileActionEnum = Field(..., description="操作类型(枚举:ls/cat/add/append/edit/rename/chmod/delete)"), + file_path: str = Field(..., description="文件/目录路径(必填)"), + content: Optional[str] = Field(None, description="写入/追加内容(add/edit/append必填)"), + new_path: Optional[str] = Field(None, description="新路径(rename必填)"), + mode: Optional[str] = Field(None, description="权限模式(chmod必填,如755)"), + detail: bool = Field(False, description="ls是否显示详细信息"), + overwrite: bool = Field(False, description="add是否覆盖已存在文件"), + recursive: bool = Field(False, description="delete是否递归删除目录"), + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码(默认utf-8)"), +) -> Dict: + """ + 统一文件管理工具(精简参数+枚举入参,适配大模型识别) + """ + # 初始化结果字典 + result = init_result_dict() + result["file_path"] = file_path.strip() + fm = FileManager(lang=lang) + is_zh = lang == LanguageEnum.ZH + + # 1. 核心参数校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" + return result + + # 2. 按枚举操作类型执行 + try: + if action == FileActionEnum.LS: + result["result"] = fm.ls(file_path.strip(), detail=detail) + result["success"] = True + result["message"] = f"本地文件列表查询完成(路径:{file_path})" if is_zh else f"Local file list queried (path: {file_path})" + + elif action == FileActionEnum.CAT: + result["result"] = fm.cat(file_path.strip(), encoding=encoding) + result["success"] = True + result["message"] = f"本地文件内容读取完成(路径:{file_path})" if is_zh else f"Local file content read (path: {file_path})" + + elif action == FileActionEnum.ADD: + fm.add(file_path.strip(), overwrite=overwrite) + result["success"] = True + result["message"] = f"本地文件创建成功(路径:{file_path})" if is_zh else f"Local file created (path: {file_path})" + + elif action == FileActionEnum.APPEND: + if not content: + result["message"] = "追加内容不能为空" if is_zh else "Append content cannot be empty" + return result + fm.append(file_path.strip(), content.strip(), encoding=encoding) + result["success"] = True + result["message"] = f"本地文件内容追加成功(路径:{file_path})" if is_zh else f"Local file content appended (path: {file_path})" + + elif action == FileActionEnum.EDIT: + if not content: + result["message"] = "覆盖内容不能为空" if is_zh else "Edit content cannot be empty" + return result + fm.edit(file_path.strip(), content.strip(), encoding=encoding) + result["success"] = True + result["message"] = f"本地文件内容覆盖成功(路径:{file_path})" if is_zh else f"Local file content overwritten (path: {file_path})" + + elif action == FileActionEnum.RENAME: + if not new_path: + result["message"] = "新路径不能为空" if is_zh else "New path cannot be empty" + return result + fm.rename(file_path.strip(), new_path.strip()) + result["success"] = True + result["file_path"] = new_path.strip() + result["message"] = f"本地文件重命名成功({file_path} → {new_path})" if is_zh else f"Local file renamed ({file_path} → {new_path})" + + elif action == FileActionEnum.CHMOD: + if not mode: + result["message"] = "权限模式不能为空(如755)" if is_zh else "Permission mode cannot be empty (e.g.755)" + return result + fm.chmod(file_path.strip(), mode.strip()) + result["success"] = True + result["message"] = f"本地文件权限修改成功(路径:{file_path},权限:{mode})" if is_zh else f"Local file permission modified (path: {file_path}, mode: {mode})" + + elif action == FileActionEnum.DELETE: + fm.delete(file_path.strip(), recursive=recursive) + result["success"] = True + result["message"] = f"本地文件删除成功(路径:{file_path})" if is_zh else f"Local file deleted (path: {file_path})" + + except Exception as e: + result["message"] = f"操作失败:{str(e)}" if is_zh else f"Operation failed: {str(e)}" + logger.error(f"File manager error: {str(e)}") + + return result \ No newline at end of file -- Gitee From ff3881a4bd2c850dfe49b44886d008fd74f2640a Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 10:49:31 +0800 Subject: [PATCH 05/10] =?UTF-8?q?mcp=5Fcenter=20oe-cli-mcp-server=20?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E4=BF=A1=E6=81=AF=E6=9F=A5=E8=AF=A2mcp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../base_tools/sys_info_tool/base.py | 432 +++++++++++++++++- 1 file changed, 431 insertions(+), 1 deletion(-) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py index ea0c8a85cb..ee4c19ab24 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py @@ -1 +1,431 @@ -q \ No newline at end of file +import logging +import os +import platform +import subprocess +import re +from enum import Enum +from typing import Dict, List, Optional + +from servers.oe_cli_mcp_server.config.base_config_loader import BaseConfig, LanguageEnum + +# 初始化日志 +logger = logging.getLogger("sys_info_tool") +logger.setLevel(logging.INFO) + +# ========== 枚举类定义(不变) ========== +class InfoTypeEnum(str, Enum): + """信息类型枚举(对应需求的3大类)""" + # 系统类 + OS = "os" # 版本/内核 + LOAD = "load" # 系统负载 + UPTIME = "uptime" # 运行时间 + # 硬件类 + CPU = "cpu" # CPU信息 + MEM = "mem" # 内存占用 + DISK = "disk" # 磁盘分区/使用率 + GPU = "gpu" # 显卡状态 + NET = "net" # 网卡/IP + # 安全类 + SELINUX = "selinux"# SELinux状态 + FIREWALL = "firewall" # 防火墙规则 + +# ========== 通用工具函数(不变) ========== +def get_language() -> LanguageEnum: + """获取全局语言配置""" + return BaseConfig().get_config().public_config.language + +def is_zh() -> bool: + """判断是否为中文环境""" + return get_language() == LanguageEnum.ZH + +def init_result_dict( + target_host: str = "127.0.0.1", + result_type: str = "dict" +) -> Dict: + """初始化返回结果字典(result为嵌套字典,key为信息类型)""" + return { + "success": False, + "message": "", + "result": {}, # 格式:{"cpu": {...}, "mem": {...}, ...} + "target": target_host, + "info_types": [] # 记录已采集的信息类型列表 + } + +def parse_info_types(info_type_list: List[str]) -> List[InfoTypeEnum]: + """解析字符串列表为InfoTypeEnum列表(适配大模型输入)""" + if not info_type_list: + valid_values = [e.value for e in InfoTypeEnum] + zh_msg = f"信息类型列表不能为空,可选枚举值:{','.join(valid_values)}" + en_msg = f"Info type list cannot be empty, optional enum values: {','.join(valid_values)}" + raise ValueError(zh_msg if is_zh() else en_msg) + + parsed_enums = [] + valid_values = [e.value for e in InfoTypeEnum] + for info_type_str in info_type_list: + try: + parsed_enums.append(InfoTypeEnum(info_type_str.strip().lower())) + except ValueError: + zh_msg = f"无效的信息类型:{info_type_str},可选枚举值:{','.join(valid_values)}" + en_msg = f"Invalid info type: {info_type_str}, optional enum values: {','.join(valid_values)}" + raise ValueError(zh_msg if is_zh() else en_msg) + return parsed_enums + +# ========== 系统信息采集核心类(修复命令不存在问题) ========== +class SystemInfoCollector: + """系统/硬件/安全信息采集类(优先Python原生,必要时调用系统命令)""" + def __init__(self): + self.lang = get_language() + + def _get_msg(self, zh_msg: str, en_msg: str) -> str: + """多语言提示""" + return zh_msg if self.lang == LanguageEnum.ZH else en_msg + + def _run_cmd_safe(self, cmd: List[str]) -> Optional[str]: + """安全执行系统命令(命令不存在/执行失败返回None,不抛出异常)""" + # 检查命令是否存在(获取绝对路径) + cmd_name = cmd[0] + cmd_path = self._find_cmd_absolute_path(cmd_name) + if not cmd_path: + logger.warning(self._get_msg(f"命令不存在:{cmd_name}", f"Command not found: {cmd_name}")) + return None + + # 替换为绝对路径执行 + cmd[0] = cmd_path + try: + logger.info(f"执行系统命令:{' '.join(cmd)}") + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True, + timeout=10 # 超时保护:10秒 + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + error_msg = e.stderr.strip() or e.stdout.strip() + logger.error(self._get_msg(f"命令执行失败:{cmd_name},错误:{error_msg}", f"Command failed: {cmd_name}, error: {error_msg}")) + return None + except subprocess.TimeoutExpired: + logger.error(self._get_msg(f"命令执行超时:{cmd_name}", f"Command timeout: {cmd_name}")) + return None + except Exception as e: + logger.error(self._get_msg(f"系统调用异常:{cmd_name},错误:{str(e)}", f"System call exception: {cmd_name}, error: {str(e)}")) + return None + + def _find_cmd_absolute_path(self, cmd: str) -> Optional[str]: + """辅助函数:查找命令的绝对路径(兼容特殊情况)""" + try: + # 用 which 命令查找(若 which 存在) + which_result = self._run_cmd_safe(["which", cmd]) + if which_result: + return which_result.strip() + except Exception: + pass + + # which 不存在时,遍历常见路径 + common_paths = ["/usr/bin", "/bin", "/usr/sbin", "/sbin", "/usr/local/bin"] + for path in common_paths: + cmd_path = os.path.join(path, cmd) + if os.path.exists(cmd_path) and os.access(cmd_path, os.X_OK): + return cmd_path + return None + + # ---------- 系统类信息 ---------- + def get_os_info(self) -> Dict: + """获取系统版本/内核信息(无命令依赖,不变)""" + os_info = { + "system": platform.system(), + "release": platform.release(), + "version": platform.version(), + "kernel": platform.uname().release, + "architecture": platform.machine() + } + # 补充OpenEuler版本(通过/etc/os-release) + if os.path.exists("/etc/os-release"): + try: + with open("/etc/os-release", "r", encoding="utf-8") as f: + for line in f: + if line.startswith("PRETTY_NAME="): + os_info["pretty_name"] = line.strip().split("=")[1].strip('"') + elif line.startswith("VERSION_ID="): + os_info["version_id"] = line.strip().split("=")[1].strip('"') + except Exception as e: + logger.error(self._get_msg(f"读取os-release失败:{str(e)}", f"Failed to read os-release: {str(e)}")) + return os_info + + def get_load_info(self) -> Dict: + """获取系统负载(无命令依赖,不变)""" + try: + load_avg = os.getloadavg() + return { + "1min": load_avg[0], + "5min": load_avg[1], + "15min": load_avg[2], + "cpu_count": os.cpu_count() or 0 + } + except Exception as e: + logger.error(self._get_msg(f"获取系统负载失败:{str(e)}", f"Failed to get load avg: {str(e)}")) + return {"error": self._get_msg("获取系统负载失败", "Failed to get load average")} + + def get_uptime_info(self) -> Dict: + """获取系统运行时间(修复uptime命令不存在问题)""" + uptime_output = self._run_cmd_safe(["uptime", "-p"]) + if uptime_output: + return {"uptime": uptime_output} + else: + # 命令不存在时,通过/proc/uptime计算(Python原生方式) + try: + with open("/proc/uptime", "r", encoding="utf-8") as f: + total_seconds = float(f.readline().split()[0]) + hours = int(total_seconds // 3600) + minutes = int((total_seconds % 3600) // 60) + uptime_str = self._get_msg(f"运行 {hours} 小时 {minutes} 分钟", f"up {hours} hours, {minutes} minutes") + return {"uptime": uptime_str, "note": self._get_msg("基于/proc/uptime计算", "Calculated from /proc/uptime")} + except Exception as e: + logger.error(self._get_msg(f"获取运行时间失败:{str(e)}", f"Failed to get uptime: {str(e)}")) + return {"error": self._get_msg("获取运行时间失败", "Failed to get uptime")} + + # ---------- 硬件类信息 ---------- + def get_cpu_info(self) -> Dict: + """获取CPU型号/核心信息(无命令依赖,不变)""" + cpu_info = {} + try: + # 通过/proc/cpuinfo获取 + if os.path.exists("/proc/cpuinfo"): + with open("/proc/cpuinfo", "r", encoding="utf-8") as f: + for line in f: + if line.startswith("model name"): + cpu_info["model"] = line.strip().split(":")[1].strip() + break + # 获取CPU核心数 + cpu_info["physical_cores"] = os.cpu_count() or 0 + return cpu_info + except Exception as e: + logger.error(self._get_msg(f"获取CPU信息失败:{str(e)}", f"Failed to get CPU info: {str(e)}")) + return {"error": self._get_msg("获取CPU信息失败", "Failed to get CPU information")} + + def get_mem_info(self) -> Dict: + """获取内存占用信息(无命令依赖,不变)""" + mem_info = {} + try: + if os.path.exists("/proc/meminfo"): + with open("/proc/meminfo", "r", encoding="utf-8") as f: + for line in f: + if line.startswith("MemTotal:"): + mem_info["total_mb"] = int(line.strip().split()[1]) // 1024 + elif line.startswith("MemFree:"): + mem_info["free_mb"] = int(line.strip().split()[1]) // 1024 + elif line.startswith("MemAvailable:"): + mem_info["available_mb"] = int(line.strip().split()[1]) // 1024 + if "total_mb" in mem_info and "free_mb" in mem_info: + mem_info["used_mb"] = mem_info["total_mb"] - mem_info["free_mb"] + mem_info["used_percent"] = round(mem_info["used_mb"] / mem_info["total_mb"] * 100, 2) + return mem_info + except Exception as e: + logger.error(self._get_msg(f"获取内存信息失败:{str(e)}", f"Failed to get memory info: {str(e)}")) + return {"error": self._get_msg("获取内存信息失败", "Failed to get memory information")} + + def get_disk_info(self) -> List[Dict]: + """获取磁盘分区/使用率信息(优化命令检查)""" + disk_list = [] + # 直接指定 df 绝对路径(OpenEuler 99% 情况下的路径) + df_cmd = self._find_cmd_absolute_path("df") + if not df_cmd: + logger.error(self._get_msg("未找到df命令,无法获取磁盘信息", "df command not found, cannot get disk info")) + return [{"error": self._get_msg("未找到df命令,请安装coreutils包", "df command not found, please install coreutils")}] + + try: + # 用绝对路径执行 df 命令(避免 PATH 问题) + disk_output = self._run_cmd_safe([ + df_cmd, "-h", "-T", + "--output=source,fstype,size,used,avail,pcent,mountpoint" + ]) + if not disk_output: + return [{"error": self._get_msg("df命令执行失败,无法获取磁盘信息", "df command execution failed")}] + + # 解析输出 + lines = [line.strip() for line in disk_output.splitlines() if line.strip()] + if len(lines) < 2: + logger.warning(self._get_msg("df未返回有效磁盘信息", "df returned no valid disk info")) + return [{"error": self._get_msg("df未返回有效磁盘信息", "df returned no valid disk information")}] + + for line in lines[1:]: + parts = re.split(r"\s+", line, maxsplit=6) + if len(parts) != 7: + logger.debug(f"跳过无效行:{line}") + continue + used_percent = parts[5].strip("%") if "%" in parts[5] else parts[5] + disk_list.append({ + "device": parts[0], + "fstype": parts[1], + "size": parts[2], + "used": parts[3], + "avail": parts[4], + "used_percent": used_percent, + "mountpoint": parts[6] + }) + return disk_list if disk_list else [{"error": self._get_msg("未检测到有效磁盘分区", "No valid disk partitions detected")}] + except Exception as e: + logger.error(self._get_msg(f"获取磁盘信息失败:{str(e)}", f"Failed to get disk info: {str(e)}")) + return [{"error": self._get_msg("获取磁盘信息失败", "Failed to get disk information")}] + + def get_gpu_info(self) -> List[Dict]: + """获取显卡状态(修复nvidia-smi命令不存在问题)""" + gpu_list = [] + # 检查nvidia-smi是否存在 + nvidia_smi_path = self._find_cmd_absolute_path("nvidia-smi") + if not nvidia_smi_path: + logger.warning(self._get_msg("未检测到nvidia-smi命令,跳过GPU信息采集", "nvidia-smi not found, skip GPU info collection")) + return [{"note": self._get_msg("未检测到nvidia-smi命令(无NVIDIA显卡或未安装驱动)", "nvidia-smi not found (no NVIDIA GPU or driver)")}] + + # 执行命令 + gpu_output = self._run_cmd_safe([ + nvidia_smi_path, + "--query-gpu=name,memory.total,memory.used,utilization.gpu", + "--format=csv,noheader,nounits" + ]) + if gpu_output: + for line in gpu_output.splitlines(): + if not line.strip(): + continue + parts = line.strip().split(", ") + if len(parts) != 4: + logger.debug(f"跳过无效GPU行:{line}") + continue + name, mem_total, mem_used, util = parts + gpu_list.append({ + "name": name.strip(), + "memory_total_mb": int(mem_total) if mem_total.isdigit() else 0, + "memory_used_mb": int(mem_used) if mem_used.isdigit() else 0, + "utilization_percent": int(util) if util.isdigit() else 0 + }) + return gpu_list if gpu_list else [{"note": self._get_msg("未检测到可用GPU", "No available GPU detected")}] + else: + return [{"error": self._get_msg("GPU信息采集失败", "Failed to collect GPU information")}] + + def get_net_info(self) -> List[Dict]: + """获取网卡/IP信息(修复ip命令不存在问题)""" + # 检查ip命令是否存在 + ip_cmd_path = self._find_cmd_absolute_path("ip") + if not ip_cmd_path: + logger.warning(self._get_msg("未找到ip命令,尝试通过/proc/net/dev获取网卡信息", "ip command not found, try to get net info from /proc/net/dev")) + # 降级方案:通过/proc/net/dev获取网卡名称(无IP信息) + net_list = [] + try: + with open("/proc/net/dev", "r", encoding="utf-8") as f: + lines = [line.strip() for line in f.readlines() if line.strip() and not line.startswith("Inter-|") and not line.startswith(" face |")] + for line in lines: + if ":" in line: + iface = line.split(":")[0].strip() + net_list.append({ + "interface": iface, + "ips": [{"note": self._get_msg("ip命令不存在,无法获取IP信息", "ip command not found, cannot get IP info")}] + }) + return net_list + except Exception as e: + logger.error(self._get_msg(f"获取网络信息失败:{str(e)}", f"Failed to get net info: {str(e)}")) + return [{"error": self._get_msg("获取网络信息失败(ip命令不存在)", "Failed to get network information (ip command not found)")}] + + # ip命令存在,正常采集 + net_output = self._run_cmd_safe([ip_cmd_path, "addr", "show"]) + if not net_output: + return [{"error": self._get_msg("ip命令执行失败,无法获取网络信息", "ip command execution failed")}] + + net_list = [] + current_iface = None + for line in net_output.splitlines(): + line = line.strip() + if line.startswith(("1:", "2:", "3:", "4:", "5:")): # 网卡名称行 + current_iface = line.split(":")[1].strip() + net_list.append({"interface": current_iface, "ips": []}) + elif current_iface and line.startswith("inet "): # IPv4地址 + ip_part = line.split()[1] + net_list[-1]["ips"].append({"type": "ipv4", "address": ip_part}) + elif current_iface and line.startswith("inet6 "): # IPv6地址 + ip_part = line.split()[1] + net_list[-1]["ips"].append({"type": "ipv6", "address": ip_part}) + return net_list if net_list else [{"error": self._get_msg("未检测到有效网卡信息", "No valid network interface detected")}] + + # ---------- 安全类信息 ---------- + def get_selinux_info(self) -> Dict: + """获取SELinux状态(修复getenforce命令不存在问题)""" + # 检查getenforce命令是否存在 + getenforce_path = self._find_cmd_absolute_path("getenforce") + if getenforce_path: + selinux_output = self._run_cmd_safe([getenforce_path]) + if selinux_output: + return {"status": selinux_output.strip()} + + # 命令不存在时,通过配置文件判断 + logger.warning(self._get_msg("未找到getenforce命令,尝试通过配置文件判断SELinux状态", "getenforce not found, try to judge SELinux status from config")) + try: + with open("/etc/selinux/config", "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line.startswith("SELINUX=") and not line.startswith("#"): + selinux_mode = line.split("=")[1].strip() + return { + "status": selinux_mode.upper(), + "note": self._get_msg("基于/etc/selinux/config配置判断", "Judged from /etc/selinux/config") + } + return {"status": self._get_msg("未知", "Unknown"), "note": self._get_msg("未找到SELinux配置", "SELinux config not found")} + except Exception as e: + logger.error(self._get_msg(f"获取SELinux状态失败:{str(e)}", f"Failed to get SELinux status: {str(e)}")) + return {"error": self._get_msg("获取SELinux状态失败", "Failed to get SELinux status")} + + def get_firewall_info(self) -> Dict: + """获取防火墙规则(修复firewalld相关命令不存在问题)""" + firewall_info = {} + # 检查systemctl命令是否存在 + systemctl_path = self._find_cmd_absolute_path("systemctl") + if not systemctl_path: + logger.warning(self._get_msg("未找到systemctl命令,无法检查防火墙状态", "systemctl not found, cannot check firewall status")) + firewall_info["status"] = self._get_msg("未知(无systemctl)", "Unknown (systemctl not found)") + return firewall_info + + # 检查firewalld状态 + status_output = self._run_cmd_safe([systemctl_path, "is-active", "firewalld"]) + if status_output == "active": + firewall_info["status"] = "active" + # 检查firewall-cmd命令是否存在 + firewall_cmd_path = self._find_cmd_absolute_path("firewall-cmd") + if firewall_cmd_path: + ports_output = self._run_cmd_safe([firewall_cmd_path, "--list-ports"]) + firewall_info["open_ports"] = ports_output.strip().split() if ports_output and ports_output.strip() else [] + else: + firewall_info["open_ports"] = [self._get_msg("firewall-cmd命令不存在,无法获取开放端口", "firewall-cmd not found, cannot get open ports")] + elif status_output == "inactive": + firewall_info["status"] = "inactive" + else: + # 未安装firewalld或命令执行失败 + firewall_info["status"] = self._get_msg("未安装firewalld或状态未知", "firewalld not installed or status unknown") + return firewall_info + + # ---------- 批量采集多类型信息 ---------- + def collect_batch(self, info_types: List[InfoTypeEnum]) -> Dict[str, Dict]: + """ + 批量采集多类信息 + :param info_types: InfoTypeEnum列表 + :return: 结构化结果(key为信息类型字符串,value为对应采集结果) + """ + batch_result = {} + info_type_map = { + InfoTypeEnum.OS: self.get_os_info, + InfoTypeEnum.LOAD: self.get_load_info, + InfoTypeEnum.UPTIME: self.get_uptime_info, + InfoTypeEnum.CPU: self.get_cpu_info, + InfoTypeEnum.MEM: self.get_mem_info, + InfoTypeEnum.DISK: self.get_disk_info, + InfoTypeEnum.GPU: self.get_gpu_info, + InfoTypeEnum.NET: self.get_net_info, + InfoTypeEnum.SELINUX: self.get_selinux_info, + InfoTypeEnum.FIREWALL: self.get_firewall_info + } + for info_type in info_types: + try: + batch_result[info_type.value] = info_type_map[info_type]() + except Exception as e: + logger.error(self._get_msg(f"采集{info_type.value}信息失败:{str(e)}", f"Failed to collect {info_type.value} info: {str(e)}")) + batch_result[info_type.value] = {"error": self._get_msg(f"采集{info_type.value}信息失败", f"Failed to collect {info_type.value} information")} + return batch_result \ No newline at end of file -- Gitee From 14f998f848339f2a7ebf3719d78d0f5f5fae3ab4 Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 11:01:55 +0800 Subject: [PATCH 06/10] =?UTF-8?q?mcp=5Fcenter=20fix=EF=BC=8C=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E3=80=82=E7=B3=BB=E7=BB=9F=E4=BF=A1=E6=81=AFmcp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py | 3 ++- .../mcp_tools/base_tools/sys_info_tool/base.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py index b4cbfd03d4..9e2c98c72a 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py @@ -4,7 +4,8 @@ import shutil from enum import Enum from typing import Dict, List from pydantic import Field -from servers.oe_cli_mcp_server.config.base_config_loader import BaseConfig, LanguageEnum + +from config.public.base_config_loader import BaseConfig, LanguageEnum # 初始化日志 logger = logging.getLogger("file_tool") diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py index ee4c19ab24..323373ff2d 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py @@ -6,7 +6,7 @@ import re from enum import Enum from typing import Dict, List, Optional -from servers.oe_cli_mcp_server.config.base_config_loader import BaseConfig, LanguageEnum +from config.public.base_config_loader import LanguageEnum, BaseConfig # 初始化日志 logger = logging.getLogger("sys_info_tool") -- Gitee From 2f1b2f4475e82a19ecc3f5b2ce21b1f11b42f1eb Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 11:05:41 +0800 Subject: [PATCH 07/10] =?UTF-8?q?mcp=5Fcenter=20oe-cli-mcp-server=20fix?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=AE=A1=E7=90=86mcp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py index cc456412e6..aa627d95f6 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py @@ -1,6 +1,6 @@ from typing import Dict, Optional from pydantic import Field -from servers.oe_cli_mcp_server.config.base_config_loader import LanguageEnum +from config.public.base_config_loader import LanguageEnum, BaseConfig from servers.oe_cli_mcp_server.mcp_tools.base_tools.file_tool.base import ( init_result_dict, FileManager, -- Gitee From 58948b80ab4b12c83caadb05ea49073599fdded8 Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 11:22:07 +0800 Subject: [PATCH 08/10] =?UTF-8?q?mcp=5Fcenter=20fix=EF=BC=8C=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E4=BF=A1=E6=81=AFmcp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mcp_tools/base_tools/sys_info_tool/base.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py index 323373ff2d..15fb85dd53 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py @@ -115,15 +115,6 @@ class SystemInfoCollector: def _find_cmd_absolute_path(self, cmd: str) -> Optional[str]: """辅助函数:查找命令的绝对路径(兼容特殊情况)""" - try: - # 用 which 命令查找(若 which 存在) - which_result = self._run_cmd_safe(["which", cmd]) - if which_result: - return which_result.strip() - except Exception: - pass - - # which 不存在时,遍历常见路径 common_paths = ["/usr/bin", "/bin", "/usr/sbin", "/sbin", "/usr/local/bin"] for path in common_paths: cmd_path = os.path.join(path, cmd) @@ -237,10 +228,8 @@ class SystemInfoCollector: try: # 用绝对路径执行 df 命令(避免 PATH 问题) - disk_output = self._run_cmd_safe([ - df_cmd, "-h", "-T", - "--output=source,fstype,size,used,avail,pcent,mountpoint" - ]) + disk_output = self._run_cmd_safe([df_cmd, "-h", "-T"]) + if not disk_output: return [{"error": self._get_msg("df命令执行失败,无法获取磁盘信息", "df command execution failed")}] -- Gitee From ab7004dd3bed382969fdd6ff5b568b575f179619 Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 11:24:49 +0800 Subject: [PATCH 09/10] =?UTF-8?q?mcp=5Fcenter=20oe-cli-mcp-server=20?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=8F=90=E4=BA=A4mcp?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../base_tools/sys_info_tool/base.py | 420 ------------------ .../base_tools/sys_info_tool/config.json | 8 - .../base_tools/sys_info_tool/deps.toml | 0 .../base_tools/sys_info_tool/tool.py | 50 --- 4 files changed, 478 deletions(-) delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/deps.toml delete mode 100644 mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py deleted file mode 100644 index 15fb85dd53..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/base.py +++ /dev/null @@ -1,420 +0,0 @@ -import logging -import os -import platform -import subprocess -import re -from enum import Enum -from typing import Dict, List, Optional - -from config.public.base_config_loader import LanguageEnum, BaseConfig - -# 初始化日志 -logger = logging.getLogger("sys_info_tool") -logger.setLevel(logging.INFO) - -# ========== 枚举类定义(不变) ========== -class InfoTypeEnum(str, Enum): - """信息类型枚举(对应需求的3大类)""" - # 系统类 - OS = "os" # 版本/内核 - LOAD = "load" # 系统负载 - UPTIME = "uptime" # 运行时间 - # 硬件类 - CPU = "cpu" # CPU信息 - MEM = "mem" # 内存占用 - DISK = "disk" # 磁盘分区/使用率 - GPU = "gpu" # 显卡状态 - NET = "net" # 网卡/IP - # 安全类 - SELINUX = "selinux"# SELinux状态 - FIREWALL = "firewall" # 防火墙规则 - -# ========== 通用工具函数(不变) ========== -def get_language() -> LanguageEnum: - """获取全局语言配置""" - return BaseConfig().get_config().public_config.language - -def is_zh() -> bool: - """判断是否为中文环境""" - return get_language() == LanguageEnum.ZH - -def init_result_dict( - target_host: str = "127.0.0.1", - result_type: str = "dict" -) -> Dict: - """初始化返回结果字典(result为嵌套字典,key为信息类型)""" - return { - "success": False, - "message": "", - "result": {}, # 格式:{"cpu": {...}, "mem": {...}, ...} - "target": target_host, - "info_types": [] # 记录已采集的信息类型列表 - } - -def parse_info_types(info_type_list: List[str]) -> List[InfoTypeEnum]: - """解析字符串列表为InfoTypeEnum列表(适配大模型输入)""" - if not info_type_list: - valid_values = [e.value for e in InfoTypeEnum] - zh_msg = f"信息类型列表不能为空,可选枚举值:{','.join(valid_values)}" - en_msg = f"Info type list cannot be empty, optional enum values: {','.join(valid_values)}" - raise ValueError(zh_msg if is_zh() else en_msg) - - parsed_enums = [] - valid_values = [e.value for e in InfoTypeEnum] - for info_type_str in info_type_list: - try: - parsed_enums.append(InfoTypeEnum(info_type_str.strip().lower())) - except ValueError: - zh_msg = f"无效的信息类型:{info_type_str},可选枚举值:{','.join(valid_values)}" - en_msg = f"Invalid info type: {info_type_str}, optional enum values: {','.join(valid_values)}" - raise ValueError(zh_msg if is_zh() else en_msg) - return parsed_enums - -# ========== 系统信息采集核心类(修复命令不存在问题) ========== -class SystemInfoCollector: - """系统/硬件/安全信息采集类(优先Python原生,必要时调用系统命令)""" - def __init__(self): - self.lang = get_language() - - def _get_msg(self, zh_msg: str, en_msg: str) -> str: - """多语言提示""" - return zh_msg if self.lang == LanguageEnum.ZH else en_msg - - def _run_cmd_safe(self, cmd: List[str]) -> Optional[str]: - """安全执行系统命令(命令不存在/执行失败返回None,不抛出异常)""" - # 检查命令是否存在(获取绝对路径) - cmd_name = cmd[0] - cmd_path = self._find_cmd_absolute_path(cmd_name) - if not cmd_path: - logger.warning(self._get_msg(f"命令不存在:{cmd_name}", f"Command not found: {cmd_name}")) - return None - - # 替换为绝对路径执行 - cmd[0] = cmd_path - try: - logger.info(f"执行系统命令:{' '.join(cmd)}") - result = subprocess.run( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - check=True, - timeout=10 # 超时保护:10秒 - ) - return result.stdout.strip() - except subprocess.CalledProcessError as e: - error_msg = e.stderr.strip() or e.stdout.strip() - logger.error(self._get_msg(f"命令执行失败:{cmd_name},错误:{error_msg}", f"Command failed: {cmd_name}, error: {error_msg}")) - return None - except subprocess.TimeoutExpired: - logger.error(self._get_msg(f"命令执行超时:{cmd_name}", f"Command timeout: {cmd_name}")) - return None - except Exception as e: - logger.error(self._get_msg(f"系统调用异常:{cmd_name},错误:{str(e)}", f"System call exception: {cmd_name}, error: {str(e)}")) - return None - - def _find_cmd_absolute_path(self, cmd: str) -> Optional[str]: - """辅助函数:查找命令的绝对路径(兼容特殊情况)""" - common_paths = ["/usr/bin", "/bin", "/usr/sbin", "/sbin", "/usr/local/bin"] - for path in common_paths: - cmd_path = os.path.join(path, cmd) - if os.path.exists(cmd_path) and os.access(cmd_path, os.X_OK): - return cmd_path - return None - - # ---------- 系统类信息 ---------- - def get_os_info(self) -> Dict: - """获取系统版本/内核信息(无命令依赖,不变)""" - os_info = { - "system": platform.system(), - "release": platform.release(), - "version": platform.version(), - "kernel": platform.uname().release, - "architecture": platform.machine() - } - # 补充OpenEuler版本(通过/etc/os-release) - if os.path.exists("/etc/os-release"): - try: - with open("/etc/os-release", "r", encoding="utf-8") as f: - for line in f: - if line.startswith("PRETTY_NAME="): - os_info["pretty_name"] = line.strip().split("=")[1].strip('"') - elif line.startswith("VERSION_ID="): - os_info["version_id"] = line.strip().split("=")[1].strip('"') - except Exception as e: - logger.error(self._get_msg(f"读取os-release失败:{str(e)}", f"Failed to read os-release: {str(e)}")) - return os_info - - def get_load_info(self) -> Dict: - """获取系统负载(无命令依赖,不变)""" - try: - load_avg = os.getloadavg() - return { - "1min": load_avg[0], - "5min": load_avg[1], - "15min": load_avg[2], - "cpu_count": os.cpu_count() or 0 - } - except Exception as e: - logger.error(self._get_msg(f"获取系统负载失败:{str(e)}", f"Failed to get load avg: {str(e)}")) - return {"error": self._get_msg("获取系统负载失败", "Failed to get load average")} - - def get_uptime_info(self) -> Dict: - """获取系统运行时间(修复uptime命令不存在问题)""" - uptime_output = self._run_cmd_safe(["uptime", "-p"]) - if uptime_output: - return {"uptime": uptime_output} - else: - # 命令不存在时,通过/proc/uptime计算(Python原生方式) - try: - with open("/proc/uptime", "r", encoding="utf-8") as f: - total_seconds = float(f.readline().split()[0]) - hours = int(total_seconds // 3600) - minutes = int((total_seconds % 3600) // 60) - uptime_str = self._get_msg(f"运行 {hours} 小时 {minutes} 分钟", f"up {hours} hours, {minutes} minutes") - return {"uptime": uptime_str, "note": self._get_msg("基于/proc/uptime计算", "Calculated from /proc/uptime")} - except Exception as e: - logger.error(self._get_msg(f"获取运行时间失败:{str(e)}", f"Failed to get uptime: {str(e)}")) - return {"error": self._get_msg("获取运行时间失败", "Failed to get uptime")} - - # ---------- 硬件类信息 ---------- - def get_cpu_info(self) -> Dict: - """获取CPU型号/核心信息(无命令依赖,不变)""" - cpu_info = {} - try: - # 通过/proc/cpuinfo获取 - if os.path.exists("/proc/cpuinfo"): - with open("/proc/cpuinfo", "r", encoding="utf-8") as f: - for line in f: - if line.startswith("model name"): - cpu_info["model"] = line.strip().split(":")[1].strip() - break - # 获取CPU核心数 - cpu_info["physical_cores"] = os.cpu_count() or 0 - return cpu_info - except Exception as e: - logger.error(self._get_msg(f"获取CPU信息失败:{str(e)}", f"Failed to get CPU info: {str(e)}")) - return {"error": self._get_msg("获取CPU信息失败", "Failed to get CPU information")} - - def get_mem_info(self) -> Dict: - """获取内存占用信息(无命令依赖,不变)""" - mem_info = {} - try: - if os.path.exists("/proc/meminfo"): - with open("/proc/meminfo", "r", encoding="utf-8") as f: - for line in f: - if line.startswith("MemTotal:"): - mem_info["total_mb"] = int(line.strip().split()[1]) // 1024 - elif line.startswith("MemFree:"): - mem_info["free_mb"] = int(line.strip().split()[1]) // 1024 - elif line.startswith("MemAvailable:"): - mem_info["available_mb"] = int(line.strip().split()[1]) // 1024 - if "total_mb" in mem_info and "free_mb" in mem_info: - mem_info["used_mb"] = mem_info["total_mb"] - mem_info["free_mb"] - mem_info["used_percent"] = round(mem_info["used_mb"] / mem_info["total_mb"] * 100, 2) - return mem_info - except Exception as e: - logger.error(self._get_msg(f"获取内存信息失败:{str(e)}", f"Failed to get memory info: {str(e)}")) - return {"error": self._get_msg("获取内存信息失败", "Failed to get memory information")} - - def get_disk_info(self) -> List[Dict]: - """获取磁盘分区/使用率信息(优化命令检查)""" - disk_list = [] - # 直接指定 df 绝对路径(OpenEuler 99% 情况下的路径) - df_cmd = self._find_cmd_absolute_path("df") - if not df_cmd: - logger.error(self._get_msg("未找到df命令,无法获取磁盘信息", "df command not found, cannot get disk info")) - return [{"error": self._get_msg("未找到df命令,请安装coreutils包", "df command not found, please install coreutils")}] - - try: - # 用绝对路径执行 df 命令(避免 PATH 问题) - disk_output = self._run_cmd_safe([df_cmd, "-h", "-T"]) - - if not disk_output: - return [{"error": self._get_msg("df命令执行失败,无法获取磁盘信息", "df command execution failed")}] - - # 解析输出 - lines = [line.strip() for line in disk_output.splitlines() if line.strip()] - if len(lines) < 2: - logger.warning(self._get_msg("df未返回有效磁盘信息", "df returned no valid disk info")) - return [{"error": self._get_msg("df未返回有效磁盘信息", "df returned no valid disk information")}] - - for line in lines[1:]: - parts = re.split(r"\s+", line, maxsplit=6) - if len(parts) != 7: - logger.debug(f"跳过无效行:{line}") - continue - used_percent = parts[5].strip("%") if "%" in parts[5] else parts[5] - disk_list.append({ - "device": parts[0], - "fstype": parts[1], - "size": parts[2], - "used": parts[3], - "avail": parts[4], - "used_percent": used_percent, - "mountpoint": parts[6] - }) - return disk_list if disk_list else [{"error": self._get_msg("未检测到有效磁盘分区", "No valid disk partitions detected")}] - except Exception as e: - logger.error(self._get_msg(f"获取磁盘信息失败:{str(e)}", f"Failed to get disk info: {str(e)}")) - return [{"error": self._get_msg("获取磁盘信息失败", "Failed to get disk information")}] - - def get_gpu_info(self) -> List[Dict]: - """获取显卡状态(修复nvidia-smi命令不存在问题)""" - gpu_list = [] - # 检查nvidia-smi是否存在 - nvidia_smi_path = self._find_cmd_absolute_path("nvidia-smi") - if not nvidia_smi_path: - logger.warning(self._get_msg("未检测到nvidia-smi命令,跳过GPU信息采集", "nvidia-smi not found, skip GPU info collection")) - return [{"note": self._get_msg("未检测到nvidia-smi命令(无NVIDIA显卡或未安装驱动)", "nvidia-smi not found (no NVIDIA GPU or driver)")}] - - # 执行命令 - gpu_output = self._run_cmd_safe([ - nvidia_smi_path, - "--query-gpu=name,memory.total,memory.used,utilization.gpu", - "--format=csv,noheader,nounits" - ]) - if gpu_output: - for line in gpu_output.splitlines(): - if not line.strip(): - continue - parts = line.strip().split(", ") - if len(parts) != 4: - logger.debug(f"跳过无效GPU行:{line}") - continue - name, mem_total, mem_used, util = parts - gpu_list.append({ - "name": name.strip(), - "memory_total_mb": int(mem_total) if mem_total.isdigit() else 0, - "memory_used_mb": int(mem_used) if mem_used.isdigit() else 0, - "utilization_percent": int(util) if util.isdigit() else 0 - }) - return gpu_list if gpu_list else [{"note": self._get_msg("未检测到可用GPU", "No available GPU detected")}] - else: - return [{"error": self._get_msg("GPU信息采集失败", "Failed to collect GPU information")}] - - def get_net_info(self) -> List[Dict]: - """获取网卡/IP信息(修复ip命令不存在问题)""" - # 检查ip命令是否存在 - ip_cmd_path = self._find_cmd_absolute_path("ip") - if not ip_cmd_path: - logger.warning(self._get_msg("未找到ip命令,尝试通过/proc/net/dev获取网卡信息", "ip command not found, try to get net info from /proc/net/dev")) - # 降级方案:通过/proc/net/dev获取网卡名称(无IP信息) - net_list = [] - try: - with open("/proc/net/dev", "r", encoding="utf-8") as f: - lines = [line.strip() for line in f.readlines() if line.strip() and not line.startswith("Inter-|") and not line.startswith(" face |")] - for line in lines: - if ":" in line: - iface = line.split(":")[0].strip() - net_list.append({ - "interface": iface, - "ips": [{"note": self._get_msg("ip命令不存在,无法获取IP信息", "ip command not found, cannot get IP info")}] - }) - return net_list - except Exception as e: - logger.error(self._get_msg(f"获取网络信息失败:{str(e)}", f"Failed to get net info: {str(e)}")) - return [{"error": self._get_msg("获取网络信息失败(ip命令不存在)", "Failed to get network information (ip command not found)")}] - - # ip命令存在,正常采集 - net_output = self._run_cmd_safe([ip_cmd_path, "addr", "show"]) - if not net_output: - return [{"error": self._get_msg("ip命令执行失败,无法获取网络信息", "ip command execution failed")}] - - net_list = [] - current_iface = None - for line in net_output.splitlines(): - line = line.strip() - if line.startswith(("1:", "2:", "3:", "4:", "5:")): # 网卡名称行 - current_iface = line.split(":")[1].strip() - net_list.append({"interface": current_iface, "ips": []}) - elif current_iface and line.startswith("inet "): # IPv4地址 - ip_part = line.split()[1] - net_list[-1]["ips"].append({"type": "ipv4", "address": ip_part}) - elif current_iface and line.startswith("inet6 "): # IPv6地址 - ip_part = line.split()[1] - net_list[-1]["ips"].append({"type": "ipv6", "address": ip_part}) - return net_list if net_list else [{"error": self._get_msg("未检测到有效网卡信息", "No valid network interface detected")}] - - # ---------- 安全类信息 ---------- - def get_selinux_info(self) -> Dict: - """获取SELinux状态(修复getenforce命令不存在问题)""" - # 检查getenforce命令是否存在 - getenforce_path = self._find_cmd_absolute_path("getenforce") - if getenforce_path: - selinux_output = self._run_cmd_safe([getenforce_path]) - if selinux_output: - return {"status": selinux_output.strip()} - - # 命令不存在时,通过配置文件判断 - logger.warning(self._get_msg("未找到getenforce命令,尝试通过配置文件判断SELinux状态", "getenforce not found, try to judge SELinux status from config")) - try: - with open("/etc/selinux/config", "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line.startswith("SELINUX=") and not line.startswith("#"): - selinux_mode = line.split("=")[1].strip() - return { - "status": selinux_mode.upper(), - "note": self._get_msg("基于/etc/selinux/config配置判断", "Judged from /etc/selinux/config") - } - return {"status": self._get_msg("未知", "Unknown"), "note": self._get_msg("未找到SELinux配置", "SELinux config not found")} - except Exception as e: - logger.error(self._get_msg(f"获取SELinux状态失败:{str(e)}", f"Failed to get SELinux status: {str(e)}")) - return {"error": self._get_msg("获取SELinux状态失败", "Failed to get SELinux status")} - - def get_firewall_info(self) -> Dict: - """获取防火墙规则(修复firewalld相关命令不存在问题)""" - firewall_info = {} - # 检查systemctl命令是否存在 - systemctl_path = self._find_cmd_absolute_path("systemctl") - if not systemctl_path: - logger.warning(self._get_msg("未找到systemctl命令,无法检查防火墙状态", "systemctl not found, cannot check firewall status")) - firewall_info["status"] = self._get_msg("未知(无systemctl)", "Unknown (systemctl not found)") - return firewall_info - - # 检查firewalld状态 - status_output = self._run_cmd_safe([systemctl_path, "is-active", "firewalld"]) - if status_output == "active": - firewall_info["status"] = "active" - # 检查firewall-cmd命令是否存在 - firewall_cmd_path = self._find_cmd_absolute_path("firewall-cmd") - if firewall_cmd_path: - ports_output = self._run_cmd_safe([firewall_cmd_path, "--list-ports"]) - firewall_info["open_ports"] = ports_output.strip().split() if ports_output and ports_output.strip() else [] - else: - firewall_info["open_ports"] = [self._get_msg("firewall-cmd命令不存在,无法获取开放端口", "firewall-cmd not found, cannot get open ports")] - elif status_output == "inactive": - firewall_info["status"] = "inactive" - else: - # 未安装firewalld或命令执行失败 - firewall_info["status"] = self._get_msg("未安装firewalld或状态未知", "firewalld not installed or status unknown") - return firewall_info - - # ---------- 批量采集多类型信息 ---------- - def collect_batch(self, info_types: List[InfoTypeEnum]) -> Dict[str, Dict]: - """ - 批量采集多类信息 - :param info_types: InfoTypeEnum列表 - :return: 结构化结果(key为信息类型字符串,value为对应采集结果) - """ - batch_result = {} - info_type_map = { - InfoTypeEnum.OS: self.get_os_info, - InfoTypeEnum.LOAD: self.get_load_info, - InfoTypeEnum.UPTIME: self.get_uptime_info, - InfoTypeEnum.CPU: self.get_cpu_info, - InfoTypeEnum.MEM: self.get_mem_info, - InfoTypeEnum.DISK: self.get_disk_info, - InfoTypeEnum.GPU: self.get_gpu_info, - InfoTypeEnum.NET: self.get_net_info, - InfoTypeEnum.SELINUX: self.get_selinux_info, - InfoTypeEnum.FIREWALL: self.get_firewall_info - } - for info_type in info_types: - try: - batch_result[info_type.value] = info_type_map[info_type]() - except Exception as e: - logger.error(self._get_msg(f"采集{info_type.value}信息失败:{str(e)}", f"Failed to collect {info_type.value} info: {str(e)}")) - batch_result[info_type.value] = {"error": self._get_msg(f"采集{info_type.value}信息失败", f"Failed to collect {info_type.value} information")} - return batch_result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json deleted file mode 100644 index 56df6d526b..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/config.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "tools": { - "sys_info_tool": { - "zh": "【系统/硬件/安全信息采集工具】\n功能:批量采集OpenEuler系统的系统、硬件、安全类信息(纯Python+系统命令安全调用)\n\n【核心提示】\n1. 信息类型(info_types)为枚举值列表,支持多选,可选枚举值如下:\n - 系统类:os(版本/内核)、load(负载)、uptime(运行时间)\n - 硬件类:cpu(型号/核心)、mem(内存占用)、disk(分区/使用率)、gpu(显卡状态)、net(网卡/IP)\n - 安全类:selinux(状态)、firewall(防火墙规则)\n2. 输入格式示例:[\"cpu\", \"mem\", \"disk\"](必须为列表,单个类型也需用列表包裹,如[\"os\"]);\n3. 无额外参数,仅需传入info_types列表即可批量采集对应信息;\n4. 语言配置:自动读取系统全局配置(中文/英文),不提供外部设置接口;\n5. 依赖说明:GPU信息采集需安装nvidia-smi(仅支持NVIDIA显卡),防火墙信息需安装firewalld。\n\n【枚举类定义(必须遵守)】\n- InfoTypeEnum(信息类型枚举):os / load / uptime / cpu / mem / disk / gpu / net / selinux / firewall\n\n【参数详情】\n- info_types:信息类型列表(必填,枚举值见上方,支持多选,格式为列表)\n\n【返回值说明】\n- success:采集结果(True=成功,False=失败)\n- message:采集信息/错误提示(根据全局语言配置自动切换)\n- result:采集结果(结构化嵌套字典,key为信息类型,value为对应采集数据)\n- target:执行目标(固定为127.0.0.1,本地执行)\n- info_types:已采集的信息类型列表", - "en": "【System/Hardware/Security Info Collector】\nFunction: Batch collect system, hardware, security info of OpenEuler (Python native + safe system command call)\n\n【Core Guidelines】\n1. Info types (info_types) is an enum value list, supports multiple selection, optional enum values:\n - System: os (version/kernel), load (system load), uptime (system uptime)\n - Hardware: cpu (model/core), mem (memory usage), disk (partition/usage), gpu (GPU status), net (network card/IP)\n - Security: selinux (SELinux status), firewall (firewall rules)\n2. Input format example: [\"cpu\", \"mem\", \"disk\"] (must be a list, single type also needs to be wrapped in a list, e.g. [\"os\"]);\n3. No additional parameters, just pass info_types list to batch collect corresponding info;\n4. Language configuration: Automatically read global system configuration (Chinese/English), no external setting interface;\n5. Dependency note: GPU info requires nvidia-smi (NVIDIA GPU only), firewall info requires firewalld.\n\n【Enum Class Definition (Must Follow)】\n- InfoTypeEnum (Info Type Enum): os / load / uptime / cpu / mem / disk / gpu / net / selinux / firewall\n\n【Parameter Details】\n- info_types: Info type list (required, enum values see above, supports multiple selection, format is list)\n\n【Return Value Explanation】\n- success: Collection result (True=success, False=failure)\n- message: Collection info/error prompt (automatically switch based on global config)\n- result: Collected info (structured nested dict, key is info type, value is corresponding collected data)\n- target: Execution target (fixed as 127.0.0.1, local execution)\n- info_types: Collected info type list" - } - } -} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/deps.toml deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py deleted file mode 100644 index 626027ff1e..0000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/sys_info_tool/tool.py +++ /dev/null @@ -1,50 +0,0 @@ -from typing import Dict, List -from pydantic import Field - -from servers.oe_cli_mcp_server.mcp_tools.base_tools.sys_info_tool.base import ( - init_result_dict, - SystemInfoCollector, - InfoTypeEnum, - parse_info_types, - is_zh, - logger -) - -def sys_info_tool( - info_types: List[str] = Field( - ..., - description="信息类型列表(枚举值可选:os/load/uptime/cpu/mem/disk/gpu/net/selinux/firewall,支持多选)" - ) -) -> Dict: - """ - 系统/硬件/安全信息采集工具(支持批量采集多类信息) - 语言配置从全局BaseConfig读取,不提供外部接口 - """ - # 初始化结果字典 - result = init_result_dict() - collector = SystemInfoCollector() - use_zh = is_zh() - - # 解析信息类型列表(字符串列表→枚举列表) - try: - parsed_info_types = parse_info_types(info_types) - result["info_types"] = [t.value for t in parsed_info_types] # 记录原始输入的类型列表 - except ValueError as e: - result["message"] = str(e) - return result - - # 批量采集信息 - try: - batch_result = collector.collect_batch(parsed_info_types) - result["result"] = batch_result - result["success"] = True - - # 生成提示信息 - info_type_str = ",".join(result["info_types"]) - result["message"] = f"以下信息采集完成:{info_type_str}" if use_zh else f"Collected info types: {info_type_str}" - - except Exception as e: - result["message"] = f"信息采集失败:{str(e)}" if use_zh else f"Info collection failed: {str(e)}" - logger.error(f"System info batch collection error: {str(e)}") - - return result \ No newline at end of file -- Gitee From d5e17c6a49beadcf74d92dc9e8e8d83922f11eee Mon Sep 17 00:00:00 2001 From: cui-gaoleng <562344211@qq.com> Date: Mon, 8 Dec 2025 11:29:09 +0800 Subject: [PATCH 10/10] =?UTF-8?q?mcp=5Fcenter=20oe-cli-mcp-server=20?= =?UTF-8?q?=E6=B5=8B=E8=AF=95client=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../oe_cli_mcp_server/client/client.py | 90 ------------------- 1 file changed, 90 deletions(-) diff --git a/mcp_center/servers/oe_cli_mcp_server/client/client.py b/mcp_center/servers/oe_cli_mcp_server/client/client.py index 3aa4b6c476..8135527a14 100644 --- a/mcp_center/servers/oe_cli_mcp_server/client/client.py +++ b/mcp_center/servers/oe_cli_mcp_server/client/client.py @@ -204,96 +204,6 @@ async def main() -> None: }) print(result) - # ================================== - # 3. pkg_tool 测试用例(4个,修复无效枚举、参数) - # ================================== - print("\n" + "="*60) - print("8. pkg_tool - 列出已安装的所有 nginx 相关包") - print("="*60) - result = await client.call_tool("pkg_tool", { - "action": "list", - "filter_key": "nginx" - }) - print(result) - - print("\n" + "="*60) - print("9. pkg_tool - 查询 openssh-server 包详情(版本/依赖)") - print("="*60) - result = await client.call_tool("pkg_tool", { - "action": "info", - "pkg_name": "openssh-server" - }) - print(result) - - print("\n" + "="*60) - print("10. pkg_tool - 仅更新系统安全补丁(替代check-update)") - print("="*60) - # 工具无check-update枚举,替换为update-sec(安全更新) - result = await client.call_tool("pkg_tool", { - "action": "update-sec", - "yes": True - }) - print(result) - - print("\n" + "="*60) - print("11. pkg_tool - 清理 yum/dnf 包缓存(all类型)") - print("="*60) - result = await client.call_tool("pkg_tool", { - "action": "clean", - "cache_type": "all", - "yes": True - }) - print(result) - - # ================================== - # 4. proc_tool 测试用例(4个,修复无效枚举、参数) - # ================================== - print("\n" + "="*60) - print("12. proc_tool - 查找所有 systemd 相关进程") - print("="*60) - result = await client.call_tool("proc_tool", { - "proc_actions": ["find"], - "proc_name": "systemd" - }) - print(result) - - print("\n" + "="*60) - print("13. proc_tool - 查询 PID=1 进程(systemd)资源占用") - print("="*60) - result = await client.call_tool("proc_tool", { - "proc_actions": ["stat"], - "pid": 1 - }) - print(result) - - print("\n" + "="*60) - print("14. proc_tool - 列出所有进程(后续可筛选CPU占用前5)") - print("="*60) - # 工具无top枚举,用list获取所有进程(业务层可筛选) - result = await client.call_tool("proc_tool", { - "proc_actions": ["list"] - }) - print(result) - - print("\n" + "="*60) - print("15. proc_tool - 重启 sshd 服务(systemd服务)") - print("="*60) - # 工具无tree枚举,替换为restart实用场景 - result = await client.call_tool("proc_tool", { - "proc_actions": ["restart"], - "service_name": "ssh" # Ubuntu中sshd服务名为ssh - }) - print(result) - - # 清理临时文件 - print("\n" + "="*60) - print("16. file_tool - 删除临时测试文件") - print("="*60) - result = await client.call_tool("file_tool", { - "action": "delete", - "file_path": "/tmp/file_tool_test.txt" - }) - print(result) await client.stop() -- Gitee