您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析- 粘包拆包的原理及处理方案

恐龙弟旺仔 发布时间:2021-12-23 12:36:31 ,浏览量:0

前言:

    前一篇中我们主要讲解了ChannelHandler的几个抽象实现类,大致了解了下各个抽象实现类在不同场景中的运用。

    本文就要一起来看下另一个比较好玩的问题,也是一个比较难点的问题,就是Netty如何解决TCP字节流的粘包、拆包问题。

1.什么是粘包、拆包?

    如果不涉及底层处理的话,我们很少会重视这个问题。TCP难道不是直接将消息发送到对端嘛?为什么会有粘包、拆包的问题呢?

    先用几个图来展示下正常发包和粘包、拆包下的不同之处。

    正常包传递:

 package1和package2顺序发送,server也顺序接收到,并且,可以完整的分辨出package1和package2的内容边界。

    当发生粘包时,包传递:

 由于某种原因,package1和package2合并到一起发送给server,server无特殊处理情况下无法分辨完整的package1和package2边界。

    当发生拆包时,包传递:

 当client发送消息体过大时,TCP会将消息体拆成多个包发送到server端,若server没有进行包合并时,则无法分辨完整的消息体。

2.为什么会有粘包拆包

    通过上面的图示,我们大概了解了何为拆包、粘包?那么什么情况下会有拆包、粘包的问题呢?我们来总结下

2.1 粘包

    1)当多次发送的数据较小时,若开启了Nagle算法时,则这些数据会被合并发送;若要发送的数据小于发送缓冲区大小,TCP有可能将多次写入的数据一次性发送到server;

    2)server端没有及时处理接收缓冲区中的数据,导致多次发送的数据堆积

2.2 拆包

    拆包的问题就比较简单了,若client端一次发送的数据过大(超过MSS=MTU-20[tcp header]-20[ip header]),则tcp会将该次数据,拆成多次进行发送。

    Q:如何确认MTU值呢?

    A:笔者的windows机器可以通过以下命令来确认

 可以看到,对外连接的MTU都是1500byte

3.如何解决粘包、拆包问题?

    针对粘包问题,我们使用合理的边界符进行拆开就可以了;

    针对拆包问题,我们只要知道了当前这次client发送的数据量大小(size)即可,如果server端接收到的数据长度小于size,则把当前接收到的数据先保存下来,并且合并到下一次接收到的数据中,直到server端接收到的数据总长度大于等于size,这时,解析size大小的数据即可。

    那么,我们提出一个可行性方案的描述:

    1)如果我们规定了\n换行符为两次消息的边界,那么理论上可以解决粘包问题。但是如果消息体本身就包含换行符,那么可能会误将一条消息拆分成多条消息展示。

    2)如果我们在发送消息体之前,在消息体头部,将消息体的size也传递过去,server端在解析的时候,先解析头部的size,然后根据size来获取到足够大小的数据量再开始解析的话,应该也是可行的。

    发送包的结构如下所示:

 4.自定义方案解决拆包、粘包问题

    依据我们在3中提出的第二个方案,我们将消息体的长度与消息体本身一起发送到服务端,服务端先解析消息体长度字段size,然后向后读size个字节,就能获取全部消息体内容

    本例代码来自于网络,地址 https://zhuanlan.zhihu.com/p/77275039 

