# Netty
**Repository Path**: aumu/netty
## Basic Information
- **Project Name**: Netty
- **Description**: No description available
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2021-04-26
- **Last Updated**: 2021-05-22
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Netty 核心组件
- Channel
- Callback
- Future
- Event & ChannelHandler
## Channel
> Channel 是 Java NIO 的一个基础构造
>
> Channel 是`入站`或`出站`数据的载体。因此它可以被打开、关闭或者连接、断开连接
>
## Callback
> Netty 内部使用回调来处理事件;当一个回调被触发时,相关的事件可用被一个 `ChannelHandler` 接口的实现来处理
>
```java
public class Callback extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("建立新连接");
}
}
```
## Future
> Future 提供了某种操作完成时通知应用程序的方式
>
### ChannelFuture
> ChannelFuture 用于执行异步操作的时候使用
>
> 它提供了额外的方法,能够帮助我们注册一个或者多个 `ChannelFutureListener` 实例
>
> 监听器的回调方法`operationComplete()`将会在对应的操作完成时调用
>
> 然后监听器可以判断该操作是否成功
>
> 如果失败可以检索产生的 `Throwable`
>
> 每一个Netty出战的 I/O 操作都会返回一个ChannelFuture
>
```java
Bootstrap bootstrap = new Bootstrap();
// 连接远程节点
ChannelFuture future = bootstrap.connect("127.0.0.1", 8880).sync();
future.addListener(new ChannelFutureListener() { // 注册一个 ChannelFutureListener,在完成操作时获得通知
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// 如果操作成功则做一些操作
System.out.println("操作成功");
} else {
// 失败,打印堆栈信息,或者其他操作
Throwable cause = future.cause();
cause.printStackTrace();
}
}
});
```
## Event & ChannelHandler
> Netty 使用不同的事件来通知我们状态的改变或者是操作的改变,这样可以让我们基于已经发生的事件来触发适当的操作
>
> 每一个事件都可以被分发到 ChannelHandler 类中的某个用户实现的方法
>
> 例如:记录日志、数据转换、流控制、应用程序逻辑
>
### Netty 入站相关事件
- 连接已被记过或者连接失活
- 数据读取
- 用户事件
- 错误事件
### Netty 出站相关事件
- 打开或者关闭到远程节点的连接
- 将数据写道或者冲刷到套接字
# Netty 快速开始
## 引入依赖
### pom.xml
```xml
io.netty
netty-all
4.1.48.Final
ch.qos.logback
logback-classic
1.2.3
compile
org.apache.logging.log4j
log4j-to-slf4j
2.13.3
compile
org.slf4j
jul-to-slf4j
1.7.30
compile
```
## 服务端
### NettyServer
```java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyServer {
private final static Logger logger = LoggerFactory.getLogger(NettyServer.class);
public static void main(String[] args) throws InterruptedException {
/**
* bossGroup:只负责处理连接请求
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
// 创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
// 设置两个线程组
.group(bossGroup, workGroup)
// 设置服务端的通道实现
.channel(NioServerSocketChannel.class)
// 设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
// 设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 设置 workGroup 的 EventLoop 对应的通道设置处理器
.childHandler(new ChannelInitializer() { // 创建一个通道初始化对象
// 给 pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
logger.info("服务端准备完成");
// 绑定端口,设置为同步,并且启动服务器
ChannelFuture cf = bootstrap.bind(6666).sync();
// 监听关闭通道
cf.channel().closeFuture().sync();
} finally {
// 优雅的关闭
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
```
### NettyServerHandler
```java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final static Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
/**
* 对于每个传入的消息都会调用
* @param ctx 上下文对象,包含 pipeline(管道)、channel(通道)、地址
* @param msg 客户端传递来的消息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 将接收到的消息写给客户端,而不冲刷出站消息
ctx.write(buf);
}
/**
* 通知 ChannelInboundHandler 最后一次对 channelRead() 的调用是当前批量读取中的最后一条消息
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将暂存于 ChannelOutboundBuffer 中的消息冲刷到远程节点,
// 并在下一次调用flush()或者writeAndFlush()方法时将会尝试写出到套接字
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
// 监听消息冲刷完毕后关闭该 Channel
.addListener(ChannelFutureListener.CLOSE);
}
/**
* 的读取操作期间,有异常抛出时调用
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 打印堆栈异常
cause.printStackTrace();
// 关闭 Channel
ctx.close();
}
}
```
## 客户端
### NettyClient
```java
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyClient {
private final static Logger logger = LoggerFactory.getLogger(NettyClient.class);
public static void main(String[] args) throws InterruptedException {
// 客户端 需要一个循环组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
// 客户端启动对象
// 服务端使用 ServerBootstrap 而客户端是使用 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 设置线程组
.group(eventExecutors)
// 设置客户端通道的实现
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
logger.info("客户端准备完成");
// 启动客户端并连接服务端
ChannelFuture future = bootstrap.connect("127.0.0.1", 6666).sync();
// 监听关闭通道
future.channel().closeFuture().sync();
} finally {
eventExecutors.shutdownGracefully();
}
}
}
```
### NettyClientHandler
```java
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final static Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
* 当通道就绪时触发
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("client : {}", ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Client", CharsetUtil.UTF_8));
}
/**
* 读取服务端发送的消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("服务端回复的消息 : {}", buf.toString(CharsetUtil.UTF_8));
logger.info("服务器的地址 : {}", ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
```
# 任务调度
## TaskQueue 任务队列
```java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("server context : {}", ctx);
// 定义 taskQueue 实现异步任务。解决 : 业务时间过长导致长时间阻塞
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty2~", CharsetUtil.UTF_8));
}
});
ByteBuf buf = (ByteBuf) msg;
logger.info("客户端发送的消息是 : {}", buf.toString(CharsetUtil.UTF_8));
logger.info("客户端的地址是 : {}", ctx.channel().remoteAddress());
}
```
## scheduledTaskQueue 定时任务队列
```java
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.info("server context : {}", ctx);
// 参数一:业务实现、参数二:延迟时间、参数三:时间单位
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty2~", CharsetUtil.UTF_8));
}
}, 5, TimeUnit.SECONDS);
ByteBuf buf = (ByteBuf) msg;
logger.info("客户端发送的消息是 : {}", buf.toString(CharsetUtil.UTF_8));
logger.info("客户端的地址是 : {}", ctx.channel().remoteAddress());
}
```
# FutureListener 监听器
> Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 `ChannelFuture` 来获取操作执行的状态,注册监听函数来执行完成后的操作
```java
// 绑定端口,设置为同步,并且启动服务器
ChannelFuture cf = bootstrap.bind(6666).sync();
// 添加监听器
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
logger.info("监听端口 6666 成功");
} else {
logger.error("监听端口 6666 失败");
}
}
});
```
# BootStrap、ServerBootStrap
> BootStrap: 只要用于配置整个 Netty 程序、串联各个组件
> Netty 中的 BootStrap 类是客户端程序的启动引导类、ServerBootStrap 是服务端启动引导类
## 常用方法
```java
// 在服务端设置两个EventLoop
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {}
// 在客户端设置一个EventLoop
public B group(EventLoopGroup group) {}
// 设置一个服务器端的通道实现
public B channel(Class extends C> channelClass) {}
// 用来给 ServerChannel 添加配置
public B option(ChannelOption option, T value) {}
// 用来给接收到的通道添加配置
public ServerBootstrap childOption(ChannelOption childOption, T value) {}
// 设置服务端的端口
public ChannelFuture bind(int inetPort) {}
// 提供给客户端连接服务端
public ChannelFuture connect(String inetHost, int inetPort) {}
```
# Future、ChannelFuture
> Netty 中所有的 IO 操作都是异步的、不能立刻得知消息被正确处理
>
> 但是可以等待执行完成或者注册监听器,具体通过 Future、ChanelFutures 来注册监听器
>
> 当操作成功或者失败的时候会去执行注册的监听器
>
## 常用方法
```java
// 返回当前正在执行IO的操作的通道
Channel channel();
// 等待异步执行完毕
ChannelFuture sync() throws InterruptedException;
```
# Channel
1. Netty 网络通信的组件,能够用于执行网络 I/O 操作
2. 通过 Channel 可获得当前网络连接通道的状态、配置参数(例如接收缓冲区大小)
3. Channel 提供异步的网络 I/O 操作(如建立连接、读写、绑定端口)
4. 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以在 I/O 操作成功、失败或取消时回调通知调用方
5. 支持关联 I/O 操作与对应的处理程序
6. 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应
## 常用的 Channel 类型
- `NioSOcketChannel` : 异步的`客户端` `TCP Socket` 连接
- `NioServerSocketChannel` : 异步的`服务器端` `TOC Socket` 连接
- `NioDatagramChannel` : 异步的 `UDP` 连接
- `NioSctpChannel` : 异步的 `客户端` `Sctp` 连接
- `NioSctpServerChannel` : 异步的 `Sctp` `服务器端` 连接
# Selector
> Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件
>
> 当向一个 Selector 中注册 Channel 后 Selector 内部的机制就可以自动不断的查询这些注册的 Channel 是否以有就绪的I/O事件
>
> 例如可读、可写、网络连接完成等
>
> 这样程序就可以很简单的使用一个下称高效的管理多个 Channel
>
# ChannelHandler
> ChannelHandler 是一个借口,用来处理 I/O 事件或拦截 I/O 操作
>
> 并将其转发到其他 ChannelPipeline(业务处理链)中的下一个处理程序
## ChannelHandler 实现类
- `ChannelInboundHandler` : 用于处理入站 I/O 事件
- `ChannelOutboundHandler` : 用于处理出栈 I/O 操作
- `ChannelInboundHandlerAdapter` : 用于处理入站 I/O 事件
- `ChannelOutboundHandlerAdapter` : 用于处理出栈 I/O 操作
- `ChannelDuplexHandler` : 用于处理入站和出战事件
## ChannelInboundHandlerAdapter 常用方法
```java
// 通道被注册事件
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {}
// 通道被注销事件
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {}
// 通道处理就绪事件
public void channelActive(ChannelHandlerContext ctx) throws Exception{}
// 通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {}
// 数据读取完毕事件
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}
// 通道发生异常事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
// 当前 Handler 被 pipeline 注册时触发
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
// 当前 Handler 被 pipeline 移除时触发
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
```
## @ChannelHandler.Sharable
> 表示一个 ChannelHandler 可以被多个 Channel 安全的共享
>
> Netty 会在每个 Channel 中都添加处理器,如果每个通道都创建一次对象,对内存的消耗无疑是非常大的
>
> 所以一般在线程安全的处理器中,都会标注此注解,表示可以只创建一次对象
>
> 一般在没有共享变量的处理器都可以加入此注解
# Pipeline & ChannelPipeline
> ChannelPipeline 是一个 Handler 的集合,负责处理和拦截 inbound 或者 outbound 的事件和操作
>
> 相当于一个贯穿Netty的链
>
> 简单说:ChannelPipeline 是一个用来保存 ChannelHandler 的集合,用来处理或拦截 Channel 的入站事件和出战操作
>
> ChannelPipeline 实现了一种高级形式的拦截过滤器模式,让用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 的相互交互
>
> 在 Netty 中,每个 Channel 都仅有一个 ChannelPipeline,

