您当前的位置: 首页 >  nio

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-NioServerSocketChannel

恐龙弟旺仔 发布时间:2021-12-27 19:07:34 ,浏览量:0

前言:

通过之前那么多的分拆知识点讲解,终于到了最重要的阶段了。无论之前的EventLoop还是ByteBufAllocate,终归都是为Channel服务的。Channel的实现类才是最终连接、读写事件的执行者。

本文主要聚焦于NioServerSocketChannel,作为Server端,主要用于接收客户端连接,并为该连接注册事件,我们通过源码的角度来分析下连接的整个过程。

1.NioServerSocketChannel构造图

分析类的时候,尤其Netty框架这种结构层特别多的关键类时,要学会通过Diagrams视角来分析,NioServerSocketChannel的类继承图如下:

 

    相关接口,之前我们都有过分析,ChannelOutboundInvoker、Channel、ServerChannel、ServerSocketChannel,本文不再赘述。具体可参考...

    而关于其的一些抽象实现类,我们之前倒没有了解过,本文就从上到下的来分析这些抽象类。

2.AbstractChannel解析
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    /** 基本属性 */
    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    
    // 这些都是在在关联到某个具体地址时,提供的地址
    private volatile SocketAddress localAddress;
    private volatile SocketAddress remoteAddress;
    // Channel被关联到的某个具体EventLoop
    private volatile EventLoop eventLoop;
    private volatile boolean registered;
    
    // 构造方法
    protected AbstractChannel(Channel parent) {
        // 通过构造方法,将Channel所关联的Unsafe、pipeline创建起来
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    
    // 每一个Channel都有一个特定的ChannelPipeline
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    
    /** 可以看到,AbstractChannel实现这些方法的方式就是托管给ChannelPipeline,
     *  而ChannelPipeline的执行,也就是顺序执行其中的ChannelHandler
     */
    @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }
    
    public Channel read() {
        pipeline.read();
        return this;
    }

    @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }
    
    // 每一个具体的Channel实现类都有一个具体的Unsafe实现类,交由子类实现
    protected abstract AbstractUnsafe newUnsafe();
    
    // 每一个AbstractChannel都有一个AbstractUnsafe
    // 之前一篇Channel的介绍中,我们知道Channel的方法实现都是交由其Unsafe来实现的
    protected abstract class AbstractUnsafe implements Unsafe {
        
        // 1.将channel注册到EventLoop
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            // 已注册的则不再重复注册
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
			...
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                // 调用register0进行注册
                register0(promise);
            } else {
                ...
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 具体的注册方法交由子类实现
                doRegister();
                neverRegistered = false;
                registered = true;

                safeSetSuccess(promise);
                // 触发所有关联的ChannelHandler.channelRegistered()方法
                pipeline.fireChannelRegistered();
                // 如果客户端连接成功,则触发ChannelHandler.channelActive()方法
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 则支持自动读取,则开始执行读取方法
                        beginRead();
                    }
                }
            } catch (Throwable t) {
               ...
            }
        }
            
        // 2.绑定方法
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
			...
            boolean wasActive = isActive();
            try {
                // 执行绑定方法,交由具体子类执行
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }
            ...
            safeSetSuccess(promise);
        }    
            
        // 3.读取数据
        @Override
        public final void beginRead() {
            ...
            try {
                // 交由子类实现	 
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }
            
        // 写出数据
        // write和flush是一家    
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            ...

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }

            // 添加到outboundBuffer
            outboundBuffer.addMessage(msg, size, promise);
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
			...
            try {
                // 将上述添加到outboundBuffer中的消息发送出去
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                ...
            } finally {
                inFlush0 = false;
            }
        }
    }
}

总结:针对AbstractChannel,主要工作都交由ChannelPipeline来实现了。其中AbstractUnsafe,作为一个抽象实现,对基本的bind、read、write方法进行基本的验证,具体实现则交由子类实现了 

3.AbstractNIOChannel解析
public abstract class AbstractNioChannel extends AbstractChannel {
 
    // NIO中的相关类
    private final SelectableChannel ch;
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    
    // 基本构造方法
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        // 关注的事件,这里要注意下,NioServerSocketChannel和NioSocketChannel关注的事件不同
        this.readInterestOp = readInterestOp;
        try {
            // 设置为不阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            ...
        }
    }
        
    // 将channel注册到EventLoop
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 本质上还是调用SelectableChannel.register(Selector)方法来实现的,还是JDK_NIO相关方法
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
        }
    }
            
    // 开始读
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // 主要是将读事件关注起来
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
            
    // AbstractNioUnsafe
    protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
     
        // 创建连接
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            try {
                boolean wasActive = isActive();
                // 创建对远端连接
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    // 连接失败
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) {
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                    close(voidPromise());
                                }
                            }
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    }
					...
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }
    }
}

没有什么特殊的方法,主要是对JDK_NIO的相关SelectableChannel、Selector的封装

4.AbstractNioMessageChannel
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {

    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }
    
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
        private final List readBuf = new ArrayList();
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // 针对连接的客户端,也被作为一种请求被读出来,
                        // 具体可见NioServerSocketChannel.doReadMessages方法,下面有
                        int localRead = doReadMessages(readBuf);
                        // 无连接,则直接break掉
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 将获取到的连接,交由ChannelPipeline执行
                    // NioServerSocketChannel的ChannelPipeline有哪些ChannelHandler?
                    // 在之前ServerBootstrap的介绍中,我们有提到过,就是ServerBootstrapAcceptor这个Handler
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // 删除读事件关注
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }

    // 写事件
	protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        final SelectionKey key = selectionKey();
        final int interestOps = key.interestOps();

        for (;;) {
            Object msg = in.current();
            if (msg == null) {
                // Wrote all messages.
                if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                }
                break;
            }
            try {
                boolean done = false;
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                    // 具体的写操作也交由子类NioServerSocketChannel实现
                    if (doWriteMessage(msg, in)) {
                        done = true;
                        break;
                    }
                }

                if (done) {
                    in.remove();
                } else {
                    // Did not write all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                        key.interestOps(interestOps | SelectionKey.OP_WRITE);
                    }
                    break;
                }
            } catch (Exception e) {
                if (continueOnWriteError()) {
                    in.remove(e);
                } else {
                    throw e;
                }
            }
        }
    }

}

