FileRegion/CompositeByteBuf一行搞定
BossGroup (1线程)
EventLoop 轮询 Accept 事件
↓
新连接注册到 WorkerGroup
↓
WorkerGroup (N线程 = CPU核数*2)
EventLoop-1 EventLoop-2 ... EventLoop-N
(读→解码→业务→编码→写 全在一个线程)
关键点:一个Channel从创建到销毁,始终绑定在同一个EventLoop线程——无锁化设计。// 1. 固定长度
ch.pipeline().addLast(new FixedLengthFrameDecoder(100));
// 2. 分隔符(最常用——Redis RESP协议就用\r\n)
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192,
Delimiters.lineDelimiter()));
// 3. 长度域(自定义协议:前4字节表长度)
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(
65535, 0, 4, 0, 4));
// 4. 行分隔
ch.pipeline().addLast(new LineBasedFrameDecoder(8192));
// 5. 变长自定义
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024,
0, 2, -2, 0));
CompositeByteBuf:多个ByteBuf合并成一个逻辑Buf,不实际拷贝数据Slice:共享同一个byte[],不同视图FileRegion:调用transferTo直接文件→Socket,绕过用户态sendfile(Linux):DMA直接磁盘→内核缓冲区→Socket缓冲区(不经过用户态)mmap:内核和用户空间共享内存splice:两个fd之间管道传输ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT; ByteBuf buf = allocator.buffer(1024); // 从池中取,用完release归还
I/O Request
↓
┌─── ChannelPipeline ───────────┐
│ HeadContext (双向) │
│ ↓ │
│ [Encoder] =====出站====> │ // 出站Handler
│ [Codec] ==出/入==> │ // 编解码器合并
│ [Business] <====入站==== │ // 业务Handler
│ ↓ │
│ TailContext (双向) │
└────────────────────────────────┘
↓
I/O Response
入站事件(Inbound):ChannelActive→ChannelRead→ChannelReadComplete 从头到尾顺序执行ctx.fireChannelRead()继续传播;不调用则中断。next()轮询从EventLoopGroup中选一个。// 默认轮询算法(DefaultEventExecutorChooserFactory)
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1]; // 2的幂次取模
}
关键保证:一个Channel创建后绑定到某个EventLoop → 永久绑定,直到Channel关闭。这意味着:DefaultEventExecutorGroup(业务线程池):
pipeline.addLast(new DefaultEventExecutorGroup(16),
new BusinessHandler()); // BusinessHandler在独立线程池执行
ch.pipeline().addLast(new IdleStateHandler(
60, // readerIdleTime: 60秒没读到数据 → 触发 READER_IDLE
30, // writerIdleTime: 30秒没写出数据 → 触发 WRITER_IDLE
0 // allIdleTime: 0=禁用
));
// 自定义心跳处理
class HeartbeatHandler extends ChannelInboundHandlerAdapter {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
ctx.close(); // 超过60秒没收到→踢下线
} else if (state == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage()); // 30秒→发心跳
}
}
}
}
生产实践:服务端设readerIdle=2*客户端心跳间隔——留一倍容错空间。
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new HttpServerCodec()) // HTTP编解码
.addLast(new HttpObjectAggregator(65536)) // 聚合完整请求
.addLast(new SimpleChannelInboundHandler() {
protected void channelRead0(ChannelHandlerContext ctx,
FullHttpRequest req) {
String body = "Hello Netty!";
FullHttpResponse resp = new DefaultFullHttpResponse(
HTTP_1_1, OK, Unpooled.copiedBuffer(body, UTF_8));
resp.headers().set(CONTENT_TYPE, "application/json");
resp.headers().set(CONTENT_LENGTH, body.length());
ctx.writeAndFlush(resp).addListener(CLOSE);
}
});
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }
追问多说一句:实际项目用Spring WebFlux + Netty,Spring Boot 3默认底层就是Netty。
☕ 这篇文章帮到你了?
请作者喝杯咖啡,支持持续更新更多面试干货
微信扫码赞赏