有关于zookeeper的序列化方式,其采用的是一种名为jute的方式。这种方式相对而言,还是比较老的,貌似跟不上时代潮流了。因为jute的方式本质上使用的是JDK自带的序列化方式,那么相对现在流行的protobuf等方式显得确实笨重了许多。但是总体看来,序列化并不是zookeeper的性能瓶颈,且为了兼容,所以zookeeper一直没有更换这种序列化方式。
1.JDK的DataInputStream和DataOutputStream在介绍Jute之前,我们先来学习下JDK的DataInputStream和DataOutputStream。
这种方式,允许我们的应用程序以与机器无关的方式从底层输入(输出)流中读取(写入)基本的java类型。我们先来看下使用方式
@Test
public void test() {
String file = "D:\\test.txt";
DataOutputStream dataOutputStream = null;
DataInputStream dataInputStream = null;
try {
dataOutputStream = new DataOutputStream(new FileOutputStream(new File(file)));
// 顺序写入int string 和boolean类型数据
dataOutputStream.writeInt(121);
dataOutputStream.write("jack".getBytes());
dataOutputStream.writeBoolean(true);
dataInputStream = new DataInputStream(new FileInputStream(file));
// 按照写入的顺序,读取值
int i = dataInputStream.readInt();
System.out.println(i);
// 这里需要注意的时,我们写入的字符串是4byte,所以这里直接设置bytes数组长度为4
byte[] bytes = new byte[4];
int read = dataInputStream.read(bytes);
String name = new String(bytes);
System.out.println(name);
boolean b = dataInputStream.readBoolean();
System.out.println(b);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
dataOutputStream.close();
dataInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.Jute序列化方式的使用
2.1 创建实体类,实现Record接口
public class Person implements Record {
private int age;
private String name;
public Person(int age, String name) {
this.age = age;
this.name = name;
}
public void serialize(OutputArchive archive, String tag) throws IOException {
// 每个以startRecord开头,endRecord结尾
archive.startRecord(this, tag);
archive.writeInt(age, "age");
archive.writeString(name, "name");
archive.endRecord(this, tag);
}
public void deserialize(InputArchive archive, String tag) throws IOException {
archive.startRecord(tag);
age = archive.readInt("age");
name = archive.readString("name");
archive.endRecord(tag);
}
@Override
public String toString() {
return "Person{" +
"age=" + age +
", name='" + name + '\'' +
'}';
}
}
Record接口是Jute序列化的核心接口,每个需要被序列化的实例都需要实现这个接口,该接口主要定义了序列化和反序列的方法
public interface Record {
public void serialize(OutputArchive archive, String tag)
throws IOException;
public void deserialize(InputArchive archive, String tag)
throws IOException;
}
2.2 OutputArchive、InputArchive
OutputArchive接口是每个做序列化必须要实现的接口类
InputArchive接口是每个做反序列化必须要实现的接口类
具体接口方法如下:
public interface InputArchive {
public byte readByte(String tag) throws IOException;
public boolean readBool(String tag) throws IOException;
public int readInt(String tag) throws IOException;
public long readLong(String tag) throws IOException;
public float readFloat(String tag) throws IOException;
public double readDouble(String tag) throws IOException;
public String readString(String tag) throws IOException;
public byte[] readBuffer(String tag) throws IOException;
public void readRecord(Record r, String tag) throws IOException;
public void startRecord(String tag) throws IOException;
public void endRecord(String tag) throws IOException;
public Index startVector(String tag) throws IOException;
public void endVector(String tag) throws IOException;
public Index startMap(String tag) throws IOException;
public void endMap(String tag) throws IOException;
}
public interface OutputArchive {
public void writeByte(byte b, String tag) throws IOException;
public void writeBool(boolean b, String tag) throws IOException;
public void writeInt(int i, String tag) throws IOException;
public void writeLong(long l, String tag) throws IOException;
public void writeFloat(float f, String tag) throws IOException;
public void writeDouble(double d, String tag) throws IOException;
public void writeString(String s, String tag) throws IOException;
public void writeBuffer(byte buf[], String tag)
throws IOException;
public void writeRecord(Record r, String tag) throws IOException;
public void startRecord(Record r, String tag) throws IOException;
public void endRecord(Record r, String tag) throws IOException;
public void startVector(List v, String tag) throws IOException;
public void endVector(List v, String tag) throws IOException;
public void startMap(TreeMap v, String tag) throws IOException;
public void endMap(TreeMap v, String tag) throws IOException;
}
而关于其实现类型,默认有三种
我们比较常用的就是BinaryInputArchive和BinaryOutputArchive,而关于Csv相关实现类,主要是为了更方便数据的可视化展示;XML的相关实现类主要是为了将数据以xml格式保存还原。
2.3 BinaryInputArchive、BinaryOutputArchive的使用String path = "D:\\test1.txt";
// 将Person写出到文件中
OutputStream outputStream = new FileOutputStream(new File(path));
// 创建输出Archive
BinaryOutputArchive binaryOutputArchive = BinaryOutputArchive.getArchive(outputStream);
Person person = new Person(18, "jack");
binaryOutputArchive.writeRecord(person, "person");
// 从文件中读取Person对象
InputStream inputStream = new FileInputStream(new File(path));
BinaryInputArchive binaryInputArchive = BinaryInputArchive.getArchive(inputStream);
Person person2 = new Person();
binaryInputArchive.readRecord(person2, "person");
System.out.println(person2);
// res:
Person{age=18, name='jack'}
BinaryOutputArchive和BinaryInputArchive的使用并不复杂,在这里,本质上是调用Person类的序列化和反序列化方法来操作的。
2.4 Zookeeper中对其的使用我们使用一个简单的例子,来看下Jute在Zookeeper中的使用呢,就以Stat.java为例
public class Stat implements Record {
private long czxid;
private long mzxid;
private long ctime;
private long mtime;
private int version;
private int cversion;
private int aversion;
private long ephemeralOwner;
private int dataLength;
private int numChildren;
private long pzxid;
public Stat() {
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeLong(czxid,"czxid");
a_.writeLong(mzxid,"mzxid");
a_.writeLong(ctime,"ctime");
a_.writeLong(mtime,"mtime");
a_.writeInt(version,"version");
a_.writeInt(cversion,"cversion");
a_.writeInt(aversion,"aversion");
a_.writeLong(ephemeralOwner,"ephemeralOwner");
a_.writeInt(dataLength,"dataLength");
a_.writeInt(numChildren,"numChildren");
a_.writeLong(pzxid,"pzxid");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
czxid=a_.readLong("czxid");
mzxid=a_.readLong("mzxid");
ctime=a_.readLong("ctime");
mtime=a_.readLong("mtime");
version=a_.readInt("version");
cversion=a_.readInt("cversion");
aversion=a_.readInt("aversion");
ephemeralOwner=a_.readLong("ephemeralOwner");
dataLength=a_.readInt("dataLength");
numChildren=a_.readInt("numChildren");
pzxid=a_.readLong("pzxid");
a_.endRecord(tag);
}
// 写出方法,直接就如下所示
public void write(java.io.DataOutput out) throws java.io.IOException {
BinaryOutputArchive archive = new BinaryOutputArchive(out);
// 直接调用serialize,将类序列化
serialize(archive, "");
}
}
从Stat中可以看到,Zookeeper对其的使用与我们创建的示例差不多,都是一样的用法
2.5 BinaryInputArchive、BinaryOutputArchive源码分析使用方式这么简单,那么源码中有很多奥秘嘛,我们直接看
public class BinaryOutputArchive implements OutputArchive {
private ByteBuffer bb = ByteBuffer.allocate(1024);
// 最重要的属性,本质上所有的操作都委托给DataOutputStream来操作
private DataOutput out;
public static BinaryOutputArchive getArchive(OutputStream strm) {
// 如果直接传入OutputStream,则将其包装成DataOutputStream
return new BinaryOutputArchive(new DataOutputStream(strm));
}
// 可以看到,全部委托给DataOutputStream来操作
public void writeByte(byte b, String tag) throws IOException {
out.writeByte(b);
}
public void writeInt(int i, String tag) throws IOException {
out.writeInt(i);
}
...
// writeRecord则直接调用Record.serialize()方法
public void writeRecord(Record r, String tag) throws IOException {
r.serialize(this, tag);
}
// 两个方法为空方法
public void startRecord(Record r, String tag) throws IOException {}
public void endRecord(Record r, String tag) throws IOException {}
}
代码比较简单,我们就不再详细解说,关键的writeRecord()方法本质上还是调用Record.serialize()方法
所以我们在创建自己的实体类时,必须要实现Record接口,并实现其serialize()、deserialize()方法
public class BinaryInputArchive implements InputArchive {
static public final String UNREASONBLE_LENGTH= "Unreasonable length = ";
private DataInput in;
static public BinaryInputArchive getArchive(InputStream strm) {
return new BinaryInputArchive(new DataInputStream(strm));
}
public byte readByte(String tag) throws IOException {
return in.readByte();
}
//也是委托给Record来调用
public void readRecord(Record r, String tag) throws IOException {
r.deserialize(this, tag);
}
...
}
有了上面BinaryOutputArchive的分析,BinaryInputArchive笔者就不再赘述,同样是将输入流操作交由DataInputStream来执行,关于Record的读取操作,也是交由Record本身来执行。