基础知识
Netty核心组件介绍
Channel
一个连接分配一个channel,如:
- 终端 A 连接 → 创建 Channel-1
- 终端 B 连接 → 创建 Channel-2
- 终端 C 连接 → 创建 Channel-3
线程模型
单线程 QPS<=5k
所有I/O操作和业务逻辑由单个线程处理
EventLoopGroup singleGroup = new NioEventLoopGroup(1); // 关键:线程数=1
ServerBootstrap b = new ServerBootstrap();
b.group(singleGroup) // 单线程同时处理连接和I/O
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new SimpleHandler());
}
});
多线程 1w<=QPS<=3w
一组线程共同处理所有Channel的I/O事件
EventLoopGroup workerGroup = new NioEventLoopGroup(16); // 关键:多线程
ServerBootstrap b = new ServerBootstrap();
b.group(workerGroup) // 所有线程平等处理连接和I/O
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new SimpleHandler());
}
});
主从多线程 QPS>5w
分离连接接收(主线程组)和I/O处理(工作线程组)
EventLoopGroup bossGroup = new NioEventLoopGroup(2); // 主线程组:专责接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup(16); // 工作线程组:处理I/O
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 关键:双线程组
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ComplexHandler()); // 复杂业务处理
}
});
AttributeKey
ChannelHandler
ChannelDuplexHandler
应用场景:
- 出站入站流量统计监控
ChannelHandlerContext
生命周期
ChannelTrafficShapingHandler
ChannelHandler#channelWritabilityChanged
ByteBuf
注意事项:
- ByteBuf使用完以后,一定要调用#release方法释放,避免内存泄漏。
开发中涉及到类:
- Unpooled 创建非池化ByteBuf
- PooledByteBufAllocator 创建池化ByteBuf,减少频繁分配和释放内存的开销
- CompositeByteBuf 用于创建复合缓冲区,将多个ByteBuf 合并为一个逻辑缓冲区
- ByteBufAllocator 缓冲区分配器接口,用于分配、回收缓冲区。
- ReferenceCounted 用于引用计数管理
- ByteBufUtil 用于转换、比较、计算缓冲区哈希值
PooledByteBufAllocator
ChannelPipeline
注意:
- addLast添加的处理器需要考虑顺序问题,不能够随意颠倒,一般是入站Decode->出站Encode。
IdleStateHandler 心跳处理器
该处理器的几种空闲状态
public enum IdleState {
READER_IDLE, //读空闲状态
WRITER_IDLE, //写空闲状态
ALL_IDLE; //读写都是空想状态
}
案例:心跳处理器new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)
10秒内没有收到客户端任何数据,则关闭连接。
ChannelHandler#userEventTriggered
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
//如果是读空闲状态
if (event.state() == IdleState.READER_IDLE) {
System.out.println("Client timeout, closing connection");
ctx.close(); //会断开与客户端的链接
}
}
}
@Sharable注解
介绍:用于在多个channel中共享handler,默认情况下每个channel(一个session连接)会创建一个handler实例保证线程安全。 应用场景:无状态、线程安全,对于有状态和非线程安全的handler共享会导致数据错误。
内置编码器和解码器总结
编解码器名称 | 介绍 | 应用场景 |
---|---|---|
LineBasedFrameDecoder | 基于行分隔符(\n 或 \r\n )分割字节流为消息,支持最大帧长限制。 | 文本协议(如 SMTP、IRC),日志传输,简单命令行协议。示例:处理 "Hello\nWorld\n" 为两条消息。 |
DelimiterBasedFrameDecoder | 基于自定义分隔符(如 , 、$ )分割字节流为消息。 | 自定义文本协议,CSV 数据流传输。示例:以 $ 分隔,处理 data1$data2$ 。 |
LengthFieldBasedFrameDecoder | 基于长度字段解析自定义协议,支持配置长度字段偏移和调整。 | 自定义二进制协议,工业设备通信,游戏服务器。示例:解析 [4字节长度][数据] 格式。 |
LengthFieldPrepender | 在消息前添加长度字段,配合 LengthFieldBasedFrameDecoder 使用。 | 自定义二进制协议,解决粘包/拆包问题。示例:为消息添加 4 字节长度前缀。 |
FixedLengthFrameDecoder | 按固定长度分割字节流为消息。 | 固定长度协议,嵌入式设备通信,遗留系统。示例:每 16 字节分割为一条消息。 |
StringEncoder | 将字符串编码为指定字符集(如 UTF-8)的字节流。 | 文本协议,聊天服务器,HTTP 请求处理。示例:编码字符串消息到字节流。 |
StringDecoder | 将字节流解码为字符串,支持指定字符集。 | 配合 LineBasedFrameDecoder 等,处理文本消息。示例:解码字节为字符串。 |
ObjectEncoder | 将 Java 对象序列化为字节流(基于 Java 序列化)。 | 快速原型开发,Java 内部对象传输。示例:传输 User 对象。 |
ObjectDecoder | 将字节流反序列化为 Java 对象。 | 配合 ObjectEncoder ,处理 Java 对象。示例:反序列化接收到的对象。 |
ProtobufEncoder | 将 Google Protocol Buffers 消息编码为字节流。 | 高性能跨语言协议,gRPC,微服务通信。示例:编码 Protobuf Person 消息。 |
ProtobufDecoder | 将字节流解码为 Protobuf 消息对象。 | 配合 Protobuf 协议,处理跨平台消息。示例:解码 Protobuf 消息。 |
HttpRequestEncoder | 将 HTTP 请求编码为字节流。 | HTTP 服务器/客户端,REST API,代理服务器。示例:编码 HTTP GET 请求。 |
HttpResponseEncoder | 将 HTTP 响应编码为字节流。 | HTTP 服务器,Web 应用。示例:编码 HTTP 响应。 |
HttpRequestDecoder | 将字节流解码为 HTTP 请求对象。 | 解析 HTTP 请求,Web 服务器开发。示例:解码 POST 请求的表单数据。 |
HttpResponseDecoder | 将字节流解码为 HTTP 响应对象。 | 解析 HTTP 响应,客户端开发。示例:解码服务端响应。 |
HttpObjectAggregator | 将分段 HTTP 消息聚合成完整对象(如 FullHttpRequest )。 | 处理 HTTP 分块传输(chunked),简化 Web 开发。示例:聚合分段请求。 |
WebSocketFrameEncoder | 编码 WebSocket 帧(如文本帧、二进制帧)。 | WebSocket 服务器/客户端,实时通信。示例:编码聊天消息帧。 |
WebSocketFrameDecoder | 解码 WebSocket 帧为对象。 | 实时应用,如聊天、在线游戏。示例:解码接收到的文本帧。 |
ByteToMessageDecoder | 抽象基类,用于自定义字节到消息的解码逻辑。 | 自定义协议开发,复杂协议处理。示例:实现私有协议解析。 |
MessageToByteEncoder | 抽象基类,用于自定义消息到字节的编码逻辑。 | 自定义协议,复杂消息编码。示例:编码私有协议消息。 |
Base64Encoder | 将字节流编码为 Base64 格式。 | 传输 Base64 编码数据,如 JSON 中的二进制数据。示例:编码图片数据。 |
Base64Decoder | 将 Base64 格式解码为字节流。 | 解码 Base64 数据,如嵌入的图片。示例:解码 Base64 字符串。 |
KryoEncoder(需额外依赖) | 使用 Kryo 序列化库将对象编码为字节流。 | 高性能对象传输,替代 Java 序列化。示例:编码复杂 Java 对象。 |
KryoDecoder(需额外依赖) | 使用 Kryo 反序列化字节流为对象。 | 配合 KryoEncoder ,高性能场景。示例:解码 Kryo 序列化对象。 |
黑白名单功能
1、动态校验拦截 继承RuleBasedIpFilter实现一个过滤器
public class DynamicIpFilter extends RuleBasedIpFilter {
@Override
protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress remoteAddress) {
String ip = remoteAddress.getAddress().getHostAddress();
return !RedisClient.isInBlacklist(ip); // 查询Redis黑名单或者其他存储
}
}
pipeline.addLast(new DynamicIpFilter()); // 动态过滤器
2、静态黑白名单
IpSubnetFilterRule blacklistRule = new IpSubnetFilterRule("192.168.1.1", 32, IpFilterRuleType.REJECT);
RuleBasedIpFilter ipFilter = new RuleBasedIpFilter(blacklistRule);
pipeline.addLast(ipFilter);
3、自定义handler实现黑白名单
public class DynamicIpFilter extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if (RedisClient.isBlacklisted(ip)) { // 查询Redis黑名单
ctx.close(); // 拦截连接
} else {
ctx.fireChannelActive(); // 放行
}
}
}
4、CIDR规则匹配(高效IP段控制)
List<IpFilterRule> rules = new ArrayList<>();
rules.add(new IpSubnetFilterRule("10.0.0.0/8", IpFilterRuleType.ACCEPT)); // 动态添加规则
pipeline.addLast(new RuleBasedIpFilter(rules.toArray(new IpFilterRule[0])));
内存泄漏探测
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
支持的检测级别:
public enum Level {
DISABLED, //完全禁用内存泄漏检测
SIMPLE, //默认级别 简单采样 开销较小
ADVANCED,//高级采样资源泄漏检测,高开销为代价报告最近访问泄漏对象的位置
PARANOID;//和ADVANCED一样 仅用于测试环境
}