4.1 客户端代码
public class SocketClient {
    public static void main(String[] args) throws Exception {
        // 要连接的服务端IP地址和端口
        String host = "127.0.0.1";
        int port = 55533;
        // 与服务端建立连接
        Socket socket = new Socket(host, port);
        // 建立连接后获得输出流
        OutputStream outputStream = socket.getOutputStream();
        String message = "这是一个整包!!!";
        byte[] contentBytes = message.getBytes("UTF-8");
        System.out.println("contentBytes.length = " + contentBytes.length);
        // 消息体长度为length,int类型,占4字节
        int length = contentBytes.length;
        byte[] lengthBytes = Utils.int2Bytes(length);
        // 最终发送到server端的信息体为4+length字节
        byte[] resultBytes = new byte[4 + length];
        // 将length的4个字节复制到resultBytes中
        System.arraycopy(lengthBytes, 0, resultBytes, 0, lengthBytes.length);
        // 将contentBytes复制到resultBytes中
        System.arraycopy(contentBytes, 0, resultBytes, 4, contentBytes.length);

        for (int i = 0; i < 10; i++) {
            // 一次性将包长度字段和包内容字段全部发送出去
            outputStream.write(resultBytes);
        }
        Thread.sleep(20000);
        outputStream.close();
        socket.close();
    }
}
4.2 服务端代码
public class SocketServer {
    public static void main(String[] args) throws Exception {
        // 监听指定的端口
        int port = 55533;
        ServerSocket server = new ServerSocket(port);
        // server将一直等待连接的到来
        System.out.println("server将一直等待连接的到来");
        Socket socket = server.accept();
        // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
        InputStream inputStream = socket.getInputStream();
        byte[] bytes = new byte[1024 * 128];
        int len;
        byte[] totalBytes = new byte[]{};
        int totalLength = 0;
        while ((len = inputStream.read(bytes)) != -1) {
            //1. 将读取的数据和上一次遗留的数据拼起来
            int tempLength = totalLength;
            totalLength = len + totalLength;
            byte[] tempBytes = totalBytes;
            totalBytes = new byte[totalLength];
            System.arraycopy(tempBytes, 0, totalBytes, 0, tempLength);
            System.arraycopy(bytes, 0, totalBytes, tempLength, len);
            // 这里的条件判断很重要,如果totalLength小于4,说明客户端发送的数据过少,甚至都没有将消息体长度发送过来
            while (totalLength > 4) {
                byte[] lengthBytes = new byte[4];
                System.arraycopy(totalBytes, 0, lengthBytes, 0, lengthBytes.length);
                int contentLength = Utils.bytes2Int(lengthBytes);
                //2. 如果剩下数据小于数据头标的长度,则出现拆包,则需要将本次数据保存下来,继续获取后续数据,并合并到本次数据中
                if (totalLength < contentLength + 4) {
                    break;
                }
                //3. 将数据头标的指定长度的数据取出则为应用数据
                byte[] contentBytes = new byte[contentLength];
                System.arraycopy(totalBytes, 4, contentBytes, 0, contentLength);
                //注意指定编码格式,发送方和接收方一定要统一,建议使用UTF-8
                String content = new String(contentBytes, "UTF-8");
                System.out.println("contentLength = " + contentLength + ", content: " + content);
                //4. 去掉已读取的数据
                totalLength -= (4 + contentLength);
                byte[] leftBytes = new byte[totalLength];
                System.arraycopy(totalBytes, 4 + contentLength, leftBytes, 0, totalLength);
                totalBytes = leftBytes;
            }
        }
        inputStream.close();
        socket.close();
        server.close();
    }
}

总结:通过这种size+body的方式,我们首先来解析size字段,获取之后,再获取size个字节作为body;如果暂时无法获取size个字节,则暂存已经获取的字节,跳过本次处理,继续读取数据,直到获取size个字节为止。

5.Netty提供的解决方案

    关于粘包、拆包的问题,Netty提供了多种场景下的解决方案。不可谓不强大。具体有哪些呢?就是图中ByteToMessageDecoder的实现类和MessageToMessageDecoder的实现类。本文主要关注下ByteToMessageDecoder和其相关实现类。这个才是核心。

本文主要关注的实现类有FixedLengthFrameDecoder、LineBasedFrameDecoder、DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder

FixedLengthFrameDecoder:基于固定长度消息进行处理;

LengthFieldBasedFrameDecoder:基于消息头指定长度进行处理;

LineBasedFrameDecoder:基于换行符来进行处理;

DelimiterBasedFrameDecoder:基于指定的消息边界符来进行处理;

其中FixedLengthFrameDecoder是LengthFieldBasedFrameDecoder的一种特殊情况;

LineBasedFrameDecoder是DelimiterBasedFrameDecoder的一种特殊情况;

所以,本文中进行源码解析的主要就是FixedLengthFrameDecoder和LineBasedFrameDecoder。

后续有时间再对另外两种进行解析;

