diff --git a/mcp_center/run.sh b/mcp_center/run.sh index d77f47737bb8ae1254d885570a696158c65dc27b..ddf5936e00d311bd9de48ae86561da11ab975416 100755 --- a/mcp_center/run.sh +++ b/mcp_center/run.sh @@ -2,7 +2,7 @@ SERVICE_DIR="/usr/lib/euler-copilot-framework/mcp_center/service" -/usr/lib/euler-copilot-framework/mcp_center/servers/oe-cli-mcp-server/run.sh +/usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/run.sh for service_file in "$SERVICE_DIR"/*.service; do if [ -f "$service_file" ]; then diff --git a/mcp_center/servers/oe_cli_mcp_server/client/client.py b/mcp_center/servers/oe_cli_mcp_server/client/client.py index ed2b13ffcfa4a883eef1dcb49efdf8bb49694036..8135527a14b9d203bac9e87021955b89aa81d7ce 100644 --- a/mcp_center/servers/oe_cli_mcp_server/client/client.py +++ b/mcp_center/servers/oe_cli_mcp_server/client/client.py @@ -7,7 +7,7 @@ from contextlib import AsyncExitStack from typing import TYPE_CHECKING, Union from pydantic import BaseModel, Field from enum import Enum -from mcp import ClientSession, StdioServerParameters +from mcp import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import stdio_client @@ -120,12 +120,91 @@ class MCPClient: async def main() -> None: """测试MCP Client""" - url = "http://0.0.0.0:8002/sse" + url = "http://0.0.0.0:12555/sse" headers = {} client = MCPClient(url, headers) await client.init() - result = await client.call_tool("nvidia_smi_status", {}) + + # 初始化时多余的调用移除,保留下方有序测试用例 + # ================================== + # 1. sys_info_tool 测试用例(3个,修复无效枚举值) + # ================================== + print("\n" + "="*60) + print("1. sys_info_tool - 采集CPU+内存+磁盘+系统信息") + print("="*60) + result = await client.call_tool("sys_info_tool", {"info_types": ["cpu", "mem", "disk", "os"]}) + print(result) + + print("\n" + "="*60) + print("2. sys_info_tool - 单独采集网络信息(IP/网卡)") + print("="*60) + result = await client.call_tool("sys_info_tool", {"info_types": ["net"]}) + print(result) + + print("\n" + "="*60) + print("3. sys_info_tool - 采集安全信息(SELinux+防火墙)") + print("="*60) + result = await client.call_tool("sys_info_tool", {"info_types": ["selinux", "firewall"]}) + print(result) + + # 移除无效的 "kernel" 和 "all" 类型测试(工具不支持) + + # ================================== + # 2. file_tool 测试用例(4个,修复枚举值、参数名) + # ================================== + print("\n" + "="*60) + print("4. file_tool - 列出 /etc 目录下的 .conf 配置文件(过滤关键词)") + print("="*60) + # 用 ls + 后续过滤实现(工具无find枚举,参数名改为file_path) + result = await client.call_tool("file_tool", { + "action": "ls", + "file_path": "/etc", + "detail": False, + "encoding": "utf-8" + }) print(result) + + print("\n" + "="*60) + print("5. file_tool - 读取 /etc/os-release 文件内容(系统版本)") + print("="*60) + # action改为cat,参数名改为file_path + result = await client.call_tool("file_tool", { + "action": "cat", + "file_path": "/etc/os-release", + "encoding": "utf-8" + }) + print(result) + + print("\n" + "="*60) + print("6. file_tool - 新建临时文件并写入内容") + print("="*60) + # 工具无find/mtime枚举,替换为add+edit实用场景 + result = await client.call_tool("file_tool", { + "action": "add", + "file_path": "/tmp/file_tool_test.txt", + "overwrite": True + }) + print("新建文件结果:", result) + result = await client.call_tool("file_tool", { + "action": "edit", + "file_path": "/tmp/file_tool_test.txt", + "content": "file_tool测试内容\n系统版本:Ubuntu 22.04", + "encoding": "utf-8" + }) + print("写入内容结果:", result) + + print("\n" + "="*60) + print("7. file_tool - 修改 /tmp/file_tool_test.txt 权限为755") + print("="*60) + # action改为chmod,参数名改为file_path + result = await client.call_tool("file_tool", { + "action": "chmod", + "file_path": "/tmp/file_tool_test.txt", + "mode": "755" + }) + print(result) + + await client.stop() if __name__ == "__main__": diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py b/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py index 7d319b5add139444eb948709f4e7b1daa81560b1..dc0a65fc1b92bcd5da3b275938cfa600ee28beca 100755 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3 +#!/usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/venv/global/bin/python3 import logging import sys with open("/etc/systemd/system/mcp-server.service", "r") as f: diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py b/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py index 7269a006be10fa1b9f01f0244730faf56a1da577..4aff4122298d5845f421538584f1bb68fc52ef8a 100644 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_server/mcp_manager.py @@ -79,7 +79,7 @@ class McpServer(ToolManager): if os.path.basename(mcp_collection)[:-4] == ".zip": unzip_tool(mcp_collection) package_name = os.path.basename(mcp_collection)[:-4] - package_dir = os.path.join(get_project_root(), "mcp_tools/personal_tools", package_name) + package_dir = os.path.join(get_project_root(), "mcp_tools/personal_tools", package_name)#存放到个性化目录下 else: package_name = mcp_collection package_dir = self.get_package_path(package_name) diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py new file mode 100644 index 0000000000000000000000000000000000000000..9e2c98c72a432870fdf4940fb3ca7638a82eae0e --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/base.py @@ -0,0 +1,240 @@ +import logging +import os +import shutil +from enum import Enum +from typing import Dict, List +from pydantic import Field + +from config.public.base_config_loader import BaseConfig, LanguageEnum + +# 初始化日志 +logger = logging.getLogger("file_tool") +logger.setLevel(logging.INFO) +lang = BaseConfig().get_config().public_config.language +# ========== 枚举类定义(提升大模型识别性) ========== +class FileActionEnum(str, Enum): + """文件操作类型枚举""" + LS = "ls" # 查看文件/目录 + CAT = "cat" # 读取文件内容 + ADD = "add" # 新建空文件 + APPEND = "append" # 追加内容 + EDIT = "edit" # 覆盖内容 + RENAME = "rename" # 重命名 + CHMOD = "chmod" # 修改权限 + DELETE = "delete" # 删除文件/目录 + +class FileEncodingEnum(str, Enum): + """文件编码枚举""" + UTF8 = "utf-8" + GBK = "gbk" + GB2312 = "gb2312" + ASCII = "ascii" + +class CacheTypeEnum(str, Enum): + """缓存类型枚举(预留扩展)""" + ALL = "all" + PACKAGES = "packages" + METADATA = "metadata" + +# ========== 通用工具函数 ========== +def get_language() -> bool: + """获取语言配置:True=中文,False=英文""" + return BaseConfig().get_config().public_config.language == LanguageEnum.ZH + +def init_result_dict( + target_host: str = "127.0.0.1", + result_type: str = "list", + include_file_path: bool = True +) -> Dict: + """初始化返回结果字典(默认包含file_path字段)""" + result = { + "success": False, + "message": "", + "result": [] if result_type == "list" else "", + "target": target_host, + "file_path": "" + } + return result + +# ========== 文件管理核心类 ========== +class FileManager: + """文件管理核心类(Python原生实现,无shell依赖)""" + def __init__(self, lang: LanguageEnum = LanguageEnum.ZH): + self.is_zh = lang + + def _get_error_msg(self, zh_msg: str, en_msg: str) -> str: + """多语言错误提示""" + return zh_msg if self.is_zh else en_msg + + def ls(self, file_path: str, detail: bool = Field(False, description="是否显示详细信息")) -> List[Dict]: + """ + 查看文件/目录状态(Python实现ls功能) + :param file_path: 文件/目录路径(必填) + :param detail: 是否显示详细信息(默认False) + :return: 结构化文件信息列表 + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"路径不存在:{file_path}", f"Path not found: {file_path}")) + + result = [] + if os.path.isfile(file_path): + # 单个文件 + stat = os.stat(file_path) + file_info = { + "name": os.path.basename(file_path), + "path": file_path, + "size": stat.st_size, + "mtime": stat.st_mtime, + "mode": oct(stat.st_mode)[-3:], + "type": "file" + } + result.append(file_info) + else: + # 目录 + for item in os.listdir(file_path): + full_path = os.path.join(file_path, item) + stat = os.stat(full_path) + item_info = { + "name": item, + "path": full_path, + "size": stat.st_size, + "mtime": stat.st_mtime, + "mode": oct(stat.st_mode)[-3:], + "type": "file" if os.path.isfile(full_path) else "dir" + } + result.append(item_info) + return result + + def cat(self, + file_path: str, + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码")) -> List[str]: + """ + 读取文件内容(Python实现cat功能) + :param file_path: 文件路径(必填) + :param encoding: 文件编码(默认utf-8) + :return: 按行拆分的内容列表 + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"文件不存在:{file_path}", f"File not found: {file_path}")) + if os.path.isdir(file_path): + raise IsADirectoryError(self._get_error_msg(f"路径是目录,无法读取内容:{file_path}", f"Path is directory, cannot read content: {file_path}")) + + with open(file_path, "r", encoding=encoding.value, errors="ignore") as f: + content = [line.rstrip("\n") for line in f.readlines()] + return content + + def add(self, + file_path: str, + overwrite: bool = Field(False, description="是否覆盖已存在文件")) -> None: + """ + 新建空文件(Python实现touch功能) + :param file_path: 文件路径(必填) + :param overwrite: 已存在时是否覆盖(默认False) + """ + if os.path.exists(file_path) and not overwrite: + logger.info(self._get_error_msg(f"文件已存在,跳过创建:{file_path}", f"File exists, skip creation: {file_path}")) + return + + # 确保父目录存在 + parent_dir = os.path.dirname(file_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + + with open(file_path, "w", encoding=FileEncodingEnum.UTF8.value) as f: + pass + logger.info(self._get_error_msg(f"文件创建成功:{file_path}", f"File created successfully: {file_path}")) + + def append(self, + file_path: str, + content: str = Field(..., description="追加内容"), + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码")) -> None: + """ + 追加内容到文件 + :param file_path: 文件路径(必填) + :param content: 追加内容(必填) + :param encoding: 文件编码(默认utf-8) + """ + if not content: + raise ValueError(self._get_error_msg("追加内容不能为空", "Append content cannot be empty")) + + parent_dir = os.path.dirname(file_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + + with open(file_path, "a", encoding=encoding.value) as f: + f.write(content + "\n") + logger.info(self._get_error_msg(f"内容追加成功:{file_path}", f"Content appended successfully: {file_path}")) + + def edit(self, + file_path: str, + content: str = Field(..., description="覆盖内容"), + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码")) -> None: + """ + 覆盖写入文件内容 + :param file_path: 文件路径(必填) + :param content: 覆盖内容(必填) + :param encoding: 文件编码(默认utf-8) + """ + if not content: + raise ValueError(self._get_error_msg("覆盖内容不能为空", "Edit content cannot be empty")) + + parent_dir = os.path.dirname(file_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + + with open(file_path, "w", encoding=encoding.value) as f: + f.write(content) + logger.info(self._get_error_msg(f"文件内容覆盖成功:{file_path}", f"File content overwritten successfully: {file_path}")) + + def rename(self, + old_path: str, + new_path: str = Field(..., description="新路径")) -> None: + """ + 重命名文件/目录 + :param old_path: 原路径(必填) + :param new_path: 新路径(必填) + """ + if not os.path.exists(old_path): + raise FileNotFoundError(self._get_error_msg(f"原路径不存在:{old_path}", f"Old path not found: {old_path}")) + if os.path.exists(new_path): + raise FileExistsError(self._get_error_msg(f"新路径已存在:{new_path}", f"New path already exists: {new_path}")) + + shutil.move(old_path, new_path) + logger.info(self._get_error_msg(f"文件重命名成功:{old_path} → {new_path}", f"File renamed successfully: {old_path} → {new_path}")) + + def chmod(self, + file_path: str, + mode: str = Field(..., description="权限模式(如755)")) -> None: + """ + 修改文件权限 + :param file_path: 文件路径(必填) + :param mode: 权限模式(必填,如755) + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"文件不存在:{file_path}", f"File not found: {file_path}")) + + try: + numeric_mode = int(mode, 8) + os.chmod(file_path, numeric_mode) + logger.info(self._get_error_msg(f"权限修改成功:{file_path} → {mode}", f"Permission modified successfully: {file_path} → {mode}")) + except ValueError: + raise ValueError(self._get_error_msg(f"权限格式错误(需为8进制,如755):{mode}", f"Invalid mode format (must be octal, e.g.755): {mode}")) + + def delete(self, + file_path: str, + recursive: bool = Field(False, description="是否递归删除目录")) -> None: + """ + 删除文件/目录 + :param file_path: 文件/目录路径(必填) + :param recursive: 是否递归删除目录(默认False) + """ + if not os.path.exists(file_path): + raise FileNotFoundError(self._get_error_msg(f"路径不存在:{file_path}", f"Path not found: {file_path}")) + + if os.path.isfile(file_path): + os.remove(file_path) + else: + if not recursive: + raise IsADirectoryError(self._get_error_msg(f"路径是目录,需开启recursive=True递归删除:{file_path}", f"Path is directory, set recursive=True to delete: {file_path}")) + shutil.rmtree(file_path) + logger.info(self._get_error_msg(f"路径删除成功:{file_path}", f"Path deleted successfully: {file_path}")) \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json new file mode 100644 index 0000000000000000000000000000000000000000..93845a4dbb493ad1e263da557a7ceeb7ef89a297 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/config.json @@ -0,0 +1,8 @@ +{ + "tools": { + "file_tool": { + "zh": "【统一文件管理工具】\n功能:支持本地文件/目录的查、增、改、删全操作(纯Python实现,无shell依赖)\n\n【核心提示】\n1. 操作类型(action)必须从以下枚举值中选择:ls/cat/add/append/edit/rename/chmod/delete,不可传入其他值;\n2. 文件编码(encoding)必须从以下枚举值中选择:utf-8/gbk/gb2312/ascii,默认值为utf-8;\n3. 不同操作类型(action)对应不同必填参数,未满足则执行失败,请严格遵守:\n - ls(查看列表)/cat(读取内容)/delete(删除):仅需传入 file_path(文件/目录路径);\n - add(新建文件):需传入 file_path,可选传入 overwrite(是否覆盖已存在文件,默认False);\n - append(追加内容)/edit(覆盖内容):必须传入 file_path + content(写入/追加的内容);\n - rename(重命名):必须传入 file_path(原路径) + new_path(新路径);\n - chmod(修改权限):必须传入 file_path + mode(权限模式,如755/644,需为8进制格式);\n4. 非必填参数默认值:detail(ls是否显示详细信息)=False,recursive(删除目录是否递归)=False;\n5. 路径格式要求:file_path/new_path需传入绝对路径(如\"/tmp/test.txt\"),避免相对路径导致的找不到文件问题。\n\n【枚举类定义(必须遵守)】\n- FileActionEnum(操作类型枚举):ls / cat / add / append / edit / rename / chmod / delete\n- FileEncodingEnum(文件编码枚举):utf-8 / gbk / gb2312 / ascii\n\n【参数详情】\n- action:操作类型(必填,枚举值见上方)\n- file_path:目标文件/目录路径(必填,绝对路径)\n- content:写入/追加内容(add/edit/append操作必填)\n- new_path:重命名后的新路径(rename操作必填,绝对路径)\n- mode:权限模式(chmod操作必填,如755)\n- detail:ls操作是否显示详细信息(默认False)\n- overwrite:add操作是否覆盖已存在文件(默认False)\n- recursive:delete操作是否递归删除目录(默认False)\n- encoding:文件编码(枚举值见上方,默认utf-8)\n【返回值说明】\n- success:执行结果(True=成功,False=失败)\n- message:执行信息/错误提示(多语言)\n- result:操作结果(ls/cat返回结构化列表,其他操作返回空列表)\n- file_path:操作的文件路径(rename后自动更新为新路径)\n- target:执行目标(固定为127.0.0.1,本地执行)", + "en": "【Unified File Management Tool】\nFunction: Supports query/add/edit/delete operations for local files/directories (Python native implementation, no shell dependency)\n\n【Core Guidelines】\n1. Operation type (action) must be selected from the following enum values: ls/cat/add/append/edit/rename/chmod/delete, other values are not allowed;\n2. File encoding (encoding) must be selected from the following enum values: utf-8/gbk/gb2312/ascii, default value is utf-8;\n3. Different action types correspond to different required parameters, execution will fail if not met, please strictly follow:\n - ls (list files)/cat (read content)/delete (delete): Only need to pass file_path (file/directory path);\n - add (create empty file): Need to pass file_path, optionally pass overwrite (whether to overwrite existing file, default False);\n - append (append content)/edit (overwrite content): Must pass file_path + content (content to write/append);\n - rename (rename file/directory): Must pass file_path (old path) + new_path (new path);\n - chmod (modify permission): Must pass file_path + mode (permission mode, e.g.755/644, must be octal format);\n4. Default values for optional parameters: detail (whether to show detailed info for ls)=False, recursive (whether to delete directory recursively)=False;\n5. Path format requirement: file_path/new_path must be absolute path (e.g.\"/tmp/test.txt\"), avoid file not found due to relative path.\n\n【Enum Class Definition (Must Follow)】\n- FileActionEnum (Operation Type Enum): ls / cat / add / append / edit / rename / chmod / delete\n- FileEncodingEnum (File Encoding Enum): utf-8 / gbk / gb2312 / ascii\n\n【Parameter Details】\n- action: Operation type (required, enum values see above)\n- file_path: Target file/directory path (required, absolute path)\n- content: Content to write/append (required for add/edit/append)\n- new_path: New path after rename (required for rename, absolute path)\n- mode: Permission mode (required for chmod, e.g.755)\n- detail: Whether to show detailed info for ls (default False)\n- overwrite: Whether to overwrite existing file for add (default False)\n- recursive: Whether to delete directory recursively for delete (default False)\n- encoding: File encoding (enum values see above, default utf-8)\n- lang: Language (enum values: ZH/EN, default ZH)\n\n【Return Value Explanation】\n- success: Execution result (True=success, False=failure)\n- message: Execution info/error prompt (multilingual)\n- result: Operation result (structured list for ls/cat, empty list for others)\n- file_path: Operated file path (automatically updated to new path after rename)\n- target: Execution target (fixed as 127.0.0.1, local execution)" + } + } +} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/deps.toml new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py new file mode 100644 index 0000000000000000000000000000000000000000..aa627d95f6e165cc6bfe73711c708dd24d47e518 --- /dev/null +++ b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tool/tool.py @@ -0,0 +1,96 @@ +from typing import Dict, Optional +from pydantic import Field +from config.public.base_config_loader import LanguageEnum, BaseConfig +from servers.oe_cli_mcp_server.mcp_tools.base_tools.file_tool.base import ( + init_result_dict, + FileManager, + FileActionEnum, + FileEncodingEnum, + logger, lang +) + +def file_tool( + action: FileActionEnum = Field(..., description="操作类型(枚举:ls/cat/add/append/edit/rename/chmod/delete)"), + file_path: str = Field(..., description="文件/目录路径(必填)"), + content: Optional[str] = Field(None, description="写入/追加内容(add/edit/append必填)"), + new_path: Optional[str] = Field(None, description="新路径(rename必填)"), + mode: Optional[str] = Field(None, description="权限模式(chmod必填,如755)"), + detail: bool = Field(False, description="ls是否显示详细信息"), + overwrite: bool = Field(False, description="add是否覆盖已存在文件"), + recursive: bool = Field(False, description="delete是否递归删除目录"), + encoding: FileEncodingEnum = Field(FileEncodingEnum.UTF8, description="文件编码(默认utf-8)"), +) -> Dict: + """ + 统一文件管理工具(精简参数+枚举入参,适配大模型识别) + """ + # 初始化结果字典 + result = init_result_dict() + result["file_path"] = file_path.strip() + fm = FileManager(lang=lang) + is_zh = lang == LanguageEnum.ZH + + # 1. 核心参数校验 + if not file_path.strip(): + result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" + return result + + # 2. 按枚举操作类型执行 + try: + if action == FileActionEnum.LS: + result["result"] = fm.ls(file_path.strip(), detail=detail) + result["success"] = True + result["message"] = f"本地文件列表查询完成(路径:{file_path})" if is_zh else f"Local file list queried (path: {file_path})" + + elif action == FileActionEnum.CAT: + result["result"] = fm.cat(file_path.strip(), encoding=encoding) + result["success"] = True + result["message"] = f"本地文件内容读取完成(路径:{file_path})" if is_zh else f"Local file content read (path: {file_path})" + + elif action == FileActionEnum.ADD: + fm.add(file_path.strip(), overwrite=overwrite) + result["success"] = True + result["message"] = f"本地文件创建成功(路径:{file_path})" if is_zh else f"Local file created (path: {file_path})" + + elif action == FileActionEnum.APPEND: + if not content: + result["message"] = "追加内容不能为空" if is_zh else "Append content cannot be empty" + return result + fm.append(file_path.strip(), content.strip(), encoding=encoding) + result["success"] = True + result["message"] = f"本地文件内容追加成功(路径:{file_path})" if is_zh else f"Local file content appended (path: {file_path})" + + elif action == FileActionEnum.EDIT: + if not content: + result["message"] = "覆盖内容不能为空" if is_zh else "Edit content cannot be empty" + return result + fm.edit(file_path.strip(), content.strip(), encoding=encoding) + result["success"] = True + result["message"] = f"本地文件内容覆盖成功(路径:{file_path})" if is_zh else f"Local file content overwritten (path: {file_path})" + + elif action == FileActionEnum.RENAME: + if not new_path: + result["message"] = "新路径不能为空" if is_zh else "New path cannot be empty" + return result + fm.rename(file_path.strip(), new_path.strip()) + result["success"] = True + result["file_path"] = new_path.strip() + result["message"] = f"本地文件重命名成功({file_path} → {new_path})" if is_zh else f"Local file renamed ({file_path} → {new_path})" + + elif action == FileActionEnum.CHMOD: + if not mode: + result["message"] = "权限模式不能为空(如755)" if is_zh else "Permission mode cannot be empty (e.g.755)" + return result + fm.chmod(file_path.strip(), mode.strip()) + result["success"] = True + result["message"] = f"本地文件权限修改成功(路径:{file_path},权限:{mode})" if is_zh else f"Local file permission modified (path: {file_path}, mode: {mode})" + + elif action == FileActionEnum.DELETE: + fm.delete(file_path.strip(), recursive=recursive) + result["success"] = True + result["message"] = f"本地文件删除成功(路径:{file_path})" if is_zh else f"Local file deleted (path: {file_path})" + + except Exception as e: + result["message"] = f"操作失败:{str(e)}" if is_zh else f"Operation failed: {str(e)}" + logger.error(f"File manager error: {str(e)}") + + return result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py deleted file mode 100644 index 1a018f99ae5e4dfa88d72400615fe9773527acb0..0000000000000000000000000000000000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/base.py +++ /dev/null @@ -1,152 +0,0 @@ -import logging -import subprocess -import paramiko -from typing import Dict, Optional - -from servers.oe_cli_mcp_server.config.base_config_loader import BaseConfig, LanguageEnum - -# 初始化日志(保持原逻辑) -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) - -def get_language() -> bool: - """获取语言配置:True=中文,False=英文""" - return BaseConfig().get_config().public_config.language == LanguageEnum.ZH - -def get_remote_auth(ip: str) -> Optional[Dict]: - """ - 获取服务器认证信息:匹配IP/主机名对应的连接配置 - """ - for host_config in BaseConfig().get_config().public_config.remote_hosts: - if ip in [host_config.host, host_config.name]: - return { - "host": host_config.host, - "port": host_config.port, - "username": host_config.username, - "password": host_config.password - } - return None - -def escape_shell_content(content: str) -> str: - """ - 转义shell命令中的特殊字符(避免注入和语法错误) - """ - # 转义单引号(最关键,因为命令中用单引号包裹参数) - return content.replace("'", "'\\''") - -def init_result_dict( - target_host: str, - result_type: str = "list", # result字段类型:list/str - include_file_path: bool = False -) -> Dict: - """ - 初始化返回结果字典(统一结构) - """ - result = { - "success": False, - "message": "", - "result": [] if result_type == "list" else "", - "target": target_host - } - if include_file_path: - result["file_path"] = "" - return result - -def run_local_command( - cmd: str, - result: Dict, - success_msg_zh: str, - success_msg_en: str, - is_list_result: bool = True, - in_place: bool = False -) -> Dict: - """ - 执行本地命令并处理结果 - """ - try: - logger.info(f"执行本地命令:{cmd}") - output = subprocess.check_output( - cmd, shell=True, text=True, stderr=subprocess.STDOUT - ) - result["success"] = True - result["message"] = success_msg_zh if get_language() else success_msg_en - # 处理返回结果(in_place=True时不返回内容) - if not in_place: - if is_list_result: - result["result"] = output.strip().split("\n") if output.strip() else [] - else: - result["result"] = output.strip() - # grep无匹配时退出码1,特殊处理 - except subprocess.CalledProcessError as e: - if e.returncode == 1 and "grep" in cmd: - result["success"] = True - result["message"] = "未找到匹配内容" if get_language() else "No matching content found" - else: - result["message"] = f"本地执行失败:{e.output.strip()}" if get_language() else f"Local execution failed: {e.output.strip()}" - logger.error(result["message"]) - except Exception as e: - result["message"] = f"本地处理异常:{str(e)}" if get_language() else f"Local processing exception: {str(e)}" - logger.error(result["message"]) - return result - -def run_remote_command( - cmd: str, - remote_auth: Dict, - result: Dict, - success_msg_zh: str, - success_msg_en: str, - is_list_result: bool = True, - in_place: bool = False -) -> Dict: - """ - 执行远程命令并处理结果(SSH连接封装) - """ - ssh_conn: Optional[paramiko.SSHClient] = None - try: - # 初始化SSH客户端 - ssh_conn = paramiko.SSHClient() - ssh_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - ssh_conn.connect( - hostname=remote_auth["host"], - port=remote_auth["port"], - username=remote_auth["username"], - password=remote_auth["password"], - timeout=10, - banner_timeout=10 - ) - logger.info(f"已连接远程主机:{remote_auth['host']},执行命令:{cmd}") - - # 执行命令 - stdin, stdout, stderr = ssh_conn.exec_command(cmd) - stdout_msg = stdout.read().decode("utf-8", errors="replace").strip() - stderr_msg = stderr.read().decode("utf-8", errors="replace").strip() - - # 处理结果 - if stderr_msg: - result["message"] = f"远程执行失败:{stderr_msg}" if get_language() else f"Remote execution failed: {stderr_msg}" - logger.error(result["message"]) - else: - result["success"] = True - result["message"] = success_msg_zh if get_language() else success_msg_en - if not in_place: - if is_list_result: - result["result"] = stdout_msg.split("\n") if stdout_msg else [] - else: - result["result"] = stdout_msg.strip() - except paramiko.AuthenticationException: - result["message"] = "SSH认证失败,请检查用户名和密码" if get_language() else "SSH authentication failed, check username and password" - logger.error(result["message"]) - except TimeoutError: - result["message"] = "SSH连接超时,请检查网络或主机状态" if get_language() else "SSH connection timed out, check network or host status" - logger.error(result["message"]) - except Exception as e: - result["message"] = f"远程处理异常:{str(e)}" if get_language() else f"Remote processing exception: {str(e)}" - logger.error(result["message"]) - finally: - # 关闭SSH连接 - if ssh_conn: - transport = ssh_conn.get_transport() - if transport and transport.is_active(): - ssh_conn.close() - logger.info(f"已关闭与{remote_auth['host']}的SSH连接") - return result \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json deleted file mode 100644 index dd08cfb5c160f80644914db7b7149ac4cb2703b6..0000000000000000000000000000000000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/config.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "tools": { - "file_grep_tool": { - "zh": "搜索文件中匹配指定模式的内容(本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填,如\"/var/log/syslog\")\n -pattern: 搜索模式(支持正则,必填,如\"error\")\n -options: grep可选参数(如\"-i\"忽略大小写、\"-n\"显示行号)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 匹配结果列表(成功时返回)\n -target: 执行目标主机", - "en": "Search for content matching the specified pattern in the file (supports local/remote)\n\nParameters:\n -target: Target host IP/hostname, None for local\n -file_path: Target file path (required, e.g. \"/var/log/syslog\")\n -pattern: Search pattern (supports regex, required, e.g. \"error\")\n -options: Optional grep parameters (e.g. \"-i\" ignore case, \"-n\" show line numbers)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: List of matching results (returned on success)\n -target: Target host for execution" - }, - "file_sed_tool": { - "zh": "替换文件中匹配的内容(本地/远程均支持,默认不修改原文件)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填)\n -pattern: 匹配模式(如\"s/old/new/g\",必填,g表示全局替换)\n -in_place: 是否直接修改原文件(True/False,默认False,仅输出结果)\n -options: sed可选参数(如\"-i.bak\"备份原文件)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 替换后内容(in_place=False时返回)\n -target: 执行目标主机", - "en": "Replace matching content in the file (supports local/remote, no original file modification by default)\n\nParameters:\n -target: Target host IP/hostname, None for local\n -file_path: Target file path (required)\n -pattern: Matching pattern (e.g. \"s/old/new/g\", required, g for global replacement)\n -in_place: Whether to modify the original file directly (True/False, default False, only output result)\n -options: Optional sed parameters (e.g. \"-i.bak\" backup original file)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: Content after replacement (returned when in_place=False)\n -target: Target host for execution" - }, - "file_awk_tool": { - "zh": "用awk处理文本文件(支持列提取、条件过滤等,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地(127.0.0.1)\n -file_path: 目标文件路径(必填,如\"/etc/passwd\"或\"/var/log/access.log\")\n -script: awk脚本(必填,示例:\"'{print $1,$3}'\"提取1、3列;\"'$3>100 {print $0}'\"过滤第3列大于100的行)\n -options: awk可选参数(示例:\"-F:\"指定分隔符为冒号;\"-v OFS=,\"指定输出分隔符为逗号)\n\n返回:\n -success: 执行结果(True=成功,False=失败)\n -message: 执行状态描述/错误提示(中文/英文根据配置自动切换)\n -result: awk处理结果列表(每行一个元素,无结果时返回空列表)\n -target: 实际执行的目标主机IP/hostname", - "en": "Process text files with awk (supports column extraction, condition filtering, local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost (127.0.0.1)\n -file_path: Target file path (required, e.g. \"/etc/passwd\" or \"/var/log/access.log\")\n -script: awk script (required, example: \"'{print $1,$3}'\" extract column 1&3; \"'$3>100 {print $0}'\" filter rows where column3>100)\n -options: Optional awk parameters (example: \"-F:\" set delimiter to colon; \"-v OFS=,\" set output delimiter to comma)\n\nReturns:\n -success: Execution result (True=success, False=failure)\n -message: Execution status/error prompt (auto-switch between Chinese/English)\n -result: List of awk processing results (one element per line, empty list if no result)\n -target: Actual target host IP/hostname for execution" - }, - "file_sort_tool": { - "zh": "对文本文件进行排序(支持按列、升序/降序、去重等,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地(127.0.0.1)\n -file_path: 目标文件路径(必填,如\"/tmp/logs.txt\")\n -options: sort可选参数(示例:\n \"-n\"按数字排序;\"-k3\"按第3列排序;\n \"-r\"降序排列;\"-u\"去重后排序;\n \"-t,\"指定逗号为分隔符)\n -output_file: 排序结果输出路径(可选,默认不保存到文件,仅返回结果)\n\n返回:\n -success: 执行结果(True=成功,False=失败)\n -message: 执行状态描述/错误提示(根据语言配置自动切换)\n -result: 排序结果列表(output_file为空时返回,每行一个元素)\n -target: 实际执行的目标主机", - "en": "Sort text files (supports column-based sorting, ascending/descending, deduplication, local/remote)\n\nParameters:\n -target: Target host IP/hostname, None for localhost (127.0.0.1)\n -file_path: Target file path (required, e.g. \"/tmp/logs.txt\")\n -options: Optional sort parameters (examples:\n \"-n\" numeric sort; \"-k3\" sort by 3rd column;\n \"-r\" reverse order; \"-u\" unique then sort;\n \"-t,\" set comma as delimiter)\n -output_file: Output path for sorted results (optional, default returns in result without saving)\n\nReturns:\n -success: Execution result (True=success, False=failure)\n -message: Execution status/error prompt (auto-switch language)\n -result: List of sorted results (returned if output_file is empty, one element per line)\n -target: Actual target host for execution" - }, - "file_unique_tool": { - "zh": "对文本文件进行去重处理(通常与sort配合使用,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -file_path: 目标文件路径(必填,如\"/tmp/duplicates.txt\")\n -options: unique可选参数(示例:\n \"-u\"仅显示唯一行;\"-d\"仅显示重复行;\n \"-c\"显示每行出现的次数;\"-i\"忽略大小写)\n -output_file: 去重结果输出路径(可选,默认不保存到文件)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -result: 去重结果列表(output_file为空时返回)\n -target: 执行目标主机", - "en": "Deduplicate text files (usually used with sort, supports local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost\n -file_path: Target file path (required, e.g. \"/tmp/duplicates.txt\")\n -options: Optional unique parameters (examples:\n \"-u\" show only unique lines; \"-d\" show only duplicate lines;\n \"-c\" show count of each line; \"-i\" ignore case)\n -output_file: Output path for deduplicated results (optional, default returns in result)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -result: List of deduplicated results (returned if output_file is empty)\n -target: Target host for execution" - }, - "file_echo_tool": { - "zh": "向文件写入内容(支持创建文件、追加内容,本地/远程均支持)\n\n参数:\n -target: 目标主机IP/hostname,None表示本地\n -content: 要写入的内容(必填,如\"Hello World\")\n -file_path: 目标文件路径(必填,如\"/tmp/message.txt\")\n -append: 是否追加内容(True=追加,False=覆盖,默认False)\n\n返回:\n -success: 执行结果(True/False)\n -message: 执行信息/错误提示\n -target: 执行目标主机\n -file_path: 实际写入的文件路径", - "en": "Write content to file (supports file creation, appending content, local/remote execution)\n\nParameters:\n -target: Target host IP/hostname, None for localhost\n -content: Content to write (required, e.g. \"Hello World\")\n -file_path: Target file path (required, e.g. \"/tmp/message.txt\")\n -append: Whether to append content (True=append, False=overwrite, default False)\n\nReturns:\n -success: Execution result (True/False)\n -message: Execution info/error prompt\n -target: Target host for execution\n -file_path: Actual file path written to" - } - } -} \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml deleted file mode 100644 index dc15ebc850e9392a48a23cc4b7850f95b2587a6d..0000000000000000000000000000000000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/deps.toml +++ /dev/null @@ -1,20 +0,0 @@ -[system] -# 系统基础工具(grep/sed/awk/sort/uniq/echo 通常系统自带,此处列出需确保安装的依赖) -#required_tools = [ -# "yum install -y grep sed gawk coreutils" -#] -# SSH客户端依赖(部分系统可能未预装) -#ssh_client = [ -# "yum install -y openssh-clients" -#] -[pip_config] -index_url = "https://pypi.tuna.tsinghua.edu.cn/simple" - -[pip] -# Python依赖包 -paramiko = "==4.0.0" # SSH远程连接 -typing_extensions = "==4.12.2" -psutil = "==7.0.0" -toml = "== 0.10.2" -mcp = "== 1.9.4" -scp = "== 0.15.0" # 类型提示兼容(如需) \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py b/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py deleted file mode 100644 index 84a978a15338b9b41e48b72031ba62118e5ad59b..0000000000000000000000000000000000000000 --- a/mcp_center/servers/oe_cli_mcp_server/mcp_tools/base_tools/file_tools/tool.py +++ /dev/null @@ -1,342 +0,0 @@ -from typing import Dict, Optional -from servers.oe_cli_mcp_server.config.base_config_loader import LanguageEnum -from servers.oe_cli_mcp_server.mcp_tools. base_tools. file_tools. base import ( - get_remote_auth, - run_local_command, - run_remote_command, - init_result_dict, - escape_shell_content -) - -def file_grep_tool( - target: Optional[str] = None, - file_path: str = "", - pattern: str = "", - options: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 基础参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - if not pattern.strip(): - result["message"] = "搜索模式不能为空" if is_zh else "Search pattern cannot be empty" - return result - - # 构建grep命令(内容转义,避免shell注入) - escaped_pattern = escape_shell_content(pattern.strip()) - escaped_file = escape_shell_content(file_path.strip()) - grep_cmd = f"grep {options.strip()} '{escaped_pattern}' {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - return run_local_command( - cmd=grep_cmd, - result=result, - success_msg_zh=f"本地文件搜索完成(路径:{file_path})", - success_msg_en=f"Local file search completed (path: {file_path})", - is_list_result=True - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth: - result["message"] = f"未找到远程主机({target_host})的认证配置" if is_zh else f"Authentication config for remote host ({target_host}) not found" - return result - if not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程执行需用户名和密码" if is_zh else "Username and password required for remote execution" - return result - - return run_remote_command( - cmd=grep_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程文件搜索完成(主机:{target_host},路径:{file_path})", - success_msg_en=f"Remote file search completed (host: {target_host}, path: {file_path})", - is_list_result=True - ) - -def file_sed_tool( - target: Optional[str] = None, - file_path: str = "", - pattern: str = "", - in_place: bool = False, - options: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host, result_type="str") - - # 基础校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - if not pattern.strip() or "s/" not in pattern: - result["message"] = "替换模式格式错误(需含s/)" if is_zh else "Replacement pattern format error (must contain s/)" - return result - - # 构建sed命令(内容转义) - escaped_pattern = escape_shell_content(pattern.strip()) - escaped_file = escape_shell_content(file_path.strip()) - in_place_opt = "-i" if in_place else "" - sed_cmd = f"sed {options.strip()} {in_place_opt} '{escaped_pattern}' {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - msg_zh = "原文件已修改" if in_place else "替换后内容已输出" - msg_en = "original file modified" if in_place else "replaced content output" - return run_local_command( - cmd=sed_cmd, - result=result, - success_msg_zh=f"本地sed执行成功({msg_zh},路径:{file_path})", - success_msg_en=f"Local sed executed successfully ({msg_en}, path: {file_path})", - is_list_result=False, - in_place=in_place - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程认证配置缺失" if is_zh else "Remote authentication config missing" - return result - - msg_zh = "原文件已修改" if in_place else "替换后内容已输出" - msg_en = "original file modified" if in_place else "replaced content output" - return run_remote_command( - cmd=sed_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程sed执行成功({msg_zh},主机:{target_host})", - success_msg_en=f"Remote sed executed successfully ({msg_en}, host: {target_host})", - is_list_result=False, - in_place=in_place - ) - -def file_awk_tool( - target: Optional[str] = None, - file_path: str = "", - script: str = "", - options: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 基础参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空,请传入有效的文件路径(如\"/etc/passwd\")" if is_zh else "File path cannot be empty, please pass a valid path (e.g. \"/etc/passwd\")" - return result - if not script.strip(): - result["message"] = "awk脚本不能为空,请传入有效的处理逻辑(如\"'{print $1,$3}'\")" if is_zh else "awk script cannot be empty, please pass valid logic (e.g. \"'{print $1,$3}'\")" - return result - - # 构建awk命令(内容转义) - escaped_script = escape_shell_content(script.strip()) - escaped_file = escape_shell_content(file_path.strip()) - options_clean = options.strip() - awk_cmd = f"awk {options_clean} {escaped_script} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - return run_local_command( - cmd=awk_cmd, - result=result, - success_msg_zh=f"本地awk处理成功(文件:{file_path.strip()})", - success_msg_en=f"Local awk processing succeeded (file: {file_path.strip()})", - is_list_result=True - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth: - result["message"] = f"未找到远程主机({target_host})的认证配置,请检查配置文件中的remote_hosts" if is_zh else f"Authentication config for remote host ({target_host}) not found, check remote_hosts in config" - return result - if not (remote_auth.get("username") and remote_auth.get("password")): - result["message"] = f"远程主机({target_host})的认证配置缺失用户名或密码,无法建立SSH连接" if is_zh else f"Remote host ({target_host}) auth config lacks username/password, cannot establish SSH connection" - return result - - return run_remote_command( - cmd=awk_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程awk处理成功(主机:{target_host},文件:{file_path.strip()})", - success_msg_en=f"Remote awk processing succeeded (host: {target_host}, file: {file_path.strip()})", - is_list_result=True - ) - -def file_sort_tool( - target: Optional[str] = None, - file_path: str = "", - options: str = "", - output_file: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空,请提供有效的文件路径" if is_zh else "File path cannot be empty, please provide a valid path" - return result - - # 构建sort命令(内容转义) - escaped_file = escape_shell_content(file_path.strip()) - escaped_output = escape_shell_content(output_file.strip()) if output_file.strip() else "" - options_clean = options.strip() - if output_file.strip(): - sort_cmd = f"sort {options_clean} {escaped_file} -o {escaped_output}" - else: - sort_cmd = f"sort {options_clean} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - success_msg_zh = f"本地排序完成,结果已保存至:{output_file.strip()}" if output_file.strip() else f"本地排序完成" - success_msg_en = f"Local sort completed, result saved to: {output_file.strip()}" if output_file.strip() else f"Local sort completed" - return run_local_command( - cmd=sort_cmd, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth: - result["message"] = f"未找到远程主机({target_host})的认证配置" if is_zh else f"Authentication config for remote host ({target_host}) not found" - return result - if not (remote_auth.get("username") and remote_auth.get("password")): - result["message"] = f"远程主机({target_host})的认证信息不完整(缺少用户名或密码)" if is_zh else f"Remote host ({target_host}) auth info incomplete (missing username/password)" - return result - - success_msg_zh = f"远程排序完成,结果已保存至:{output_file.strip()}(主机:{target_host})" if output_file.strip() else f"远程排序完成(主机:{target_host})" - success_msg_en = f"Remote sort completed, result saved to: {output_file.strip()} (host: {target_host})" if output_file.strip() else f"Remote sort completed (host: {target_host})" - return run_remote_command( - cmd=sort_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - -def file_unique_tool( - target: Optional[str] = None, - file_path: str = "", - options: str = "", - output_file: str = "", - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host) - - # 参数校验 - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - - # 构建unique命令(内容转义) - escaped_file = escape_shell_content(file_path.strip()) - escaped_output = escape_shell_content(output_file.strip()) if output_file.strip() else "" - options_clean = options.strip() - if output_file.strip(): - unique_cmd = f"uniq {options_clean} {escaped_file} {escaped_output}" - else: - unique_cmd = f"uniq {options_clean} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - success_msg_zh = f"本地去重完成,结果已保存至:{output_file.strip()}" if output_file.strip() else f"本地去重完成" - success_msg_en = f"Local deduplication completed, result saved to: {output_file.strip()}" if output_file.strip() else f"Local deduplication completed" - return run_local_command( - cmd=unique_cmd, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程认证配置缺失" if is_zh else "Remote auth config missing" - return result - - success_msg_zh = f"远程去重完成,结果已保存至:{output_file.strip()}(主机:{target_host})" if output_file.strip() else f"远程去重完成(主机:{target_host})" - success_msg_en = f"Remote deduplication completed, result saved to: {output_file.strip()} (host: {target_host})" if output_file.strip() else f"Remote deduplication completed (host: {target_host})" - return run_remote_command( - cmd=unique_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=success_msg_zh, - success_msg_en=success_msg_en, - is_list_result=True, - in_place=bool(output_file.strip()) - ) - -def file_echo_tool( - target: Optional[str] = None, - content: str = "", - file_path: str = "", - append: bool = False, - lang: Optional[LanguageEnum] = LanguageEnum.ZH -) -> Dict: - is_zh = lang - target_host = target.strip() if (target and isinstance(target, str)) else "127.0.0.1" - result = init_result_dict(target_host, result_type="str", include_file_path=True) - - # 参数校验 - if not content.strip(): - result["message"] = "写入内容不能为空" if is_zh else "Content to write cannot be empty" - return result - if not file_path.strip(): - result["message"] = "文件路径不能为空" if is_zh else "File path cannot be empty" - return result - - # 构建echo命令(内容转义) - escaped_content = escape_shell_content(content.strip()) - escaped_file = escape_shell_content(file_path.strip()) - redirect = ">>" if append else ">" - echo_cmd = f"echo '{escaped_content}' {redirect} {escaped_file}" - - # 本地执行 - if target_host == "127.0.0.1": - action = "追加" if append else "写入" - action_en = "appended" if append else "written" - return run_local_command( - cmd=echo_cmd, - result=result, - success_msg_zh=f"本地{action}成功,文件路径:{file_path.strip()}", - success_msg_en=f"Local {action_en} successfully, file path: {file_path.strip()}", - is_list_result=False, - in_place=True - ) - - # 远程执行 - remote_auth = get_remote_auth(target_host) - if not remote_auth or not (remote_auth["username"] and remote_auth["password"]): - result["message"] = "远程认证配置缺失" if is_zh else "Remote auth config missing" - return result - - action = "追加" if append else "写入" - action_en = "appended" if append else "written" - return run_remote_command( - cmd=echo_cmd, - remote_auth=remote_auth, - result=result, - success_msg_zh=f"远程{action}成功(主机:{target_host}),文件路径:{file_path.strip()}", - success_msg_en=f"Remote {action_en} successfully (host: {target_host}), file path: {file_path.strip()}", - is_list_result=False, - in_place=True - ) \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/requirements.txt b/mcp_center/servers/oe_cli_mcp_server/requirements.txt index d6df13ee848c6e598c0156cd6378d8b275603c58..bd108e4f916bb7f8b3bda062dd6f84d7089b496a 100644 --- a/mcp_center/servers/oe_cli_mcp_server/requirements.txt +++ b/mcp_center/servers/oe_cli_mcp_server/requirements.txt @@ -7,4 +7,5 @@ mcp==1.9.4 scp==0.15.0 fastapi>=0.122.0 uvicorn>=0.38.0 -requests==2.31.0 \ No newline at end of file +requests==2.31.0 +pydantic >=2.7.2 \ No newline at end of file diff --git a/mcp_center/servers/oe_cli_mcp_server/run.sh b/mcp_center/servers/oe_cli_mcp_server/run.sh index bce85a1ab720348c65e6f8d12b25abc1a948ed31..dcffd746dc5b90434fe2eaeef4e6f945693a9b3c 100755 --- a/mcp_center/servers/oe_cli_mcp_server/run.sh +++ b/mcp_center/servers/oe_cli_mcp_server/run.sh @@ -15,12 +15,12 @@ pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple pip install -r "$REQUIREMENTS" -i https://pypi.tuna.tsinghua.edu.cn/simple # 3. 部署systemd服务 -cp /home/tsn/framework-dev-with-mcp/mcp_center/servers/oe_cli_mcp_server/mcp-server.service /etc/systemd/system/ +cp /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp-server.service /etc/systemd/system/ systemctl daemon-reload systemctl enable mcp-server --now -systemctl status mcp-server + # 4. 全局命令链接 chmod +x /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py rm -f /usr/local/bin/mcp-server -ln -s /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py /usr/local/bin/mcp-server \ No newline at end of file +ln -s /usr/lib/euler-copilot-framework/mcp_center/servers/oe_cli_mcp_server/mcp_server/cli.py /usr/local/bin/mcp-server