# csagent-framework **Repository Path**: liuxiang13/csagent-framework ## Basic Information - **Project Name**: csagent-framework - **Description**: agent framework - **Primary Language**: Python - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2025-07-24 - **Last Updated**: 2025-08-11 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # CSAgent - 智能对话Agent框架
![Python](https://img.shields.io/badge/Python-3.8+-blue.svg) ![Framework](https://img.shields.io/badge/Framework-LangGraph-green.svg) ![License](https://img.shields.io/badge/License-MIT-yellow.svg) **基于图执行引擎的企业级智能Agent开发框架** [特性](#-特性) • [安装](#-安装) • [快速开始](#-快速开始) • [API文档](#-api文档) • [架构](#-架构设计)
## 📋 项目简介 CSAgent 是一个灵活、可扩展的智能对话Agent框架,采用**基于图的执行引擎**设计,支持构建复杂的多轮对话、知识问答、工作流自动化等AI应用。 与dify/coze定位的区别: dify和coze是开发agent应用的低代码平台 本框架是面向研发的轻量级agent服务开发框架 ### 🎯 特性 - **📊 图式架构**: 基于LangGraph的状态图管理,支持复杂的执行流程 - **🔌 节点化设计**: 可插拔的节点系统,支持自定义业务逻辑 - **🌊 流式响应**: 实时流式输出,提供流畅的对话体验 - **🧠 智能记忆**: 支持短期(Redis)和长期(ElasticSearch)记忆系统 - **⚙️ 配置驱动**: 通过YAML配置文件快速定义Agent行为 - **🔄 并发执行**: 支持节点并行处理,提升响应效率 - **💻 统一服务**: 提供统一的异步http-server ## 🚀 快速开始 ### 环境要求 - Python 3.8+ - langgraph==0.5.1 tornado==6.5.1 - cookiecutter - Redis (可选,用于记忆功能) - ElasticSearch (可选,用于长期记忆) ### 1. 环境准备 ```bash # 安装脚手架工具和关键依赖 pip install cookiecutter # 安装核心依赖 pip install langgraph==0.5.1 tornado==6.5.1 ``` ### 2. 生成app ```bash # 生成的app会放在apps目录下 cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps # 第一步:填入你的app name, 如:hello_world [1/2] project_name (your app name): hello_world # 第二步: 填入你的app agent配置路径(不填默认./conf/agent/hello_world.yaml) [2/2] agent_yaml (./conf/agent/hello_world.yaml): ``` ![alt text](images/build.png) ### 3. 启动服务 ```bash cd apps/hello_world/ python3 bin/http_server.py --port=8901 ``` ### 4. 测试服务 ```bash curl --location '127.0.0.1:8901/api/hello_world/v1/chat' \ --header 'Content-Type: application/json' \ --data '{ "session_id": "1", "log_id":"vcbfbg349984590hohn", "user_id": "7c096c176e614ebaaa00457d86c26379", "messages": [ { "role": "user", "content": "你好" } ], "params": { } }' ``` ```bash { "code": 0, "err_msg": "", "user_id": "7c096c176e614ebaaa00457d86c26379", "session_id": "1", "log_id": "vcbfbg349984590hohn", "is_delta": false, "messages": [ { "role": "user", "content": "你好" }, { "role": "assistant", "content": "hello world" } ], "extra": { "用户任意定义的key": "hello world" } } ``` ## 📡 API文档 ### 健康检查 ```http GET /api/{{app-name}}/health ``` **响应:** ```json {"code": 0} ``` ### agent服务接口 ```http POST /api/{{app-name}}/v1/chat Content-Type: application/json ``` **请求参数:** ```json { "user_id": "", # 用户id, 必填 "session_id": "", # 会话id, 必填 "log_id":"", # logid, 必填 "messages": [ {"role": "user", "content": "高血压有什么症状?"} ], # 对话列表,规范与openai大模型对话一致, 非必填 "stream": false, # 是否sse流式返回, 非必填, 默认false "params":{ } # 用户自定义的其他请求参数, 根据业务场景自行定义,非必填 } ``` **响应:** ```json { "code": 0, # 错误码, 0-OK, 非0-异常错误 "err_msg": "", # 错误信息 "user_id": "", # 用户id "session_id": "", # 会话id "is_delta": false, # 是否为流式模式的增量返回结果 "messages": [ {"role": "user", "content": "高血压有什么症状?"}, {"role": "assistant", "content": "高血压的主要症状包括..."} ], # 对话列表,规范与openai大模型对话一致 "extra": {} # 返回结果的额外参数, 根据业务场景自行定义 } ``` ### 流式响应 设置 `"stream": true` 启用流式响应: ```bash curl -X POST http://localhost:8901/api/{{app-name}}/v1/chat \ -H "Content-Type: application/json" \ -d '{ "user_id": "", # 用户id, 必填 "session_id": "", # 会话id, 必填 "log_id":"", # logid, 必填 "messages": [ {"role": "user", "content": "高血压有什么症状?"} ], # 对话列表,规范与openai大模型对话一致, 非必填 "stream": true, # 是否sse流式返回, 非必填, 默认false "params":{ } # 用户自定义的其他请求参数, 根据业务场景自行定义,非必填 }' ``` **流式响应格式:** ``` data: {"code": 0, "is_delta": true, "messages": [...], ...} data: {"code": 0, "is_delta": true, "messages": [...], ...} ... ``` ## 🏗️ 架构设计 ### 系统架构 ``` ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ HTTP Client │───▶│ Tornado Web │───▶│ AgentService │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ ┌─────────────────┐ ▼ │ Memory Sys │ ┌─────────────────┐ │ ┌─────────────┐│ │ CSAgent Core │ │ │ Redis ││◀───┤ │ │ │(Short Term) ││ │ ┌──────────────┐ │ └─────────────┘│ │ │ LangGraph │ │ ┌─────────────┐│ │ │ State Engine │ │ │ElasticSearch││ │ └──────────────┘ │ │(Long Term) ││ └─────────────────┘ │ └─────────────┘│ │ └─────────────────┘ ▼ ┌─────────────────┐ │ Node Graph │ │ ┌─────┐ ┌─────┐ │ │ │ RAG │▶│ LLM │ │ │ └─────┘ └─────┘ │ └─────────────────┘ ``` ### 项目结构 ``` {{app-name}}/ ├── csagent/ # 核心agent构建框架 │ └── core/ │ ├── agent.py # Agent主类 │ ├── builder/ # agent构建器 │ ├── node/ # 基础节点定义 │ └── config/ # 配置管理 ├── src/ # 业务实现 │ ├── bootstrap/ # 启动初始化 │ ├── controller/ # HTTP控制器 │ ├── common/ # 通用组建 │ ├── dao/ # 数据资源访问 │ ├── error/ # 错误定义 │ ├── resource/ # 资源定义, 数据资源/服务资源等 │ ├── service/ # 业务服务层 │ └── node/ # 自定义节点 │ └── demo_stream/ # demo_stream agent │ ├── rag.py # 知识检索节点 │ └── llm.py # LLM推理节点 ├── conf/ # 配置文件 │ ├── agent/ # Agent配置 │ ├── node/ # agent节点自定义配置 │ └── log.yaml # 日志配置 ├── bin/ # 启动脚本 │ └── http_server.py # HTTP服务器 └── requirements.txt # 依赖 ``` ### 执行流程(示例) ```mermaid graph TD A[用户请求] --> B[AgentHandler] B --> C[AgentService] C --> D[Agent.run] D --> E[GraphBuilder] E --> F[START节点] F --> G[RAG节点
知识检索] G --> H[LLM节点
AI生成] H --> I[END节点] I --> J[返回响应] K[Memory System] --> D L[Config System] --> E style G fill:#e1f5fe style H fill:#f3e5f5 style K fill:#fff3e0 ``` ## ⚙️ 配置说明 ### Agent配置 ```yaml name: agent_name # desc: # agent描述 memory: # 记忆配置 short_term: # 短期记忆配置 read: false # 是否读取短期记忆, 默认关闭 write: false # 是否写入短期记忆, 默认关闭 ttl: 604800 # 可选,短期记忆ttl,默认7天,0表示永久 long_term: # 长期记忆配置,当前长期记忆使用在线用户画像,暂不支持agent写入长期记忆 read: false # 是否读取长期记忆, 默认关闭 graph: # agent执行图配置 node: # 节点定义 - name: rag # 节点名称 desc: 知识库搜索 # 节点描述 conf: "./conf/node/rag.yaml" # 节点自定义配置, 可选, 若配置则文件必须存在 instance: "src.node.demo_stream.rag.Node" # 节点实例化路径 - name: llm desc: 大模型调用 instance: "src.node.demo_stream.llm.Node" edge: # fanout边定义, 必填 START: # 必填,START为保留字,表示图起点 - name: rag # 必填,目的节点,表示START到rag存在一条边,即rag是一个起始节点 rag: # 必填,起始节点名称 - name: llm # 必填,目的节点 ``` ### 节点路由配置 #### 固定串行路由 ```yaml graph: node: - name: node1 ... - name: node2 ... edge: START: - name: node1 node1: # 必填,起始节点名称 - name: node2 # 必填,目的节点 ``` ```mermaid graph TD A[START] --> B[node1] B --> C[node2] C --> D[END] ``` #### 固定并行路由 ```yaml graph: node: - name: node1 ... - name: node2 ... - name: node3 ... edge: START: - name: node1 node1: # 必填,起始节点名称 - name: node2 # 必填,目的节点 - name: node3 ``` ```mermaid graph TD A[START] --> B[node1] B --> C[node2] B --> D[node3] C --> E[END] D --> E ``` #### 条件路由 ```yaml graph: node: - name: node1 ... - name: node2 ... edge: START: - name: node1 node1: # 必填,起始节点名称 - name: node2 # 必填,目的节点 condition: "'messages' in node_context and len(node_context['messages'])>0 and len(node_context['messages'][-1].get('tool_calls',[]))>0" # 条件定义,上下文变量来自(##-节点上下文AgentState) - name: END # END为配置保留字,表示终止节点 condition: default # 选填,default表示没有其他其他节点可路由时的默认路由节点(为了方便配置使用) node2: - name: node1 ``` ```mermaid graph TD A[START] --> B[node1] B -."condition".-> C[node2] B -."condition".-> D[END] C --> B ``` #### map-reduce路由(并行处理) ```yaml graph: node: - name: mapper ... - name: reducer ... - name: aggregation ... edge: START: - name: mapper mapper: - name: reducer parallel: # 并行处理相关配置, 适用场景例子:报告解读中对报告进行分片并行处理 enable: True # 开关 batch: 3 # 每个并行执行流的batch reducer: - name: aggregation ``` ```mermaid graph TD A[START] --> B[mapper] B --"切片并行处理(map-reduce)"--> C[reducer] C --> D[aggregation] D --> E[END] ``` ### 节点上下文AgentState `AgentState包含agent执行过程中所有的上下文信息, 包括中间结果,记忆等,每个节点都可以更新状态,获取其他节点更新的状态,并且状态更新是并行安全的,定义如下:` ```python class AgentState(TypedDict): code: Annotated[int, replace] err_msg: Annotated[str, replace] debug: Annotated[bool, replace] log_id: Annotated[str, replace] session_id: Annotated[str, replace] user_id: Annotated[str, replace] stream: Annotated[bool, replace] is_delta: Annotated[bool, replace] messages: Annotated[List[Dict[str, Any]], replace] short_term_memory: Annotated[Dict[str, Any], deep_update] long_term_memory: Annotated[Dict[str, Any], deep_update] params: Annotated[Dict[str, Any], deep_update] extra: Annotated[Dict[str, Any], deep_update] node_context: Annotated[Dict[str, Any], deep_update] parallel_info: Annotated[Dict[str, Any], deep_update] ``` `各字段定义:` `code`: agent执行过程中的状态码,对应最终response中的code,若执行过程中出现异常,业务节点可设置成预定义错误码后框架会直接返回异常 `err_msg`: agent执行过程中的错误信息 `debug`: 是否为debug状态,对应request中的debug字段 `log_id`: log_id,对应request中的log_id字段 `session_id`: 会话id,对应request中的session_id字段 `user_id`: 用户id,对应request中的user_id字段 `stream`: 是否流式,对应request中的stream字段 `is_delta`: 是否为流式增量返回的结果,对应response中的is_delta字段。一个使用场景是同一个接口可能同时支持流式或非流式,比如一个意图后面接问答(流式)或推荐(非流式),此时接口统一走流式,上层不做处理的话返回结果为:问答(delta1,delta2,...完整结果finalres),推荐(完整结果finalres)。业务层需要根据不同的意图来区分返回给前端渲染的结果,问答意图可能需要丢弃final_res(因为前端已经根据所有的delta内容全部渲染完成)。 `messages`: agent对话列表,规范同openai大模型对话规范。原始对话内容从request中传入,回答结果append到列表中作为response返回。`messages不支持多个节点并行写入!` `short_term_memory`: 短期记忆,若开启短期记忆功能框架会将记忆对应字段放到里面 `long_term_memory`: 长期记忆,若开启长期记忆功能框架会将记忆对应字段放到里面 `params`: 额外的请求参数,对应request中的params字段,除了对话列表外业务层可自定义额外的请求参数,框架会透传给节点 `extra`: 额外的响应参数,对应response中的extra字段,除了对话列表外业务层可自定义额外的响应参数,框架会透传给调用方 `node_context`: 框架为节点预留的上下文字段,作为各个节点执行过程中内部信息的传递,对调用者屏蔽,`node_context支持多个节点并行写入`。 `parallel_info`: 框架预留字段用来支持mapreduce功能,mapper节点切分的分片信息必须写入该字段。 ### 自定义节点 ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" # 若节点需要进行某些初始化工作,则实现该函数,否则忽略 # self.conf为node配置中conf文件的内容,若无配置则为{} # self.node_config为节点的元信息配置 print(self.conf) print(f'{self.node_config.name}初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: """返回问候语 Args: state: agent执行状态上下文 stream_writer: 流式输出writer,可以支持流式输出任意中间结果 Returns: 更新后的agent执行状态上下文 """ print(f"{self.node_config.name}开始执行") # 根据agent状态上下文进行计算 messages = state['messages'] params = state['params'] short_term_memory = state['short_term_memory'] long_term_memory = state['long_term_memory'] # 模拟节点处理过程 time.sleep(1) state['messages'].append({"role":"assistant", "content":"hello"}) print(f"{self.node_config.name}执行完成") return state ``` ## 🔧 开发指南 ### 添加你自己的agent配置 路径一般为`conf/agent/your_agent.yaml` ### 添加新节点 1. **创建节点文件** `src/node/your_module/your_node.py` 2. **实现节点类** 继承 `BaseNode` 3. **配置agent** 在 `conf/agent/your_agent.yaml` 中添加节点和路由配置 4. **测试节点** 运行测试验证功能 ### 修改初始化时设置的agent配置 修改src.bootstrap.bootstrap.py中的代码 ```python service.agent_service = AgentService(agent_yaml="{{cookiecutter.agent_yaml}}") ``` ### 配置记忆功能 ```bash # 配置redis环境变量 (短期记忆) export REDIS_ADDR="your redis addr". export REDIS_PORT="your redis port" export REDIS_PASSWORD="your redis password" # 配置记忆生成大模型信息 (若开启短期记忆生成) export CS_MEMORY_API_KEY="your llm api-key" export CS_MEMORY_BASE_URL="your llm api baseurl" export CS_MEMORY_MODEL="your llm model id" # 配置长期记忆读取es地址 (当前长期记忆来自在线存储的用户画像) export ES_HOST="http://xxx:9200" export ES_USER="your es user" export ES_PASSWORD="your es password" # 更新Agent配置启用记忆 memory: short_term: read: true write: true ttl: 604800 # 7天 long_term: read: true ``` ### 短期记忆使用 前提:开启了短期记忆功能。 使用方式:直接读取agent state ```python class Node(BaseNode): def execute(self, state: AgentState, stream_writer=None) -> AgentState: short_term_memory = state['short_term_memory] print(f"短期记忆:{short_term_memory}") return state ``` ### 自定义短期记忆实现 实现方式: 1. agent记忆配置中设置strategy 2. 实现strategy,实现方式参考csagent/utils.py:default_short_memory ```yaml memory: short_term: read: True # 可选,默认False, agent开始执行时是否读取memory,同步 write: True # 可选,默认False,agent执行成功后是否更新memory,异步 strategy: "src.memory.short_memory" #可选,不填使用默认策略 ``` ### 长期记忆使用 前提:开启了长期记忆功能。 使用方式:直接读取agent state ```python class Node(BaseNode): def execute(self, state: AgentState, stream_writer=None) -> AgentState: long_term_memory = state['long_term_memory] print(f"长期记忆:{long_term_memory}") return state ``` ### 长期记忆更新(暂不支持) ## 🔧 应用开发示例 ### react(deepresearch) agent配置: ```yaml name: react # 必填,agent名字 desc: 具备react机制的agent,循环调用llm和工具 # 必填,agent详细描述 version: "1.0" # 选填,agent版本 graph: # agent的执行图 node: # 图节点 - name: llm # 必填,节点名称 desc: 执行大模型调用 # 必填, 节点描述 instance: "src.node.react.llm.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 - name: tools desc: 基于大模型调用结果执行工具调用 instance: "src.node.react.tools.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 edge: # fanout START: # 必填,START为保留字,表示图起点 - name: llm # 必填,目的节点,表示START到llm存在一条边,即llm是一个起始节点 llm: # 必填,起始节点名称 - name: tools # 必填,目的节点 condition: "'messages' in node_context and len(node_context['messages'])>0 and len(node_context['messages'][-1].get('tool_calls',[]))>0" # 选填,表示从起始节点到目的节点的路由条件,支持表达式计算,其中使用的字段均来自于图计算过程中使用的上下文 - name: END # END为结束节点的保留字,用户不可用END命名自己的节点 condition: default # 选填,default表示没有其他其他节点可路由时的默认路由节点 tools: - name: llm #表示tools到llm有一条固定边 ``` `src.node.react.llm.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState from openai import OpenAI import time import copy class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print(f"{self.node_config.name}初始化成功") def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"{self.node_config.name}开始执行") query = state['messages'][-1]['content'] print(f"query:{query}") # 构造llm request with tools # 模拟大模型调用 time.sleep(3) # 中间的tool call过程不放到主message列表中,所以单独copy了一份出来放在了node_context里 if "messages" not in state["node_context"]: state["node_context"]["messages"] = copy.deepcopy(state['messages']) # 下面为模拟了一次工具调用过程 if len(state["node_context"]["messages"]) == 1: state["node_context"]["messages"].append( { "role":"assistant", "content":'', "tool_calls":[ { "id": "call_6596dafa2a6a46f7a217da", "function": { "arguments": "{\"query\": \"xxxx\"}", "name": "search_web" }, "type": "function", "index": 0 } ] } ) else: # 已经执行了一次工具调用,返回的最终结果放到主messages中 state['messages'].append( { "role":"assistant", "content":'这是大模型的最终回答', } ) return state ``` `src.node.react.tools.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState from openai import OpenAI import time import os import copy class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print(f"{self.node_config.name}初始化成功") def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"{self.node_config.name}开始执行") tool_call = state["node_context"]["messages"][-1]['tool_calls'][0] function_name = tool_call['function']['name'] args = tool_call['function']['arguments'] print(f"执行{function_name}工具调用,args:{args}") #模拟大模型调用过程 time.sleep(1) state["node_context"]["messages"].append( { "role":"tool", "content":'{\"res\":\"call res\"}', "tool_call_id":tool_call['id'] } ) return state ``` app构建: ```bash cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps [1/2] project_name (your app name): react [2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/react.yaml ``` request: ```bash curl --location '127.0.0.1:8901/api/react/v1/chat' \ --header 'Content-Type: application/json' \ --data '{ "session_id": "1", "log_id":"vcbfbg349984590hohn", "user_id": "7c096c176e614ebaaa00457d86c26379", "messages": [ { "role": "user", "content": "你好" } ], "params": { } }' ``` response: ```json { "code": 0, "err_msg": "", "user_id": "7c096c176e614ebaaa00457d86c26379", "session_id": "1", "log_id": "vcbfbg349984590hohn", "is_delta": false, "messages": [ { "role": "user", "content": "你好" }, { "role": "assistant", "content": "这是大模型的最终回答" } ], "extra": {} } ``` ### 并行健康推荐 agent配置: ```yaml name: health_recommend # 必填,agent名字 desc: 健康推荐agent,基于用户输入和用户画像推荐配餐和药物 # 必填,agent详细描述 version: "1.0" # 选填,agent版本 graph: # agent的执行图 node: # 图节点 - name: user_feature # 必填,节点名称 desc: 获取并计算用户画像特征 # 必填, 节点描述 instance: "src.node.health_recommend.user_feature.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 - name: drug_recommend desc: 药物推荐 instance: "src.node.health_recommend.drug_recommend.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 - name: food_recommend desc: 食物推荐 instance: "src.node.health_recommend.food_recommend.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 - name: merge desc: 多路推荐融合 instance: "src.node.health_recommend.merge.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 edge: # fanout START: # 必填,START为保留字,表示图起点 - name: user_feature # 必填,目的节点,表示START到user_feature存在一条边,即user_feature是一个起始节点 user_feature: # 必填,起始节点名称 - name: drug_recommend # 必填,目的节点 - name: food_recommend # 必填,目的节点 drug_recommend: - name: merge food_recommend: - name: merge ``` `src.node.health_recommend.user_feature.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print('user feature Node初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"[用户画像]user feature node开始执行") query = state['messages'][-1]['content'] print(f"[用户画像]query:{query}") print("[用户画像]画像生成中") time.sleep(1) state['node_context']['user_feature'] = "用户画像" print(f"[用户画像]user feature node执行完成") return state ``` `src.node.health_recommend.drug_recommend.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print('[药品推荐]drug recommend Node初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"[药品推荐]drug recommend node开始执行") query = state['messages'][-1]['content'] print(f"[药品推荐]query:{query}") print(f"[药品推荐]用户画像:{state['node_context']['user_feature']}") print("[药品推荐]药物推荐中......") time.sleep(1) state['node_context']['drug_recommend'] = "阿莫西林" print(f"[药品推荐]drug recommend node执行完成") return state ``` `src.node.health_recommend.food_recommend.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print('food recommend Node初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"[食品推荐]food recommend node开始执行") query = state['messages'][-1]['content'] print(f"[食品推荐]query:{query}") print(f"[食品推荐]用户画像:{state['node_context']['user_feature']}") print("[食品推荐]食物推荐中......") time.sleep(1) state['node_context']['food_recommend'] = "宫保鸡丁" print(f"[食品推荐]food recommend node执行完成") return state ``` `src.node.health_recommend.merge.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print('[推荐融合]merge Node初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"[推荐融合]merge node开始执行") query = state['messages'][-1]['content'] print("[推荐融合]推荐结果聚合中......") time.sleep(1) state['extra']['recommend'] = {} state['extra']['recommend']['drug'] = state['node_context']['drug_recommend'] state['extra']['recommend']['food'] = state['node_context']['food_recommend'] print(f"[推荐融合]merge node执行完成") return state ``` app构建: ```bash cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps [1/2] project_name (your app name): health_recommend [2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/health_recommend.yaml ``` request: ```bash curl --location '127.0.0.1:8901/api/health_recommend/v1/chat' \ --header 'Content-Type: application/json' \ --data '{ "session_id": "1", "log_id":"vcbfbg349984590hohn", "user_id": "7c096c176e614ebaaa00457d86c26379", "messages": [ { "role": "user", "content": "我感冒了" } ], "params": { # 也可将参数放在这里,根据需要自行定义 } }' ``` response: ```json { "code": 0, "err_msg": "", "user_id": "7c096c176e614ebaaa00457d86c26379", "session_id": "1", "log_id": "vcbfbg349984590hohn", "is_delta": false, "messages": [ { "role": "user", "content": "我感冒了" } ], "extra": { # 这里将推荐结果放在了extra, 也可根据实际需求放在message列表 "recommend": { "drug": "阿莫西林", "food": "宫保鸡丁" } } } ``` ### map-reduce(切分报告解读) agent配置: ```yaml name: report_analysis # 必填,agent名字 desc: 体检报告解读,指标抽取,指标分级 # 必填,agent详细描述 version: "1.0" # 选填,agent版本 graph: # agent的执行图 node: # 图节点 - name: doc_trunk desc: 基于某些策略对文档进行分片 instance: "src.node.report_analysis.doc_trunk.Node" - name: doc_analysis desc: 解析 instance: "src.node.report_analysis.doc_analysis.Node" - name: aggregation desc: 聚合 instance: "src.node.report_analysis.aggregation.Node" edge: # fanout START: # 必填,START为保留字,表示图起点 - name: doc_trunk # 必填,目的节点 doc_trunk: # 必填,起始节点名称 - name: doc_analysis # 必填,目的节点 parallel: # 并行处理相关配置 enable: True # 开关 batch: 3 # 每个并行执行流的batch doc_analysis: - name: aggregation ``` `src.node.report_analysis.doc_trunk.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print(f"{self.node_config.name} Node初始化成功") def _parse_doc(self, url): time.sleep(1) markdown = '章节内容1\n章节内容2\n章节内容3\n章节内容4\n章节内容5\n章节内容6\n章节内容7\n章节内容8' return markdown def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"{self.node_config.name} node开始执行") doc_url = state['params']['url'] markdown = self._parse_doc(doc_url) print(f"markdown:{markdown}") trunk_list = markdown.split('\n') for index, item in enumerate(trunk_list): # 需要存在特定的parallel_info字段中,kv格式 state['parallel_info'][str(index)] = {'doc_trunk':item} print(f"{self.node_config.name} node执行完成") return state ``` `src.node.report_analysis.doc_analysis.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print(f"{self.node_config.name} Node初始化成功") def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"{self.node_config.name} node开始执行") # parallel_info中存在batch_size个trunk parallel_info = state['parallel_info'] print(f"parallel_info:{parallel_info}") print("文档解读中") time.sleep(1) for id, item in parallel_info.items(): item['result'] = "解读结果" print(f"{self.node_config.name} node执行完成") return state ``` `src.node.report_analysis.aggregation.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import copy import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print(f"{self.node_config.name} Node初始化成功") def execute(self, state: AgentState, stream_writer) -> AgentState: print(f"{self.node_config.name} node开始执行") parallel_info = state['parallel_info'] print(f"parallel_info:{parallel_info}") print("文档解读结果聚合中") time.sleep(1) state['extra']['doc_analysis_res'] = [] for id, item in parallel_info.items(): state['extra']['doc_analysis_res'].append(item) return state ``` app构建: ```bash cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps [1/2] project_name (your app name): report_analysis [2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/report_analysis.yaml ``` request: ```bash curl --location '127.0.0.1:8901/api/report_analysis/v1/chat' \ --header 'Content-Type: application/json' \ --data '{ "session_id": "1", "log_id":"vcbfbg349984590hohn", "user_id": "7c096c176e614ebaaa00457d86c26379", "messages": [ ], "params": { "url":"Xxx" } }' ``` response: ```json { "code": 0, "err_msg": "", "user_id": "7c096c176e614ebaaa00457d86c26379", "session_id": "1", "log_id": "vcbfbg349984590hohn", "is_delta": false, "messages": [], "extra": { "doc_analysis_res": [ { "doc_trunk": "章节内容1", "result": "解读结果" }, { "doc_trunk": "章节内容2", "result": "解读结果" }, { "doc_trunk": "章节内容3", "result": "解读结果" }, { "doc_trunk": "章节内容4", "result": "解读结果" }, { "doc_trunk": "章节内容5", "result": "解读结果" }, { "doc_trunk": "章节内容6", "result": "解读结果" }, { "doc_trunk": "章节内容7", "result": "解读结果" }, { "doc_trunk": "章节内容8", "result": "解读结果" } ] } } ``` ### 流式返回 agent配置: ```yaml name: demo_stream # 必填,agent名字 desc: 健康问答agent,回答一些健康科普问题 # 必填,agent详细描述 version: "1.0" # 选填,agent版本 graph: # agent的执行图 node: # 图节点 - name: rag # 必填,节点名称 desc: 知识库搜索 # 必填, 节点描述 instance: "src.node.demo_stream.rag.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 - name: llm desc: 大模型调用 instance: "src.node.demo_stream.llm.Node" # 必填,节点实现逻辑,Node实现必须遵循标准 edge: # fanout START: # 必填,START为保留字,表示图起点 - name: rag # 必填,目的节点,表示START到rag存在一条边,即rag是一个起始节点 rag: # 必填,起始节点名称 - name: llm # 必填,目的节点 ``` `src.node.demo_stream.rag.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState import time class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print('rag Node初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"rag node开始执行") query = state['messages'][-1]['content'] print(f"query:{query}") print("检索中") time.sleep(1) state['node_context']['evi'] = "检索到的证据" print(f"rag node执行完成") return state ``` `src.node.demo_stream.llm.py` ```python from csagent.core.node.base_node import BaseNode from csagent.core.context import AgentState from openai import OpenAI import time import os import copy class Node(BaseNode): def initialize(self): """初始化节点""" print(self.conf) print('llm Node初始化成功') def execute(self, state: AgentState, stream_writer=None) -> AgentState: print(f"llm node开始执行") query = state['messages'][-1]['content'] client = OpenAI( # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx", api_key=os.getenv("CS_API_KEY"), base_url=os.getenv("CS_BASE_URL"), ) start = int(time.time() * 1000) response = client.chat.completions.create( model="qwen-turbo", messages=[ {"role": "user", "content": query} ], stream=True # 关键参数,启用流式响应 ) all_content = "" #流式处理时记录的完整content for chunk in response: if chunk.choices[0].delta.content is not None: # 增量返回时必须copy一份数据 delta = copy.deepcopy(state) # 设置增量结果标记 delta['is_delta'] = True delta['messages'].append({"role":"assistant", "content":chunk.choices[0].delta.content}) # 流式返回增量结果 stream_writer(delta) all_content +=chunk.choices[0].delta.content end = int(time.time() * 1000) print(f"llm node cost:%d" % (end-start)) # 完整结果记录 state['messages'].append({"role":"assistant", "content":all_content}) return state app构建: ```bash cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps [1/2] project_name (your app name): demo_stream [2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/demo_stream.yaml ``` ``` request: ```bash curl --location '127.0.0.1:8901/api/demo_stream/v1/chat' \ --header 'Content-Type: application/json' \ --data '{ "session_id": "1", "log_id":"ytu6u56j6j76jj76j5g6", "user_id": "7c096c176e614ebaaa00457d86c26379", "stream":true, # stream设置为true "messages": [ { "role": "user", "content": "高血压的症状" } ], "params": { } }' ``` response: 请注意!! 流式返回结果中最后一个is_delta==false的片段代表完整的全量结果。 需要在http层根据实际业务场景考虑要不要过滤(代码位置在http服务层service.agent.py:run_stream函数中,见代码注释)或者调用方决定是否过滤。 ![alt text](./images/stream.png) ## 🤔 常见问题 ### Q: 启动时报"模块找不到"错误? A: 确保在项目根目录运行,Python路径会自动设置。 ### Q: 记忆功能连接失败? A: 记忆功能是可选的,可以在配置中禁用,或确保对应环境变量&Redis/ES服务正常运行。 ### Q: 如何添加新的业务逻辑? A: 创建新的节点类,在Agent配置中定义执行图即可。 ## 📄 许可证 MIT License - 详见 [LICENSE](LICENSE) 文件。 ## 🤝 贡献 欢迎提交 Issue 和 Pull Request! ## 📞 联系 如有问题,请在项目中提交 Issue。 ---
⭐ 如果这个项目对你有帮助,请给个 Star!⭐