您当前的位置: 首页 > 

Charge8

暂无认证

  • 3浏览

    0关注

    447博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty常用组件一

Charge8 发布时间:2022-04-28 14:58:32 ,浏览量:3

一、EventLoopGroup和EventLoop

在这里插入图片描述

1、EventLoopGroup接口

EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。

EventLoopGroup 是一组 EventLoop 的抽象,一个 EventLoopGroup 当中会包含一个或多个 EventLoop。EventLoopGroup 提供 next 接口,可以从一组 EventLoop 里面按照一定规则获取其中一个 EventLoop 来处理任务。 在这里插入图片描述 常用的实现类:

  • NioEventLoopGroup:异步非阻塞
  • OioEventLoopGroup:同步阻塞
2、EventLoop接口

在 NIO 中,通常我们都是在一个 while 循环中,从 Selector选择器中 select 出事件,然后依次处理每一种事件。而 Netty提供了一个 EventLoop接口。

io.netty.channel. EventLoop 接口定义了 Netty 的核心抽象,用于处理网络连接的生命周期中所发生的事件。

  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
  • 所有有 EnventLoop 处理的 I/O 事件都将在它专有的
  • Thread 上被处理; 每一个 EventLoop 负责处理一个或多个 Channel;
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop;

常用的实现类:

二、Channel

Channel 和 EventLoop 关系:

  • Channel 需要被注册到某个 EventLoop 上,在 Channel 整个生命周期内都由这个 EventLoop处理IO事件。即:一个 Channel在它的生命周期内只与一个 EventLoop进行绑定。
  • 一个 EventLoop可以同时被多个 Channel绑定。
1、Channel 接口

在基于 Java 的网络编程中,其基本的 I/O 操作(bind()、connect()、read()和 write())使用的是 Socket。

Netty 提供了 Channel 接口,提供了所有的 I/O 操作的 API。大大地降低了直接使用 Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。

由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为 java.lang.Comparable 的一个子接口。

在这里插入图片描述

2、Channel 常见方法:
  • eventLoop: 返回分配给 Channel 的 EventLoop
  • pipeline: 返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的ChannelPipeline。
  • isActive: 如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。 例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的。
  • localAddress: 返回本地的 SokcetAddress
  • remoteAddress: 返回远程的 SocketAddress
  • write: 将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正写往 socket。
  • flush: 将之前已写的数据冲刷到底层 socket 进行传输。
  • writeAndFlush: 一个简便的方法,等同于调用 write()并接着调用 flush()
三、ChannelPipeline

在这里插入图片描述

1、ChannelPipeline 接口

当 Channel 被创建时,它将会被自动地分配一个新的 ChannelPipeline,每个 Channel 都有自己的 ChannelPipeline。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站(从网络到业务处理)和 出站(从业务处理到网络),各种事件流的 API,我们代码中的 ChannelHandler 都是放在 ChannelPipeline 中的。

使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些 ChannelHandler 对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler,而且 ChannelHandler 对象也完全可以拦截事件不让事件继续传递。它们的执行顺序是由它们被添加的顺序所决定的。

2、ChannelHandler 的生命周期

在Netty中,网络连接的不同生命周期都可以通过回调的方式来绑定相应的逻辑,这个回调接口就是ChannelHandler。

这里主要看一下 ChannelInboundHandlerAdapter类: 在这里插入图片描述

  • channelUnregistered :channel取消注册事件,channel 已经被创建,但还未注册到 EventLoop
  • channelRegistered:channel注册事件,channel 已经被注册到了 EventLoop
  • channelActive :channel是否是活跃事件,channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
  • channelInactive :channel不活跃事件,channel 没有连接到远程节点
  • channelReadComplete:channel读取完毕事件
  • userEventTriggered:channel 用户事件触发事件
  • channelWritabilityChanged:channel 可写更改事件
  • exceptionCaught:channel 捕获到异常事件
  • handlerAdded:channel 助手类(拦截器)的添加事件
  • handlerRemoved:channel 助手类(拦截器)移除事件
  • channelRead/channelRead0:channel读取数据事件

这些方法都接受一个 ChannelHandlerContext 参数

当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。

在我们的开发编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。比如:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("MyServerHandler 连接已建立...");
		super.channelActive(ctx);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端发送过来的消息
		ByteBuf in = (ByteBuf) msg;
		System.out.println("Server Accept Client Context ("+ ctx.channel().remoteAddress() +")消息 ->" + in.toString(CharsetUtil.UTF_8));
		ctx.writeAndFlush(in);

		// ctx.close();
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//发送消息给客户端
		ByteBuf byteBuf = Unpooled.copiedBuffer("Server Receive Client msg", CharsetUtil.UTF_8);
		ctx.writeAndFlush(byteBuf);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		//发生异常,关闭通道
		//cause.printStackTrace();
		ctx.close();
	}
}
3、ChannelPipeline 上的方法

ChannelPipeline 以双向链表的形式进行维护管理 Handler,提供了对应的方法在 ChannelPipeline 中增加或者删除、替换 Handler。 在这里插入图片描述

  • addFirst、addBefore、addAfter、addLast:将一个 ChannelHandler 添加到 ChannelPipeline 中
  • remove:将一个 ChannelHandler 从 ChannelPipeline 中移除
  • replace:将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler
  • get:通过类型或者名称返回 ChannelHandler
  • context: 返回和 ChannelHandler 绑定的 ChannelHandlerContext
  • names:返回 ChannelPipeline 中所有 ChannelHandler 的名称
  • ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。
