EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。
EventLoopGroup 是一组 EventLoop 的抽象,一个 EventLoopGroup 当中会包含一个或多个 EventLoop。EventLoopGroup 提供 next 接口,可以从一组 EventLoop 里面按照一定规则获取其中一个 EventLoop 来处理任务。 常用的实现类:
- NioEventLoopGroup:异步非阻塞
- OioEventLoopGroup:同步阻塞
在 NIO 中,通常我们都是在一个 while 循环中,从 Selector选择器中 select 出事件,然后依次处理每一种事件。而 Netty提供了一个 EventLoop接口。
io.netty.channel. EventLoop 接口定义了 Netty 的核心抽象,用于处理网络连接的生命周期中所发生的事件。
- 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
- 所有有 EnventLoop 处理的 I/O 事件都将在它专有的
- Thread 上被处理; 每一个 EventLoop 负责处理一个或多个 Channel;
- 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
Channel 和 EventLoop 关系:
- Channel 需要被注册到某个 EventLoop 上,在 Channel 整个生命周期内都由这个 EventLoop处理IO事件。即:
一个 Channel在它的生命周期内只与一个 EventLoop进行绑定。
- 一个 EventLoop可以同时被多个 Channel绑定。
在基于 Java 的网络编程中,其基本的 I/O 操作(bind()、connect()、read()和 write())使用的是 Socket。
Netty 提供了 Channel 接口,提供了所有的 I/O 操作的 API。大大地降低了直接使用 Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。
由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。
eventLoop
: 返回分配给 Channel 的 EventLooppipeline
: 返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的ChannelPipeline。- isActive: 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的。
- localAddress: 返回本地的 SokcetAddress
- remoteAddress: 返回远程的 SocketAddress
- write: 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正写往 socket。
- flush: 将之前已写的数据冲刷到底层 socket 进行传输。
writeAndFlush
: 一个简便的方法,等同于调用 write()并接着调用 flush()
当 Channel 被创建时,它将会被自动地分配一个新的 ChannelPipeline,每个 Channel 都有自己的 ChannelPipeline。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站(从网络到业务处理)和 出站(从业务处理到网络)
,各种事件流的 API,我们代码中的 ChannelHandler 都是放在 ChannelPipeline 中的。
使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些 ChannelHandler 对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler,而且 ChannelHandler 对象也完全可以拦截事件不让事件继续传递。它们的执行顺序是由它们被添加的顺序所决定的。
2、ChannelHandler 的生命周期在Netty中,网络连接的不同生命周期都可以通过回调的方式来绑定相应的逻辑,这个回调接口就是ChannelHandler。
这里主要看一下 ChannelInboundHandlerAdapter类:
- channelUnregistered :channel取消注册事件,channel 已经被创建,但还未注册到 EventLoop
channelRegistered
:channel注册事件,channel 已经被注册到了 EventLoop- channelActive :channel是否是活跃事件,channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
- channelInactive :channel不活跃事件,channel 没有连接到远程节点
- channelReadComplete:channel读取完毕事件
- userEventTriggered:channel 用户事件触发事件
- channelWritabilityChanged:channel 可写更改事件
- exceptionCaught:channel 捕获到异常事件
- handlerAdded:channel 助手类(拦截器)的添加事件
- handlerRemoved:channel 助手类(拦截器)移除事件
channelRead/channelRead0
:channel读取数据事件
这些方法都接受一个 ChannelHandlerContext 参数
。
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。
在我们的开发编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。比如:
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyServerHandler 连接已建立...");
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取客户端发送过来的消息
ByteBuf in = (ByteBuf) msg;
System.out.println("Server Accept Client Context ("+ ctx.channel().remoteAddress() +")消息 ->" + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(in);
// ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ByteBuf byteBuf = Unpooled.copiedBuffer("Server Receive Client msg", CharsetUtil.UTF_8);
ctx.writeAndFlush(byteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常,关闭通道
//cause.printStackTrace();
ctx.close();
}
}
3、ChannelPipeline 上的方法
ChannelPipeline 以双向链表的形式进行维护管理 Handler,提供了对应的方法在 ChannelPipeline 中增加或者删除、替换 Handler。
- addFirst、addBefore、addAfter、addLast:将一个 ChannelHandler 添加到 ChannelPipeline 中
- remove:将一个 ChannelHandler 从 ChannelPipeline 中移除
- replace:将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
- get:通过类型或者名称返回 ChannelHandler
- context: 返回和 ChannelHandler 绑定的 ChannelHandlerContext
- names:返回 ChannelPipeline 中所有 ChannelHandler 的名称
- ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext。
上面说过 ChannelPipeline 以双向链表的形式进行维护管理 Handler,毫无疑问,Handler 在放入 ChannelPipeline 的时候必须要有两个指针pre 和 next 来说明它的前一个元素和后一个元素。
不过 ChannelHandlerContext 不仅仅只是个包装类,它还提供了很多的方法,比如:让事件从当前 ChannelHandler 传递给链中的下一个 ChannelHandler,还可以被用于获取底层的 Channel,还可以用于写出站数据。
2、Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和Channel-Pipeline 本身上。 但是有一点重要的不同。
- 如果调用 Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。
- 如果调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的 ChannelHandler。
- alloc:返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator
- bind:绑定到给定的 SocketAddress,并返回 ChannelFuture
- channel:返回绑定到这个实例的 Channel
- close:关闭 Channel,并返回 ChannelFuture
- connect:连接给定的 SocketAddress,并返回 ChannelFuture
- deregister:从之前分配的 EventExecutor 注销,并返回 ChannelFuture
- disconnect:从远程节点断开,并返回 ChannelFuture
- executor:返回调度事件的 EventExecutor
- fireChannelActive:触发对下一个 ChannelInboundHandler 上的 channelActive()方法(已连接)的调用
- fireChannelInactive:触发对下一个 ChannelInboundHandler 上的 channelInactive()方法(已关闭)的调用
- fireChannelRead:触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接收的消息)的调用
- fireChannelReadComplete:触发对下一个 ChannelInboundHandler 上的channelReadComplete()方法的调用
- fireChannelRegistered:触发对下一个 ChannelInboundHandler 上的fireChannelRegistered()方法的调用
- fireChannelUnregistered:触发对下一个 ChannelInboundHandler 上的fireChannelUnregistered()方法的调用
- fireChannelWritabilityChanged:触发对下一个 ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用
- fireExceptionCaught:触发对下一个 ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用
- fireUserEventTriggered:触发对下一个 ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用
- handler:返回绑定到这个实例的 ChannelHandler
- isRemoved:如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true
- name:返回这个实例的唯一名称
- pipeline:返回这个实例所关联的 ChannelPipeline
- read:将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的channelReadComplete(ctx)方法
- write:通过这个实例写入消息并经过 ChannelPipeline
- writeAndFlush:通过这个实例写入并冲刷消息并经过 ChannelPipeline当使用 ChannelHandlerContext 的 API 的时候,有以下两点:
- ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
- 相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。
ChannelOption类的各种属性在套接字选项中都有对应。
下面看几个常用的:
- ChannelOption.SO_BACKLOG SO_BACKLOG (Socket参数),服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128
- ChannelOption.SO_REUSEADDR ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。
- ChannelOption.SO_KEEPALIVE SO_KEEPALIVE (Socket参数),连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
- ChannelOption.SO_RCVBUF SO_RCVBUF(Socket参数),TCP数据接收缓冲区大小。
- ChannelOption.SO_LINGER ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞 close()的调用时间,直到数据完全发送
- ChannelOption.TCP_NODELAY TCP_NODELAY (TCP参数),立即发送数据,默认值为Ture。
//创建服务端的启动对象,设置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组boosGroup和workerGroup
bootstrap.group(bossGroup, workerGroup)
//设置服务端通道实现类型
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//使用匿名内部类的形式初始化通道对象
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//给pipeline管道设置处理器
socketChannel.pipeline().addLast(new MyServerHandler());
}
});
六、ChannelFuture接口
一般在Socket编程中,等待响应结果都是同步阻塞的,而Netty 中所有的 I/O 操作都是异步的,不会造成阻塞,为此,Netty 提供了 ChannelFuture 接口, 其 addListener()方法,,表示注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
ChannelFuture提供操作完成时一种异步通知的方式。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯定的是它将会被执行。
比如:客户端连接服务端时,我们添加一个监听器:
//连接服务端
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//客户端连接服务端时,我们添加一个监听器
channelFuture.addListener(new ChannelFutureListener() {
//使用匿名内部类,ChannelFutureListener接口
//重写operationComplete方法
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("客户端连接服务器成功");
}else{
System.out.println("客户端连接服务器失败 ->" + channelFuture.toString());
}
}
});
参考文章:
- netty组件—ChannelPipeline 和 ChannelHandlerContext:https://blog.csdn.net/qq_31391225/article/details/109512957
– 求知若饥,虚心若愚。