跳到主要内容

基础知识

Netty核心组件介绍

Channel

一个连接分配一个channel,如:

  • 终端 A 连接 → 创建 Channel-1
  • 终端 B 连接 → 创建 Channel-2
  • 终端 C 连接 → 创建 Channel-3

线程模型

单线程 QPS<=5k

所有I/O操作和业务逻辑由单个线程处理

EventLoopGroup singleGroup = new NioEventLoopGroup(1); // 关键:线程数=1
ServerBootstrap b = new ServerBootstrap();
b.group(singleGroup) // 单线程同时处理连接和I/O
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new SimpleHandler());
}
});

多线程 1w<=QPS<=3w

一组线程共同处理所有Channel的I/O事件

EventLoopGroup workerGroup = new NioEventLoopGroup(16); // 关键:多线程

ServerBootstrap b = new ServerBootstrap();
b.group(workerGroup) // 所有线程平等处理连接和I/O
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new SimpleHandler());
}
});

主从多线程 QPS>5w

分离连接接收(主线程组)和I/O处理(工作线程组)

EventLoopGroup bossGroup = new NioEventLoopGroup(2);  // 主线程组:专责接收连接
EventLoopGroup workerGroup = new NioEventLoopGroup(16); // 工作线程组:处理I/O

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup) // 关键:双线程组
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ComplexHandler()); // 复杂业务处理
}
});

AttributeKey

ChannelHandler

ChannelDuplexHandler

应用场景:

  • 出站入站流量统计监控

ChannelHandlerContext

生命周期

ChannelTrafficShapingHandler

ChannelHandler#channelWritabilityChanged

ByteBuf

注意事项:

  • ByteBuf使用完以后,一定要调用#release方法释放,避免内存泄漏。

开发中涉及到类:

  • Unpooled 创建非池化ByteBuf
  • PooledByteBufAllocator 创建池化ByteBuf,减少频繁分配和释放内存的开销
  • CompositeByteBuf 用于创建复合缓冲区,将多个ByteBuf 合并为一个逻辑缓冲区
  • ByteBufAllocator 缓冲区分配器接口,用于分配、回收缓冲区。
  • ReferenceCounted 用于引用计数管理
  • ByteBufUtil 用于转换、比较、计算缓冲区哈希值

PooledByteBufAllocator

ChannelPipeline

注意:

  • addLast添加的处理器需要考虑顺序问题,不能够随意颠倒,一般是入站Decode->出站Encode。

IdleStateHandler 心跳处理器

该处理器的几种空闲状态

public enum IdleState {
READER_IDLE, //读空闲状态
WRITER_IDLE, //写空闲状态
ALL_IDLE; //读写都是空想状态
}

案例:心跳处理器new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS)10秒内没有收到客户端任何数据,则关闭连接。

ChannelHandler#userEventTriggered
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
//如果是读空闲状态
if (event.state() == IdleState.READER_IDLE) {
System.out.println("Client timeout, closing connection");
ctx.close(); //会断开与客户端的链接
}
}
}

@Sharable注解

介绍:用于在多个channel中共享handler,默认情况下每个channel(一个session连接)会创建一个handler实例保证线程安全。 应用场景:无状态、线程安全,对于有状态和非线程安全的handler共享会导致数据错误。

内置编码器和解码器总结