四、ChannelHandlerContext 1、ChannelHandlerContext 接口

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext。

上面说过 ChannelPipeline 以双向链表的形式进行维护管理 Handler,毫无疑问,Handler 在放入 ChannelPipeline 的时候必须要有两个指针pre 和 next 来说明它的前一个元素和后一个元素。

不过 ChannelHandlerContext 不仅仅只是个包装类,它还提供了很多的方法,比如:让事件从当前 ChannelHandler 传递给链中的下一个 ChannelHandler,还可以被用于获取底层的 Channel,还可以用于写出站数据。

2、Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播

ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和Channel-Pipeline 本身上。 但是有一点重要的不同。

  • 如果调用 Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。
  • 如果调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的 ChannelHandler。
3、ChannelHandlerContext 的方法
  • alloc:返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator
  • bind:绑定到给定的 SocketAddress,并返回 ChannelFuture
  • channel:返回绑定到这个实例的 Channel
  • close:关闭 Channel,并返回 ChannelFuture
  • connect:连接给定的 SocketAddress,并返回 ChannelFuture
  • deregister:从之前分配的 EventExecutor 注销,并返回 ChannelFuture
  • disconnect:从远程节点断开,并返回 ChannelFuture
  • executor:返回调度事件的 EventExecutor
  • fireChannelActive:触发对下一个 ChannelInboundHandler 上的 channelActive()方法(已连接)的调用
  • fireChannelInactive:触发对下一个 ChannelInboundHandler 上的 channelInactive()方法(已关闭)的调用
  • fireChannelRead:触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接收的消息)的调用
  • fireChannelReadComplete:触发对下一个 ChannelInboundHandler 上的channelReadComplete()方法的调用
  • fireChannelRegistered:触发对下一个 ChannelInboundHandler 上的fireChannelRegistered()方法的调用
  • fireChannelUnregistered:触发对下一个 ChannelInboundHandler 上的fireChannelUnregistered()方法的调用
  • fireChannelWritabilityChanged:触发对下一个 ChannelInboundHandler 上的fireChannelWritabilityChanged()方法的调用
  • fireExceptionCaught:触发对下一个 ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用
  • fireUserEventTriggered:触发对下一个 ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用
  • handler:返回绑定到这个实例的 ChannelHandler
  • isRemoved:如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true
  • name:返回这个实例的唯一名称
  • pipeline:返回这个实例所关联的 ChannelPipeline
  • read:将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的channelReadComplete(ctx)方法
  • write:通过这个实例写入消息并经过 ChannelPipeline
  • writeAndFlush:通过这个实例写入并冲刷消息并经过 ChannelPipeline当使用 ChannelHandlerContext 的 API 的时候,有以下两点:
    • ChannelHandlerContext 和ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
    • 相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。
五、ChannelOption类

ChannelOption类的各种属性在套接字选项中都有对应。

下面看几个常用的:

  • ChannelOption.SO_BACKLOG SO_BACKLOG (Socket参数),服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128
  • ChannelOption.SO_REUSEADDR ChanneOption.SO_REUSEADDR 对应于套接字选项中的 SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口。
  • ChannelOption.SO_KEEPALIVE SO_KEEPALIVE (Socket参数),连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。
  • ChannelOption.SO_RCVBUF SO_RCVBUF(Socket参数),TCP数据接收缓冲区大小。
  • ChannelOption.SO_LINGER ChannelOption.SO_LINGER 参数对应于套接字选项中的 SO_LINGER,Linux 内核默认的处理方式是当用户调用 close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用 SO_LINGER 可以阻塞 close()的调用时间,直到数据完全发送
  • ChannelOption.TCP_NODELAY TCP_NODELAY (TCP参数),立即发送数据,默认值为Ture。
        //创建服务端的启动对象,设置参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        //设置两个线程组boosGroup和workerGroup
        bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型
                .channel(NioServerSocketChannel.class)
                //设置线程队列得到连接个数
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置保持活动连接状态
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //使用匿名内部类的形式初始化通道对象
                .childHandler(new ChannelInitializer() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //给pipeline管道设置处理器
                        socketChannel.pipeline().addLast(new MyServerHandler());
                    }
                });
六、ChannelFuture接口

一般在Socket编程中,等待响应结果都是同步阻塞的,而Netty 中所有的 I/O 操作都是异步的,不会造成阻塞,为此,Netty 提供了 ChannelFuture 接口, 其 addListener()方法,,表示注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。

ChannelFuture提供操作完成时一种异步通知的方式。它究竟什么时候被执行则可能取决于若干的因素,因此不可能准确地预测,但是可以肯定的是它将会被执行。

比如:客户端连接服务端时,我们添加一个监听器:

    //连接服务端
    ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
    //客户端连接服务端时,我们添加一个监听器
    channelFuture.addListener(new ChannelFutureListener() {
        //使用匿名内部类,ChannelFutureListener接口
        //重写operationComplete方法
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if(channelFuture.isSuccess()){
                System.out.println("客户端连接服务器成功");
            }else{
                System.out.println("客户端连接服务器失败 ->" + channelFuture.toString());
            }
        }
    });    

参考文章:

  • netty组件—ChannelPipeline 和 ChannelHandlerContext:https://blog.csdn.net/qq_31391225/article/details/109512957

– 求知若饥,虚心若愚。

关注
打赏
1664721914
查看更多评论
立即登录/注册

微信扫码登录

0.0390s