您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-ChannelOutboundBuffer详解

恐龙弟旺仔 发布时间:2021-12-28 19:41:52 ,浏览量:0

前言:

    在上篇文章介绍了NioSocketChannel的写操作之后,我们知道其真正动作可以分为两步:分别是write和flush。write操作将msg信息放入ChannelOutboundBuffer容器之后就结束了,而flush操作才真正将存在在容器中的数据发送到对端。

    本文就来介绍下ChannelOutboundBuffer容器的基本结构。

1.ChannelOutboundBuffer基本属性
// an internal data structure used by {@link AbstractChannel} to store its pending outbound write requests.
public final class ChannelOutboundBuffer {
    // 每一个channel有一个对应的ChannelOutboundBuffer
    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) flushedEntry、unflushedEntry、tailEntry之间的位置关系
    // 通过注释可知,Entry是一个链表结构
    
    // flushedEntry是第一个被发送到对端的Entry对象
    private Entry flushedEntry;
    // unflushedEntry是第一个不被发送到对端的Entry对象
    // 那么再通过flushedEntry、unflushedEntry之间的位置关系可知,flushedEntry、unflushedEntry之间的Entry都是要被发送到对端的
    private Entry unflushedEntry;
    // 链表尾端的Entry
    private Entry tailEntry;
    // 需要被发送到对端的Entry对象数量
    private int flushed;

    private int nioBufferCount;
    private long nioBufferSize;

    // 是否刷新失败
    private boolean inFail;
	// 待发送的缓冲区的字节总数
    private volatile long totalPendingSize;

    private volatile int unwritable;

    private volatile Runnable fireChannelWritabilityChangedTask;
}

根据其类注释我们可以了解到其作用:作为一个数据存储的中间层,用来存储AbstractChannel发送过来的数据

而待发送的数据则被包装成Entry对象,我们来看下其结构

1.1 Entry
static final class Entry {
    // 对象池,用来循环利用Entry
    private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() {
        @Override
        public Entry newObject(Handle handle) {
            return new Entry(handle);
        }
    });

    // 池化操作处理器
    private final Handle handle;
    // 下一个Entry,单向链表结构
    Entry next;
    // 真正所包含的信息
    Object msg;
    
    ByteBuffer[] bufs;
    ByteBuffer buf;
    // 回调类
    ChannelPromise promise;
    // 已发送数据进度
    long progress;
    // 总数据量
    long total;
    // 待发送数据量
    int pendingSize;
    int count = -1;
    // 是否已被取消
    boolean cancelled;
}

待发送消息msg被包装成Entry对象,且Entry对象是一个单向链表的结构。

而Entry对象的生产则是通过RECYCLER这个对象池来产生的。

当然,还有些属性不是那么明朗,我们继续看。

2.ChannelOutboundBuffer.addMessage()添加消息

    在前文中,AbstractUnsafe.write()发送消息方法中,最后会调用ChannelOutboundBuffer.addMessage()将消息发送到ChannelOutboundBuffer容器中

