您当前的位置: 首页 > 

庄小焱

暂无认证

  • 2浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty——Read过程源码分析

庄小焱 发布时间:2021-11-13 11:00:27 ,浏览量:2

摘要

将分析一下work线程中的read过程,其中涉及到零拷贝技术的及其源码分析。

Read code
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final NioUnsafe unsafe = ch.unsafe();
    //检查该SelectionKey是否有效,如果无效,则关闭channel
    if (!k.isValid()) {
        // close the channel if the key is not valid anymore
        unsafe.close(unsafe.voidPromise());
        return;
    }

    try {
        int readyOps = k.readyOps();
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        // 如果准备好READ或ACCEPT则触发unsafe.read() ,检查是否为0,如上面的源码英文注释所说:解决JDK可能会产生死循环的一个bug。
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                //如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
                // Connection already closed - no need to handle write.
                return;
            }
        }
        // 如果准备好了WRITE则将缓冲区中的数据发送出去,如果缓冲区中数据都发送完成,则清除之前关注的OP_WRITE标记
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // 如果是OP_CONNECT,则需要移除OP_CONNECT否则Selector.select(timeout)将立即返回不会有任何阻塞,这样可能会出现cpu 100%
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

该方法主要是对SelectionKey k进行了检查,有如下几种不同的情况:

  • 1)OP_ACCEPT,接受客户端连接
  • 2)OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取。
  • 3)OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据。
  • 4)OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态。

主要来看下当work 线程 selector检测到OP_READ事件时,内部干了些什么?

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
    if (!ch.isOpen()) {
        //如果已经关闭,则直接返回即可,不需要再处理该channel的其他事件
        // Connection already closed - no need to handle write.
        return;
    }
}

从代码中可以看到,当selectionKey发生的事件是SelectionKey.OP_READ,执行unsafe的read方法。注意这里的unsafe是NioByteUnsafe的实例。为什么说这里的unsafe是NioByteUnsafe的实例呢?在上篇博文Netty源码分析:accept中我们知道Boss NioEventLoopGroup中的NioEventLoop只负责accpt客户端连接,然后将该客户端注册到Work NioEventLoopGroup中的NioEventLoop中,即最终是由work线程对应的selector来进行read等时间的监听,即work线程中的channel为SocketChannel,SocketChannel的unsafe就是NioByteUnsafe的实例。

Unsafe中的read()
@Override
    public void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }

        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }

        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                //1、分配缓存
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();//可写的字节容量
                //2、将socketChannel数据写入缓存
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount = Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
}
allocHandler的实例化过程

allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看allocHandler的实例化过程。

RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
    this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}

其中, config.getRecvByteBufAllocator()得到的是一个 AdaptiveRecvByteBufAllocator实例DEFAULT。

public static final AdaptiveRecvByteBufAllocator DEFAULT = new AdaptiveRecvByteBufAllocator();

而AdaptiveRecvByteBufAllocator中的newHandler()方法的代码如下:

@Override
public Handle newHandle() {
    return new HandleImpl(minIndex, maxIndex, initial);
}

HandleImpl(int minIndex, int maxIndex, int initial) {
    this.minIndex = minIndex;
    this.maxIndex = maxIndex;

    index = getSizeTableIndex(initial);
    nextReceiveBufferSize = SIZE_TABLE[index];
}

其中,上面方法中所用到参数:minIndex maxIndex initial是什么意思呢?含义如下:minIndex是最小缓存在SIZE_TABLE中对应的下标。maxIndex是最大缓存在SIZE_TABLE中对应的下标,initial为初始化缓存大小。

AdaptiveRecvByteBufAllocator的相关常量字段

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {

        static final int DEFAULT_MINIMUM = 64;
        static final int DEFAULT_INITIAL = 1024;
        static final int DEFAULT_MAXIMUM = 65536;

        private static final int INDEX_INCREMENT = 4;
        private static final int INDEX_DECREMENT = 1;

        private static final int[] SIZE_TABLE;

1)、SIZE_TABLE:按照从小到大的顺序预先存储可以分配的缓存大小。 从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。SIZE_TABLE初始化过程如下。

static {
    List sizeTable = new ArrayList();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i > 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) {
            low = mid + 1;
        } else if (size < a) {
            high = mid - 1;
        } else if (size == a) {
            return mid;
        } else { //这里的情况就是 a < size  0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

这里的setBytes方法有不同的实现,这里看下UnpooledUnsafeDirectByteBuf的setBytes的实现。

UnpooledUnsafeDirectByteBuf.java

@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;//当Channel 已经关闭,则返回-1.    
    }
} 

private ByteBuffer internalNioBuffer() {
    ByteBuffer tmpNioBuf = this.tmpNioBuf;
    if (tmpNioBuf == null) {
        this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
    }
    return tmpNioBuf;
}

