您当前的位置: 首页 > 

庄小焱

暂无认证

  • 3浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty——拆包LineBasedFrameDecoder源码分析

庄小焱 发布时间:2021-11-13 17:13:09 ,浏览量:3

摘要

Netty 自带多个粘包拆包解码器。今天介绍 LineBasedFrameDecoder,换行符解码器。

行拆包器

这个类叫做 LineBasedFrameDecoder,基于行分隔符的拆包器,TA可以同时处理 \n以及\r\n两种类型的行分隔符,核心方法都在继承的 decode 方法中。

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

netty 中自带的拆包器都是如上这种模板,我们来看看decode(ctx, in);

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
    int eol = findEndOfLine(buffer);
    int length;
    int length;
    if (!this.discarding) {
        if (eol >= 0) {
            length = eol - buffer.readerIndex();
            int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1;
            if (length > this.maxLength) {
                buffer.readerIndex(eol + delimLength);
                this.fail(ctx, length);
                return null;
            } else {
                ByteBuf frame;
                if (this.stripDelimiter) {
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);
                } else {
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                return frame;
            }
        } else {
            length = buffer.readableBytes();
            if (length > this.maxLength) {
                this.discardedBytes = length;
                buffer.readerIndex(buffer.writerIndex());
                this.discarding = true;
                if (this.failFast) {
                    this.fail(ctx, "over " + this.discardedBytes);
                }
            }

            return null;
        }
    } else {
        if (eol >= 0) {
            length = this.discardedBytes + eol - buffer.readerIndex();
            length = buffer.getByte(eol) == '\r' ? 2 : 1;
            buffer.readerIndex(eol + length);
            this.discardedBytes = 0;
            this.discarding = false;
            if (!this.failFast) {
                this.fail(ctx, length);
            }
        } else {
            this.discardedBytes += buffer.readableBytes();
            buffer.readerIndex(buffer.writerIndex());
        }

        return null;
    }
}

ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');

private static int findEndOfLine(ByteBuf buffer) {
    int i = buffer.forEachByte(ByteProcessor.FIND_LF);
    if (i > 0 && buffer.getByte(i - 1) == '\r') {
        --i;
    }

    return i;
}
找到换行符位置
final int eol = findEndOfLine(buffer);

private static int findEndOfLine(final ByteBuf buffer) {
    int i = buffer.forEachByte(ByteProcessor.FIND_LF);
    if (i > 0 && buffer.getByte(i - 1) == '\r') {
        i--;
    }
    return i;
}

ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');

for循环遍历,找到第一个 \n 的位置,如果\n前面的字符为\r,那就返回\r的位置

非discarding模式的处理

接下来,netty会判断,当前拆包是否属于丢弃模式,用一个成员变量来标识

private boolean discarding;

第一次拆包不在discarding模式
非discarding模式下找到行分隔符的处理
// 1.计算分隔符和包长度
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

// 丢弃异常数据
if (length > maxLength) {
    buffer.readerIndex(eol + delimLength);
    fail(ctx, length);
    return null;
}

// 取包的时候是否包括分隔符
if (stripDelimiter) {
    frame = buffer.readRetainedSlice(length);
    buffer.skipBytes(delimLength);
} else {
    frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
  • 1.首先,新建一个帧,计算一下当前包的长度和分隔符的长度(因为有两种分隔符)
  • 2.然后判断一下需要拆包的长度是否大于该拆包器允许的最大长度(maxLength),这个参数在构造函数中被传递进来,如超出允许的最大长度,就将这段数据抛弃,返回null
  • 3.最后,将一个完整的数据包取出,如果构造本解包器的时候指定 stripDelimiter为false,即解析出来的包包含分隔符,默认为不包含分隔符。
非discarding模式下未找到分隔符的处理

没有找到对应的行分隔符,说明字节容器没有足够的数据拼接成一个完整的业务数据包,进入如下流程处理。

final int length = buffer.readableBytes();
if (length > maxLength) {
    discardedBytes = length;
    buffer.readerIndex(buffer.writerIndex());
    discarding = true;
    if (failFast) {
        fail(ctx, "over " + discardedBytes);
    }
}
return null;

首先取得当前字节容器的可读字节个数,接着,判断一下是否已经超过可允许的最大长度,如果没有超过,直接返回null,字节容器中的数据没有任何改变,否则,就需要进入丢弃模式。

使用一个成员变量 discardedBytes 来表示已经丢弃了多少数据,然后将字节容器的读指针移到写指针,意味着丢弃这一部分数据,设置成员变量discarding为true表示当前处于丢弃模式。如果设置了failFast,那么直接抛出异常,默认情况下failFast为false,即安静得丢弃数据。

discarding模式

如果解包的时候处在discarding模式,也会有两种情况发生

discarding模式下找到行分隔符

在discarding模式下,如果找到分隔符,那可以将分隔符之前的都丢弃掉

final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
    fail(ctx, length);
}

