前一篇中我们主要讲解了ChannelHandler的几个抽象实现类,大致了解了下各个抽象实现类在不同场景中的运用。
本文就要一起来看下另一个比较好玩的问题,也是一个比较难点的问题,就是Netty如何解决TCP字节流的粘包、拆包问题。
1.什么是粘包、拆包?如果不涉及底层处理的话,我们很少会重视这个问题。TCP难道不是直接将消息发送到对端嘛?为什么会有粘包、拆包的问题呢?
先用几个图来展示下正常发包和粘包、拆包下的不同之处。
正常包传递:
package1和package2顺序发送,server也顺序接收到,并且,可以完整的分辨出package1和package2的内容边界。
当发生粘包时,包传递:
由于某种原因,package1和package2合并到一起发送给server,server无特殊处理情况下无法分辨完整的package1和package2边界。
当发生拆包时,包传递:
当client发送消息体过大时,TCP会将消息体拆成多个包发送到server端,若server没有进行包合并时,则无法分辨完整的消息体。
2.为什么会有粘包拆包通过上面的图示,我们大概了解了何为拆包、粘包?那么什么情况下会有拆包、粘包的问题呢?我们来总结下
2.1 粘包1)当多次发送的数据较小时,若开启了Nagle算法时,则这些数据会被合并发送;若要发送的数据小于发送缓冲区大小,TCP有可能将多次写入的数据一次性发送到server;
2)server端没有及时处理接收缓冲区中的数据,导致多次发送的数据堆积
2.2 拆包拆包的问题就比较简单了,若client端一次发送的数据过大(超过MSS=MTU-20[tcp header]-20[ip header]),则tcp会将该次数据,拆成多次进行发送。
Q:如何确认MTU值呢?
A:笔者的windows机器可以通过以下命令来确认
可以看到,对外连接的MTU都是1500byte
3.如何解决粘包、拆包问题?针对粘包问题,我们使用合理的边界符进行拆开就可以了;
针对拆包问题,我们只要知道了当前这次client发送的数据量大小(size)即可,如果server端接收到的数据长度小于size,则把当前接收到的数据先保存下来,并且合并到下一次接收到的数据中,直到server端接收到的数据总长度大于等于size,这时,解析size大小的数据即可。
那么,我们提出一个可行性方案的描述:
1)如果我们规定了\n换行符为两次消息的边界,那么理论上可以解决粘包问题。但是如果消息体本身就包含换行符,那么可能会误将一条消息拆分成多条消息展示。
2)如果我们在发送消息体之前,在消息体头部,将消息体的size也传递过去,server端在解析的时候,先解析头部的size,然后根据size来获取到足够大小的数据量再开始解析的话,应该也是可行的。
发送包的结构如下所示:
依据我们在3中提出的第二个方案,我们将消息体的长度与消息体本身一起发送到服务端,服务端先解析消息体长度字段size,然后向后读size个字节,就能获取全部消息体内容
本例代码来自于网络,地址 https://zhuanlan.zhihu.com/p/77275039
4.1 客户端代码public class SocketClient {
public static void main(String[] args) throws Exception {
// 要连接的服务端IP地址和端口
String host = "127.0.0.1";
int port = 55533;
// 与服务端建立连接
Socket socket = new Socket(host, port);
// 建立连接后获得输出流
OutputStream outputStream = socket.getOutputStream();
String message = "这是一个整包!!!";
byte[] contentBytes = message.getBytes("UTF-8");
System.out.println("contentBytes.length = " + contentBytes.length);
// 消息体长度为length,int类型,占4字节
int length = contentBytes.length;
byte[] lengthBytes = Utils.int2Bytes(length);
// 最终发送到server端的信息体为4+length字节
byte[] resultBytes = new byte[4 + length];
// 将length的4个字节复制到resultBytes中
System.arraycopy(lengthBytes, 0, resultBytes, 0, lengthBytes.length);
// 将contentBytes复制到resultBytes中
System.arraycopy(contentBytes, 0, resultBytes, 4, contentBytes.length);
for (int i = 0; i < 10; i++) {
// 一次性将包长度字段和包内容字段全部发送出去
outputStream.write(resultBytes);
}
Thread.sleep(20000);
outputStream.close();
socket.close();
}
}
4.2 服务端代码
public class SocketServer {
public static void main(String[] args) throws Exception {
// 监听指定的端口
int port = 55533;
ServerSocket server = new ServerSocket(port);
// server将一直等待连接的到来
System.out.println("server将一直等待连接的到来");
Socket socket = server.accept();
// 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024 * 128];
int len;
byte[] totalBytes = new byte[]{};
int totalLength = 0;
while ((len = inputStream.read(bytes)) != -1) {
//1. 将读取的数据和上一次遗留的数据拼起来
int tempLength = totalLength;
totalLength = len + totalLength;
byte[] tempBytes = totalBytes;
totalBytes = new byte[totalLength];
System.arraycopy(tempBytes, 0, totalBytes, 0, tempLength);
System.arraycopy(bytes, 0, totalBytes, tempLength, len);
// 这里的条件判断很重要,如果totalLength小于4,说明客户端发送的数据过少,甚至都没有将消息体长度发送过来
while (totalLength > 4) {
byte[] lengthBytes = new byte[4];
System.arraycopy(totalBytes, 0, lengthBytes, 0, lengthBytes.length);
int contentLength = Utils.bytes2Int(lengthBytes);
//2. 如果剩下数据小于数据头标的长度,则出现拆包,则需要将本次数据保存下来,继续获取后续数据,并合并到本次数据中
if (totalLength < contentLength + 4) {
break;
}
//3. 将数据头标的指定长度的数据取出则为应用数据
byte[] contentBytes = new byte[contentLength];
System.arraycopy(totalBytes, 4, contentBytes, 0, contentLength);
//注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
String content = new String(contentBytes, "UTF-8");
System.out.println("contentLength = " + contentLength + ", content: " + content);
//4. 去掉已读取的数据
totalLength -= (4 + contentLength);
byte[] leftBytes = new byte[totalLength];
System.arraycopy(totalBytes, 4 + contentLength, leftBytes, 0, totalLength);
totalBytes = leftBytes;
}
}
inputStream.close();
socket.close();
server.close();
}
}
总结:通过这种size+body的方式,我们首先来解析size字段,获取之后,再获取size个字节作为body;如果暂时无法获取size个字节,则暂存已经获取的字节,跳过本次处理,继续读取数据,直到获取size个字节为止。
5.Netty提供的解决方案
关于粘包、拆包的问题,Netty提供了多种场景下的解决方案。不可谓不强大。具体有哪些呢?就是图中ByteToMessageDecoder的实现类和MessageToMessageDecoder的实现类。本文主要关注下ByteToMessageDecoder和其相关实现类。这个才是核心。
本文主要关注的实现类有FixedLengthFrameDecoder、LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder
FixedLengthFrameDecoder:基于固定长度消息进行处理;
LengthFieldBasedFrameDecoder:基于消息头指定长度进行处理;
LineBasedFrameDecoder:基于换行符来进行处理;
DelimiterBasedFrameDecoder:基于指定的消息边界符来进行处理;
其中FixedLengthFrameDecoder是LengthFieldBasedFrameDecoder的一种特殊情况;
LineBasedFrameDecoder是DelimiterBasedFrameDecoder的一种特殊情况;
所以,本文中进行源码解析的主要就是FixedLengthFrameDecoder和LineBasedFrameDecoder。
后续有时间再对另外两种进行解析;
5.1 ByteToMessageDecoder// {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an other Message type.
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
// 这个类比较关键,是一个累加器,上面示例中如果消息体没有达到size大小,在这里会将消息暂存在Cumulator上
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
// cumulate的入参和出参分别代表:
// alloc:ByteBuf分配器
// cumulation:当前累计结果,都存在到这个ByteBuf中,因为ByteToMessageDecoder对象在创建的时候已经确定了,所以是唯一的
// in:当前读取的ByteBuf内容
// 返回值:新的累计结果(将原有的cumulation加上新读入的in,返回两个的合集 )
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// 容量不够,则需要先扩容
return expandCumulation(alloc, cumulation, in);
}
// 将新读入的in,写入到cumulation中
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// 释放新写入的in
in.release();
}
}
};
...
// 重点的方法,读
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
// 如果first=true,则说明是第一次读入数据,则设置cumulation为null
// 否则就将本次读入的数据msg合并到之前的cumulation中
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 执行读入解码操作
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// 在读入一定的次数后,释放一部分已读取的数据
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
// 在这里
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) {
try {
while (in.isReadable()) {
int outSize = out.size();
// 如果已经有合适的写入数据则outSize会大于0,这样就可以继续将数据传递到下一个ChannelHandler
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 记录可读字节数,执行解码
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
...
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 将解码消息的工作交由子类完成
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
}
总结看来,ByteToMessageDecoder的工作比较明确,就是不断读取流入的字节数据,
交由子类来完成真正的解码工作,如果数据量不足,则子类直接放弃本次解码,等数据读取足够
时再解码。
下面来看一下它的真正实现类的操作。
5.2 FixedLengthFrameDecoder(固定长度字段的解码器)public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
// 指定获取多长字节后进行解码
private final int frameLength;
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 很简单,如果可读的字段小于指定长度,则直接返回null
// 结合父类来看,则其会跳过本次执行,直接进行下一次读取
if (in.readableBytes() < frameLength) {
return null;
} else {
// 有足够数据,则切割指定长度ByteBuf,传递到下一个ChannelHandler来执行
return in.readRetainedSlice(frameLength);
}
}
}
5.3 LineBasedFrameDecoder(基于换行符的解码器)
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 寻找结束标志位\n等
final int eol = findEndOfLine(buffer);
if (!discarding) {
if (eol >= 0) {
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 这个最大长度设置还是蛮坑爹的,
// 如果maxLength设置的过小,当读入数据已经超过maxLength限制,则直接报错,并忽略这些数据
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
// 正常情况下,读到换行符时,则直接切割当前index到换行符为止的字节数组
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
// 返回frame ByteBuf,并添加到out这个自定义List中,供后续的ChannelHandler使用
return frame;
} else {
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
offset = 0;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
// 如果当前读入的数据没有读取到换行符,则直接进行下一次读取
return null;
}
} else {
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
// We skip everything in the buffer, we need to set the offset to 0 again.
offset = 0;
}
return null;
}
}
}
情况比较复杂,主要是加入了maxLength的限制,这个比较闹心,如果看着实在烦的话,我们可以只
关注主要分支,对其拆包有个了解即可。
总结: