Netty 自带多个粘包拆包解码器。今天介绍 LineBasedFrameDecoder,换行符解码器。
行拆包器这个类叫做 LineBasedFrameDecoder
,基于行分隔符的拆包器,TA可以同时处理 \n
以及\r\n
两种类型的行分隔符,核心方法都在继承的 decode
方法中。
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
netty 中自带的拆包器都是如上这种模板,我们来看看decode(ctx, in);
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
int eol = findEndOfLine(buffer);
int length;
int length;
if (!this.discarding) {
if (eol >= 0) {
length = eol - buffer.readerIndex();
int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1;
if (length > this.maxLength) {
buffer.readerIndex(eol + delimLength);
this.fail(ctx, length);
return null;
} else {
ByteBuf frame;
if (this.stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
}
} else {
length = buffer.readableBytes();
if (length > this.maxLength) {
this.discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
this.discarding = true;
if (this.failFast) {
this.fail(ctx, "over " + this.discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
length = this.discardedBytes + eol - buffer.readerIndex();
length = buffer.getByte(eol) == '\r' ? 2 : 1;
buffer.readerIndex(eol + length);
this.discardedBytes = 0;
this.discarding = false;
if (!this.failFast) {
this.fail(ctx, length);
}
} else {
this.discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');
private static int findEndOfLine(ByteBuf buffer) {
int i = buffer.forEachByte(ByteProcessor.FIND_LF);
if (i > 0 && buffer.getByte(i - 1) == '\r') {
--i;
}
return i;
}
找到换行符位置
final int eol = findEndOfLine(buffer);
private static int findEndOfLine(final ByteBuf buffer) {
int i = buffer.forEachByte(ByteProcessor.FIND_LF);
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
return i;
}
ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');
for循环遍历,找到第一个 \n
的位置,如果\n
前面的字符为\r
,那就返回\r
的位置
接下来,netty会判断,当前拆包是否属于丢弃模式,用一个成员变量来标识
private boolean discarding;
第一次拆包不在discarding模式
非discarding模式下找到行分隔符的处理
// 1.计算分隔符和包长度
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 丢弃异常数据
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
// 取包的时候是否包括分隔符
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
- 1.首先,新建一个帧,计算一下当前包的长度和分隔符的长度(因为有两种分隔符)
- 2.然后判断一下需要拆包的长度是否大于该拆包器允许的最大长度(
maxLength
),这个参数在构造函数中被传递进来,如超出允许的最大长度,就将这段数据抛弃,返回null - 3.最后,将一个完整的数据包取出,如果构造本解包器的时候指定
stripDelimiter
为false,即解析出来的包包含分隔符,默认为不包含分隔符。
没有找到对应的行分隔符,说明字节容器没有足够的数据拼接成一个完整的业务数据包,进入如下流程处理。
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
首先取得当前字节容器的可读字节个数,接着,判断一下是否已经超过可允许的最大长度,如果没有超过,直接返回null,字节容器中的数据没有任何改变,否则,就需要进入丢弃模式。
使用一个成员变量 discardedBytes
来表示已经丢弃了多少数据,然后将字节容器的读指针移到写指针,意味着丢弃这一部分数据,设置成员变量discarding
为true表示当前处于丢弃模式。如果设置了failFast
,那么直接抛出异常,默认情况下failFast
为false,即安静得丢弃数据。
如果解包的时候处在discarding模式,也会有两种情况发生
discarding模式下找到行分隔符在discarding模式下,如果找到分隔符,那可以将分隔符之前的都丢弃掉
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
计算出分隔符的长度之后,直接把分隔符之前的数据全部丢弃,当然丢弃的字符也包括分隔符,经过这么一次丢弃,后面就有可能是正常的数据包,下一次解包的时候就会进入正常的解包流程。
discarding模式下未找到行分隔符这种情况比较简单,因为当前还在丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完,当前读取的数据是丢弃的一部分,所以直接丢弃。
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
特定分隔符拆包
这个类叫做 DelimiterBasedFrameDecoder
,可以传递给TA一个分隔符列表,数据包会按照分隔符列表进行拆分,读者可以完全根据行拆包器的思路去分析这个DelimiterBasedFrameDecoder
。
LengthFieldBasedFrameDecoder解码器自定义协议。通常,协议的格式如下:
LengthFieldBasedFrameDecoder是netty解决拆包粘包问题的一个重要的类,主要结构就是header+body结构。我们只需要传入正确的参数就可以发送和接收正确的数据,那么重点就在于这几个参数的意义。下面我们就具体了解一下这几个参数的意义。先来看一下LengthFieldBasedFrameDecoder主要的构造方法:
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
那么这几个重要的参数如下:
- maxFrameLength:最大帧长度。也就是可以接收的数据的最大长度。如果超过,此次数据会被丢弃。
- lengthFieldOffset:长度域偏移。就是说数据开始的几个字节可能不是表示数据长度,需要后移几个字节才是长度域。
- lengthFieldLength:长度域字节数。用几个字节来表示数据长度。
- lengthAdjustment:数据长度修正。因为长度域指定的长度可以使header+body的整个长度,也可以只是body的长度。如果表示header+body的整个长度,那么我们需要修正数据长度。
- initialBytesToStrip:跳过的字节数。如果你需要接收header+body的所有数据,此值就是0,如果你只想接收body数据,那么需要跳过header所占用的字节数。
需求1:长度域为2个字节,我们要求发送和接收的数据如下所示:
发送的数据 (14 bytes) 接收到数据 (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 12 | "HELLO, WORLD" | | 12 | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
留心的你肯定发现了,长度域只是实际内容的长度,不包括长度域的长度。下面是参数的值:
- lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthFieldLength=2:长度域2个字节。
- lengthAdjustment=0:数据长度修正为0,因为长度域只包含数据的长度,所以不需要修正。
- initialBytesToStrip=0:发送和接收的数据完全一致,所以不需要跳过任何字节。
需求2:长度域为2个字节,我们要求发送和接收的数据如下所示:
发送的数据 (14 bytes) 接收到数据 (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 12 | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
参数值如下:
- lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthFieldLength=2:长度域2个字节。
- lengthAdjustment=0:数据长度修正为0,因为长度域只包含数据的长度,所以不需要修正。
- initialBytesToStrip=2:我们发现接收的数据没有长度域的数据,所以要跳过长度域的2个字节
需求3:长度域为2个字节,我们要求发送和接收的数据如下所示:
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 14 | "HELLO, WORLD" | | 14 | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
留心的你肯定又发现了,长度域表示的长度是总长度 也就是header+body的总长度。参数如下:
- lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthFieldLength=2:长度域2个字节。
- lengthAdjustment=-2:因为长度域为总长度,所以我们需要修正数据长度,也就是减去2。
- initialBytesToStrip=0:我们发现接收的数据没有长度域的数据,所以要跳过长度域的2个字节。
需求4:长度域为2个字节,我们要求发送和接收的数据如下所示:
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| meta | Length | Actual Content |----->| meta | Length | Actual Content |
| 0xCAFE | 12 | "HELLO, WORLD" | | 0xCAFE | 12 | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
我们发现,数据的结构有点变化,变成了 meta+header+body的结构。meta一般表示元数据,魔数等。我们定义这里meta有三个字节。参数如下:
- lengthFieldOffset=3:开始的3个字节是meta,然后才是长度域,所以长度域偏移为3。
- lengthFieldLength=2:长度域2个字节。
- lengthAdjustment=0:长度域指定的长度位数据长度,所以数据长度不需要修正。
- initialBytesToStrip=0:发送和接收数据相同,不需要跳过数据。
需求5:长度域为2个字节,我们要求发送和接收的数据如下所示:
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | meta | Actual Content |----->| Length | meta | Actual Content |
| 12 | 0xCAFE | "HELLO, WORLD" | | 12 | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
我们发现,数据的结构有点变化,变成了 header+meta+body的结构。meta一般表示元数据,魔数等。我们定义这里meta有三个字节。参数如下:
- lengthFieldOffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthFieldLength=2:长度域2个字节。
- lengthAdjustment=3:我们需要把meta+body当做body处理,所以数据长度需要加3。
- initialBytesToStrip=0:发送和接收数据相同,不需要跳过数据。
需求6:长度域为2个字节,我们要求发送和接收的数据如下所示:
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
我们发现,数据的结构有点变化,变成了 hdr1+header+hdr2+body的结构。我们定义这里hdr1和hdr2都只有1个字节。参数如下:
- lengthFieldOffset=1:开始的1个字节是长度域,所以需要设置长度域偏移为1。
- lengthFieldLength=2:长度域2个字节。
- lengthAdjustment=1:我们需要把hdr2+body当做body处理,所以数据长度需要加1。
- initialBytesToStrip=3:接收数据不包括hdr1和长度域相同,所以需要跳过3个字节。
具体的拆包协议只需要实现
void decode(ChannelHandlerContext ctx, ByteBuf in, List out)
其中 in 表示目前为止还未拆的数据,拆完之后的包添加到 out这个list中即可实现包向下传递,第一层实现比较简单。
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
重载的protected函数decode做真正的拆包动作。
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (this.discardingTooLongFrame) {
long bytesToDiscard = this.bytesToDiscard;
int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes());
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= (long)localBytesToDiscard;
this.bytesToDiscard = bytesToDiscard;
this.failIfNecessary(false);
}
// 如果当前可读字节还未达到长度长度域的偏移,那说明肯定是读不到长度域的,直接不读
if (in.readableBytes() < this.lengthFieldEndOffset) {
return null;
} else {
// 拿到长度域的实际字节偏移,就是长度域的开始下标
// 这里就是需求4,开始的几个字节并不是长度域
int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
// 拿到实际的未调整过的包长度
// 就是读取长度域的十进制值,最原始传过来的包的长度
long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
// 如果拿到的长度为负数,直接跳过长度域并抛出异常
if (frameLength < 0L) {
in.skipBytes(this.lengthFieldEndOffset);
throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
} else {
// 调整包的长度
frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
// 整个数据包的长度还没有长度域长,直接抛出异常
if (frameLength < (long)this.lengthFieldEndOffset) {
in.skipBytes(this.lengthFieldEndOffset);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset);
// 数据包长度超出最大包长度,进入丢弃模式
} else if (frameLength > (long)this.maxFrameLength) {
long discard = frameLength - (long)in.readableBytes();
this.tooLongFrameLength = frameLength;
if (discard < 0L) {
in.skipBytes((int)frameLength);
} else {
this.discardingTooLongFrame = true;
this.bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
this.failIfNecessary(true);
return null;
} else {
int frameLengthInt = (int)frameLength;
//当前可读的字节数小于包中的length,什么都不做,等待下一次解码
if (in.readableBytes() < frameLengthInt) {
return null;
//跳过的字节不能大于数据包的长度,否则就抛出 CorruptedFrameException 的异常
} else if (this.initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
} else {
//根据initialBytesToStrip的设置来跳过某些字节
in.skipBytes(this.initialBytesToStrip);
//拿到当前累积数据的读指针
int readerIndex = in.readerIndex();
//拿到待抽取数据包的实际长度
int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
//进行抽取
ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
//移动读指针
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
}
}
}
}
获取frame长度:获取需要待拆包的包大小
// 拿到长度域的实际字节偏移,就是长度域的开始下标
// 这里就是需求4,开始的几个字节并不是长度域
int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
// 拿到实际的未调整过的包长度
// 就是读取长度域的十进制值,最原始传过来的包的长度
long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
// 调整包的长度
frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
上面这一段内容有个扩展点 getUnadjustedFrameLength,如果你的长度域代表的值表达的含义不是正常的int,short等基本类型,你可以重写这个函数。
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
buf = buf.order(order);
long frameLength;
switch (length) {
case 1:
frameLength = buf.getUnsignedByte(offset);
break;
case 2:
frameLength = buf.getUnsignedShort(offset);
break;
case 3:
frameLength = buf.getUnsignedMedium(offset);
break;
case 4:
frameLength = buf.getUnsignedInt(offset);
break;
case 8:
frameLength = buf.getLong(offset);
break;
default:
throw new DecoderException(
"unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
}
return frameLength;
}
跳过指定字节长度
int frameLengthInt = (int)frameLength;
//当前可读的字节数小于包中的length,什么都不做,等待下一次解码
if (in.readableBytes() < frameLengthInt) {
return null;
//跳过的字节不能大于数据包的长度,否则就抛出 CorruptedFrameException 的异常
} else if (this.initialBytesToStrip > frameLengthInt) {
in.skipBytes(frameLengthInt);
throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip);
}
//根据initialBytesToStrip的设置来跳过某些字节
in.skipBytes(this.initialBytesToStrip);
先验证当前是否已经读到足够的字节,如果读到了,在下一步抽取一个完整的数据包之前,需要根据initialBytesToStrip的设置来跳过某些字节(见文章开篇),当然,跳过的字节不能大于数据包的长度,否则就抛出 CorruptedFrameException 的异常。
抽取frame
//根据initialBytesToStrip的设置来跳过某些字节
in.skipBytes(this.initialBytesToStrip);
//拿到当前累积数据的读指针
int readerIndex = in.readerIndex();
//拿到待抽取数据包的实际长度
int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
//进行抽取
ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
//移动读指针
in.readerIndex(readerIndex + actualFrameLength);
return frame;
到了最后抽取数据包其实就很简单了,拿到当前累积数据的读指针,然后拿到待抽取数据包的实际长度进行抽取,抽取之后,移动读指针。
protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
return buffer.retainedSlice(index, length);
}
抽取的过程是简单的调用了一下 ByteBuf 的retainedSliceapi,该api无内存copy开销。
自定义解码器实战public class MyProtocolBean {
//类型 系统编号 0xA 表示A系统,0xB 表示B系统
private byte type;
//信息标志 0xA 表示心跳包 0xB 表示超时包 0xC 业务信息包
private byte flag;
//内容长度
private int length;
//内容
private String content;
//省略get/set
}
服务端的实现
public class Server {
private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大长度
private static final int LENGTH_FIELD_LENGTH = 4; //长度字段所占的字节数
private static final int LENGTH_FIELD_OFFSET = 2; //长度偏移
private static final int LENGTH_ADJUSTMENT = 0;
private static final int INITIAL_BYTES_TO_STRIP = 0;
private int port;
public Server(int port) {
this.port = port;
}
public void start(){
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
ch.pipeline().addLast(new ServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port );
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new Server(port).start();
}
}
自定义解码器MyProtocolDecoder
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder {
private static final int HEADER_SIZE = 6;
/**
* @param maxFrameLength 帧的最大长度
* @param lengthFieldOffset length字段偏移的地址
* @param lengthFieldLength length字段所占的字节长
* @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段
* @param initialBytesToStrip 解析时候跳过多少个长度
* @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异
*/
public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分
in = (ByteBuf) super.decode(ctx,in);
if(in == null){
return null;
}
if(in.readableBytes()
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?