接上一篇文章,我们介绍了ChannelPipeline、ChannelHandlerContext、ChannelHandler之间的关联关系。通过类比LinkedList的方式,我们可以将ChannelHandlerContext当做集合中的每一个Node,Node有prev和next属性,指向前和后一个节点;而ChannelPipeline就是集合本身,提供对ChannelHandlerContext的操作。
1.ChannelPipeline中的节点操作相关方法// 添加类方法
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
// 删除类方法
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
T remove(Class handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
在前一篇中,我们已经对addLast()方法有过分析,比较简单,其他方法与其也基本类似,都是对Node本身的关联操作,笔者不再赘述。
2.ChannelPipeline的触发方法// inbound类方法 ,从ChannelInboundInvoker接口中可以获取
ChannelPipeline fireChannelRegistered();
ChannelPipeline fireChannelUnregistered();
ChannelPipeline fireChannelActive();
ChannelPipeline fireChannelInactive);
ChannelPipeline fireExceptionCaught(Throwable cause);
ChannelPipeline fireUserEventTriggered(Object event);
ChannelPipeline fireChannelRead(Object msg);
ChannelPipeline fireChannelReadComplete();
ChannelPipeline fireChannelWritabilityChanged();
// outbound类方法 ,从ChannelOutboundInvoker可以获取
ChannelFuture bind(SocketAddress localAddress);
ChannelFuture connect(SocketAddress remoteAddress);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
ChannelFuture disconnect(ChannelPromise promise);
ChannelFuture close(ChannelPromise promise);
ChannelFuture deregister(ChannelPromise promise);
ChannelOutboundInvoker read();
ChannelFuture write(Object msg);
ChannelFuture write(Object msg, ChannelPromise promise);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelFuture writeAndFlush(Object msg);
触发类的方法我们从两个维度来分析,inbound(入站事件)和outbound(出站事件)。通过方法名我们也能比较明确的区分这两类事件的区别:
入站事件:客户端channel的注册(register)、接收到客户端数据(read)等;
出站事件:当前客户端连接到远程服务(bind、connect)、向远程服务端写数据(write)等;
3.inbound入站事件分析前文中,我们介绍过真正做事的就是ChannelHandler,ChannelHandler的方法执行就是通过上述fire开头的方法来触发的。那么ChannelPipeline中的ChannelHandler类的相关方法究竟是如何执行的呢,下面我们就来分析下一些比较典型的入站事件。
代码示例如下(HelloServer中的代码):
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frame", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new HelloClientHandler());
我们来分析下,当有一个client连接到server时,server完成握手连接后,会调用ChannelPipeline.fireChannelActive()方法时的入站调用过程。
整个ChannelPipeline的节点如下所示:
3.1 ChannelPipeline.fireChannelActive()
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelActive() {
// 直接调用AbstractChannelHandlerContext触发
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
}
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 直接调用AbstractChannelHandlerContext.invokeChannelActive执行
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
//
private void invokeChannelActive() {
if (invokeHandler()) {
try {
// 调用对应ChannelHandler.channelActive()执行
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelActive();
}
}
}
总结上述调用过程就是:
DefaultChannelPipeline.fireChannelActive()
-> ChannelHandlerContext.invokeChannelActive()
-> ChannelHandler.channelActive()
ChannelPipeline的触发方法最终调用到ChannelHandler的相关执行方法
3.2 step1:HeadContext.channelActive()final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 调用ChannelHandlerContext.fireChannelActive方法
ctx.fireChannelActive();
// 如果启动auto_read,则读取client数据
readIfIsAutoRead();
}
}
// ChannelHandlerContext.fireChannelActive
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelActive() {
// findContextInbound方法获取下一个合适的节点
invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
return this;
}
// 获取当前ChannelHandlerContext的下一个inbound节点
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
// 对于不属于inbound类型事件的节点,则直接跳过,判断下一个节点
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// 这个判断条件我们需要特别关注下,如果当前节点事件不属于inbound类型事件,
// 且当前ChannelHandler不关注该事件(方法上带有@Skip)则返回true
// 意味着跳过该ChannelHandler的方法执行,直接执行下一个
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
}
HeadContext作为首节点,其channelActive()方法没有特别的,就直接获取下一个节点(属于inbound事件节点),执行下一个可用节点的channelActive方法。
** 那么哪些事件属于inbound类型事件呢?
ChannelHandlerMask中已经有了说明。
final class ChannelHandlerMask {
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
}
** ChannelHandler在封装成ChannelHandlerContext时,具体的executionMask怎么计算出来的呢?
// ChannelHandlerMask.mask()
final class ChannelHandlerMask {
static int mask(Class
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?