您当前的位置: 首页 > 

庄小焱

暂无认证

  • 3浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty——心跳服务IdleStateHandler源码分析

庄小焱 发布时间:2021-11-13 16:17:19 ,浏览量:3

摘要

心跳说的是在客户端和服务端在互相建立ESTABLISH状态的时候,如何通过发送一个最简单的包来保持连接的存活,还有监控另一边服务的可用性等。本文主要是研究netty的心跳机制的原理。

这里有两个问题:

 在源码中的没有IdleStateHandler的原因是因为什么?为什么会被删除这个类??

心跳服务的作用
  • 保活 Q:为什么说心跳机制能保持连接的存活,它是集群中或长连接中最为有效避免网络中断的一个重要的保障措施? A:之所以说是“避免网络中断的一个重要保障措施”,原因是:我们得知公网IP是一个宝贵的资源,一旦某一连接长时间的占用并且不发数据,这怎能对得起网络给此连接分配公网IP,这简直是对网络资源最大的浪费,所以基本上所有的NAT路由器都会定时的清除那些长时间没有数据传输的映射表项。一是回收IP资源,二是释放NAT路由器本身内存的资源,这样问题就来了,连接被从中间断开了,双发还都不晓得对方已经连通不了了,还会继续发数据,这样会有两个结果:a) 发方会收到NAT路由器的RST包,导致发方知道连接已中断;b) 发方没有收到任何NAT的回执,NAT只是简单的drop相应的数据包 通常我们测试得出的是第二种情况会多些,就是客户端是不知道自己应经连接断开了,所以这时候心跳就可以和NAT建立关联了,只要我们在NAT认为合理连接的时间内发送心跳数据包,这样NAT会继续keep连接的IP映射表项不被移除,达到了连接不会被中断的目的。

  • 检测另一端服务是否可用 TCP的断开可能有时候是不能瞬时探知的,甚至是不能探知的,也可能有很长时间的延迟,如果前端没有正常的断开TCP连接,四次握手没有发起,服务端无从得知客户端的掉线,这个时候我们就需要心跳包来检测另一端服务是否还存活可用。

基于TCP的keepalive机制实现

基于TCP的keepalive机制,由具体的TCP协议栈来实现长连接的维持。如在netty中可以在创建channel的时候,指定SO_KEEPALIVE参数来实现:

存在的问题:Netty只能控制SO_KEEPALIVE这个参数,其他参数,则需要从系统的sysctl中读取,其中比较关键的是tcp_keepalive_time,发送心跳包检测的时间间隔,默认为7200s,即空闲后,每2小时检测一次。如果客户端在这2小时内断开了,那么服务端也要维护这个连接2小时,浪费服务端资源;另外就是对于需要实时传输数据的场景,客户端断开了,服务端也要2小时后才能发现。服务端发送心跳检测,具体可能出现的情况如下:

  • 1、连接正常:客户端仍然存在,网络连接状况良好。此时客户端会返回一个 ACK 。 服务端接收到ACK后重置计时器,在2小时后再发送探测。如果2小时内连接上有数据传输,那么在该时间基础上向后推延2个小时;
  • 2、连接断开:客户端异常关闭,或是网络断开。在这两种情况下,客户端都不会响应。服务器没有收到对其发出探测的响应,并且在一定时间(系统默认为 1000 ms )后重复发送 keep-alive packet ,并且重复发送一定次数。
  • 3、客户端曾经崩溃,但已经重启:这种情况下,服务器将会收到对其存活探测的响应,但该响应是一个复位,从而引起服务器对连接的终止。
基于Netty的IdleStateHandler实现 什么是 IdleStateHandler

当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件。

怎么使用IdleStateHandler

IdleStateHandler 既是出站处理器也是入站处理器,继承了 ChannelDuplexHandler 。通常在 initChannel 方法中将 IdleStateHandler 添加到 pipeline 中。然后在自己的 handler 中重写 userEventTriggered 方法,当发生空闲事件(读或者写),就会触发这个方法,并传入具体事件。这时,你可以通过 Context 对象尝试向目标 Socekt 写入数据,并设置一个 监听器,如果发送失败就关闭 Socket (Netty 准备了一个 ChannelFutureListener.CLOSE_ON_FAILURE 监听器用来实现关闭 Socket 逻辑)。这样,就实现了一个简单的心跳服务。

IdleStateHandler源码
private final boolean observeOutput;// 是否考虑出站时较慢的情况。默认值是false(不考虑)。
private final long readerIdleTimeNanos; // 读事件空闲时间,0 则禁用事件
private final long writerIdleTimeNanos;// 写事件空闲时间,0 则禁用事件
private final long allIdleTimeNanos; //读或写空闲时间,0 则禁用事件

构造方法:该类有 3 个构造方法,主要对一下 4 个属性赋值。可以分别控制读,写,读写超时的时间,单位为秒,如果是0表示不检测,所以如果全是0,则相当于没添加这个IdleStateHandler,连接是个普通的短连接。

handlerAdded 方法

IdleStateHandler是在创建IdleStateHandler实例并添加到ChannelPipeline时添加定时任务来进行定时检测的,具体在initialize(ctx)方法实现;同时在从ChannelPipeline移除或Channel关闭时,移除这个定时检测,具体在destroy()实现

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
        this.initialize(ctx);
    }

}

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    this.destroy();
}
initialize 方法
private void initialize(ChannelHandlerContext ctx) {
    switch (state) {
    case 1:
    case 2:
        return;
    }
    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
      // 这里的 schedule 方法会调用 eventLoop 的 schedule 方法,将定时任务添加进队列中
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将 state 状态设置为 1,防止重复初始化。调用。

initOutputChanged 方法

初始化 “监控出站数据属性”,代码如下:

private void initOutputChanged(ChannelHandlerContext ctx) {
    if (observeOutput) {
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 记录了出站缓冲区相关的数据,buf 对象的 hash 码,和 buf 的剩余缓冲字节数
        if (buf != null) {
            lastMessageHashCode = System.identityHashCode(buf.current());
            lastPendingWriteBytes = buf.totalPendingWriteBytes();
        }
    }
}
读事件的 run 方法
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }

    if (nextDelay             
关注
打赏
1657692713
查看更多评论
0.0385s