您当前的位置: 首页 >  pip

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-ChannelPipeline inbound、outbound事件的传播

恐龙弟旺仔 发布时间:2021-12-20 19:39:00 ,浏览量:0

前言:

    接上一篇文章,我们介绍了ChannelPipeline、ChannelHandlerContext、ChannelHandler之间的关联关系。通过类比LinkedList的方式,我们可以将ChannelHandlerContext当做集合中的每一个Node,Node有prev和next属性,指向前和后一个节点;而ChannelPipeline就是集合本身,提供对ChannelHandlerContext的操作。

1.ChannelPipeline中的节点操作相关方法
// 添加类方法
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

// 删除类方法
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
 T remove(Class handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();

    在前一篇中,我们已经对addLast()方法有过分析,比较简单,其他方法与其也基本类似,都是对Node本身的关联操作,笔者不再赘述。

2.ChannelPipeline的触发方法
// inbound类方法 ,从ChannelInboundInvoker接口中可以获取
ChannelPipeline fireChannelRegistered();
ChannelPipeline fireChannelUnregistered();
ChannelPipeline fireChannelActive();
ChannelPipeline fireChannelInactive);
ChannelPipeline fireExceptionCaught(Throwable cause);
ChannelPipeline fireUserEventTriggered(Object event);
ChannelPipeline fireChannelRead(Object msg);
ChannelPipeline fireChannelReadComplete();
ChannelPipeline fireChannelWritabilityChanged();

// outbound类方法 ,从ChannelOutboundInvoker可以获取
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);

触发类的方法我们从两个维度来分析,inbound(入站事件)和outbound(出站事件)。通过方法名我们也能比较明确的区分这两类事件的区别:

入站事件:客户端channel的注册(register)、接收到客户端数据(read)等;

出站事件:当前客户端连接到远程服务(bind、connect)、向远程服务端写数据(write)等;  

3.inbound入站事件分析

    前文中,我们介绍过真正做事的就是ChannelHandler,ChannelHandler的方法执行就是通过上述fire开头的方法来触发的。那么ChannelPipeline中的ChannelHandler类的相关方法究竟是如何执行的呢,下面我们就来分析下一些比较典型的入站事件。

    代码示例如下(HelloServer中的代码):

ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frame", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new HelloClientHandler());

    我们来分析下,当有一个client连接到server时,server完成握手连接后,会调用ChannelPipeline.fireChannelActive()方法时的入站调用过程。

    整个ChannelPipeline的节点如下所示:

 

3.1 ChannelPipeline.fireChannelActive()
public class DefaultChannelPipeline implements ChannelPipeline {
	@Override
    public final ChannelPipeline fireChannelActive() {
        // 直接调用AbstractChannelHandlerContext触发
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
}

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 直接调用AbstractChannelHandlerContext.invokeChannelActive执行
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }
    // 
    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                // 调用对应ChannelHandler.channelActive()执行
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                invokeExceptionCaught(t);
            }
        } else {
            fireChannelActive();
        }
    }
    
}

总结上述调用过程就是:

DefaultChannelPipeline.fireChannelActive()

 -> ChannelHandlerContext.invokeChannelActive()

 -> ChannelHandler.channelActive()

ChannelPipeline的触发方法最终调用到ChannelHandler的相关执行方法

3.2 step1:HeadContext.channelActive()
final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {
	    @Override
        public void channelActive(ChannelHandlerContext ctx) {
        	// 调用ChannelHandlerContext.fireChannelActive方法
            ctx.fireChannelActive();
			// 如果启动auto_read,则读取client数据
            readIfIsAutoRead();
        }
}

// ChannelHandlerContext.fireChannelActive
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelHandlerContext fireChannelActive() {
    	// findContextInbound方法获取下一个合适的节点
        invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
        return this;
    }
    
    // 获取当前ChannelHandlerContext的下一个inbound节点
    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
            // 对于不属于inbound类型事件的节点,则直接跳过,判断下一个节点
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }
    
    private static boolean skipContext(
            AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
        // 这个判断条件我们需要特别关注下,如果当前节点事件不属于inbound类型事件,
        // 且当前ChannelHandler不关注该事件(方法上带有@Skip)则返回true
        // 意味着跳过该ChannelHandler的方法执行,直接执行下一个
        return (ctx.executionMask & (onlyMask | mask)) == 0 ||
                (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
    }
}

HeadContext作为首节点,其channelActive()方法没有特别的,就直接获取下一个节点(属于inbound事件节点),执行下一个可用节点的channelActive方法。

** 那么哪些事件属于inbound类型事件呢?

ChannelHandlerMask中已经有了说明。

final class ChannelHandlerMask {
	static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
            MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
            MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
}

** ChannelHandler在封装成ChannelHandlerContext时,具体的executionMask怎么计算出来的呢?

// ChannelHandlerMask.mask()
final class ChannelHandlerMask {
    static int mask(Class            
关注
打赏
1655041699
查看更多评论
0.0641s