# ezflow **Repository Path**: Ky1eYang/ezflow ## Basic Information - **Project Name**: ezflow - **Description**: 基于python的stream模块管理器,适用于各种需要流式处理的场景搭建管道 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-04-19 - **Last Updated**: 2025-09-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # EzFlow 简易流式架构 该包是一个基于Python的现代异步流式处理框架,通过`TraceStream`类来包裹异步执行单元,使用链式连接方式构建数据处理管道,支持数据追踪和会话管理。 ## 快速开始 开发时先使用`pytest --forked`查看是否有不通过的测试实例 ```shell uv sync --extra dev pytest -v --forked ``` ## 核心概念 ### 执行模式 EzFlow采用**协程异步模式**作为主要执行方式: - **协程异步模式**: 基于asyncio,低资源消耗,支持并发处理,适合IO密集型和计算密集型任务 - **线程池支持**: 可选配置executor用于CPU密集型任务的并行处理 ### 核心组件 #### TraceStream - 异步流处理单元 每个`TraceStream`都是一个异步执行单元,包含以下核心组件: ``` ______________________________ | Input Queue (asyncio.Queue)| | Processor (__call__) | | TraceWrapper & Future | | Buffer (deque) | -> export to downstream | Data Tracing & Weak Refs | |____________________________| ``` #### 组件说明: - **Input Queue**: 异步队列,接收上游数据 - **Processor**: 核心处理逻辑(实现`__call__`方法,支持async) - **TraceWrapper**: 数据包装器,包含数据、ID、Future和处理链信息 - **Buffer**: 输出缓冲区 - **Data Tracing**: 弱引用管理的数据追踪系统 ## 核心类型 ### TraceWrapper - 数据包装器 ```python @dataclass class TraceWrapper(Generic[T_in, T_out]): data: T_in # 实际数据 future: asyncio.Future[T_out] # 异步结果Future id: Optional[int] # 数据唯一标识 chain: Dict[ReferenceType[TraceStream], int] # 处理链追踪 ``` ### TraceStream - 异步流处理器 `TraceStream`是所有流模块的基类,定义了异步流处理的标准接口: | 属性 | 类型 | 说明 | |------|------|------| | type_in | TypeVar | 输入数据类型 | | type_out | TypeVar | 输出数据类型 | | executor | Executor | 可选线程池执行器 | | max_queue_size | int | 输入队列最大长度 | | timeout_perflow | float | 单次处理超时时间 | | is_broadcast | bool | 是否广播到多个下游 | | filter_input | bool | 是否过滤输入类型 | ### BasePipeline - 流水线管理器 `BasePipeline`用于组合和管理多个流处理器: - **流组合**: 将多个TraceStream组合成处理链 - **生命周期管理**: 统一启动和停止整个处理链 - **弱引用管理**: 防止内存泄漏 - **配置序列化**: 支持从dataclass配置创建和序列化 ### BaseContext - 双向流上下文 `BaseContext`用于处理需要双向数据流的场景(如WebSocket、TCP连接): - **rx_loop**: 接收循环,处理输入数据流 - **tx_loop**: 发送循环,处理输出响应流 - **Future管理**: 通过TraceWrapper的Future实现请求-响应关联 ## 处理流程 ### 异步流处理循环 核心异步处理入口 `TraceStream.run_main()`: ```python async def run_main(self, loop: asyncio.AbstractEventLoop): self._running_count += 1 self.loop_main = loop self._start_event.set() while True: # 从输入队列获取数据 wrapper_in = await self._input_queue.get() # 处理数据 if self.is_coroutine: output = await self.__call__(wrapper_in.data, wrapper_in.future) else: # 在线程池中执行同步处理 output = await loop.run_in_executor( self._executor, self.__call__, wrapper_in.data, wrapper_in.future ) # 导出结果到下游 await self.export(output_wrapper) ``` ### 数据流处理架构 ``` [Input] -> [TraceWrapper] -> [AsyncQueue] -> [Processor] -> [Buffer] -> [Export] | | | | | | [外部输入] [数据包装] [异步队列] [用户逻辑] [输出缓存] [下游传递] ``` ### Pipeline执行流程 ```python async def __call__(self, up_module=None, dn_module=None): # 1. 启动后台Porter循环(负责数据传输) bg_loop, bg_thread = create_loop_thread(run_bg_loop) # 2. 在当前循环启动所有流的主处理循环 async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(module.run_main(current_loop)) for module in module_instance] # 3. 安全停止和资源清理 stop_loop_thread(bg_loop, bg_thread) return result_buffer ``` ## 基本使用 ### 1. 继承实现处理逻辑 ```python from ezflow import TraceStream import asyncio class MyProcessor(TraceStream): def __init__(self, *args, **kwargs): super().__init__(str, str, **kwargs) # 指定输入输出类型 async def __call__(self, data: str, future: asyncio.Future = None) -> str: if data is None: return "hello" # 异步处理逻辑 result = data.upper() return result # 或同步处理(将在线程池中执行) class SyncProcessor(TraceStream): def __call__(self, data: str, future: asyncio.Future = None) -> str: return data.lower() ``` ### 2. 链式连接和Pipeline构建 ```python # 创建流实例 stream1 = MyProcessor() stream2 = SyncProcessor() stream3 = MyProcessor() # 方式1: 直接链接 stream1.link_to(stream2) stream2.link_to(stream3) # 方式2: 链式调用创建Pipeline pipeline = stream1.link(stream2).link(stream3) ``` ### 3. 数据输入和执行 ```python import asyncio async def main(): # 输入数据 stream1.input("hello") stream1.input("world") stream1.input(None) # 结束信号 # 执行Pipeline result = await pipeline() print(f"处理结果: {result}") asyncio.run(main()) ``` ### 4. 使用Context处理双向流 ```python from ezflow import BaseContext, TraceWrapper class WebSocketContext(BaseContext): def __init__(self, websocket, **kwargs): self.websocket = websocket super().__init__(None, **kwargs) # Context不需要generator async def rx_loop(self): """接收消息循环""" async for message in self.websocket: wrapper = TraceWrapper(message) await self.export(wrapper) async def tx_loop(self): """发送响应循环""" while self._is_working: # 从traced_data中获取需要响应的数据 for wrapper_id, wrapper in self.traced_data.items(): if wrapper.future.done(): result = wrapper.future.result() await self.websocket.send(result) ``` ## 高级特性 ### 1. 数据追踪和会话管理 EzFlow通过`TraceWrapper`和弱引用系统实现完整的数据流追踪: ```python from ezflow import TraceWrapper import dataclasses @dataclasses.dataclass(frozen=True) class SessionData: user_id: str content: str # TraceWrapper会自动为hashable对象分配唯一ID wrapper = TraceWrapper(SessionData("user123", "hello")) print(f"数据ID: {wrapper.id:x}") print(f"处理链: {wrapper.chain}") ``` ### 2. 广播模式和数据分发 ```python # 启用广播模式 broadcaster = TraceStream(str, str, is_broadcast=True) processor1 = TraceStream(str, str) processor2 = TraceStream(str, str) # 连接多个下游 broadcaster.link_to(processor1) broadcaster.link_to(processor2) # 数据会同时发送到所有下游 broadcaster.input("broadcast message") ``` ### 3. 线程池处理CPU密集任务 ```python from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=4) as executor: cpu_intensive_stream = TraceStream( bytes, str, executor=executor # CPU密集任务在线程池执行 ) ``` ### 4. Pipeline配置和序列化 ```python from ezflow import BasePipeline import dataclasses @dataclasses.dataclass class PipelineConfig: input_type: str = "text" model_path: str = "/path/to/model" batch_size: int = 32 class MyPipeline(BasePipeline): @classmethod def from_config(cls, cfg: PipelineConfig): # 从配置创建pipeline stream1 = TextProcessor(cfg.input_type) stream2 = ModelProcessor(cfg.model_path, cfg.batch_size) return cls(stream1, stream2) # 使用配置创建 config = PipelineConfig() pipeline = MyPipeline.from_config(config) # 支持pickle序列化 import pickle serialized = pickle.dumps(pipeline) restored = pickle.loads(serialized) ``` ### 5. 插件系统扩展 ```python from ezflow import plugins # 使用插件装饰器 @plugins.stream.custom_decorator class EnhancedStream(TraceStream): pass @plugins.logging.advanced_logger class LoggedStream(TraceStream): pass ``` ### 6. 错误处理和超时控制 ```python class RobustStream(TraceStream): def __init__(self, *args, **kwargs): super().__init__(*args, timeout_perflow=5.0, **kwargs) async def __call__(self, data, future): try: # 处理逻辑,会自动应用超时 result = await some_async_operation(data) return result except asyncio.TimeoutError: self.logger.warning("处理超时,使用默认值") return "timeout_default" except Exception as e: self.logger.error(f"处理错误: {e}") raise ``` ## 完整示例 ### 简单文本处理Pipeline ```python import asyncio from ezflow import TraceStream class Tokenizer(TraceStream): def __init__(self): super().__init__(str, list) async def __call__(self, text: str, future=None): if text is None: return None return text.split() class WordCounter(TraceStream): def __init__(self): super().__init__(list, dict) def __call__(self, tokens: list, future=None): if tokens is None: return None return {"word_count": len(tokens), "words": tokens} async def main(): # 构建处理链 tokenizer = Tokenizer() counter = WordCounter() pipeline = tokenizer.link(counter) # 输入数据 texts = ["hello world", "python is great", "async processing"] for text in texts: tokenizer.input(text) tokenizer.input(None) # 结束信号 # 执行并获取结果 results = await pipeline() for result in results: print(f"结果: {result.data}") asyncio.run(main()) ``` ### WebSocket双向通信Context ```python import asyncio import websockets from ezflow import BaseContext, TraceWrapper, TraceStream class EchoProcessor(TraceStream): def __init__(self): super().__init__(str, str) async def __call__(self, message: str, future=None): # 处理消息并设置Future结果 response = f"Echo: {message.upper()}" if future: future.set_result(response) return response class WebSocketContext(BaseContext): def __init__(self, websocket, **kwargs): self.websocket = websocket self.pending_futures = set() super().__init__(None, **kwargs) async def rx_loop(self): """接收WebSocket消息""" try: async for message in self.websocket: wrapper = TraceWrapper(message) self.pending_futures.add(wrapper.future) await self.export(wrapper) except websockets.exceptions.ConnectionClosed: await self.export(TraceWrapper(None)) # 发送结束信号 async def tx_loop(self): """发送响应""" while self._is_working: if not self.pending_futures: await asyncio.sleep(0.01) continue done, pending = await asyncio.wait( self.pending_futures, return_when=asyncio.FIRST_COMPLETED, timeout=0.1 ) for future in done: try: response = future.result() await self.websocket.send(response) except Exception as e: print(f"发送错误: {e}") self.pending_futures = pending async def websocket_handler(websocket, path): # 创建处理链 context = WebSocketContext(websocket) processor = EchoProcessor() pipeline = context.link(processor) # 运行处理 await pipeline() # 启动WebSocket服务器 start_server = websockets.serve(websocket_handler, "localhost", 8765) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() ``` ## 注意事项 1. **协程优先**: 新版本主要基于asyncio,建议优先使用async def实现__call__方法 2. **类型提示**: 虽然运行时不强制检查,但建议正确指定泛型类型以获得更好的IDE支持 3. **资源管理**: Pipeline会自动管理生命周期,但对于外部资源(如文件、网络连接)仍需手动清理 4. **弱引用限制**: 数据追踪系统基于弱引用,注意不可弱引用的基础类型(int, str等) 5. **线程安全**: TraceStream是协程安全的,但如果使用executor,需注意线程安全 6. **插件加载**: 插件会在import时自动加载,确保项目结构和entry_points配置正确 ## 升级指南 ### 从0.3.x升级到0.4.x - `BaseStream` → `TraceStream` - 移除轮询模式相关参数(`interval_ms`, `order_type`等) - `__call__`方法现在接收`(data, future)`参数 - 使用`input()`方法代替`send()` - Pipeline执行使用`await pipeline()`代替`start()` ## 性能优化建议 1. **合理配置线程池**: CPU密集任务使用ThreadPoolExecutor 2. **控制队列大小**: 设置合适的max_queue_size避免内存溢出 3. **超时设置**: 合理设置timeout_perflow避免阻塞 4. **广播模式谨慎使用**: 仅在确实需要多路分发时启用 5. **及时清理数据**: 避免在TraceWrapper中持有大量数据引用