> 一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表
>
> 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
>
> 入站事件和出战事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler
>
> 出战事件会从链表 tail 往前传递到最前一个出站的 handler, 两种类型的 handler 互不干扰
>
## 常用方法
```java
// 把 handler 添加到链表的最后一个位置
ChannelPipeline addLast(ChannelHandler... handlers);
// 把 handler 添加到链表的第一个位置
ChannelPipeline addFirst(ChannelHandler... handlers);
```
# ChannelHandlerContext
> 保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
>
> ChannelHandlerContext 中包含一个具体的事件处理器 ChannelHandler
>
> 同时 ChannelHandlerContext 中也绑定了对应的pipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用
>
## 常用方法
```java
// 关闭通道
ChannelFuture close();
// 刷新
ChannelHandlerContext flush();
// 将数据写到 ChannelPipeline 中,当前 CHannelHandler 的下一个 ChannelHandler 开始处理(出站)
ChannelFuture writeAndFlush(Object msg);
```
# ChannelOption
## ChannelOption.SO_BACKLOG
> 对应 TCP/IP 协议 Listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小
>
> 服务端处理客户端链接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端请求时,服务端不能处理的客户端连接请求放在队列中等待处理
>
## ChannelOption.SO_KEEPALIVE
> 是否保持连接活动状态
# EventLoopGroup & NioEventLoopGroup
> EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了跟他好的利用多核 CPU 资源,一半会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例
>
> EventLoopGroup 提供 next 接口,可以重里面按照一定规则获取其中一个 EventLoop 来处理任务
>
> 在 Netty 服务器端编程中,我们一半都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup
>
> 通常一个服务端口即一个 ServerSocketChannel 对应一个 Selector 和一个 EventLoop 线程。
>
> BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理
>
> > BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop维护者一个注册了 ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来
> >
> > 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup
> >
> > WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护 Selector 并对其后续的 IO 事件进行处理
# Unpooled
> Unpooled 类是 Netty 用来操作 缓冲区的工具类
## 常用方法
```java
public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
```
# 心跳机制
```java
package io.mvvm.netty.heart;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* @program: Netty
* @description: 心跳机制-服务端
* @author: 潘
* @create: 2021-04-29 00:04
**/
public class HeartServer {
private final static Logger logger = LoggerFactory.getLogger(HeartServer.class);
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// handler 对应的是 bossGroup
// childHandler 对应的是 workGroup
// 日志处理器,提供给BossGroup打印日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 加入空闲状态处理器
// 参数一:多少时间没有读取消息,就会发送一个心跳,检测是否是连接状态
// 参数二: 写
// 参数三: 读写
// 参数四:时间单位
// 当 IdleStateHandler 触发后,会传递给通道的下一个 handler 处理
// 通过调用下一个 handler 的 userEventTriggered 方法
ch.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
// 加入一个对空闲检测处理的handler
ch.pipeline().addLast(new HeartHandler());
}
});
logger.info("服务端准备完成");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
logger.info("监听端口 6666 成功");
} else {
logger.error("监听端口 6666 失败");
}
}
});
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
```
```java
package io.mvvm.netty.heart;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
/**
* @program: Netty
* @description: 空闲状态处理器
* @author: 潘
* @create: 2021-04-29 00:18
**/
public class HeartHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent ise = (IdleStateEvent) evt;
switch (ise.state()) {
case READER_IDLE:
System.out.println("读空闲");
break;
case WRITER_IDLE:
System.out.println("写空闲");
break;
case ALL_IDLE:
System.out.println("读写空闲");
break;
}
}
}
}
```
# WebSocket
```java
package io.mvvm.netty.ws;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @program: Netty
* @description: WebSocketServer
* @author: 潘
* @create: 2021-04-29 09:24
**/
public class WebSocketServer {
private final static Logger logger = LoggerFactory.getLogger(WebSocketServer.class);
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer() { // 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// Http协议编解码器
ch.pipeline().addLast(new HttpServerCodec());
// 以块方式写,对大数据流的支持,例如文件传输
ch.pipeline().addLast(new ChunkedWriteHandler());
// 将多个数据段聚合起来,如果发送大文本数据避免请求多次
ch.pipeline().addLast(new HttpObjectAggregator(8192));
// 处理WebSocket握手以及处理控制帧
ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义Handler
ch.pipeline().addLast(new WebSocketMessageHandler());
}
});
ChannelFuture cf = bootstrap.bind(5566).sync();
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
```
```java
package io.mvvm.netty.ws;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* @program: Netty
* @description: 消息处理器, 表示一个文本帧
* @author: 潘
* @create: 2021-04-29 09:41
**/
@ChannelHandler.Sharable
public class WebSocketMessageHandler extends SimpleChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端已连接");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 收到客户端发送的消息
System.out.println(msg.text());
// 向客户端回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("Hello WebSocket : " + msg.text()));
// 动态删除当前处理器
ctx.pipeline().remove(this);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开连接");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("处理器已添加");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("处理器被移除");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生异常" + cause.getMessage());
ctx.close();
}
}
```
# Encoder & Decoder
> 网络应用程序在数据传输时采用二进制字节码数据,所以在发送数据时需要编码,接收数据时需要解码
>
> codec (编解码器)分别表示:Decoder(解码器)、Encoder(编码器)
>
## Netty 的编解码器
- StringEncoder & StringDecoder > String 类型数据的编解码器
- ObjectEncoder & ObjectDecoder > Java 对象的编解码器
> Netty 提供的 Object 编解码器采用的是Java的序列化方式,因此就存在效率低、无法跨语言、体积大等问题
>
## Google Protobuf
> Protobuf 是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化(序列化)
# 粘包 & 半包
> TCP 是面向流的,提供高可靠性服务。收发两端都要有成对的 Socket
>
> 因此发送端为了将多个发给接收端的包,更有效的发送给对方,使用了 Nagle 算法优化
>
> 将多次间隔较小且数量小的数据,合并为一个大的数据块,然后进行封包,
>
> 这样虽然提高了效率,但是接收端就难以分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
## 粘包
### 现象
发送 `abc` `def`,接收为 `abcdef`
### 原因
- 应用层:接收方 ByteBuf 设置太大(Netty默认1024)
- 滑动窗口:假设发送方256bytes表示一个完整的报文,但由于接收方处理不及时且窗口大小足够大,这256bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会出现粘包
- Nagle 算法
## 半包
### 现象
发送 `abcdef` 接收 `abc` `de` `f`
### 原因
- 应用层:接收方 ByteBuf 小于实际发送数据的数量
- 滑动窗口:加涉接收方的窗口只剩128bytes,发送方的报文大小是256bytes,这时放不下了,只能先发送前128bytes,等待ack后才能发送剩余部分,这就造成半包
- MSS限制:当发送的数据草果 MSS 限制后,会将数据切分发送,就会照成半包
## 粘包半包演示
### 服务端
```java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
logger.info("客户端发送的消息是 : {}", buf.toString(CharsetUtil.UTF_8));
}
```
### 客户端
```java
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ByteBuf buf = Unpooled.copiedBuffer("Hello Client" + i, CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
}
```
### 结果打印
```log
[nioEventLoopGroup-3-1] INFO io.mvvm.netty.decoder.NettyServerHandler - 客户端发送的消息是 : Hello Client0Hello Client1Hello Client2Hello Client3Hello Client4Hello Client5Hello Client6Hello Client7Hello Client8Hello Client9
```
### 结论
> 代码解释
>
> 客户端和服务端建立连接后,循环向服务端发送10条消息
>
> 期望结果
>
> 服务端收到10条消息
>
> 实际结果
>
> 服务端将10条消息放在一起成为了一条消息,这就是粘包现象
## 解决方案
### 短连接
> 建立连接后只发送一次消息,发送消息完毕后断开连接
>
> 缺点:不如用http
### 定长解码器 FixedLengthFrameDecoder
> 定常解码器即固定每条消息的固定长度,如果消息长度小于定长的长度,则进行补位到定长长度
>
> 优点:解决了短连接的问题
>
> 缺点,增加消息体积,逐一匹配字符效率低
#### 服务端
```java
// 定长解码器,参数为:帧的长度
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
```
#### 客户端
```java
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
byte[] bytes = getBytes(c, r.nextInt(9) + 1);
buffer.writeBytes(bytes);
c++;
}
ctx.writeAndFlush(buffer);
}
private static byte[] getBytes(char c, int i) {
StringBuilder sb = new StringBuilder();
for (int i1 = 0; i1 < i; i1++) {
sb.append(c);
}
int length = sb.length();
for (int i1 = 0; i1 < 10 - length; i1++) {
sb.append("-");
}
System.out.println(sb.toString());
return sb.toString().getBytes(StandardCharsets.UTF_8);
}
```
### 行解码器 LineBasedFrameDecoder
> public LineBasedFrameDecoder(final int maxLength)
>
> 参数maxLength表示设置一个最大消息长度,如果消息超过这个长度还未找到分隔符,则会抛出`TooLongFrameException`异常
>
> public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)
>
> 和上面那个的区别就是,LineBasedFrameDecoder采用 `\\r\\n` 实现分割
>
> DelimiterBasedFrameDecoder采用自定义分隔符
>
> 每条消息采用特定的分隔符进行分割,例如 `\\n`
>
> 优点: 解决了定长解码器的补位问题
>
> 缺点:逐一匹配字符,效率低
#### 服务端
```java
// 行解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
```
#### 客户端
```java
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
byte[] bytes = getBytes(c, r.nextInt(256) + 1);
buffer.writeBytes(bytes);
c++;
}
ctx.writeAndFlush(buffer);
}
private static byte[] getBytes(char c, int i) {
StringBuilder sb = new StringBuilder();
for (int i1 = 0; i1 < i; i1++) {
sb.append(c);
}
sb.append("\n");
System.out.println(sb.toString());
return sb.toString().getBytes(StandardCharsets.UTF_8);
}
```
### LET 解码器 LengthFieldBasedFrameDecoder
```java
/**
* 基于长度字段帧的解码器
*
* maxFrameLength: 帧的最大长度,如果超出此长度会抛出TooLongFrameException异常
* lengthFieldOffset: 长度字段偏移量,(即偏移多少可以读到长度)
* lengthFieldLength: 长度字段长度,(长度有多少个字节)
* lengthAdjustment: 长度字段为基准,还有多少字节内容
* initialBytesToStrip: 从头去除几个字节,(例如,长度是4个字节,去除长度,则值为4)
*/
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip){}
```
#### 测试
> EmbeddedChannel用来测试处理器
```java
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),
new LoggingHandler(LogLevel.INFO)
);
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
send(buf, "Hello");
send(buf, "Hi");
send(buf, "Test");
channel.writeInbound(buf);
}
private static void send(ByteBuf buf, String msg) {
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
int length = bytes.length;
buf.writeInt(length); // 写入消息长度
buf.writeBytes(bytes); // 写入消息内容
}
```
# 协议设计与解析
## 自定义协议要素
- 魔术:用来在第一事件判断是否是无效数据包
- 版本号:可以支持协议的升级
- 序列化算法:消息正文采用哪种序列化方式,例如:json、protobuf、hessian、jdk
- 指令类型:是登陆、注册、单聊、群聊...和业务相关
- 请求序号:为了双工通信,提供异步能力
- 正文长度
- 消息正文
## 自定义协议编解码实现
### MessageCodec
```java
import io.mvvm.netty.codec.entity.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
public class MessageCodec extends ByteToMessageCodec {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 魔数,4字节
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 版本,1字节
out.writeByte(1);
// 3. 序列化算法(0:jdk, 1:json),1字节
out.writeByte(0);
// 4. 指令类型,1字节
out.writeByte(msg.getMessageType());
// 5. 请求序号,4个字节
out.writeByte(msg.getSequenceId());
// 无意义,对齐填充,保持2的倍数
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度,4字节
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List