编解码器名称介绍应用场景
LineBasedFrameDecoder基于行分隔符(\n\r\n)分割字节流为消息,支持最大帧长限制。文本协议(如 SMTP、IRC),日志传输,简单命令行协议。示例:处理 "Hello\nWorld\n" 为两条消息。
DelimiterBasedFrameDecoder基于自定义分隔符(如 ,$)分割字节流为消息。自定义文本协议,CSV 数据流传输。示例:以 $ 分隔,处理 data1$data2$
LengthFieldBasedFrameDecoder基于长度字段解析自定义协议,支持配置长度字段偏移和调整。自定义二进制协议,工业设备通信,游戏服务器。示例:解析 [4字节长度][数据] 格式。
LengthFieldPrepender在消息前添加长度字段,配合 LengthFieldBasedFrameDecoder 使用。自定义二进制协议,解决粘包/拆包问题。示例:为消息添加 4 字节长度前缀。
FixedLengthFrameDecoder按固定长度分割字节流为消息。固定长度协议,嵌入式设备通信,遗留系统。示例:每 16 字节分割为一条消息。
StringEncoder将字符串编码为指定字符集(如 UTF-8)的字节流。文本协议,聊天服务器,HTTP 请求处理。示例:编码字符串消息到字节流。
StringDecoder将字节流解码为字符串,支持指定字符集。配合 LineBasedFrameDecoder 等,处理文本消息。示例:解码字节为字符串。
ObjectEncoder将 Java 对象序列化为字节流(基于 Java 序列化)。快速原型开发,Java 内部对象传输。示例:传输 User 对象。
ObjectDecoder将字节流反序列化为 Java 对象。配合 ObjectEncoder,处理 Java 对象。示例:反序列化接收到的对象。
ProtobufEncoder将 Google Protocol Buffers 消息编码为字节流。高性能跨语言协议,gRPC,微服务通信。示例:编码 Protobuf Person 消息。
ProtobufDecoder将字节流解码为 Protobuf 消息对象。配合 Protobuf 协议,处理跨平台消息。示例:解码 Protobuf 消息。
HttpRequestEncoder将 HTTP 请求编码为字节流。HTTP 服务器/客户端,REST API,代理服务器。示例:编码 HTTP GET 请求。
HttpResponseEncoder将 HTTP 响应编码为字节流。HTTP 服务器,Web 应用。示例:编码 HTTP 响应。
HttpRequestDecoder将字节流解码为 HTTP 请求对象。解析 HTTP 请求,Web 服务器开发。示例:解码 POST 请求的表单数据。
HttpResponseDecoder将字节流解码为 HTTP 响应对象。解析 HTTP 响应,客户端开发。示例:解码服务端响应。
HttpObjectAggregator将分段 HTTP 消息聚合成完整对象(如 FullHttpRequest)。处理 HTTP 分块传输(chunked),简化 Web 开发。示例:聚合分段请求。
WebSocketFrameEncoder编码 WebSocket 帧(如文本帧、二进制帧)。WebSocket 服务器/客户端,实时通信。示例:编码聊天消息帧。
WebSocketFrameDecoder解码 WebSocket 帧为对象。实时应用,如聊天、在线游戏。示例:解码接收到的文本帧。
ByteToMessageDecoder抽象基类,用于自定义字节到消息的解码逻辑。自定义协议开发,复杂协议处理。示例:实现私有协议解析。
MessageToByteEncoder抽象基类,用于自定义消息到字节的编码逻辑。自定义协议,复杂消息编码。示例:编码私有协议消息。
Base64Encoder将字节流编码为 Base64 格式。传输 Base64 编码数据,如 JSON 中的二进制数据。示例:编码图片数据。
Base64Decoder将 Base64 格式解码为字节流。解码 Base64 数据,如嵌入的图片。示例:解码 Base64 字符串。
KryoEncoder(需额外依赖)使用 Kryo 序列化库将对象编码为字节流。高性能对象传输,替代 Java 序列化。示例:编码复杂 Java 对象。
KryoDecoder(需额外依赖)使用 Kryo 反序列化字节流为对象。配合 KryoEncoder,高性能场景。示例:解码 Kryo 序列化对象。

黑白名单功能

1、动态校验拦截 继承RuleBasedIpFilter实现一个过滤器

public class DynamicIpFilter extends RuleBasedIpFilter {
@Override
protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress remoteAddress) {
String ip = remoteAddress.getAddress().getHostAddress();
return !RedisClient.isInBlacklist(ip); // 查询Redis黑名单或者其他存储
}
}
pipeline.addLast(new DynamicIpFilter()); // 动态过滤器

2、静态黑白名单

IpSubnetFilterRule blacklistRule = new IpSubnetFilterRule("192.168.1.1", 32, IpFilterRuleType.REJECT);
RuleBasedIpFilter ipFilter = new RuleBasedIpFilter(blacklistRule);
pipeline.addLast(ipFilter);

3、自定义handler实现黑白名单

public class DynamicIpFilter extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
String ip = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
if (RedisClient.isBlacklisted(ip)) { // 查询Redis黑名单
ctx.close(); // 拦截连接
} else {
ctx.fireChannelActive(); // 放行
}
}
}

4、CIDR规则匹配(高效IP段控制)

List<IpFilterRule> rules = new ArrayList<>();
rules.add(new IpSubnetFilterRule("10.0.0.0/8", IpFilterRuleType.ACCEPT)); // 动态添加规则
pipeline.addLast(new RuleBasedIpFilter(rules.toArray(new IpFilterRule[0])));

内存泄漏探测

 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

支持的检测级别:

public enum Level {
DISABLED, //完全禁用内存泄漏检测
SIMPLE, //默认级别 简单采样 开销较小
ADVANCED,//高级采样资源泄漏检测,高开销为代价报告最近访问泄漏对象的位置
PARANOID;//和ADVANCED一样 仅用于测试环境
}