Netty
约 3449 字大约 12 分钟
2025-04-10
Netty
文字内容较长,请耐心阅读。
一、什么是Netty?
Netty是由JBOSS提供的一个异步的、基于事件驱动的网络应用java开源框架,用于快速开发高性能、高可用的网络IO程序。本质上是一个NIO(同步非阻塞I/O模型)框架,适用于面向clients端的高并发应用、peer-to-peer场景大量数据持续传输的应用或者服务器通讯相关的多种应用场景。
在分布式系统各个节点之间需要远程服务调用,Netty往往作为基础的通讯组件被这些框架使用。如常见的阿里分布式Dubbo、RockerMQ、ElasticSearch、gRpc、Spark等都用到了Netty来进行通讯。
二、BIO、NIO、AIO区别是什么?
(摘自Guide哥)
- BIO (Blocking I/O): 同步阻塞 I/O 模式,数据的读取写入必须阻塞在一个线程内等待其完成。在客户端连接数量不高的情况下,是没问题的。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。
- NIO (Non-blocking/New I/O): NIO 是一种同步非阻塞的 I/O 模型,于 Java 1.4 中引入,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的
Socket
和ServerSocket
相对应的SocketChannel
和ServerSocketChannel
两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。 - AIO (Asynchronous I/O): AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的 IO 模型。异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。AIO 是异步 IO 的缩写,虽然 NIO 在网络操作中,提供了非阻塞的方法,但是 NIO 的 IO 行为还是同步的。对于 NIO 来说,我们的业务线程是在 IO 操作准备好时,得到通知,接着就由这个线程自行进行 IO 操作,IO 操作本身是同步的。查阅网上相关资料,我发现就目前来说 AIO 的应用还不是很广泛,Netty 之前也尝试使用过 AIO,不过又放弃了。
三、Netty由哪几部分构成?
(摘自W3Cschool)
Channel
Channel是 NIO 基本的结构。它代表了一个用于连接到实体如硬件设备、文件、网络套接字或程序组件,能够执行一个或多个不同的 I/O 操作(例如读或写)的开放连接。
现在,把 Channel 想象成一个可以“打开”或“关闭”,“连接”或“断开”和作为传入和传出数据的运输工具。
Callback (回调)
callback (回调)是一个简单的方法,提供给另一种方法作为引用,这样后者就可以在某个合适的时间调用前者。这种技术被广泛使用在各种编程的情况下,最常见的方法之一通知给其他人操作已完成。
Netty 内部使用回调处理事件时。一旦这样的回调被触发,事件可以由接口
ChannelHandler
的实现来处理。如下面的代码,一旦一个新的连接建立了,调用channelActive()
,并将打印一条消息。public class ConnectHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client " + ctx.channel().remoteAddress() + " connected"); } }
Future
Future 提供了另外一种通知应用操作已经完成的方式。这个对象作为一个异步操作结果的占位符,它将在将来的某个时候完成并提供结果。
JDK 附带接口 java.util.concurrent.Future ,但所提供的实现只允许您手动检查操作是否完成或阻塞。这是很麻烦的,所以 Netty 提供自己了的实现
ChannelFuture
,用于在执行异步操作时使用。ChannelFuture
提供多个附件方法来允许一个或者多个ChannelFutureListener
实例。这个回调方法 operationComplete() 会在操作完成时调用。事件监听者能够确认这个操作是否成功或者是错误。如果是后者,我们可以检索到产生的 Throwable。简而言之,ChannelFutureListener
提供的通知机制不需要手动检查操作是否完成的。每个 Netty 的 outbound I/O 操作都会返回一个
ChannelFuture
,这样就不会阻塞。这就是 Netty 所谓的“自底向上的异步和事件驱动”。下面例子简单的演示了作为 I/O 操作的一部分
ChannelFuture
的返回。当调用 connect() 将会直接是非阻塞的,并且调用在背后完成。由于线程是非阻塞的,所以无需等待操作完成,而可以去干其他事,因此这令资源利用更高效。Channel channel = ...; //不会阻塞 ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
下面代码描述了如何利用
ChannelFutureListener
。首先,连接到远程地址。接着,通过ChannelFuture
调用 connect() 来 注册一个新ChannelFutureListener
。当监听器被通知连接完成,我们检查状态。如果是成功,就写数据到 Channel,否则我们检索ChannelFuture
中的Throwable。注意,错误的处理取决于你的项目。当然,特定的错误是需要加以约束的。例如,在连接失败的情况下你可以尝试连接到另一个。
Channel channel = ...; //不会阻塞 ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { if (future.isSuccess()) { ByteBuf buffer = Unpooled.copiedBuffer("Hello", Charset.defaultCharset()); ChannelFuture wf = future.channel().writeAndFlush(buffer); // ... } else { Throwable cause = future.cause(); cause.printStackTrace(); } } });
异步连接到远程对等节点。调用立即返回并提供 ChannelFuture。
操作完成后通知注册一个 ChannelFutureListener 。
当 operationComplete() 调用时检查操作的状态。
如果成功就创建一个 ByteBuf 来保存数据。
异步发送数据到远程。再次返回ChannelFuture。
如果有一个错误则抛出 Throwable,描述错误原因。
Event 和 Handler
Netty 使用不同的事件来通知我们更改的状态或操作的状态。这使我们能够根据发生的事件触发适当的行为。
这些行为可能包括:
- 日志/数据转换
- 流控制
- 应用程序逻辑
由于 Netty 是一个网络框架,事件很清晰的跟入站或出站数据流相关。因为一些事件可能触发传入的数据或状态的变化包括:
- 活动或非活动连接
- 数据的读取
- 用户事件
- 错误
出站事件是由于在未来操作将触发一个动作。这些包括:
- 打开或关闭一个连接到远程
- 写或冲刷数据到 socket
每个事件都可以分配给用户实现处理程序类的方法。这说明了事件驱动的范例可直接转换为应用程序构建块。
下图显示了一个事件可以由一连串的事件处理器来处理:
Netty 的
ChannelHandler
是各种处理程序的基本抽象。想象下,每个处理器实例就是一个回调,用于执行对各种事件的响应。在此基础之上,Netty 也提供了一组丰富的预定义的处理程序,您可以开箱即用。比如,各种协议的编解码器包括 HTTP 和 SSL/TLS。在内部,
ChannelHandler
使用事件和future
本身,创建具有 Netty 特性抽象的消费者。EventLoop和EventLoopGroup
**EventLoop(事件循环)**定义了Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。io.netty.util.concurrent包构建在JDK 的java.util.concurrent包上。一个EventLoop将由一个永远都不会改变的Thread驱动,同时任务(Runnable或者Callable)可以直接提交给
EventLoop
实现,以立即执行或者调度执行。它运行在一个循环中,直到它停止。网络框架需要需要在一个循环中为一个特定的连接运行事件:
- 阻塞直到事件可以运行
- 循环所有事件,并运行他们
while (!terminated) { List<Runnable> readyEvents = blockUntilEventsReady(); for (Runnable ev: readyEvents) { ev.run(); } }
根据配置和可用核心的不同,可能会创建多个
EventLoop
实例用以优化资源的使用,并且单个EventLoop
可能会被指派用于服务多个Channel。**那到底 EventLoop、EventLoopGroup和Channel是什么关系呢?**在Guide哥的 小专栏 我找到了这张图:
Netty的
EventLoop
在继承了 ScheduledExecutorService的同时,只定义了一个方法,parent()。在Netty 4 中,所有的I/O操作和事件都由已经被分配给了EventLoop
的那个Thread来处理。任务调度:
偶尔,你将需要调度一个任务以便稍后(延迟)执行或者周期性地执行。例如,你可能想要注册一个在客户端已经连接了5 分钟之后触发的任务。一个常见的用例是,发送心跳消息到远程节点,以检查连接是否仍然还活着。如果没有响应,你便知道可以关闭该Channel 了。 在内部,当提交任务到如果(当前)调用线程正是支撑
EventLoop
的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop
将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop
下次处理它的事件时,它会执行队列中的那些任务/事件。Netty 线程模型的内部
Netty 的内部实现使其线程模型表现优异,它会检查正在执行的 Thread 是否是已分配给实际 Channel (和
EventLoop
),在 Channel 的生命周期内,EventLoop
负责处理所有的事件。如果 Thread 是相同的
EventLoop
中的一个,讨论的代码块被执行;如果线程不同,它安排一个任务并在一个内部队列后执行。通常是通过EventLoop
的 Channel 只执行一次下一个事件,这允许直接从任何线程与通道交互,同时还确保所有的 ChannelHandler 是线程安全,不需要担心并发访问问题。下图显示在 EventLoop 中调度任务执行逻辑,这适合 Netty 的线程模型:
- 应在 EventLoop 中执行的任务
- 任务传递到执行方法后,执行检查来检测调用线程是否是与分配给 EventLoop 是一样的
- 线程是一样的,说明你在 EventLoop 里,这意味着可以直接执行的任务
- 线程与 EventLoop 分配的不一样。当 EventLoop 事件执行时,队列的任务再次执行一次
设计是非常重要的,以确保不要把任何长时间运行的任务放在执行队列中,因为长时间运行的任务会阻止其他在相同线程上执行的任务。这多少会影响整个系统依赖于 EventLoop 实现用于特殊传输的实现。
传输之间的切换在你的代码库中可能没有任何改变,重要的是:切勿阻塞 I/O 线程。如果你必须做阻塞调用(或执行需要长时间才能完成的任务),请使用 EventExecutor。
利用EventLoop实现调度任务执行
使用
ScheduledExecutorService
工作的很好,但是有局限性,比如在一个额外的线程中执行任务。如果需要执行很多任务,资源使用就会很严重;对于像 Netty 这样的高性能的网络框架来说,严重的资源使用是不能接受的。Netty 对这个问题提供了很好的方法。Netty 允许使用
EventLoop
调度任务分配到通道,如下面代码:Channel ch = null; // Get reference to channel ScheduledFuture<?> future = ch.eventLoop().schedule( new Runnable() { @Override public void run() { System.out.println("Now its 60 seconds later"); } }, 60, TimeUnit.SECONDS);//调度任务60秒后运行
如果想任务每隔多少秒执行一次,看下面代码:
Channel ch = null; // Get reference to channel ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate( new Runnable() { @Override public void run() { System.out.println("Run every 60 seconds"); } }, 60, 60, TimeUnit.SECONDS);//调度任务60秒运行一次
取消操作,可以使用 ScheduledFuture 返回每个异步操作。 ScheduledFuture 提供一个方法用于取消一个调度了的任务或者检查它的状态。一个简单的取消操作如下:
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(..); // Some other code that runs... future.cancel(false);
Bootstrap 和 ServerBootstrap(启动引导类)
Bootstrap
是客户端的启动引导类/辅助类,Bootstrap 类负责创建管道给客户或应用程序,利用无连接协议和在调用 bind() 或 connect() 之后。当 bind() 调用时,Bootstrap 将创建一个新的管道, 当 connect() 调用在 Channel 来建立连接
Bootstrap 将创建一个新的管道, 当 connect() 调用时
新的 Channel
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); //1 bootstrap.group(group) //2 .channel(NioSocketChannel.class) //3 .handler(new SimpleChannelInboundHandler<ByteBuf>() { //4 @Override protected void channeRead0( ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { System.out.println("Received data"); byteBuf.clear(); } }); ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));//5 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("Connection established"); } else { System.err.println("Connection attempt failed"); channelFuture.cause().printStackTrace(); } } });
- 创建一个新的 Bootstrap 来创建和连接到新的客户端管道
- 指定 EventLoopGroup
- 指定 Channel 实现来使用
- 设置处理器给 Channel 的事件和数据
- 连接到远端主机