您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 3浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-IdleStateHandler

恐龙弟旺仔 发布时间:2021-12-30 19:45:48 ,浏览量:3

前言:

心跳机制广泛运用在我们的应用平台中。对于连接到应用服务的客户端,服务端有必要对长时间没有请求的客户端连接进行清理,以避免连接过多。这就需要服务端有空闲连接检测机制。

而针对客户端而言,如果长时间未请求数据,为避免被服务端清理连接,就需要间歇性的发送心跳请求。

在Netty中,针对以上需求,已经有现成的Handler可供使用,这就是本文要介绍的IdleStateHandler。

1.IdleStateHandler的使用 1.1 服务端检测长时间未请求的客户端

我们在服务端可以使用IdleStateHandler来检测长时间未发送请求的客户端,对其进行清理操作,简单示例如下:

// 还是使用HelloServer的示例,我们在ChannelInitializer中添加IdleStateHandler
.childHandler(new ChannelInitializer() {
  protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    // 在这里添加IdleStateHandler,设置空闲检测时间为10秒
    pipeline.addLast("idle", new IdleStateHandler(10, 10, 10));
    // 针对空闲事件的处理(自定义),具体内容如下
    pipeline.addLast("idledeal", new IdleEventHandler());
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());
    pipeline.addLast("handler", new HelloServerHandler());
}
});

// IdleEventHandler
public class IdleEventHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            evt = (IdleStateEvent) evt;

            // 若检测到长时间未读到请求事件,则清理客户端连接
            if (evt.equals(IdleStateEvent.READER_IDLE_STATE_EVENT)) {
                System.out.println("idle...");
                ctx.channel().close();
            }else {
                // 其他事件
                // TODO
            }
        }
    }
}

在本例中,我们设置了IdleStateHandler的读空闲检测时间为10s,则客户端连接10s没有发送任何请求过来时,则发送一个IdleStateEvent.READER_IDLE_STATE_EVENT事件到下游,IdleEventHandler处理该事件,直接关闭客户端连接。

1.2 客户端发送心跳请求

若客户端本身检测到长时间未发送请求,为避免被服务端清理,则可以主动发送一个心跳请求。简单示例如下

// 同样的使用HelloClient的代码,我们改造下ChannelInitializer
.handler(new ChannelInitializer() {
  protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    ...
    // 在这里添加IdleStateHandler检测
    pipeline.addLast("idle", new IdleStateHandler(5, 5, 5));
    // 空闲事件处理Handler(自定义),具体内容如下
    pipeline.addLast("idledeal", new ClientIdleEventHandler());
    pipeline.addLast("handler", new HelloClientHandler());
}
});

// ClientIdleEventHandler
public class ClientIdleEventHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            evt = (IdleStateEvent) evt;

            // 若检测到长时间未发送请求事件,则主动发送心跳信息
            if (evt.equals(IdleStateEvent.WRITER_IDLE_STATE_EVENT)) {
                ctx.writeAndFlush("ping");
            }else {
                // 其他事件
                // TODO
            }
        }
    }
}

这样,当客户端发现已经5s没有发送过请求时,则主动发送一个ping心跳信息到服务端,避免被清理

2.IdleStateHandler的构造

我们首先来看下IdleStateHandler的相关构造方法和基本属性

public class IdleStateHandler extends ChannelDuplexHandler {
    
    // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
    // write监听器
    private final ChannelFutureListener writeListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            lastWriteTime = ticksInNanos();
            firstWriterIdleEvent = firstAllIdleEvent = true;
        }
    };

    private final boolean observeOutput;
	// 三种类型的空闲时间设置
    private final long readerIdleTimeNanos;
    private final long writerIdleTimeNanos;
    private final long allIdleTimeNanos;

	// 读空闲检测定时任务
    private ScheduledFuture readerIdleTimeout;
	// 最近一次读事件
    private long lastReadTime;
	// 是否第一次读idleEvent触发
    private boolean firstReaderIdleEvent = true;

	// 以下与读设置类似
    private ScheduledFuture writerIdleTimeout;
    private long lastWriteTime;
    private boolean firstWriterIdleEvent = true;

    private ScheduledFuture allIdleTimeout;
    private boolean firstAllIdleEvent = true;

	// IdleStateHandler的状态,避免多次初始化
    private byte state; // 0 - none, 1 - initialized, 2 - destroyed
    private boolean reading;

    private long lastChangeCheckTimeStamp;
    private int lastMessageHashCode;
    private long lastPendingWriteBytes;
    private long lastFlushProgress;

	// 默认使用的构造器
	public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        // 默认单位为秒
        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

    public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

	public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");

        this.observeOutput = observeOutput;

        // 以纳秒为单位重新设置超时时间
        if (readerIdleTime  0) {
            // 创建一个读、写空闲检测定时任务,延后readerIdleTimeNanos执行
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                                      allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }
    
    //  创建一个定时任务
    ScheduledFuture schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        return ctx.executor().schedule(task, delay, unit);
    }
}

初始化方法,主要用于初始化三个定时任务,那么这三个定时任务ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask具体是怎么用的呢?我们先来看下读空闲检测是如何做的。

3.2 读空闲检测
public class IdleStateHandler extends ChannelDuplexHandler {
	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 当设置的readerIdleTimeNanos或allIdleTimeNanos大于0时,说明需要进行读空闲检测
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            // 设置reading 正在读数据的状态为true
            reading = true;
            // 设置两个状态为为true
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    // 重点在这来,如果本次读已经结束,则需要重置时间和状态位
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 如果数据正在读状态
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            // 则重新设置lastReadTime为当前时间
            lastReadTime = ticksInNanos();
            // 将正在读标志设置为false
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }
}

通过上述两个方法可以看出,当发生读事件时,设置reading=true,当本次读结束时,则设置reading=false,lastReadTime(最近一次读时间)为当前时间。

那么这个是如何被检测到读超时的呢?我们可以回到initialize()方法,其中有一个ReaderIdleTimeoutTask的定时任务,延迟readerIdleTimeNanos执行,一起来看下这个task的具体内容

3.2.1 ReaderIdleTimeoutTask 读空闲检测定时任务

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

    // 将ChannelHandlerContext传入当前task
    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = readerIdleTimeNanos;
        // 如果当前没有发生读事件,则reading为false
        if (!reading) {
            nextDelay -= ticksInNanos() - lastReadTime;
        }

        // 读空闲超时,需要发送READER_IDLE事件
        if (nextDelay  readerIdleTimeNanos(读空闲检测时间),说明读空闲超时,往下游发送一个READER_IDLE Event

reading=false的情况有两种:没有发生过读、读已经结束;当读数据正在进行时,则reading=true

总结:通过这种对lastReadTime的定时任务检测,就可以发现是否已经长时间未读,若是,则发送下游READER_IDLE事件,下游检测到该事件进行相应处理即可。

3.3 写空闲检测

分析过程与3.2 读空闲检测类似,我们先来看下重写后的write方法

public class IdleStateHandler extends ChannelDuplexHandler {
	@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 如果writerIdleTimeNanos或allIdleTimeNanos大于0,说明需要进行写空闲检测
        if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            // 对write方法执行后的ChannelFuture添加监听器,writeListener内容如下
            ctx.write(msg, promise.unvoid()).addListener(writeListener);
        } else {
            ctx.write(msg, promise);
        }
    }
    
    private final ChannelFutureListener writeListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // 当写方法完成时,设置最新一次写时间为当前时间
            lastWriteTime = ticksInNanos();
            firstWriterIdleEvent = firstAllIdleEvent = true;
        }
    };
}

方法并不复杂,主要就是对write方法添加一个监听器,用于监听wirte方法完成,完成后重置下lastWriteTime。下面来看下WriteTask所做的事情

3.3.1 WriterIdleTimeoutTask 写空闲检测

private final class WriterIdleTimeoutTask extends AbstractIdleTask {

    // 将ChannelHandlerContext传入当前task
    WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {
        long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        // 同样的方式来计算nextDelay
        long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        // 已超时
        if (nextDelay             
关注
打赏
1655041699
查看更多评论
0.0421s