# csagent-framework
**Repository Path**: liuxiang13/csagent-framework
## Basic Information
- **Project Name**: csagent-framework
- **Description**: agent framework
- **Primary Language**: Python
- **License**: Apache-2.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 2
- **Created**: 2025-07-24
- **Last Updated**: 2025-08-11
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# CSAgent - 智能对话Agent框架



**基于图执行引擎的企业级智能Agent开发框架**
[特性](#-特性) • [安装](#-安装) • [快速开始](#-快速开始) • [API文档](#-api文档) • [架构](#-架构设计)
## 📋 项目简介
CSAgent 是一个灵活、可扩展的智能对话Agent框架,采用**基于图的执行引擎**设计,支持构建复杂的多轮对话、知识问答、工作流自动化等AI应用。
与dify/coze定位的区别:
dify和coze是开发agent应用的低代码平台
本框架是面向研发的轻量级agent服务开发框架
### 🎯 特性
- **📊 图式架构**: 基于LangGraph的状态图管理,支持复杂的执行流程
- **🔌 节点化设计**: 可插拔的节点系统,支持自定义业务逻辑
- **🌊 流式响应**: 实时流式输出,提供流畅的对话体验
- **🧠 智能记忆**: 支持短期(Redis)和长期(ElasticSearch)记忆系统
- **⚙️ 配置驱动**: 通过YAML配置文件快速定义Agent行为
- **🔄 并发执行**: 支持节点并行处理,提升响应效率
- **💻 统一服务**: 提供统一的异步http-server
## 🚀 快速开始
### 环境要求
- Python 3.8+
- langgraph==0.5.1 tornado==6.5.1
- cookiecutter
- Redis (可选,用于记忆功能)
- ElasticSearch (可选,用于长期记忆)
### 1. 环境准备
```bash
# 安装脚手架工具和关键依赖
pip install cookiecutter
# 安装核心依赖
pip install langgraph==0.5.1 tornado==6.5.1
```
### 2. 生成app
```bash
# 生成的app会放在apps目录下
cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps
# 第一步:填入你的app name, 如:hello_world
[1/2] project_name (your app name): hello_world
# 第二步: 填入你的app agent配置路径(不填默认./conf/agent/hello_world.yaml)
[2/2] agent_yaml (./conf/agent/hello_world.yaml):
```

### 3. 启动服务
```bash
cd apps/hello_world/
python3 bin/http_server.py --port=8901
```
### 4. 测试服务
```bash
curl --location '127.0.0.1:8901/api/hello_world/v1/chat' \
--header 'Content-Type: application/json' \
--data '{
"session_id": "1",
"log_id":"vcbfbg349984590hohn",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"messages": [
{
"role": "user",
"content": "你好"
}
],
"params": {
}
}'
```
```bash
{
"code": 0,
"err_msg": "",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"session_id": "1",
"log_id": "vcbfbg349984590hohn",
"is_delta": false,
"messages": [
{
"role": "user",
"content": "你好"
},
{
"role": "assistant",
"content": "hello world"
}
],
"extra": {
"用户任意定义的key": "hello world"
}
}
```
## 📡 API文档
### 健康检查
```http
GET /api/{{app-name}}/health
```
**响应:**
```json
{"code": 0}
```
### agent服务接口
```http
POST /api/{{app-name}}/v1/chat
Content-Type: application/json
```
**请求参数:**
```json
{
"user_id": "", # 用户id, 必填
"session_id": "", # 会话id, 必填
"log_id":"", # logid, 必填
"messages": [
{"role": "user", "content": "高血压有什么症状?"}
], # 对话列表,规范与openai大模型对话一致, 非必填
"stream": false, # 是否sse流式返回, 非必填, 默认false
"params":{
} # 用户自定义的其他请求参数, 根据业务场景自行定义,非必填
}
```
**响应:**
```json
{
"code": 0, # 错误码, 0-OK, 非0-异常错误
"err_msg": "", # 错误信息
"user_id": "", # 用户id
"session_id": "", # 会话id
"is_delta": false, # 是否为流式模式的增量返回结果
"messages": [
{"role": "user", "content": "高血压有什么症状?"},
{"role": "assistant", "content": "高血压的主要症状包括..."}
], # 对话列表,规范与openai大模型对话一致
"extra": {} # 返回结果的额外参数, 根据业务场景自行定义
}
```
### 流式响应
设置 `"stream": true` 启用流式响应:
```bash
curl -X POST http://localhost:8901/api/{{app-name}}/v1/chat \
-H "Content-Type: application/json" \
-d '{
"user_id": "", # 用户id, 必填
"session_id": "", # 会话id, 必填
"log_id":"", # logid, 必填
"messages": [
{"role": "user", "content": "高血压有什么症状?"}
], # 对话列表,规范与openai大模型对话一致, 非必填
"stream": true, # 是否sse流式返回, 非必填, 默认false
"params":{
} # 用户自定义的其他请求参数, 根据业务场景自行定义,非必填
}'
```
**流式响应格式:**
```
data: {"code": 0, "is_delta": true, "messages": [...], ...}
data: {"code": 0, "is_delta": true, "messages": [...], ...}
...
```
## 🏗️ 架构设计
### 系统架构
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ HTTP Client │───▶│ Tornado Web │───▶│ AgentService │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
┌─────────────────┐ ▼
│ Memory Sys │ ┌─────────────────┐
│ ┌─────────────┐│ │ CSAgent Core │
│ │ Redis ││◀───┤ │
│ │(Short Term) ││ │ ┌──────────────┐
│ └─────────────┘│ │ │ LangGraph │
│ ┌─────────────┐│ │ │ State Engine │
│ │ElasticSearch││ │ └──────────────┘
│ │(Long Term) ││ └─────────────────┘
│ └─────────────┘│ │
└─────────────────┘ ▼
┌─────────────────┐
│ Node Graph │
│ ┌─────┐ ┌─────┐ │
│ │ RAG │▶│ LLM │ │
│ └─────┘ └─────┘ │
└─────────────────┘
```
### 项目结构
```
{{app-name}}/
├── csagent/ # 核心agent构建框架
│ └── core/
│ ├── agent.py # Agent主类
│ ├── builder/ # agent构建器
│ ├── node/ # 基础节点定义
│ └── config/ # 配置管理
├── src/ # 业务实现
│ ├── bootstrap/ # 启动初始化
│ ├── controller/ # HTTP控制器
│ ├── common/ # 通用组建
│ ├── dao/ # 数据资源访问
│ ├── error/ # 错误定义
│ ├── resource/ # 资源定义, 数据资源/服务资源等
│ ├── service/ # 业务服务层
│ └── node/ # 自定义节点
│ └── demo_stream/ # demo_stream agent
│ ├── rag.py # 知识检索节点
│ └── llm.py # LLM推理节点
├── conf/ # 配置文件
│ ├── agent/ # Agent配置
│ ├── node/ # agent节点自定义配置
│ └── log.yaml # 日志配置
├── bin/ # 启动脚本
│ └── http_server.py # HTTP服务器
└── requirements.txt # 依赖
```
### 执行流程(示例)
```mermaid
graph TD
A[用户请求] --> B[AgentHandler]
B --> C[AgentService]
C --> D[Agent.run]
D --> E[GraphBuilder]
E --> F[START节点]
F --> G[RAG节点
知识检索]
G --> H[LLM节点
AI生成]
H --> I[END节点]
I --> J[返回响应]
K[Memory System] --> D
L[Config System] --> E
style G fill:#e1f5fe
style H fill:#f3e5f5
style K fill:#fff3e0
```
## ⚙️ 配置说明
### Agent配置
```yaml
name: agent_name #
desc: # agent描述
memory: # 记忆配置
short_term: # 短期记忆配置
read: false # 是否读取短期记忆, 默认关闭
write: false # 是否写入短期记忆, 默认关闭
ttl: 604800 # 可选,短期记忆ttl,默认7天,0表示永久
long_term: # 长期记忆配置,当前长期记忆使用在线用户画像,暂不支持agent写入长期记忆
read: false # 是否读取长期记忆, 默认关闭
graph: # agent执行图配置
node: # 节点定义
- name: rag # 节点名称
desc: 知识库搜索 # 节点描述
conf: "./conf/node/rag.yaml" # 节点自定义配置, 可选, 若配置则文件必须存在
instance: "src.node.demo_stream.rag.Node" # 节点实例化路径
- name: llm
desc: 大模型调用
instance: "src.node.demo_stream.llm.Node"
edge: # fanout边定义, 必填
START: # 必填,START为保留字,表示图起点
- name: rag # 必填,目的节点,表示START到rag存在一条边,即rag是一个起始节点
rag: # 必填,起始节点名称
- name: llm # 必填,目的节点
```
### 节点路由配置
#### 固定串行路由
```yaml
graph:
node:
- name: node1
...
- name: node2
...
edge:
START:
- name: node1
node1: # 必填,起始节点名称
- name: node2 # 必填,目的节点
```
```mermaid
graph TD
A[START] --> B[node1]
B --> C[node2]
C --> D[END]
```
#### 固定并行路由
```yaml
graph:
node:
- name: node1
...
- name: node2
...
- name: node3
...
edge:
START:
- name: node1
node1: # 必填,起始节点名称
- name: node2 # 必填,目的节点
- name: node3
```
```mermaid
graph TD
A[START] --> B[node1]
B --> C[node2]
B --> D[node3]
C --> E[END]
D --> E
```
#### 条件路由
```yaml
graph:
node:
- name: node1
...
- name: node2
...
edge:
START:
- name: node1
node1: # 必填,起始节点名称
- name: node2 # 必填,目的节点
condition: "'messages' in node_context and len(node_context['messages'])>0 and len(node_context['messages'][-1].get('tool_calls',[]))>0" # 条件定义,上下文变量来自(##-节点上下文AgentState)
- name: END # END为配置保留字,表示终止节点
condition: default # 选填,default表示没有其他其他节点可路由时的默认路由节点(为了方便配置使用)
node2:
- name: node1
```
```mermaid
graph TD
A[START] --> B[node1]
B -."condition".-> C[node2]
B -."condition".-> D[END]
C --> B
```
#### map-reduce路由(并行处理)
```yaml
graph:
node:
- name: mapper
...
- name: reducer
...
- name: aggregation
...
edge:
START:
- name: mapper
mapper:
- name: reducer
parallel: # 并行处理相关配置, 适用场景例子:报告解读中对报告进行分片并行处理
enable: True # 开关
batch: 3 # 每个并行执行流的batch
reducer:
- name: aggregation
```
```mermaid
graph TD
A[START] --> B[mapper]
B --"切片并行处理(map-reduce)"--> C[reducer]
C --> D[aggregation]
D --> E[END]
```
### 节点上下文AgentState
`AgentState包含agent执行过程中所有的上下文信息, 包括中间结果,记忆等,每个节点都可以更新状态,获取其他节点更新的状态,并且状态更新是并行安全的,定义如下:`
```python
class AgentState(TypedDict):
code: Annotated[int, replace]
err_msg: Annotated[str, replace]
debug: Annotated[bool, replace]
log_id: Annotated[str, replace]
session_id: Annotated[str, replace]
user_id: Annotated[str, replace]
stream: Annotated[bool, replace]
is_delta: Annotated[bool, replace]
messages: Annotated[List[Dict[str, Any]], replace]
short_term_memory: Annotated[Dict[str, Any], deep_update]
long_term_memory: Annotated[Dict[str, Any], deep_update]
params: Annotated[Dict[str, Any], deep_update]
extra: Annotated[Dict[str, Any], deep_update]
node_context: Annotated[Dict[str, Any], deep_update]
parallel_info: Annotated[Dict[str, Any], deep_update]
```
`各字段定义:`
`code`: agent执行过程中的状态码,对应最终response中的code,若执行过程中出现异常,业务节点可设置成预定义错误码后框架会直接返回异常
`err_msg`: agent执行过程中的错误信息
`debug`: 是否为debug状态,对应request中的debug字段
`log_id`: log_id,对应request中的log_id字段
`session_id`: 会话id,对应request中的session_id字段
`user_id`: 用户id,对应request中的user_id字段
`stream`: 是否流式,对应request中的stream字段
`is_delta`: 是否为流式增量返回的结果,对应response中的is_delta字段。一个使用场景是同一个接口可能同时支持流式或非流式,比如一个意图后面接问答(流式)或推荐(非流式),此时接口统一走流式,上层不做处理的话返回结果为:问答(delta1,delta2,...完整结果finalres),推荐(完整结果finalres)。业务层需要根据不同的意图来区分返回给前端渲染的结果,问答意图可能需要丢弃final_res(因为前端已经根据所有的delta内容全部渲染完成)。
`messages`: agent对话列表,规范同openai大模型对话规范。原始对话内容从request中传入,回答结果append到列表中作为response返回。`messages不支持多个节点并行写入!`
`short_term_memory`: 短期记忆,若开启短期记忆功能框架会将记忆对应字段放到里面
`long_term_memory`: 长期记忆,若开启长期记忆功能框架会将记忆对应字段放到里面
`params`: 额外的请求参数,对应request中的params字段,除了对话列表外业务层可自定义额外的请求参数,框架会透传给节点
`extra`: 额外的响应参数,对应response中的extra字段,除了对话列表外业务层可自定义额外的响应参数,框架会透传给调用方
`node_context`: 框架为节点预留的上下文字段,作为各个节点执行过程中内部信息的传递,对调用者屏蔽,`node_context支持多个节点并行写入`。
`parallel_info`: 框架预留字段用来支持mapreduce功能,mapper节点切分的分片信息必须写入该字段。
### 自定义节点
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
# 若节点需要进行某些初始化工作,则实现该函数,否则忽略
# self.conf为node配置中conf文件的内容,若无配置则为{}
# self.node_config为节点的元信息配置
print(self.conf)
print(f'{self.node_config.name}初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
"""返回问候语
Args:
state: agent执行状态上下文
stream_writer: 流式输出writer,可以支持流式输出任意中间结果
Returns:
更新后的agent执行状态上下文
"""
print(f"{self.node_config.name}开始执行")
# 根据agent状态上下文进行计算
messages = state['messages']
params = state['params']
short_term_memory = state['short_term_memory']
long_term_memory = state['long_term_memory']
# 模拟节点处理过程
time.sleep(1)
state['messages'].append({"role":"assistant", "content":"hello"})
print(f"{self.node_config.name}执行完成")
return state
```
## 🔧 开发指南
### 添加你自己的agent配置
路径一般为`conf/agent/your_agent.yaml`
### 添加新节点
1. **创建节点文件** `src/node/your_module/your_node.py`
2. **实现节点类** 继承 `BaseNode`
3. **配置agent** 在 `conf/agent/your_agent.yaml` 中添加节点和路由配置
4. **测试节点** 运行测试验证功能
### 修改初始化时设置的agent配置
修改src.bootstrap.bootstrap.py中的代码
```python
service.agent_service = AgentService(agent_yaml="{{cookiecutter.agent_yaml}}")
```
### 配置记忆功能
```bash
# 配置redis环境变量 (短期记忆)
export REDIS_ADDR="your redis addr".
export REDIS_PORT="your redis port"
export REDIS_PASSWORD="your redis password"
# 配置记忆生成大模型信息 (若开启短期记忆生成)
export CS_MEMORY_API_KEY="your llm api-key"
export CS_MEMORY_BASE_URL="your llm api baseurl"
export CS_MEMORY_MODEL="your llm model id"
# 配置长期记忆读取es地址 (当前长期记忆来自在线存储的用户画像)
export ES_HOST="http://xxx:9200"
export ES_USER="your es user"
export ES_PASSWORD="your es password"
# 更新Agent配置启用记忆
memory:
short_term:
read: true
write: true
ttl: 604800 # 7天
long_term:
read: true
```
### 短期记忆使用
前提:开启了短期记忆功能。
使用方式:直接读取agent state
```python
class Node(BaseNode):
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
short_term_memory = state['short_term_memory]
print(f"短期记忆:{short_term_memory}")
return state
```
### 自定义短期记忆实现
实现方式:
1. agent记忆配置中设置strategy
2. 实现strategy,实现方式参考csagent/utils.py:default_short_memory
```yaml
memory:
short_term:
read: True # 可选,默认False, agent开始执行时是否读取memory,同步
write: True # 可选,默认False,agent执行成功后是否更新memory,异步
strategy: "src.memory.short_memory" #可选,不填使用默认策略
```
### 长期记忆使用
前提:开启了长期记忆功能。
使用方式:直接读取agent state
```python
class Node(BaseNode):
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
long_term_memory = state['long_term_memory]
print(f"长期记忆:{long_term_memory}")
return state
```
### 长期记忆更新(暂不支持)
## 🔧 应用开发示例
### react(deepresearch)
agent配置:
```yaml
name: react # 必填,agent名字
desc: 具备react机制的agent,循环调用llm和工具 # 必填,agent详细描述
version: "1.0" # 选填,agent版本
graph: # agent的执行图
node: # 图节点
- name: llm # 必填,节点名称
desc: 执行大模型调用 # 必填, 节点描述
instance: "src.node.react.llm.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
- name: tools
desc: 基于大模型调用结果执行工具调用
instance: "src.node.react.tools.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
edge: # fanout
START: # 必填,START为保留字,表示图起点
- name: llm # 必填,目的节点,表示START到llm存在一条边,即llm是一个起始节点
llm: # 必填,起始节点名称
- name: tools # 必填,目的节点
condition: "'messages' in node_context and len(node_context['messages'])>0 and len(node_context['messages'][-1].get('tool_calls',[]))>0" # 选填,表示从起始节点到目的节点的路由条件,支持表达式计算,其中使用的字段均来自于图计算过程中使用的上下文
- name: END # END为结束节点的保留字,用户不可用END命名自己的节点
condition: default # 选填,default表示没有其他其他节点可路由时的默认路由节点
tools:
- name: llm #表示tools到llm有一条固定边
```
`src.node.react.llm.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
from openai import OpenAI
import time
import copy
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print(f"{self.node_config.name}初始化成功")
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"{self.node_config.name}开始执行")
query = state['messages'][-1]['content']
print(f"query:{query}")
# 构造llm request with tools
# 模拟大模型调用
time.sleep(3)
# 中间的tool call过程不放到主message列表中,所以单独copy了一份出来放在了node_context里
if "messages" not in state["node_context"]:
state["node_context"]["messages"] = copy.deepcopy(state['messages'])
# 下面为模拟了一次工具调用过程
if len(state["node_context"]["messages"]) == 1:
state["node_context"]["messages"].append(
{
"role":"assistant",
"content":'',
"tool_calls":[
{
"id": "call_6596dafa2a6a46f7a217da",
"function": {
"arguments": "{\"query\": \"xxxx\"}",
"name": "search_web"
},
"type": "function",
"index": 0
}
]
}
)
else:
# 已经执行了一次工具调用,返回的最终结果放到主messages中
state['messages'].append(
{
"role":"assistant",
"content":'这是大模型的最终回答',
}
)
return state
```
`src.node.react.tools.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
from openai import OpenAI
import time
import os
import copy
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print(f"{self.node_config.name}初始化成功")
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"{self.node_config.name}开始执行")
tool_call = state["node_context"]["messages"][-1]['tool_calls'][0]
function_name = tool_call['function']['name']
args = tool_call['function']['arguments']
print(f"执行{function_name}工具调用,args:{args}")
#模拟大模型调用过程
time.sleep(1)
state["node_context"]["messages"].append(
{
"role":"tool",
"content":'{\"res\":\"call res\"}',
"tool_call_id":tool_call['id']
}
)
return state
```
app构建:
```bash
cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps
[1/2] project_name (your app name): react
[2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/react.yaml
```
request:
```bash
curl --location '127.0.0.1:8901/api/react/v1/chat' \
--header 'Content-Type: application/json' \
--data '{
"session_id": "1",
"log_id":"vcbfbg349984590hohn",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"messages": [
{
"role": "user",
"content": "你好"
}
],
"params": {
}
}'
```
response:
```json
{
"code": 0,
"err_msg": "",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"session_id": "1",
"log_id": "vcbfbg349984590hohn",
"is_delta": false,
"messages": [
{
"role": "user",
"content": "你好"
},
{
"role": "assistant",
"content": "这是大模型的最终回答"
}
],
"extra": {}
}
```
### 并行健康推荐
agent配置:
```yaml
name: health_recommend # 必填,agent名字
desc: 健康推荐agent,基于用户输入和用户画像推荐配餐和药物 # 必填,agent详细描述
version: "1.0" # 选填,agent版本
graph: # agent的执行图
node: # 图节点
- name: user_feature # 必填,节点名称
desc: 获取并计算用户画像特征 # 必填, 节点描述
instance: "src.node.health_recommend.user_feature.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
- name: drug_recommend
desc: 药物推荐
instance: "src.node.health_recommend.drug_recommend.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
- name: food_recommend
desc: 食物推荐
instance: "src.node.health_recommend.food_recommend.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
- name: merge
desc: 多路推荐融合
instance: "src.node.health_recommend.merge.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
edge: # fanout
START: # 必填,START为保留字,表示图起点
- name: user_feature # 必填,目的节点,表示START到user_feature存在一条边,即user_feature是一个起始节点
user_feature: # 必填,起始节点名称
- name: drug_recommend # 必填,目的节点
- name: food_recommend # 必填,目的节点
drug_recommend:
- name: merge
food_recommend:
- name: merge
```
`src.node.health_recommend.user_feature.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print('user feature Node初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"[用户画像]user feature node开始执行")
query = state['messages'][-1]['content']
print(f"[用户画像]query:{query}")
print("[用户画像]画像生成中")
time.sleep(1)
state['node_context']['user_feature'] = "用户画像"
print(f"[用户画像]user feature node执行完成")
return state
```
`src.node.health_recommend.drug_recommend.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print('[药品推荐]drug recommend Node初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"[药品推荐]drug recommend node开始执行")
query = state['messages'][-1]['content']
print(f"[药品推荐]query:{query}")
print(f"[药品推荐]用户画像:{state['node_context']['user_feature']}")
print("[药品推荐]药物推荐中......")
time.sleep(1)
state['node_context']['drug_recommend'] = "阿莫西林"
print(f"[药品推荐]drug recommend node执行完成")
return state
```
`src.node.health_recommend.food_recommend.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print('food recommend Node初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"[食品推荐]food recommend node开始执行")
query = state['messages'][-1]['content']
print(f"[食品推荐]query:{query}")
print(f"[食品推荐]用户画像:{state['node_context']['user_feature']}")
print("[食品推荐]食物推荐中......")
time.sleep(1)
state['node_context']['food_recommend'] = "宫保鸡丁"
print(f"[食品推荐]food recommend node执行完成")
return state
```
`src.node.health_recommend.merge.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print('[推荐融合]merge Node初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"[推荐融合]merge node开始执行")
query = state['messages'][-1]['content']
print("[推荐融合]推荐结果聚合中......")
time.sleep(1)
state['extra']['recommend'] = {}
state['extra']['recommend']['drug'] = state['node_context']['drug_recommend']
state['extra']['recommend']['food'] = state['node_context']['food_recommend']
print(f"[推荐融合]merge node执行完成")
return state
```
app构建:
```bash
cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps
[1/2] project_name (your app name): health_recommend
[2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/health_recommend.yaml
```
request:
```bash
curl --location '127.0.0.1:8901/api/health_recommend/v1/chat' \
--header 'Content-Type: application/json' \
--data '{
"session_id": "1",
"log_id":"vcbfbg349984590hohn",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"messages": [
{
"role": "user",
"content": "我感冒了"
}
],
"params": {
# 也可将参数放在这里,根据需要自行定义
}
}'
```
response:
```json
{
"code": 0,
"err_msg": "",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"session_id": "1",
"log_id": "vcbfbg349984590hohn",
"is_delta": false,
"messages": [
{
"role": "user",
"content": "我感冒了"
}
],
"extra": { # 这里将推荐结果放在了extra, 也可根据实际需求放在message列表
"recommend": {
"drug": "阿莫西林",
"food": "宫保鸡丁"
}
}
}
```
### map-reduce(切分报告解读)
agent配置:
```yaml
name: report_analysis # 必填,agent名字
desc: 体检报告解读,指标抽取,指标分级 # 必填,agent详细描述
version: "1.0" # 选填,agent版本
graph: # agent的执行图
node: # 图节点
- name: doc_trunk
desc: 基于某些策略对文档进行分片
instance: "src.node.report_analysis.doc_trunk.Node"
- name: doc_analysis
desc: 解析
instance: "src.node.report_analysis.doc_analysis.Node"
- name: aggregation
desc: 聚合
instance: "src.node.report_analysis.aggregation.Node"
edge: # fanout
START: # 必填,START为保留字,表示图起点
- name: doc_trunk # 必填,目的节点
doc_trunk: # 必填,起始节点名称
- name: doc_analysis # 必填,目的节点
parallel: # 并行处理相关配置
enable: True # 开关
batch: 3 # 每个并行执行流的batch
doc_analysis:
- name: aggregation
```
`src.node.report_analysis.doc_trunk.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print(f"{self.node_config.name} Node初始化成功")
def _parse_doc(self, url):
time.sleep(1)
markdown = '章节内容1\n章节内容2\n章节内容3\n章节内容4\n章节内容5\n章节内容6\n章节内容7\n章节内容8'
return markdown
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"{self.node_config.name} node开始执行")
doc_url = state['params']['url']
markdown = self._parse_doc(doc_url)
print(f"markdown:{markdown}")
trunk_list = markdown.split('\n')
for index, item in enumerate(trunk_list):
# 需要存在特定的parallel_info字段中,kv格式
state['parallel_info'][str(index)] = {'doc_trunk':item}
print(f"{self.node_config.name} node执行完成")
return state
```
`src.node.report_analysis.doc_analysis.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print(f"{self.node_config.name} Node初始化成功")
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"{self.node_config.name} node开始执行")
# parallel_info中存在batch_size个trunk
parallel_info = state['parallel_info']
print(f"parallel_info:{parallel_info}")
print("文档解读中")
time.sleep(1)
for id, item in parallel_info.items():
item['result'] = "解读结果"
print(f"{self.node_config.name} node执行完成")
return state
```
`src.node.report_analysis.aggregation.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import copy
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print(f"{self.node_config.name} Node初始化成功")
def execute(self, state: AgentState, stream_writer) -> AgentState:
print(f"{self.node_config.name} node开始执行")
parallel_info = state['parallel_info']
print(f"parallel_info:{parallel_info}")
print("文档解读结果聚合中")
time.sleep(1)
state['extra']['doc_analysis_res'] = []
for id, item in parallel_info.items():
state['extra']['doc_analysis_res'].append(item)
return state
```
app构建:
```bash
cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps
[1/2] project_name (your app name): report_analysis
[2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/report_analysis.yaml
```
request:
```bash
curl --location '127.0.0.1:8901/api/report_analysis/v1/chat' \
--header 'Content-Type: application/json' \
--data '{
"session_id": "1",
"log_id":"vcbfbg349984590hohn",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"messages": [
],
"params": {
"url":"Xxx"
}
}'
```
response:
```json
{
"code": 0,
"err_msg": "",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"session_id": "1",
"log_id": "vcbfbg349984590hohn",
"is_delta": false,
"messages": [],
"extra": {
"doc_analysis_res": [
{
"doc_trunk": "章节内容1",
"result": "解读结果"
},
{
"doc_trunk": "章节内容2",
"result": "解读结果"
},
{
"doc_trunk": "章节内容3",
"result": "解读结果"
},
{
"doc_trunk": "章节内容4",
"result": "解读结果"
},
{
"doc_trunk": "章节内容5",
"result": "解读结果"
},
{
"doc_trunk": "章节内容6",
"result": "解读结果"
},
{
"doc_trunk": "章节内容7",
"result": "解读结果"
},
{
"doc_trunk": "章节内容8",
"result": "解读结果"
}
]
}
}
```
### 流式返回
agent配置:
```yaml
name: demo_stream # 必填,agent名字
desc: 健康问答agent,回答一些健康科普问题 # 必填,agent详细描述
version: "1.0" # 选填,agent版本
graph: # agent的执行图
node: # 图节点
- name: rag # 必填,节点名称
desc: 知识库搜索 # 必填, 节点描述
instance: "src.node.demo_stream.rag.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
- name: llm
desc: 大模型调用
instance: "src.node.demo_stream.llm.Node" # 必填,节点实现逻辑,Node实现必须遵循标准
edge: # fanout
START: # 必填,START为保留字,表示图起点
- name: rag # 必填,目的节点,表示START到rag存在一条边,即rag是一个起始节点
rag: # 必填,起始节点名称
- name: llm # 必填,目的节点
```
`src.node.demo_stream.rag.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
import time
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print('rag Node初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"rag node开始执行")
query = state['messages'][-1]['content']
print(f"query:{query}")
print("检索中")
time.sleep(1)
state['node_context']['evi'] = "检索到的证据"
print(f"rag node执行完成")
return state
```
`src.node.demo_stream.llm.py`
```python
from csagent.core.node.base_node import BaseNode
from csagent.core.context import AgentState
from openai import OpenAI
import time
import os
import copy
class Node(BaseNode):
def initialize(self):
"""初始化节点"""
print(self.conf)
print('llm Node初始化成功')
def execute(self, state: AgentState, stream_writer=None) -> AgentState:
print(f"llm node开始执行")
query = state['messages'][-1]['content']
client = OpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
api_key=os.getenv("CS_API_KEY"),
base_url=os.getenv("CS_BASE_URL"),
)
start = int(time.time() * 1000)
response = client.chat.completions.create(
model="qwen-turbo",
messages=[
{"role": "user", "content": query}
],
stream=True # 关键参数,启用流式响应
)
all_content = "" #流式处理时记录的完整content
for chunk in response:
if chunk.choices[0].delta.content is not None:
# 增量返回时必须copy一份数据
delta = copy.deepcopy(state)
# 设置增量结果标记
delta['is_delta'] = True
delta['messages'].append({"role":"assistant", "content":chunk.choices[0].delta.content})
# 流式返回增量结果
stream_writer(delta)
all_content +=chunk.choices[0].delta.content
end = int(time.time() * 1000)
print(f"llm node cost:%d" % (end-start))
# 完整结果记录
state['messages'].append({"role":"assistant", "content":all_content})
return state
app构建:
```bash
cookiecutter https://gitee.com/liuxiang13/csagent-framework.git -f -o apps
[1/2] project_name (your app name): demo_stream
[2/2] agent_yaml (./conf/agent/hello_world.yaml): ./conf/agent/demo_stream.yaml
```
```
request:
```bash
curl --location '127.0.0.1:8901/api/demo_stream/v1/chat' \
--header 'Content-Type: application/json' \
--data '{
"session_id": "1",
"log_id":"ytu6u56j6j76jj76j5g6",
"user_id": "7c096c176e614ebaaa00457d86c26379",
"stream":true, # stream设置为true
"messages": [
{
"role": "user",
"content": "高血压的症状"
}
],
"params": {
}
}'
```
response:
请注意!!
流式返回结果中最后一个is_delta==false的片段代表完整的全量结果。
需要在http层根据实际业务场景考虑要不要过滤(代码位置在http服务层service.agent.py:run_stream函数中,见代码注释)或者调用方决定是否过滤。

## 🤔 常见问题
### Q: 启动时报"模块找不到"错误?
A: 确保在项目根目录运行,Python路径会自动设置。
### Q: 记忆功能连接失败?
A: 记忆功能是可选的,可以在配置中禁用,或确保对应环境变量&Redis/ES服务正常运行。
### Q: 如何添加新的业务逻辑?
A: 创建新的节点类,在Agent配置中定义执行图即可。
## 📄 许可证
MIT License - 详见 [LICENSE](LICENSE) 文件。
## 🤝 贡献
欢迎提交 Issue 和 Pull Request!
## 📞 联系
如有问题,请在项目中提交 Issue。
---
⭐ 如果这个项目对你有帮助,请给个 Star!⭐