总结:AbstractNioMessageChannel作为一个关键类,提供了读和写的模板方法。而doReadMessages()和doWriteMessage()两个实质性的读写方法,则交由子类实现。

下面我们来看下子类是如何实现的。

5.NioServerSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {

    // 默认构造方法
    public NioServerSocketChannel() {
        // newSocket方法返回一个具体的JDK中的ServerSocketChannel
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 在这里可以看到NioServerSocketChannel关注的事件时OP_ACCEPT事件
        // 后续在将channel注册到EventLoop后,会注册OP_ACCEPT事件
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    
    // NioServerSocketChannel是否可用,
    public boolean isActive() {
        // 主要看其socket是否成功的绑定到具体的address(一般是localhost:port)
        return isOpen() && javaChannel().socket().isBound();
    }
    
    // 绑定到具体local地址
    protected void doBind(SocketAddress localAddress) throws Exception {
        // 不同JDK版本有不同的实现,本质上还是NIO那一套
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

	// NioServerSocketChannel竟然不支持写方法,仔细一想是可以理解的
	// 服务端不需要写出什么数据,写出应该交由NioSocketChannel来做,这代表一个对客户端的连接
	protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
        throw new UnsupportedOperationException();
    }

	// connect事件,应该是客户端的事件,只有客户端才需要连接到远端
    protected boolean doConnect(
            SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        throw new UnsupportedOperationException();
    }

	// 针对服务端而言,可读的事件,就是有客户端连接上来
	protected int doReadMessages(List buf) throws Exception {
        // 实现为serverSocketChannel.accept(),还是NIO那一套,获取一个客户端连接
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            ...
        }
        return 0;
    }
}

总结:NioServerSocketChannel实现的主要方法就是

doBind:连接到本地某port上,作为一个endpoint

doReadMessages:获取客户端的连接,并交由后续的ChannelPipeline处理

6.ServerBootstrapAcceptor

    刚才也说了,获取到的客户端连接交由ChannelPipeline来处理,ChannelPipeline包含了一个ServerBootstrapAcceptor的Handler,该Handler是ServerBootStrap.init()方法时添加到NioServerSocketChannel.ChannelPipeline中的。

    那NioServerSocketChannel获取到的客户端连接NioSocketChannel,究竟经过了哪些处理呢?我们来看下ServerBootstrapAcceptor有哪些处理

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    
    // 是不是很眼熟,就是ServerBootStrap在创建时候,指定的一系列属性
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry, Object>[] childAttrs;
	private final Runnable enableAutoReadTask;

	// 最关键的方法,获取到的客户端连接在这里被操作
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

        	// 在ServerBootStrap中设置的childHandler被添加到NioSocketChannel.ChannelPipeline中
            child.pipeline().addLast(childHandler);

        	// 设置NioSocketChannel的基本属性
            setChannelOptions(child, childOptions, logger);
            setAttributes(child, childAttrs);

            try {
                // 将NioSocketChannel注册到childGroup中
                // 具体实现可参见上面AbstractChannel.register方法实现
                // 主要就是将获取到的客户端连接注册到某个EventLoop.Selector上
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
}
总结:

    经历了这么多,我们来用两张时序图表示下NioServerSocketChannel执行绑定到localhost和监听客户端连接的总过程。

1.NioServerSocketChannel绑定过程

    NioServerSocketChannel绑定到本地某port上,在之前NIO时,是由以下代码完成

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(address, port));

    那么NioServerSocketChannel是如何完成这个操作的呢,看下面的时序图

 

    一系列的兜兜转转,最终还是调用到NioServerSocketChannel.doBind()方法,终于还是调用NIO方法实现绑定

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
	protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

    protected ServerSocketChannel javaChannel() {
        return (ServerSocketChannel) super.javaChannel();
    }
}

2.NioServerSocketChannel监听客户端连接

    在了解监听全过程时候,有一个很关键的类,ServerBootStrapAcceptor,该类是一个ChannelInboundHandler,是用于接收连接的主要操作类。那么这个类是在什么时候被添加进来的呢?就是在上面ServerBootStrap.init()方法中(AbstractBootstrap.initAndRegister()中被调用的)

public class ServerBootstrap extends AbstractBootstrap {
	void init(Channel channel) {
        ...
        // ServerBootstrapAcceptor被添加到NioServerSocketChannel的ChannelPipeline中
        p.addLast(new ChannelInitializer() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
}

下面用一张序列图来展现下NioServerSocketChannel接收客户端连接的总过程

注意:什么类用来监听连接呢?就是我们的入口类应该是谁呢?之前在EventLoop和EventLoopGroup中,我们说过,EventLoop.run方法用来不断轮询来监听注册到当前Selector上的Channel所关注的事件。我们当前NioServerSocketChannel的OP_ACCEPT事件就注册在EventLoop.Selector上,所以EventLoop就作为我们的入口类

 

监听客户端的连接的工作最终还是交由NioServerSocketChannel来做

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {
	protected int doReadMessages(List buf) throws Exception {
        // 获取客户端连接
        SocketChannel ch = SocketUtils.accept(javaChannel());
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            try {
                ch.close();
            } catch (Throwable t2) {
            }
        }
        return 0;
    }
}

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0871s