public final class ChannelOutboundBuffer {
	public void addMessage(Object msg, int size, ChannelPromise promise) {
        // 创建一个Entry对象,具体见2.1
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        
        // 对链表上下节点的设置
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        // 当前entry添加到最后
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
}

我们通过两张图来表示下添加首个对象和添加N个对象之后的示意图:

* 添加首个Entry时:

 

添加首个Entry后,unflushedEntry和tailEntry均指向该新增的Entry

* 添加N个Entry对象时:

 

添加N个Entry对象后,第一个添加的Entry_1则作为unflushedEntry,其next指向Entry_2,最终添加Entry_n,则Entry_n为tailEntry。

2.1 Entry.newInstance() 根据msg创建一个Entry对象
static final class Entry {
	static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        // 通过对象池获取一个Entry对象
        Entry entry = RECYCLER.get();
        // 设置基本属性
        entry.msg = msg;
        entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
        entry.total = total;
        entry.promise = promise;
        return entry;
    }
}
3.ChannelOutboundBuffer.addFlush() 添加刷新
public final class ChannelOutboundBuffer {
    public void addFlush() {
        // 获取第一个未flushed的Entry对象
        Entry entry = unflushedEntry;
        if (entry != null) {
            // 如果flushedEntry为空,说明没有设置过这个对象
            if (flushedEntry == null) {
                // 则将unflushedEntry设置为flushedEntry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                // 若entry所关联的ChannelPromise未取消,则继续刷新
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
}

代码不算复杂,之前write()方法则是添加Entry对象,一直从unflushedEntry-->TailEntry,但是flushedEntry一直为null

执行了addFlush()方法之后,则是为了设置flushEntry对象,并且遍历所有的Entry对象,记录下需要被发送的Entry对象数量 (为flushed)

接着上面的添加N个Entry对象的图来解释,执行完addFlush()方法之后,图变成如下:

 

4.AbstractUnsafe.flush0() 

    真正发送数据到对端的方法

protected abstract class AbstractUnsafe implements Unsafe {
	protected void flush0() {
        // 是否正在刷新中,如果是,则直接返回,防止重复刷新
        if (inFlush0) {
            return;
        }

        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }

        // 设置inFlush0标志位为true
        inFlush0 = true;

        // 如果Channel非active,则说明channel已经断开,
        if (!isActive()) {
            try {
                if (!outboundBuffer.isEmpty()) {
                    // 如果Channel已经断开,但是outboundBuffer还有很多未发送信息,则需要将这些Entry消息释放掉,具体见4.1
                    if (isOpen()) {
                        outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }

        try {
            // 发送消息到对端
            // 调用NioSocketChannel.doWrite()方法,具体见4.2
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                initialCloseCause = t;
                close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    initialCloseCause = t;
                    close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }
}
4.1 ChannelOutboundBuffer.failFlushed() 非active的channel释放所有未发送信息
public final class ChannelOutboundBuffer {
	void failFlushed(Throwable cause, boolean notify) {
        // Make sure that this method does not reenter.主要是防重入
        if (inFail) {
            return;
        }

        try {
            // 防重入,主要是通过inFail对象的设置来执行的
            inFail = true;
            for (;;) {
                // 调用remove0方法删除,具体见4.1.1
                if (!remove0(cause, notify)) {
                    break;
                }
            }
        } finally {
            inFail = false;
        }
    }
}

4.1.1 ChannelOutboundBuffer.remove0()

public final class ChannelOutboundBuffer {
	private boolean remove0(Throwable cause, boolean notifyWritability) {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        // 释放消息msg
        if (!e.cancelled) {
            ReferenceCountUtil.safeRelease(msg);

            safeFail(promise, cause);
            decrementPendingOutboundBytes(size, false, notifyWritability);
        }

        // 将Entry对象回收到对象池中
        e.recycle();

        return true;
    }
    
    private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            flushedEntry = e.next;
        }
    }
}

Entry对象的删除并不复杂,主要就是将msg消息释放,然后将Entry对象回收到对象池中

4.2 NioSocketChannel.doWrite() 发送消息到对端
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
	protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
        // 自旋写入次数,默认为16,可配置
        int writeSpinCount = config().getWriteSpinCount();
        do {
            // 如果队列已经空了,则取消OP_WRITE事件关注
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                return;
            }

            // 获取每次写入的最大字节数
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
            // 从队列中获取ByteBuffer数组,将ByteBuf转换为ByteBuffer
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();
            // 根据不同的场景调用不同的write()方法
			switch (nioBufferCnt) {
                case 0:
                    // TODO
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    // 数组中只有一个ByteBuffer对象,直接调用SocketChannel.write()方法来发送ByteBuffer数据
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    // 在这里调用
                    final int localWrittenBytes = ch.write(buffer);
                    // 若localWrittenBytes            
关注
打赏
1655041699
查看更多评论
0.8681s