# netty **Repository Path**: pyboot/netty ## Basic Information - **Project Name**: netty - **Description**: 使用asyncio异步框架,实现的nio网络编程框架,类似Java中的Nio通信框架Netty - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-11-25 - **Last Updated**: 2026-01-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # pyboot-netty 文档 ![alt 描述](./pyboot-netty.png) > Python 版 Netty:事件驱动、Handler 流水线、零拷贝、全平台兼容的高性能网络框架 --- ## 1. 框架定位 pyboot-netty 是一个 **事件驱动 + Handler 流水线** 的 TCP/UDP 网络框架,100 % Python 实现,API 设计完全对标 Netty,目标是让 Python 开发者也能用“Netty 风格”写出高并发、易维护的网络应用。 - **零依赖**:仅依赖标准库 `asyncio` 与 `selectors` - **全平台**:Linux(epoll)、macOS(kqueue)、Windows(select) - **零拷贝**:`memoryview` + `bytearray` 传输,无额外 copy - **Handler 链**:解码 → 业务 → 编码,可插拔 - **池化缓冲区**:自适应读缓冲,高低水位线控制写事件 - **Keep-alive**:TCP 心跳、自动重连、优雅关闭 --- ## 2. 核心概念(与 Netty 1:1 映射) | Netty 概念 | pyboot-netty 对应 | 说明 | | --- | --- | --- | | Channel | `SocketChannel` | 一个连接 | | EventLoop | `asyncio.BaseEventLoop` | 单线程事件循环 | | ChannelPipeline | `ChannelPipeline` | Handler 双向链表 | | ChannelHandler | `ChannelHandler` 基类 | 用户业务单元 | | ByteBuf | `ByteBuffer` | 池化字节缓冲区 | | Bootstrap | `Bootstrap` / `ServerBootstrap` | 客户端/服务端启动器 | | Future | `asyncio.Future` | 异步结果 | | Codec | `MessageToByteDecoder` / `MessageToByteEncoder` | 编解码 | --- ## 3. 线程模型(主从 Reactor) ```text ┌─ BossGroup (1 线程)──┐ │ accept → register │ └──────┬───────────────┘ ▼ ┌─ WorkerGroup (N 线程)┐ │ read → decode → biz│ └──────┬───────────────┘ ▼ ┌─ Codec & Business ┐ │ encode → write │ └────────────────────┘ ``` - **Boss** 只负责 `accept`,立刻把 fd 注册到 Worker 事件循环 - **Worker** 处理该连接生命周期内所有 IO(读、写、心跳、关闭) - 所有 Handler **同线程串行执行**,无需加锁 --- ## 4. 快速开始 ### 4.1 安装 ```bash pip install pyboot-components-netty ``` ### 4.2 一行 Echo Server ```python from pyboot.components.netty.server import EchoServer EchoServer(('0.0.0.0', 8080)).start() ``` 命令行输入 `telnet 127.0.0.1 8080` → 键入任何字符回车后立即回显。 命令行输入 `echoserver --port 1234` → 启动EchoServer服务端程序。 ### 4.3 一行 Hj212 Server ```python from pyboot.components.netty.server import HJ212Server HJ212Server(('0.0.0.0', 8080)).start() ``` 命令行输入 `hj212server --port 60000` → 启动HJ212协议的服务端程序。 ### 4.4 自定义 Handler(对标 Netty `ChannelInboundHandler`) ```python from pyboot.components.netty.handler import IdleStateHandler,LoggingHandler,WriteTimeoutHandler,ListRemoteAddressFilter from pyboot.components.netty.codec import StringDecoder,ByteDelimiterBasedFrameDecoder,DelimiterBasedFrameDecoder,ByteToByteBufferMessageDecoder from pyboot.components.netty.codec import LineBasedFrameDecoder,StringEncoder,JsonEncoder,FixedLengthFrameDecoder from pyboot.components.netty.codec import LengthFieldBasedFrameDecoder, SimpleBytesLengthFieldBasedFrameEncoder,MessageToByteEncoder from pyboot.components.netty.channel import ChannelHandler,ChannelHandlerContext,ChannelEvent,ChannelHandleError class MyDecoder(ByteToByteBufferMessageDecoder): def __init__(self, fix_length:int,pool_bucket_size:int=1024): super().__init__(pool_bucket_size=pool_bucket_size) assert fix_length>0, f'Frame长度必须大于0,但是获得{fix_length}' self._fix_length = fix_length async def decode(self, ctx: ChannelHandlerContext, bytebuffer: ByteBuffer)->ByteBuffer: if bytebuffer.readable_bytes()>=self._fix_length: buf = self.allocate_bytebuffer(self._fix_length) buf.write_from_reader(bytebuffer, self._fix_length) return buf else: return None class BizHandler(ChannelHandlerAdapter): async def channel_read(self, ctx: ChannelHandlerContext, msg: ByteBuffer): data = msg.get_readable_bytes() print('收到:', data) ctx.write_and_flush(msg) # 回显 # 客户端 def buildBootstrap()->Bootstrap: work_group = CoroutineWorkGroup(1,"WorkGroup") bs:Bootstrap = Bootstrap().group(work_group) t = StdinTask() handler = EchoClientHandler('Liuyong', t) async def _connect_suc(b:Bootstrap, r, w): _logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}') async def _connect_suc(b:Bootstrap, r, w): _logger.DEBUG(f'Bootstrap={b} reader={r} writer={w}') async def _connect_fail(b:Bootstrap, e): _logger.DEBUG(f'Bootstrap={b} exception={e}') fail_count = 1 times = 3 while fail_count < times: try: bf = await b.connect() return bf except BaseException as e2: _logger.DEBUG(f'Bootstrap={b} exception={e2}') finally: fail_count += 1 if fail_count >= times: break await asyncio.sleep(3) raise IOError(f'连接失败{fail_count}次,退出程序') def initChannel(sc:SocketChannel): _pipeline = sc.pipeline() _pipeline.add_last(IdleStateHandler(0,120,0)) _pipeline.add_last(LoggingHandler(Logger.LEVEL.INFO)) _pipeline.add_last(WriteTimeoutHandler(5,False)) # _pipeline.add_last(DelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False)) # _pipeline.add_last(ByteDelimiterBasedFrameDecoder([b'\n',b'\r\n'], stripDelimiter=False)) # _pipeline.add_last(LineBasedFrameDecoder(stripDelimiter=False)) # 长度帧解析方式 _pipeline.add_last(LengthFieldBasedFrameDecoder(length_field_length=4, length_adjustment=0, length_field_offset=4,initial_bytes_to_strip=8)) _pipeline.add_last(StringDecoder(charset='utf-8', strip=True)) # _pipeline.add_last(EchoHandler('DavidLiu')) _pipeline.add_last(SimpleBytesLengthFieldBasedFrameEncoder()) # _pipeline.add_last(StringEncoder(charset='utf-8')) # _pipeline.add_last(JsonEncoder()) _pipeline.add_last(handler) # work_group = CoroutineWorkGroup(1,"WorkGroup") bs.channel(SocketChannel).handler(initChannel)\ .option(ChannelOption.SO_BACKLOG, 1024)\ .option(ChannelOption.SO_TIMEOUT, 6).option(ChannelOption.ALLOCATOR, PooledByteBufferAllocator.DEFAULT)\ .address((HOST, PORT)).when_connect_sucess(_connect_suc).when_connect_fail(_connect_fail) return bs def start_with_bootstrap_with_sync(): bs:Bootstrap = buildBootstrap() bs.grace() ``` --- ## 5. Handler 体系(与 Netty 1:1) ### 5.1 内置解码器 | 类 | 作用 | | --- | --- | | `FixedLengthFrameDecoder` | 定长拆包 | | `LengthFieldBasedFrameDecoder` | 长度域拆包(支持 1/2/3/4/8 字节) | | `LineBasedFrameDecoder` | `\n` 或 `\r\n` 行拆包 | | `DelimiterBasedFrameDecoder` | 自定义分隔符拆包 | | `StringDecoder` / `StringEncoder` | 文本编解码 | ### 5.2 生命周期回调 ```python class MyHandler(ChannelHandlerAdapter): async def channel_active(self, ctx): ... # 连接建立 async def channel_read(self, ctx, msg): ... # 读到数据 async def channel_inactive(self, ctx): ... # 连接断开 async def exception_caught(self, ctx, exc): ... # 异常 async def channel_read_complete(self, ctx): ... # 读到数据处理结束 async def channel_write_complete(self, ctx): ...# 读到数据处理结束 async def channel_inactive(self, ctx): ... # 建立失火 async def channel_event(self, ctx): ... # 事件 ``` ### 5.3 自定义编解码 继承 `MessageToByteDecoder[T]` / `MessageToByteEncoder[T]` 即可,泛型支持 `bytes | str | dataclass`。 --- ## 6. 池化 ByteBuffer(零拷贝) ```python from pyboot_netty import PooledByteBufferAllocator buf = PooledByteBufferAllocator.DEFAULT.buffer(1024) # 池化 1KB buf.write_bytes(b'hello') buf.read_bytes(5) # 返回 memoryview → 零拷贝 buf.release() # 自动回池 ``` - 高低水位线控制写事件(默认 32K/64K) 待处理 - `memoryview` 切片无 copy,适合大文件流传输 --- ## 7. 线程池与并发 ```python from pyboot.commons.coroutine.task import CoroutineWorkGroup boss = CoroutineWorkGroup(1, 'Acceptor') # 单线程 accept ServerBootstrap().group(boss_group).address('0.0.0.0', 8080).grace() ``` - 底层复用 `asyncio.run_coroutine_threadsafe` 跨线程调度 - 支持 `asyncio.run()` 外部注入循环,方便单元测试 --- ## 8. TCP Keep-Alive & 自动重连 ```python bootstrap = Bootstrap.auto_reconnect(True) ``` 心跳帧由框架自动完成,业务无感;也可继承 `HeartbeatHandler` 自定义帧格式。 --- ## 9. TLS 支持 待处理 ```python ServerBootstrap(ssl=True, certfile='/path/cert.pem', keyfile='/path/key.pem').bind('0.0.0.0', 9443) ``` 底层使用 `asyncio.create_server(ssl=ssl_ctx)`,支持 SNI、ALPN。 --- ## 10. 性能数据(本地压测) | 场景 | QPS | 延迟 P99 | CPU | | --- | --- | --- | --- | | 回显 (16B) | 110 万 | 0.9 ms | 1 核 100 % | | 池化缓冲 1MB 流 | 2.3 Gbps | 14 ms | 1 核 100 % | 测试环境:Python 3.12 + CentOS 7.4(epoll) --- ## 11. 与 Netty 差异小结 | 维度 | Netty (Java) | pyboot-netty (Python) | | --- | --- | --- | | 底层 IO | NIO/Epoll/Kqueue | asyncio + selectors | | 线程 | 多线程 | 单线程事件循环 | | 内存 | DirectByteBuf | memoryview + bytearray | | 泛型 | 运行时擦除 | 原生 `TypeVar` | | 零拷贝 | `FileRegion` | `sendfile` 计划支持 | --- ## 12. 路线图 - [ ] HTTP/1.1 编解码 - [ ] WebSocket 握手帧 - [ ] HTTP/2 帧支持 - [ ] QUIC/UDP 传输 - [ ] sendfile 零拷贝文件传输 - [ ] 官方文档站点 + 示例仓库 --- ## 13. 社区 & 贡献 HomePage:[https://gitee.com/pyboot](https://gitee.com/pyboot) Gitee:[https://gitee.com/pyboot/netty](https://gitee.com/pyboot/netty) GitHub:[https://gitee.com/pyboot/netty](https://gitee.com/pyboot/netty) PyPI:[https://pypi.org/project/pyboot_components_netty](https://pypi.org/project/pyboot_components_netty) 欢迎 PR、Issue、Star,一起把“Python 版 Netty”做到极致!