5.1 ByteToMessageDecoder
// {@link ChannelInboundHandlerAdapter} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an other Message type.
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
    // 这个类比较关键,是一个累加器,上面示例中如果消息体没有达到size大小,在这里会将消息暂存在Cumulator上
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        // cumulate的入参和出参分别代表:
        // alloc:ByteBuf分配器
        // cumulation:当前累计结果,都存在到这个ByteBuf中,因为ByteToMessageDecoder对象在创建的时候已经确定了,所以是唯一的
        // in:当前读取的ByteBuf内容
        // 返回值:新的累计结果(将原有的cumulation加上新读入的in,返回两个的合集            )
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            if (!cumulation.isReadable() && in.isContiguous()) {
                cumulation.release();
                return in;
            }
            try {
                final int required = in.readableBytes();
                if (required > cumulation.maxWritableBytes() ||
                        (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                        cumulation.isReadOnly()) {
                    // 容量不够,则需要先扩容
                    return expandCumulation(alloc, cumulation, in);
                }
                // 将新读入的in,写入到cumulation中
                cumulation.writeBytes(in, in.readerIndex(), required);
                in.readerIndex(in.writerIndex());
                return cumulation;
            } finally {
                // 释放新写入的in
                in.release();
            }
        }
    };
    ...
    // 重点的方法,读
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                // 如果first=true,则说明是第一次读入数据,则设置cumulation为null
                // 否则就将本次读入的数据msg合并到之前的cumulation中
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                // 执行读入解码操作
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                try {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++numReads >= discardAfterReads) {
                        // 在读入一定的次数后,释放一部分已读取的数据
                        numReads = 0;
                        discardSomeReadBytes();
                    }

                    int size = out.size();
                    firedChannelRead |= out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                } finally {
                    out.recycle();
                }
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
    
    // 在这里
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                // 如果已经有合适的写入数据则outSize会大于0,这样就可以继续将数据传递到下一个ChannelHandler
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                // 记录可读字节数,执行解码
                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                if (ctx.isRemoved()) {
                    break;
                }
                ...
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }
    
    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            // 将解码消息的工作交由子类完成
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                fireChannelRead(ctx, out, out.size());
                out.clear();
                handlerRemoved(ctx);
            }
        }
    }
}
 

总结看来,ByteToMessageDecoder的工作比较明确,就是不断读取流入的字节数据,

交由子类来完成真正的解码工作,如果数据量不足,则子类直接放弃本次解码,等数据读取足够

时再解码。

下面来看一下它的真正实现类的操作。

5.2 FixedLengthFrameDecoder(固定长度字段的解码器)
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    // 指定获取多长字节后进行解码
    private final int frameLength;
    
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    
    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 很简单,如果可读的字段小于指定长度,则直接返回null
        // 结合父类来看,则其会跳过本次执行,直接进行下一次读取
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            // 有足够数据,则切割指定长度ByteBuf,传递到下一个ChannelHandler来执行
            return in.readRetainedSlice(frameLength);
        }
    }
}
5.3 LineBasedFrameDecoder(基于换行符的解码器)
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
    
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }
    
    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        // 寻找结束标志位\n等
        final int eol = findEndOfLine(buffer);
        if (!discarding) {
            if (eol >= 0) {
                final ByteBuf frame;
                final int length = eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

                // 这个最大长度设置还是蛮坑爹的,
                // 如果maxLength设置的过小,当读入数据已经超过maxLength限制,则直接报错,并忽略这些数据
                if (length > maxLength) {
                    buffer.readerIndex(eol + delimLength);
                    fail(ctx, length);
                    return null;
                }

                // 正常情况下,读到换行符时,则直接切割当前index到换行符为止的字节数组
                if (stripDelimiter) {
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);
                } else {
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                // 返回frame ByteBuf,并添加到out这个自定义List中,供后续的ChannelHandler使用
                return frame;
            } else {
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    discardedBytes = length;
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    offset = 0;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                // 如果当前读入的数据没有读取到换行符,则直接进行下一次读取
                return null;
            }
        } else {
            if (eol >= 0) {
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {
                discardedBytes += buffer.readableBytes();
                buffer.readerIndex(buffer.writerIndex());
                // We skip everything in the buffer, we need to set the offset to 0 again.
                offset = 0;
            }
            return null;
        }
    }
}
 

情况比较复杂,主要是加入了maxLength的限制,这个比较闹心,如果看着实在烦的话,我们可以只

关注主要分支,对其拆包有个了解即可。

 
总结: 
    Netty提供了完善的针对TCP粘包、拆包的解决方案,在使用时,我们选择合适的方案即可。
除了这些基础的实现类之外,Netty还提供了针对特定场景,比如http、Json、XML等输入流的解析方案。
我们在实际开发中直接使用即可。

   

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.1164s