通过之前那么多的分拆知识点讲解,终于到了最重要的阶段了。无论之前的EventLoop还是ByteBufAllocate,终归都是为Channel服务的。Channel的实现类才是最终连接、读写事件的执行者。
本文主要聚焦于NioServerSocketChannel,作为Server端,主要用于接收客户端连接,并为该连接注册事件,我们通过源码的角度来分析下连接的整个过程。
1.NioServerSocketChannel构造图分析类的时候,尤其Netty框架这种结构层特别多的关键类时,要学会通过Diagrams视角来分析,NioServerSocketChannel的类继承图如下:
相关接口,之前我们都有过分析,ChannelOutboundInvoker、Channel、ServerChannel、ServerSocketChannel,本文不再赘述。具体可参考...
而关于其的一些抽象实现类,我们之前倒没有了解过,本文就从上到下的来分析这些抽象类。
2.AbstractChannel解析public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
/** 基本属性 */
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
// 这些都是在在关联到某个具体地址时,提供的地址
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
// Channel被关联到的某个具体EventLoop
private volatile EventLoop eventLoop;
private volatile boolean registered;
// 构造方法
protected AbstractChannel(Channel parent) {
// 通过构造方法,将Channel所关联的Unsafe、pipeline创建起来
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
// 每一个Channel都有一个特定的ChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
/** 可以看到,AbstractChannel实现这些方法的方式就是托管给ChannelPipeline,
* 而ChannelPipeline的执行,也就是顺序执行其中的ChannelHandler
*/
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
public Channel read() {
pipeline.read();
return this;
}
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
// 每一个具体的Channel实现类都有一个具体的Unsafe实现类,交由子类实现
protected abstract AbstractUnsafe newUnsafe();
// 每一个AbstractChannel都有一个AbstractUnsafe
// 之前一篇Channel的介绍中,我们知道Channel的方法实现都是交由其Unsafe来实现的
protected abstract class AbstractUnsafe implements Unsafe {
// 1.将channel注册到EventLoop
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 已注册的则不再重复注册
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
// 调用register0进行注册
register0(promise);
} else {
...
}
}
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 具体的注册方法交由子类实现
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
// 触发所有关联的ChannelHandler.channelRegistered()方法
pipeline.fireChannelRegistered();
// 如果客户端连接成功,则触发ChannelHandler.channelActive()方法
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 则支持自动读取,则开始执行读取方法
beginRead();
}
}
} catch (Throwable t) {
...
}
}
// 2.绑定方法
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
...
boolean wasActive = isActive();
try {
// 执行绑定方法,交由具体子类执行
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
...
safeSetSuccess(promise);
}
// 3.读取数据
@Override
public final void beginRead() {
...
try {
// 交由子类实现
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
// 写出数据
// write和flush是一家
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
...
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
// 添加到outboundBuffer
outboundBuffer.addMessage(msg, size, promise);
}
@SuppressWarnings("deprecation")
protected void flush0() {
...
try {
// 将上述添加到outboundBuffer中的消息发送出去
doWrite(outboundBuffer);
} catch (Throwable t) {
...
} finally {
inFlush0 = false;
}
}
}
}
总结:针对AbstractChannel,主要工作都交由ChannelPipeline来实现了。其中AbstractUnsafe,作为一个抽象实现,对基本的bind、read、write方法进行基本的验证,具体实现则交由子类实现了
3.AbstractNIOChannel解析public abstract class AbstractNioChannel extends AbstractChannel {
// NIO中的相关类
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
// 基本构造方法
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
// 关注的事件,这里要注意下,NioServerSocketChannel和NioSocketChannel关注的事件不同
this.readInterestOp = readInterestOp;
try {
// 设置为不阻塞
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
// 将channel注册到EventLoop
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 本质上还是调用SelectableChannel.register(Selector)方法来实现的,还是JDK_NIO相关方法
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
// 开始读
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 主要是将读事件关注起来
selectionKey.interestOps(interestOps | readInterestOp);
}
}
// AbstractNioUnsafe
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
// 创建连接
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
boolean wasActive = isActive();
// 创建对远端连接
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
// 连接失败
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
...
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
}
}
没有什么特殊的方法,主要是对JDK_NIO的相关SelectableChannel、Selector的封装
4.AbstractNioMessageChannelpublic abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List readBuf = new ArrayList();
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 针对连接的客户端,也被作为一种请求被读出来,
// 具体可见NioServerSocketChannel.doReadMessages方法,下面有
int localRead = doReadMessages(readBuf);
// 无连接,则直接break掉
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 将获取到的连接,交由ChannelPipeline执行
// NioServerSocketChannel的ChannelPipeline有哪些ChannelHandler?
// 在之前ServerBootstrap的介绍中,我们有提到过,就是ServerBootstrapAcceptor这个Handler
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// 删除读事件关注
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
// 写事件
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
// 具体的写操作也交由子类NioServerSocketChannel实现
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (Exception e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
}
}
总结:AbstractNioMessageChannel作为一个关键类,提供了读和写的模板方法。而doReadMessages()和doWriteMessage()两个实质性的读写方法,则交由子类实现。
下面我们来看下子类是如何实现的。
5.NioServerSocketChannelpublic class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
// 默认构造方法
public NioServerSocketChannel() {
// newSocket方法返回一个具体的JDK中的ServerSocketChannel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
// 在这里可以看到NioServerSocketChannel关注的事件时OP_ACCEPT事件
// 后续在将channel注册到EventLoop后,会注册OP_ACCEPT事件
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// NioServerSocketChannel是否可用,
public boolean isActive() {
// 主要看其socket是否成功的绑定到具体的address(一般是localhost:port)
return isOpen() && javaChannel().socket().isBound();
}
// 绑定到具体local地址
protected void doBind(SocketAddress localAddress) throws Exception {
// 不同JDK版本有不同的实现,本质上还是NIO那一套
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
// NioServerSocketChannel竟然不支持写方法,仔细一想是可以理解的
// 服务端不需要写出什么数据,写出应该交由NioSocketChannel来做,这代表一个对客户端的连接
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
// connect事件,应该是客户端的事件,只有客户端才需要连接到远端
protected boolean doConnect(
SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
throw new UnsupportedOperationException();
}
// 针对服务端而言,可读的事件,就是有客户端连接上来
protected int doReadMessages(List buf) throws Exception {
// 实现为serverSocketChannel.accept(),还是NIO那一套,获取一个客户端连接
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
...
}
return 0;
}
}
总结:NioServerSocketChannel实现的主要方法就是
doBind:连接到本地某port上,作为一个endpoint
doReadMessages:获取客户端的连接,并交由后续的ChannelPipeline处理
6.ServerBootstrapAcceptor刚才也说了,获取到的客户端连接交由ChannelPipeline来处理,ChannelPipeline包含了一个ServerBootstrapAcceptor的Handler,该Handler是ServerBootStrap.init()方法时添加到NioServerSocketChannel.ChannelPipeline中的。
那NioServerSocketChannel获取到的客户端连接NioSocketChannel,究竟经过了哪些处理呢?我们来看下ServerBootstrapAcceptor有哪些处理
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
// 是不是很眼熟,就是ServerBootStrap在创建时候,指定的一系列属性
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
// 最关键的方法,获取到的客户端连接在这里被操作
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 在ServerBootStrap中设置的childHandler被添加到NioSocketChannel.ChannelPipeline中
child.pipeline().addLast(childHandler);
// 设置NioSocketChannel的基本属性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 将NioSocketChannel注册到childGroup中
// 具体实现可参见上面AbstractChannel.register方法实现
// 主要就是将获取到的客户端连接注册到某个EventLoop.Selector上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
总结:
经历了这么多,我们来用两张时序图表示下NioServerSocketChannel执行绑定到localhost和监听客户端连接的总过程。
1.NioServerSocketChannel绑定过程
NioServerSocketChannel绑定到本地某port上,在之前NIO时,是由以下代码完成
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(address, port));
那么NioServerSocketChannel是如何完成这个操作的呢,看下面的时序图
一系列的兜兜转转,最终还是调用到NioServerSocketChannel.doBind()方法,终于还是调用NIO方法实现绑定
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
protected ServerSocketChannel javaChannel() {
return (ServerSocketChannel) super.javaChannel();
}
}
2.NioServerSocketChannel监听客户端连接
在了解监听全过程时候,有一个很关键的类,ServerBootStrapAcceptor,该类是一个ChannelInboundHandler,是用于接收连接的主要操作类。那么这个类是在什么时候被添加进来的呢?就是在上面ServerBootStrap.init()方法中(AbstractBootstrap.initAndRegister()中被调用的)
public class ServerBootstrap extends AbstractBootstrap {
void init(Channel channel) {
...
// ServerBootstrapAcceptor被添加到NioServerSocketChannel的ChannelPipeline中
p.addLast(new ChannelInitializer() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
}
下面用一张序列图来展现下NioServerSocketChannel接收客户端连接的总过程
注意:什么类用来监听连接呢?就是我们的入口类应该是谁呢?之前在EventLoop和EventLoopGroup中,我们说过,EventLoop.run方法用来不断轮询来监听注册到当前Selector上的Channel所关注的事件。我们当前NioServerSocketChannel的OP_ACCEPT事件就注册在EventLoop.Selector上,所以EventLoop就作为我们的入口类
监听客户端的连接的工作最终还是交由NioServerSocketChannel来做
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
protected int doReadMessages(List buf) throws Exception {
// 获取客户端连接
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
try {
ch.close();
} catch (Throwable t2) {
}
}
return 0;
}
}