diff --git a/src/client/DockerRegistry.py b/src/client/DockerRegistry.py new file mode 100644 index 0000000000000000000000000000000000000000..5eb050680a6faa7c245e0fd79f7494ec9429ffe8 --- /dev/null +++ b/src/client/DockerRegistry.py @@ -0,0 +1,187 @@ +import logging +import asyncio +import atexit +from typing import Set +import signal +import subprocess +import sys +import os + +class DockerContainerRegistry: + """全局Docker容器注册表,确保程序退出时清理所有容器""" + _instance = None + _containers: Set[str] = set() + _cleanup_lock = asyncio.Lock() + _initialized = False + _cleanup_in_progress = False # 添加清理状态标志 + _signal_count = 0 # 信号计数器 + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + @classmethod + def initialize(cls): + """初始化全局清理机制""" + if cls._initialized: + return + + instance = cls() + + atexit.register(instance._sync_cleanup_all) + + def signal_handler(signum, frame): + instance._handle_signal(signum) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + cls._initialized = True + logging.info("Docker容器注册表已初始化") + + def _handle_signal(self, signum): + """处理信号,避免重复清理""" + self._signal_count += 1 + + if self._cleanup_in_progress: + if self._signal_count <= 2: + logging.info(f"清理正在进行中,请稍候... (信号计数: {self._signal_count})") + return + elif self._signal_count <= 5: + logging.warning(f"强制中断清理过程... (信号计数: {self._signal_count})") + return + else: + logging.error("多次中断信号,强制退出程序") + os._exit(1) + + logging.info(f"接收到信号 {signum},开始清理Docker容器...") + self._cleanup_in_progress = True + + try: + self._sync_cleanup_all() + except Exception as e: + logging.error(f"清理过程中出错: {e}") + finally: + logging.info("程序退出") + sys.exit(0) + + def register_container(self, container_name: str): + """注册容器""" + self._containers.add(container_name) + logging.debug(f"注册Docker容器: {container_name}") + + def unregister_container(self, container_name: str): + """注销容器""" + self._containers.discard(container_name) + logging.debug(f"注销Docker容器: {container_name}") + + def _sync_cleanup_all(self): + """同步清理所有注册的容器""" + if not self._containers or self._cleanup_in_progress: + return + + self._cleanup_in_progress = True + + try: + logging.info(f"开始清理 {len(self._containers)} 个Docker容器...") + containers_to_clean = self._containers.copy() + + for container_name in containers_to_clean: + try: + result = subprocess.run( + ["docker", "kill", container_name], + capture_output=True, + text=True, + timeout=3 # 减少超时时间 + ) + if result.returncode == 0: + logging.info(f"成功清理容器: {container_name}") + self._containers.discard(container_name) + else: + logging.warning(f"清理容器失败: {container_name}, {result.stderr}") + except subprocess.TimeoutExpired: + logging.warning(f"清理容器 {container_name} 超时,跳过") + except Exception as e: + logging.error(f"清理容器 {container_name} 出错: {e}") + + # 如果还有容器未清理,尝试强制清理 + if self._containers: + logging.info("尝试强制清理剩余容器...") + for container_name in list(self._containers): + try: + subprocess.run( + ["docker", "rm", "-f", container_name], + capture_output=True, + text=True, + timeout=2 + ) + self._containers.discard(container_name) + logging.info(f"强制清理容器: {container_name}") + except Exception as e: + logging.error(f"强制清理容器 {container_name} 失败: {e}") + + logging.info("Docker容器清理完成") + + finally: + self._cleanup_in_progress = False + + async def async_cleanup_all(self): + """异步清理所有容器""" + if not self._containers: + return + + async with self._cleanup_lock: + if self._cleanup_in_progress: + return + + self._cleanup_in_progress = True + + try: + containers_to_clean = list(self._containers) + + # 并发清理所有容器,但添加超时限制 + tasks = [] + for container_name in containers_to_clean: + task = asyncio.create_task(self._async_kill_container(container_name)) + tasks.append(task) + + if tasks: + # 设置总体超时时间 + try: + results = await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=10 # 总体超时10秒 + ) + for container_name, result in zip(containers_to_clean, results): + if isinstance(result, Exception): + logging.error(f"异步清理容器 {container_name} 失败: {result}") + else: + self.unregister_container(container_name) + except asyncio.TimeoutError: + logging.warning("异步清理容器超时") + finally: + self._cleanup_in_progress = False + + async def _async_kill_container(self, container_name: str): + """异步终止单个容器""" + loop = asyncio.get_event_loop() + try: + result = await loop.run_in_executor( + None, + lambda: subprocess.run( + ["docker", "kill", container_name], + capture_output=True, + text=True, + timeout=5 # 减少单个容器的超时时间 + ) + ) + if result.returncode == 0: + logging.info(f"异步清理容器成功: {container_name}") + return True + else: + logging.warning(f"异步清理容器失败: {container_name}, {result.stderr}") + return False + except Exception as e: + logging.error(f"异步清理容器 {container_name} 出错: {e}") + return False \ No newline at end of file diff --git a/src/client/MCPClient.py b/src/client/MCPClient.py new file mode 100644 index 0000000000000000000000000000000000000000..b46ba645e499a39f49a20dffc4f03ed8e5450c73 --- /dev/null +++ b/src/client/MCPClient.py @@ -0,0 +1,584 @@ +import asyncio +import sys +import logging +import subprocess +import os +import time +import shutil +import docker +from docker.errors import NotFound, APIError +from contextlib import AsyncExitStack +from typing import Any, Optional +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from ..utils.parse_json import parse_evaluation_json +from .DockerRegistry import DockerContainerRegistry + +class MCPClient: + """改进的MCP服务器管理器,支持可靠的Docker生命周期管理""" + + def __init__(self, name: str, config: dict[str, Any], env_script: str = "", use_docker: bool = False) -> None: + self.name: str = name + self.config: dict[str, Any] = config + self.session: Optional[ClientSession] = None + self._cleanup_lock: asyncio.Lock = asyncio.Lock() + self.exit_stack: AsyncExitStack = AsyncExitStack() + + self.env_script = env_script + + + # 状态管理 + self._is_initialized = False + self._is_cleaning_up = False + self._cleanup_completed = asyncio.Event() + + # Docker相关配置 + self.use_docker = use_docker + self.abs_script_path = self.get_command_script_path() + self.host_mcp_path = self.abs_script_path.split('src')[0] if self.abs_script_path else "" + self.container_mcp_path = "/app/" + self.server_port = config.get("port", 8080) + + # Docker进程管理 + self.docker_process = None + self.container_id = None + self.container_name = None + + self.client = docker.from_env() + + # 初始化全局容器注册表 + if use_docker: + DockerContainerRegistry.initialize() + self._registry = DockerContainerRegistry() + + def get_command_script_path(self) -> str: + """获取命令脚本路径""" + try: + server_args = self.config['args'] + source_file = None + work_dir = os.getcwd() + for i, arg in enumerate(server_args): + if arg == "--directory" and i + 1 < len(server_args): + work_dir = server_args[i + 1] + work_dir = os.path.abspath(work_dir) + break + + for arg in server_args: + if arg.endswith(".py"): + source_file = arg + break + + if not source_file: + logging.warning("未在 args 中找到 .py 源代码文件") + return None + + if os.path.isabs(source_file): + absolute_path = source_file + else: + absolute_path = os.path.join(work_dir, source_file) + + if os.path.exists(absolute_path): + return absolute_path + else: + logging.error(f"源代码文件不存在:{absolute_path}") + return None + except Exception as e: + logging.error(f"获取脚本路径出错: {e}") + return None + + async def initialize(self) -> None: + """初始化服务器""" + if self._is_initialized: + logging.warning(f"服务器 {self.name} 已经初始化") + return + + try: + logging.info(f"开始初始化服务器 {self.name}") + + if self.use_docker: + await self._initialize_docker() + else: + await self._initialize_host_server() + + self._is_initialized = True + + except Exception as e: + logging.error(f"初始化失败: {e}") + await self.cleanup() + raise + + async def _initialize_host_server(self) -> None: + """在主机上启动MCP服务器""" + command = shutil.which("npx") if self.config["command"] == "npx" else self.config["command"] + if command is None: + raise ValueError(f"主机命令不存在: {self.config['command']}") + + server_params = StdioServerParameters( + command=command, + args=self.config["args"], + env={**os.environ, **self.config["env"]} if self.config.get("env") else None, + ) + + try: + stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) + read, write = stdio_transport + session = await self.exit_stack.enter_async_context(ClientSession(read, write)) + await session.initialize() + self.session = session + logging.info(f"主机上的MCP服务器 {self.name} 已初始化") + except Exception as e: + logging.error(f"主机服务器初始化失败: {e}") + raise + + def _build_docker_command(self) -> list[str]: + """构建Docker运行命令""" + self.container_name = f"mcp-server-{self.name}-{int(time.time())}" + + docker_cmd = [ + "docker", "run", + "--rm", + "-i", + "--name", self.container_name, + "--workdir", self.container_mcp_path, + ] + + # 挂载主机MCP代码目录到容器 + docker_cmd.extend([ + "-v", f"{self.host_mcp_path}:{self.container_mcp_path}" + ]) + + # 添加环境变量 + env_vars = { + "PYTHONPATH": self.container_mcp_path, + "PYTHONUNBUFFERED": "1", + "PIP_ROOT_USER_ACTION": "ignore", + } + env_vars.update(self.config.get("env", {})) + + for key, value in env_vars.items(): + docker_cmd.extend(["-e", f"{key}={value}"]) + + docker_cmd.extend(["-a", "stdout", "-a", "stderr"]) + + # 添加Docker镜像 + self.docker_image = "val:latest" + docker_cmd.append(self.docker_image) + + startup_script = self._build_correct_bash_script() + docker_cmd.extend(["bash", "-c", startup_script]) + + return docker_cmd + + def _build_correct_bash_script(self) -> str: + """构建启动脚本""" + container_command = self._get_container_command() + script = f'''set -e + + echo "=== 快速环境检查 ===" + echo "Python版本: $(python --version)" + echo "工作目录: $(pwd)" + echo "文件列表:" + ls -la + echo "" + + # 只安装项目特定的新依赖 + echo "=== 检查并安装项目依赖 ===" + if [ -f requirements.txt ]; then + echo "发现requirements.txt文件" + pip install -qq --upgrade-strategy only-if-needed -r requirements.txt + if [ $? -eq 0 ]; then + echo "依赖安装完成(无异常)" + else + echo "依赖安装失败!(可去掉 -qq 参数重新执行查看详细错误)" + fi + else + echo "未找到requirements.txt文件" + fi + + echo "=== 执行自定义环境部署 ===" + {self.env_script} + + echo "=== 启动MCP服务器 ===" + echo "执行命令: {container_command}" + exec {container_command}''' + + return script + + def _get_container_command(self) -> str: + """获取容器内的命令字符串""" + command = self.config.get("command", "python") + if command == "uv": + command = "uv run" + script_rel_path = 'src'+self.abs_script_path.split('src')[-1] + return command + " " + script_rel_path + + + async def _initialize_docker(self): + """初始化Docker中的MCP服务器,支持输出显示""" + original_docker_command = self._build_docker_command() + + # 修改Docker命令,使用tee同时输出到终端和MCP + docker_command = self._add_output_redirection(original_docker_command) + + logging.info(f"启动Docker命令: {' '.join(docker_command)}") + + # 注册容器到全局注册表 + if self.container_name: + self._registry.register_container(self.container_name) + + # 清理可能存在的同名容器 + await self._cleanup_existing_container() + + # 使用修改后的命令建立MCP连接(只启动一个进程) + server_params = StdioServerParameters( + command=docker_command[0], + args=docker_command[1:], + env=None + ) + + try: + stdio_transport = await self.exit_stack.enter_async_context( + stdio_client(server_params)) + + read, write = stdio_transport + session = await self.exit_stack.enter_async_context( + ClientSession(read, write) + ) + + # 等待容器稳定 + await asyncio.sleep(3) + self.search_for_container() + + # 启动监控任务 + monitor_task = await self._start_container_monitoring(session) + + # 注册清理回调 + self._register_cleanup_callback(monitor_task) + + self.session = session + logging.info(f"Docker MCP服务器 {self.name} 已初始化") + + return session + + except Exception as e: + logging.error(f"Docker MCP服务器初始化失败: {str(e)}") + raise + + def _add_output_redirection(self, docker_command): + """为Docker命令添加输出重定向""" + # 找到bash脚本部分 + if len(docker_command) >= 3 and docker_command[-2] == "-c": + # 修改bash脚本,添加tee命令 + original_script = docker_command[-1] + + # 将输出同时发送到stderr(显示在终端)和stdout(给MCP) + modified_script = f''' + # 设置输出重定向 + exec > >(tee /dev/stderr) + exec 2>&1 + + # 原始脚本 + {original_script} + ''' + new_command = docker_command[:-1] + [modified_script] + return new_command + + return docker_command + + async def _cleanup_existing_container(self): + """清理可能存在的同名容器""" + if not self.container_name: + return + + try: + import subprocess + stop_cmd = f"docker stop {self.container_name} 2>/dev/null || true" + remove_cmd = f"docker rm {self.container_name} 2>/dev/null || true" + + subprocess.run(stop_cmd, shell=True, capture_output=True) + subprocess.run(remove_cmd, shell=True, capture_output=True) + + logging.debug(f"已清理可能存在的容器: {self.container_name}") + except Exception as e: + logging.warning(f"清理现有容器时出错: {str(e)}") + + async def _start_container_monitoring(self, session): + """启动容器监控和会话初始化""" + try: + # 创建监控任务 + async def monitor(): + """容器状态监控循环""" + try: + while True: + await asyncio.sleep(2) + self.search_for_container() + except asyncio.CancelledError: + logging.debug("监控任务被取消") + raise + except Exception as e: + logging.error(f"监控任务错误: {str(e)}") + raise + + # 启动初始化和监控任务 + init_task = asyncio.create_task(session.initialize()) + monitor_task = asyncio.create_task(monitor()) + + try: + # 等待任务完成 + done, pending = await asyncio.wait( + [init_task, monitor_task], + return_when=asyncio.FIRST_COMPLETED + ) + + # 取消还在运行的任务 + for task in pending: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # 检查完成的任务是否有异常 + for task in done: + await task + + # 返回监控任务引用 + return monitor_task + + except Exception as e: + # 清理任务 + for task in [init_task, monitor_task]: + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + raise + + except Exception as e: + logging.error(f"启动容器监控失败: {str(e)}") + raise + + def _register_cleanup_callback(self, monitor_task): + """注册清理回调函数""" + def cleanup(): + """清理所有资源""" + try: + # 清理监控任务 + if monitor_task and not monitor_task.done(): + monitor_task.cancel() + except Exception as cleanup_error: + logging.warning(f"清理过程中出现错误: {str(cleanup_error)}") + + self.exit_stack.callback(cleanup) + + def search_for_container(self): + try: + container = self.client.containers.get(self.container_name) + if container.status != "running": + raise RuntimeError(f"容器{self.container_name}未处于运行状态,当前状态: {container.status}") + except NotFound: + raise RuntimeError(f"容器{self.container_name}不存在") + except APIError as e: + raise RuntimeError(f"Docker API错误: {str(e)}") + + async def list_tools(self) -> list[Any]: + """列出可用工具""" + if not self.session: + raise RuntimeError(f"Server {self.name} not initialized") + + tools_response = await self.session.list_tools() + tools = [] + + for item in tools_response: + if isinstance(item, tuple) and item[0] == "tools": + tools.extend(Tool(tool.name, tool.description, tool.inputSchema) for tool in item[1]) + + return tools + + async def execute_tool( + self, + tool_name: str, + arguments: dict[str, Any], + retries: int = 1, + delay: float = 1.0, + ) -> list[Any]: + """执行工具""" + if not self.session: + raise RuntimeError(f"Server {self.name} not initialized") + + attempt = 0 + while attempt < retries: + try: + logging.info(f"Executing {tool_name}...") + result = await self.session.call_tool(tool_name, arguments) + tool_result = [] + for rc in result.content: + if rc.type == "text": + if '{' and '}' in rc.text: + try: + # 假设parse_evaluation_json函数存在 + rc_text_json = parse_evaluation_json(rc.text) + tool_result.append(rc_text_json) + except: + tool_result.append(rc.text) + else: + tool_result.append(rc.text) + elif rc.type == "image": + logging.warning("Image result is not supported yet") + elif rc.type == "resource": + logging.warning("Resource result is not supported yet") + return tool_result + except Exception as e: + attempt += 1 + logging.warning(f"Error executing tool: {e}. Attempt {attempt} of {retries}.") + if attempt < retries: + await asyncio.sleep(delay) + else: + logging.error("Max retries reached. Failing.") + raise + + async def _force_kill_docker_container_async(self) -> bool: + """异步强制终止Docker容器""" + if not self.container_name: + return True + + loop = asyncio.get_event_loop() + try: + # 使用线程池执行同步的docker命令 + result = await loop.run_in_executor( + None, + lambda: subprocess.run( + ["docker", "kill", self.container_name], + capture_output=True, + text=True, + timeout=10 + ) + ) + + if result.returncode == 0: + logging.info(f"成功强制终止容器: {self.container_name}") + return True + else: + logging.warning(f"终止容器失败: {result.stderr}") + return False + + except Exception as e: + logging.error(f"强制终止容器出错: {e}") + return False + + async def cleanup(self) -> None: + """清理服务器资源""" + async with self._cleanup_lock: + if self._is_cleaning_up: + # 等待之前的清理完成 + await self._cleanup_completed.wait() + return + + self._is_cleaning_up = True + self._cleanup_completed.clear() + + try: + logging.info(f"开始清理服务器 {self.name}") + + # 1. 标记为未初始化 + self._is_initialized = False + + # 2. 如果是Docker模式,先强制终止容器 + if self.use_docker and self.container_name: + success = await self._force_kill_docker_container_async() + if success: + # 从注册表中移除 + self._registry.unregister_container(self.container_name) + await asyncio.sleep(0.5) + + self.session = None + self.stdio_context = None + + try: + await self.exit_stack.aclose() + logging.debug("exit_stack清理完成") + except Exception as e: + logging.warning(f"exit_stack清理出错: {e}") + finally: + self.exit_stack = AsyncExitStack() + + self.docker_process = None + self.container_id = None + if self.use_docker: + self.container_name = None # 重置容器名,允许下次重新创建 + + logging.info(f"服务器 {self.name} 清理完成") + + except Exception as e: + logging.error(f"清理过程出错: {e}") + finally: + self._is_cleaning_up = False + self._cleanup_completed.set() + + async def wait_for_cleanup(self) -> None: + """等待清理完成""" + if self._is_cleaning_up: + await self._cleanup_completed.wait() + + def is_ready_for_reuse(self) -> bool: + """检查是否可以重新使用""" + return not self._is_cleaning_up and not self._is_initialized + + def __del__(self): + """析构函数,确保Docker容器被清理""" + if self.use_docker and self.container_name: + try: + subprocess.run( + ["docker", "kill", self.container_name], + capture_output=True, + timeout=5 + ) + # 从注册表中移除 + self._registry.unregister_container(self.container_name) + except: + pass + +class Tool: + """Represents a tool with its properties and formatting.""" + + def __init__( + self, + name: str, + description: str, + input_schema: dict[str, Any], + title: str | None = None, + ) -> None: + self.name: str = name + self.title: str | None = title + self.description: str = description + self.input_schema: dict[str, Any] = input_schema + + def format_for_llm(self) -> str: + """Format tool information for LLM. + + Returns: + A formatted string describing the tool. + """ + args_desc = [] + if "properties" in self.input_schema: + for param_name, param_info in self.input_schema["properties"].items(): + arg_desc = f"- {param_name}: {param_info.get('description', 'No description')}" + if param_name in self.input_schema.get("required", []): + arg_desc += " (required)" + args_desc.append(arg_desc) + + # Build the formatted output with title as a separate field + output = f"Tool: {self.name}\n" + + # Add human-readable title if available + if self.title: + output += f"User-readable title: {self.title}\n" + + output += f"""Description: {self.description} +Arguments: +{chr(10).join(args_desc)} +""" + + return output \ No newline at end of file diff --git a/src/client/Session.py b/src/client/Session.py new file mode 100644 index 0000000000000000000000000000000000000000..87f014198133c339ec208b639a1e750c93859079 --- /dev/null +++ b/src/client/Session.py @@ -0,0 +1,148 @@ +import json +import logging +import os +from typing import Any +from dotenv import load_dotenv +from ..llm.LLM import LLMClient +from .MCPClient import MCPClient +from ..utils.parse_json import parse_evaluation_json + +# Configure logging +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + + +class Configuration: + """Manages configuration and environment variables for the MCP client.""" + + def __init__(self) -> None: + """Initialize configuration with environment variables.""" + self.load_env() + self.api_key = os.getenv("LLM_API_KEY") + + @staticmethod + def load_env() -> None: + """Load environment variables from .env file.""" + load_dotenv() + + @staticmethod + def load_config(file_path: str) -> dict[str, Any]: + """Load server configuration from JSON file. + + Args: + file_path: Path to the JSON configuration file. + + Returns: + Dict containing server configuration. + + Raises: + FileNotFoundError: If configuration file doesn't exist. + JSONDecodeError: If configuration file is invalid JSON. + """ + with open(file_path, "r") as f: + return json.load(f) + + @property + def llm_api_key(self) -> str: + """Get the LLM API key. + + Returns: + The API key as a string. + + Raises: + ValueError: If the API key is not found in environment variables. + """ + if not self.api_key: + raise ValueError("LLM_API_KEY not found in environment variables") + return self.api_key + + +class ChatSession: + def __init__(self, server: MCPClient, llm_client: LLMClient) -> None: + self.server = server + self.llm_client = llm_client + + async def process_llm_response(self, llm_response: str) -> str: + """Process the LLM response and execute tools if needed. + + Args: + llm_response: The response from the LLM. + + Returns: + The result of tool execution or the original response. + """ + tool_info = { + "tool_name": "", + "arguments": {}, + } + try: + tool_call = parse_evaluation_json(llm_response) + if tool_call and "tool" in tool_call and "arguments" in tool_call: + print(f"Executing tool: {tool_call['tool']}") + print(f"With arguments: {tool_call['arguments']}") + tool_info["tool_name"] = tool_call["tool"] + tool_info["arguments"] = tool_call["arguments"] + + tools = await self.server.list_tools() + if any(tool.name == tool_call["tool"] for tool in tools): + try: + result = await self.server.execute_tool(tool_call["tool"], tool_call["arguments"]) + result_str = f"{result}" + if len(result_str) > 500: + logging.info(f"The output of tool execution is too long. Only show part of it: {result[:400]}... {result[-100:]}") + else: + logging.info(f"Tool execution result: {result}") + return tool_info, f"Tool execution result: {result}" ###这里有问题,把输出变成str了 + except Exception as e: + error_msg = f"Error executing tool: {str(e)}" + print(error_msg) + return tool_info, error_msg + + return tool_info, f"No server found with tool: {tool_call['tool']}" + return tool_info, f"tool call json decode error: {llm_response}" + except json.JSONDecodeError: + return tool_info, llm_response + + async def handle_query(self, query) -> None: + all_tools = [] + # for server in self.servers: + tools = await self.server.list_tools() + all_tools.extend(tools) + + tools_description = "\n".join([tool.format_for_llm() for tool in all_tools]) + + system_message = f"""You are a helpful assistant with access to these tools: +{tools_description} +Choose the appropriate tool based on the user's question. If no tool is needed, reply directly. +IMPORTANT: When you need to use a tool, you must ONLY respond with the exact JSON object format below, nothing else: +```json +{{ + "tool": "tool-name", + "arguments": {{ + "argument-name": "value" + }} +}} +``` +After receiving a tool's response: +1. Transform the raw data into a natural, conversational response +2. Keep responses concise but informative +3. Focus on the most relevant information +4. Use appropriate context from the user's question +5. Avoid simply repeating the raw data + +Please use only the tools that are explicitly defined above.""" + + + messages = [{"role": "system", "content": system_message}] + messages.append({"role": "user", "content": "User query:"+query}) + + llm_response = self.llm_client.get_response(messages) + print("\nAssistant: %s", llm_response) + + tool_info, tool_result = await self.process_llm_response(llm_response) + tool_included_or_not = True if tool_result != llm_response else False + if tool_included_or_not: + return tool_included_or_not, tool_info, tool_result + else: + return tool_included_or_not, tool_info, 'No tool was used, here is the direct response: '+ llm_response + +