计算出分隔符的长度之后,直接把分隔符之前的数据全部丢弃,当然丢弃的字符也包括分隔符,经过这么一次丢弃,后面就有可能是正常的数据包,下一次解包的时候就会进入正常的解包流程。

discarding模式下未找到行分隔符

这种情况比较简单,因为当前还在丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完,当前读取的数据是丢弃的一部分,所以直接丢弃。

discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
特定分隔符拆包

这个类叫做 DelimiterBasedFrameDecoder,可以传递给TA一个分隔符列表,数据包会按照分隔符列表进行拆分,读者可以完全根据行拆包器的思路去分析这个DelimiterBasedFrameDecoder

自定义协议解码器

LengthFieldBasedFrameDecoder解码器自定义协议。通常,协议的格式如下:

LengthFieldBasedFrameDecoder是netty解决拆包粘包问题的一个重要的类,主要结构就是header+body结构。我们只需要传入正确的参数就可以发送和接收正确的数据,那么重点就在于这几个参数的意义。下面我们就具体了解一下这几个参数的意义。先来看一下LengthFieldBasedFrameDecoder主要的构造方法:

public LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip)

那么这几个重要的参数如下:

  • maxFrameLength:最大帧长度。也就是可以接收的数据的最大长度。如果超过,此次数据会被丢弃。
  • lengthFieldOffset:长度域偏移。就是说数据开始的几个字节可能不是表示数据长度,需要后移几个字节才是长度域。
  • lengthFieldLength:长度域字节数。用几个字节来表示数据长度。
  • lengthAdjustment:数据长度修正。因为长度域指定的长度可以使header+body的整个长度,也可以只是body的长度。如果表示header+body的整个长度,那么我们需要修正数据长度。
  •  initialBytesToStrip:跳过的字节数。如果你需要接收header+body的所有数据,此值就是0,如果你只想接收body数据,那么需要跳过header所占用的字节数。
LengthFieldBasedFrameDecoder 的用法

