在上篇文章介绍了NioSocketChannel的写操作之后,我们知道其真正动作可以分为两步:分别是write和flush。write操作将msg信息放入ChannelOutboundBuffer容器之后就结束了,而flush操作才真正将存在在容器中的数据发送到对端。
本文就来介绍下ChannelOutboundBuffer容器的基本结构。
1.ChannelOutboundBuffer基本属性// an internal data structure used by {@link AbstractChannel} to store its pending outbound write requests.
public final class ChannelOutboundBuffer {
// 每一个channel有一个对应的ChannelOutboundBuffer
private final Channel channel;
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) flushedEntry、unflushedEntry、tailEntry之间的位置关系
// 通过注释可知,Entry是一个链表结构
// flushedEntry是第一个被发送到对端的Entry对象
private Entry flushedEntry;
// unflushedEntry是第一个不被发送到对端的Entry对象
// 那么再通过flushedEntry、unflushedEntry之间的位置关系可知,flushedEntry、unflushedEntry之间的Entry都是要被发送到对端的
private Entry unflushedEntry;
// 链表尾端的Entry
private Entry tailEntry;
// 需要被发送到对端的Entry对象数量
private int flushed;
private int nioBufferCount;
private long nioBufferSize;
// 是否刷新失败
private boolean inFail;
// 待发送的缓冲区的字节总数
private volatile long totalPendingSize;
private volatile int unwritable;
private volatile Runnable fireChannelWritabilityChangedTask;
}
根据其类注释我们可以了解到其作用:作为一个数据存储的中间层,用来存储AbstractChannel发送过来的数据
而待发送的数据则被包装成Entry对象,我们来看下其结构
1.1 Entrystatic final class Entry {
// 对象池,用来循环利用Entry
private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() {
@Override
public Entry newObject(Handle handle) {
return new Entry(handle);
}
});
// 池化操作处理器
private final Handle handle;
// 下一个Entry,单向链表结构
Entry next;
// 真正所包含的信息
Object msg;
ByteBuffer[] bufs;
ByteBuffer buf;
// 回调类
ChannelPromise promise;
// 已发送数据进度
long progress;
// 总数据量
long total;
// 待发送数据量
int pendingSize;
int count = -1;
// 是否已被取消
boolean cancelled;
}
待发送消息msg被包装成Entry对象,且Entry对象是一个单向链表的结构。
而Entry对象的生产则是通过RECYCLER这个对象池来产生的。
当然,还有些属性不是那么明朗,我们继续看。
2.ChannelOutboundBuffer.addMessage()添加消息在前文中,AbstractUnsafe.write()发送消息方法中,最后会调用ChannelOutboundBuffer.addMessage()将消息发送到ChannelOutboundBuffer容器中
public final class ChannelOutboundBuffer {
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 创建一个Entry对象,具体见2.1
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// 对链表上下节点的设置
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
// 当前entry添加到最后
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
}
我们通过两张图来表示下添加首个对象和添加N个对象之后的示意图:
* 添加首个Entry时:
添加首个Entry后,unflushedEntry和tailEntry均指向该新增的Entry
* 添加N个Entry对象时:
添加N个Entry对象后,第一个添加的Entry_1则作为unflushedEntry,其next指向Entry_2,最终添加Entry_n,则Entry_n为tailEntry。
2.1 Entry.newInstance() 根据msg创建一个Entry对象static final class Entry {
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
// 通过对象池获取一个Entry对象
Entry entry = RECYCLER.get();
// 设置基本属性
entry.msg = msg;
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}
}
3.ChannelOutboundBuffer.addFlush() 添加刷新
public final class ChannelOutboundBuffer {
public void addFlush() {
// 获取第一个未flushed的Entry对象
Entry entry = unflushedEntry;
if (entry != null) {
// 如果flushedEntry为空,说明没有设置过这个对象
if (flushedEntry == null) {
// 则将unflushedEntry设置为flushedEntry
flushedEntry = entry;
}
do {
flushed ++;
// 若entry所关联的ChannelPromise未取消,则继续刷新
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
}
代码不算复杂,之前write()方法则是添加Entry对象,一直从unflushedEntry-->TailEntry,但是flushedEntry一直为null
执行了addFlush()方法之后,则是为了设置flushEntry对象,并且遍历所有的Entry对象,记录下需要被发送的Entry对象数量 (为flushed)
接着上面的添加N个Entry对象的图来解释,执行完addFlush()方法之后,图变成如下:
4.AbstractUnsafe.flush0()
真正发送数据到对端的方法
protected abstract class AbstractUnsafe implements Unsafe {
protected void flush0() {
// 是否正在刷新中,如果是,则直接返回,防止重复刷新
if (inFlush0) {
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
// 设置inFlush0标志位为true
inFlush0 = true;
// 如果Channel非active,则说明channel已经断开,
if (!isActive()) {
try {
if (!outboundBuffer.isEmpty()) {
// 如果Channel已经断开,但是outboundBuffer还有很多未发送信息,则需要将这些Entry消息释放掉,具体见4.1
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 发送消息到对端
// 调用NioSocketChannel.doWrite()方法,具体见4.2
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t, "flush0()"), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
}
}
4.1 ChannelOutboundBuffer.failFlushed() 非active的channel释放所有未发送信息
public final class ChannelOutboundBuffer {
void failFlushed(Throwable cause, boolean notify) {
// Make sure that this method does not reenter.主要是防重入
if (inFail) {
return;
}
try {
// 防重入,主要是通过inFail对象的设置来执行的
inFail = true;
for (;;) {
// 调用remove0方法删除,具体见4.1.1
if (!remove0(cause, notify)) {
break;
}
}
} finally {
inFail = false;
}
}
}
4.1.1 ChannelOutboundBuffer.remove0()
public final class ChannelOutboundBuffer {
private boolean remove0(Throwable cause, boolean notifyWritability) {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
// 释放消息msg
if (!e.cancelled) {
ReferenceCountUtil.safeRelease(msg);
safeFail(promise, cause);
decrementPendingOutboundBytes(size, false, notifyWritability);
}
// 将Entry对象回收到对象池中
e.recycle();
return true;
}
private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
}
Entry对象的删除并不复杂,主要就是将msg消息释放,然后将Entry对象回收到对象池中
4.2 NioSocketChannel.doWrite() 发送消息到对端public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
// 自旋写入次数,默认为16,可配置
int writeSpinCount = config().getWriteSpinCount();
do {
// 如果队列已经空了,则取消OP_WRITE事件关注
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
return;
}
// 获取每次写入的最大字节数
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 从队列中获取ByteBuffer数组,将ByteBuf转换为ByteBuffer
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// 根据不同的场景调用不同的write()方法
switch (nioBufferCnt) {
case 0:
// TODO
writeSpinCount -= doWrite0(in);
break;
case 1: {
// 数组中只有一个ByteBuffer对象,直接调用SocketChannel.write()方法来发送ByteBuffer数据
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
// 在这里调用
final int localWrittenBytes = ch.write(buffer);
// 若localWrittenBytes
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?