最终底层采用ByteBuffer实现read操作,无论是PooledByteBuf、还是UnpooledXXXBuf,里面都将底层数据结构BufBuffer/array转换为ByteBuffer 来实现read操作。即无论是UnPooledXXXBuf还是PooledXXXBuf里面都有一个ByteBuffer tmpNioBuf,这个tmpNioBuf才是真正用来存储从管道Channel中读取出的内容的。到这里就完成了将channel的数据读入到了缓存Buf中。

我们具体来看看 in.read(tmpBuf); FileChannel和SocketChannel的read最后是依赖IOUtil来实现。

public int read(ByteBuffer dst) throws IOException {
    ensureOpen();
    if (!readable)
        throw new NonReadableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                n = IOUtil.read(fd, dst, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}

最后目的就是将SocketChannel中的数据读出存放到ByteBuffer dst中,我们看看 IOUtil.read(fd, dst, -1, nd)

static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {
    if (var1.isReadOnly()) {
        throw new IllegalArgumentException("Read-only buffer");
    //如果最终承载数据的buffer是DirectBuffer,则直接将数据读入到堆外内存中
    } else if (var1 instanceof DirectBuffer) {
        return readIntoNativeBuffer(var0, var1, var2, var4);
    } else {
        // 分配临时的堆外内存
        ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());

        int var7;
        try {
            // Socket I/O 操作会将数据读入到堆外内存中
            int var6 = readIntoNativeBuffer(var0, var5, var2, var4);
            var5.flip();
            if (var6 > 0) {
                // 将堆外内存的数据拷贝到堆内存中(用户定义的缓存,在jvm中分配内存)
                var1.put(var5);
            }

            var7 = var6;
        } finally {
            // 里面会调用DirectBuffer.cleaner().clean()来释放临时的堆外内存
            Util.offerFirstTemporaryDirectBuffer(var5);
        }

        return var7;
    }
}

通过上述实现可以看出,基于channel的数据读取步骤如下:

  • 1、如果缓存内存是DirectBuffer,就直接将Channel中的数据读取到堆外内存
  • 2、如果缓存内存是堆内存,则先申请一块和缓存同大小的临时 DirectByteBuffer var5。
  • 3、将内核缓存中的数据读到堆外缓存var5,底层由NativeDispatcher的read实现。
  • 4、把堆外缓存var5的数据拷贝到堆内存var1(用户定义的缓存,在jvm中分配内存)。
  • 5、会调用DirectBuffer.cleaner().clean()来释放创建的临时的堆外内存

如果AbstractNioByteChannel.read中第一步创建的是堆外内存,则会直接将数据读入到堆外内存,并不会先创建临时堆外内存,再将数据读入到堆外内存,最后将堆外内存拷贝到堆内存,简单的说,如果使用堆外内存,则只会复制一次数据,如果使用堆内存,则会复制两次数据

readIntoNativeBuffer.java

这个函数就是将内核缓冲区中的数据读取到堆外缓存DirectBuffer

private static int readIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)  throws IOException  {  
    int i = bytebuffer.position();  
    int j = bytebuffer.limit();  
    //如果断言开启,buffer的position大于limit,则抛出断言错误  
    if(!$assertionsDisabled && i > j)  
        throw new AssertionError();  
    //获取需要读的字节数  
    int k = i > j ? 0 : j - i;  
    if(k == 0)  
        return 0;  
    int i1 = 0;  
    //从输入流读取k个字节到buffer  
    if(l != -1L)  
        i1 = nativedispatcher.pread(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);  
    else  
        i1 = nativedispatcher.read(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);  
    //重新定位buffer的position  
    if(i1 > 0)  
        bytebuffer.position(i + i1);  
    return i1;  
}

AbstractNioByteChannel.read()

@Override
public void read() {
        //...
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount = Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }

                totalReadAmount += localReadAmount;

                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }

                if (localReadAmount < writable) {
                    // Read less than what the buffer can hold,
                    // which might mean we drained the recv buffer completely.
                    break;
                }
            } while (++ messages < maxMessagesPerRead);

            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);

            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            if (!config.isAutoRead() && !isReadPending()) {
                removeReadOp();
            }
        }
    }
}

int localReadAmount = doReadBytes(byteBuf);

  • 1、如果返回0,则表示没有读取到数据,则退出循环。
  • 2、如果返回-1,表示对端已经关闭连接,则退出循环。
  • 3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的ChannelRead事件,byteBuf作为参数进行后续处理,这时自定义Inbound类型的handler就可以进行业务处理了。Pipeline的事件处理在我之前的博文中有详细的介绍。处理完成之后,再一次从Channel读取数据,直至退出循环。
  • 4、循环次数超过maxMessagesPerRead时,即只能在管道中读取maxMessagesPerRead次数据,既是还没有读完也要退出。在上篇博文中,Boss线程接受客户端连接也用到了此变量,即当boss线程 selector检测到OP_ACCEPT事件后一次只能接受maxMessagesPerRead个客户端连接。
关注
打赏
1657692713
查看更多评论
立即登录/注册

微信扫码登录

0.0384s