需求1:长度域为2个字节,我们要求发送和接收的数据如下所示:

   发送的数据 (14 bytes)               接收到数据 (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
|  12    | "HELLO, WORLD" |      |   12   | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+

留心的你肯定发现了,长度域只是实际内容的长度,不包括长度域的长度。下面是参数的值:

  • lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
  • lengthFieldLength=2:长度域2个字节。
  • lengthAdjustment=0:数据长度修正为0,因为长度域只包含数据的长度,所以不需要修正。
  • initialBytesToStrip=0:发送和接收的数据完全一致,所以不需要跳过任何字节。

需求2:长度域为2个字节,我们要求发送和接收的数据如下所示:

     发送的数据 (14 bytes)        接收到数据 (12 bytes)
+--------+----------------+      +----------------+
| Length | Actual Content |----->| Actual Content |
|  12    | "HELLO, WORLD" |      | "HELLO, WORLD" |
+--------+----------------+      +----------------+

参数值如下:

  • lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
  • lengthFieldLength=2:长度域2个字节。
  • lengthAdjustment=0:数据长度修正为0,因为长度域只包含数据的长度,所以不需要修正。
  • initialBytesToStrip=2:我们发现接收的数据没有长度域的数据,所以要跳过长度域的2个字节

需求3:长度域为2个字节,我们要求发送和接收的数据如下所示:

  BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
+--------+----------------+      +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 14     | "HELLO, WORLD" |      |  14    | "HELLO, WORLD" |
+--------+----------------+      +--------+----------------+

留心的你肯定又发现了,长度域表示的长度是总长度 也就是header+body的总长度。参数如下:

  • lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
  • lengthFieldLength=2:长度域2个字节。
  • lengthAdjustment=-2:因为长度域为总长度,所以我们需要修正数据长度,也就是减去2。
  • initialBytesToStrip=0:我们发现接收的数据没有长度域的数据,所以要跳过长度域的2个字节。

需求4:长度域为2个字节,我们要求发送和接收的数据如下所示:

      BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+      +----------+----------+----------------+
| meta     |  Length  | Actual Content |----->| meta | Length | Actual Content |
|  0xCAFE  | 12       | "HELLO, WORLD" |      |  0xCAFE  | 12       | "HELLO, WORLD" |
+----------+----------+----------------+      +----------+----------+----------------+

我们发现,数据的结构有点变化,变成了 meta+header+body的结构。meta一般表示元数据,魔数等。我们定义这里meta有三个字节。参数如下:

  • lengthFieldOffset=3:开始的3个字节是meta,然后才是长度域,所以长度域偏移为3。
  • lengthFieldLength=2:长度域2个字节。
  • lengthAdjustment=0:长度域指定的长度位数据长度,所以数据长度不需要修正。
  • initialBytesToStrip=0:发送和接收数据相同,不需要跳过数据。

需求5:长度域为2个字节,我们要求发送和接收的数据如下所示:

       BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
+----------+----------+----------------+      +----------+----------+----------------+
|  Length  | meta     | Actual Content |----->| Length | meta | Actual Content |
|   12     |  0xCAFE  | "HELLO, WORLD" |      |    12    |  0xCAFE  | "HELLO, WORLD" |
+----------+----------+----------------+      +----------+----------+----------------+

我们发现,数据的结构有点变化,变成了 header+meta+body的结构。meta一般表示元数据,魔数等。我们定义这里meta有三个字节。参数如下:

  • lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
  • lengthFieldLength=2:长度域2个字节。
  • lengthAdjustment=3:我们需要把meta+body当做body处理,所以数据长度需要加3。
  • initialBytesToStrip=0:发送和接收数据相同,不需要跳过数据。

需求6:长度域为2个字节,我们要求发送和接收的数据如下所示:

      BEFORE DECODE (16 bytes)                    AFTER DECODE (13 bytes)
+------+--------+------+----------------+      +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+      +------+----------------+

我们发现,数据的结构有点变化,变成了 hdr1+header+hdr2+body的结构。我们定义这里hdr1和hdr2都只有1个字节。参数如下:

  • lengthFieldOffset=1:开始的1个字节是长度域,所以需要设置长度域偏移为1。
  • lengthFieldLength=2:长度域2个字节。
  • lengthAdjustment=1:我们需要把hdr2+body当做body处理,所以数据长度需要加1。
  • initialBytesToStrip=3:接收数据不包括hdr1和长度域相同,所以需要跳过3个字节。
LengthFieldBasedFrameDecoder 源码剖析
具体的拆包协议只需要实现

void decode(ChannelHandlerContext ctx, ByteBuf in, List out) 

其中 in 表示目前为止还未拆的数据,拆完之后的包添加到 out这个list中即可实现包向下传递,第一层实现比较简单。

@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}

重载的protected函数decode做真正的拆包动作。

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    if (this.discardingTooLongFrame) {
        long bytesToDiscard = this.bytesToDiscard;
        int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes());
        in.skipBytes(localBytesToDiscard);
        bytesToDiscard -= (long)localBytesToDiscard;
        this.bytesToDiscard = bytesToDiscard;
        this.failIfNecessary(false);
    }

    // 如果当前可读字节还未达到长度长度域的偏移,那说明肯定是读不到长度域的,直接不读
    if (in.readableBytes() < this.lengthFieldEndOffset) {
        return null;
    } else {
        // 拿到长度域的实际字节偏移,就是长度域的开始下标
        // 这里就是需求4,开始的几个字节并不是长度域
        int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
        // 拿到实际的未调整过的包长度
        // 就是读取长度域的十进制值,最原始传过来的包的长度
        long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
        // 如果拿到的长度为负数,直接跳过长度域并抛出异常
        if (frameLength < 0L) {
            in.skipBytes(this.lengthFieldEndOffset);
            throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
        } else {
            // 调整包的长度
            frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
            // 整个数据包的长度还没有长度域长,直接抛出异常
            if (frameLength < (long)this.lengthFieldEndOffset) {
                in.skipBytes(this.lengthFieldEndOffset);
                throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset);
            // 数据包长度超出最大包长度,进入丢弃模式
            } else if (frameLength > (long)this.maxFrameLength) {
                long discard = frameLength - (long)in.readableBytes();
                this.tooLongFrameLength = frameLength;
                if (discard < 0L) {
                    in.skipBytes((int)frameLength);
                } else {
                    this.discardingTooLongFrame = true;
                    this.bytesToDiscard = discard;
                    in.skipBytes(in.readableBytes());
                }

                this.failIfNecessary(true);
                return null;
            } else {
                int frameLengthInt = (int)frameLength;
                //当前可读的字节数小于包中的length,什么都不做,等待下一次解码
                if (in.readableBytes() < frameLengthInt) {
                    return null;
                //跳过的字节不能大于数据包的长度,否则就抛出 CorruptedFrameException 的异常
                } else if (this.initialBytesToStrip > frameLengthInt) {
                    in.skipBytes(frameLengthInt);
                    throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
                } else {
                    //根据initialBytesToStrip的设置来跳过某些字节
                    in.skipBytes(this.initialBytesToStrip);
                    //拿到当前累积数据的读指针
                    int readerIndex = in.readerIndex();
                    //拿到待抽取数据包的实际长度
                    int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
                    //进行抽取
                    ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
                    //移动读指针
                    in.readerIndex(readerIndex + actualFrameLength);
                    return frame;
                }
            }
        }
    }
}

获取frame长度:获取需要待拆包的包大小

// 拿到长度域的实际字节偏移,就是长度域的开始下标
// 这里就是需求4,开始的几个字节并不是长度域

int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;

// 拿到实际的未调整过的包长度
// 就是读取长度域的十进制值,最原始传过来的包的长度

long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);

// 调整包的长度

frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);

上面这一段内容有个扩展点 getUnadjustedFrameLength,如果你的长度域代表的值表达的含义不是正常的int,short等基本类型,你可以重写这个函数。

protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
    buf = buf.order(order);
    long frameLength;
    switch (length) {
    case 1:
        frameLength = buf.getUnsignedByte(offset);
        break;
    case 2:
        frameLength = buf.getUnsignedShort(offset);
        break;
    case 3:
        frameLength = buf.getUnsignedMedium(offset);
        break;
    case 4:
        frameLength = buf.getUnsignedInt(offset);
        break;
    case 8:
        frameLength = buf.getLong(offset);
        break;
    default:
        throw new DecoderException(
                "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
    }
    return frameLength;
}

跳过指定字节长度

int frameLengthInt = (int)frameLength;
//当前可读的字节数小于包中的length,什么都不做,等待下一次解码
if (in.readableBytes() < frameLengthInt) {
    return null;
//跳过的字节不能大于数据包的长度,否则就抛出 CorruptedFrameException 的异常
} else if (this.initialBytesToStrip > frameLengthInt) {
    in.skipBytes(frameLengthInt);
    throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
}
//根据initialBytesToStrip的设置来跳过某些字节
in.skipBytes(this.initialBytesToStrip);

先验证当前是否已经读到足够的字节,如果读到了,在下一步抽取一个完整的数据包之前,需要根据initialBytesToStrip的设置来跳过某些字节(见文章开篇),当然,跳过的字节不能大于数据包的长度,否则就抛出 CorruptedFrameException 的异常。

抽取frame

//根据initialBytesToStrip的设置来跳过某些字节
in.skipBytes(this.initialBytesToStrip);
//拿到当前累积数据的读指针
int readerIndex = in.readerIndex();
//拿到待抽取数据包的实际长度
int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
//进行抽取
ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
//移动读指针
in.readerIndex(readerIndex + actualFrameLength);
return frame;

到了最后抽取数据包其实就很简单了,拿到当前累积数据的读指针,然后拿到待抽取数据包的实际长度进行抽取,抽取之后,移动读指针。

protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
    return buffer.retainedSlice(index, length);
}

抽取的过程是简单的调用了一下 ByteBuf 的retainedSliceapi,该api无内存copy开销。

自定义解码器实战
public class MyProtocolBean {
    //类型  系统编号 0xA 表示A系统,0xB 表示B系统
    private byte type;

    //信息标志  0xA 表示心跳包    0xB 表示超时包  0xC 业务信息包
    private byte flag;

    //内容长度
    private int length;

    //内容
    private String content;

    //省略get/set
}
服务端的实现
public class Server {

    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  //最大长度
    private static final int LENGTH_FIELD_LENGTH = 4;  //长度字段所占的字节数
    private static final int LENGTH_FIELD_OFFSET = 2;  //长度偏移
    private static final int LENGTH_ADJUSTMENT = 0;
    private static final int INITIAL_BYTES_TO_STRIP = 0;

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer() {

                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
                            ch.pipeline().addLast(new ServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port );
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new Server(port).start();
    }
}
自定义解码器MyProtocolDecoder
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {

    private static final int HEADER_SIZE = 6;

    /**
     * @param maxFrameLength  帧的最大长度
     * @param lengthFieldOffset length字段偏移的地址
     * @param lengthFieldLength length字段所占的字节长
     * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
     * @param initialBytesToStrip 解析时候跳过多少个长度
     * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
     */

    public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {

        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);

    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
        in = (ByteBuf) super.decode(ctx,in);  

        if(in == null){
            return null;
        }
        if(in.readableBytes()            
关注
打赏
1657692713
查看更多评论
0.0460s