总体而言,Zookeeper服务端的日志分为三种:事务日志、快照日志、log4j日志。
log4j日志无需多言,我们在%ZOOKEEPER_DIR%/conf/log4j.properties中配置了日志的详细信息。
本文主要介绍下事务日志的内容和Zookeeper如何生成事务日志以及其作用。快照日志的话,下一篇会着重介绍。
1.什么是事务日志?我们在%ZOOKEEPER_DIR%/conf/zoo.cfg中配置的dataDir参数,是专门用于存储事务日志和快照日志的文件夹路径。当然,我们也可以将两个日志分开(事务读写比较频繁时事务日志会比较大,将两者分开可以提高系统性能),这时可以在zoo.cfg中配置dataLogDir路径。
那么什么是事务日志呢?
就是Zookeeper服务端针对客户端的所有事务请求(create、update、delete)等操作,在返回成功之前,都会将本次操作的内容持久化到磁盘上,完成之后,才返回客户端成功标志。
2.查看事务日志信息在笔者的机器上,我们在%ZOOKEEPER_DIR%/data/version-2目录下,看到以下几个文件
这个就是事务日志,直接打开的话是二进制内容,不利于查看,那么我们可以通过Zookeeper源码中提供的org.apache.zookeeper.server.LogFormatter来查看,
通过在main()方法中指定需要查看的事务日志文件路径即可以查看,笔者在查看log.1文件时,生成以下输出:
...
// 创建节点 /hello20040
21-10-5 下午05时13分46秒 session 0x10000d4a6d50002 cxid 0x29 zxid 0x3ac8 create '/hello20040,#776f726c643230303430,v{s{31,s{'world,'anyone}}},F,15041
// 创建一个Session会话
21-10-7 下午01时21分41秒 session 0x10000d4a6d50005 cxid 0x0 zxid 0x12505 createSession 40000
// 设置/hello20040 值
21-10-7 下午01时21分41秒 session 0x10000d4a6d50005 cxid 0x1 zxid 0x12506 setData '/hello20040,#3137,1
// 删除/hello20040节点
21-10-7 下午01时22分33秒 session 0x10000d4a6d50006 cxid 0x1 zxid 0x1250a delete '/hello20040
// 关闭会话
21-10-7 下午01时22分41秒 session 0x10000d4a6d50007 cxid 0x0 zxid 0x1250b createSession 40000
通过以上日志可以很清楚的看到每一次事务操作时的具体信息,这样方便我们进行问题排查。
当然,不仅可以直接通过debug代码的方式来查看,我们同样可以通过Zookeeper.jar的方式来查看。大家可以参考这篇博文: https://blog.csdn.net/qq_34291777/article/details/86644347
3.事务日志请求执行过程有了前面对Zookeeper server端处理请求的分析,我们知道事务日志的添加调用入口是通过SyncRequestProcessor来完成的。下来就一起来分析下其是如何将事务日志落入磁盘的。
我们就以create()方法为示例,来看下整个过程。前面server处理会话创建请求的文章中,我们知道,最终交由三个requestProcessor来处理,处理顺序为 PrepRequestProcessor --> SyncRequestProcessor --> FinalRequestProcessor
3.1 PrepRequestProcessor.pRequest() 创建事务请求对象public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
protected void pRequest(Request request) throws RequestProcessorException {
// 事务请求request,分为hdr请求头和txn请求体
request.hdr = null;
request.txn = null;
try {
switch (request.type) {
case OpCode.create:
// 这里的CreateRequest就是请求体
CreateRequest createRequest = new CreateRequest();
// 交由pRequest2Txn()方法处理
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
}
...
}
// 交由下一个processor执行
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
throws KeeperException, IOException, RequestProcessorException
{
// 创建请求头
request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type);
switch (type) {
case OpCode.create:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CreateRequest createRequest = (CreateRequest)record;
if(deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
// 检查path合法性及ACL权限控制
String path = createRequest.getPath();
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
LOG.info("Invalid path " + path + " with session 0x" +
Long.toHexString(request.sessionId));
throw new KeeperException.BadArgumentsException(path);
}
List listACL = removeDuplicates(createRequest.getAcl());
if (!fixupACL(request.authInfo, listACL)) {
throw new KeeperException.InvalidACLException(path);
}
// 检查pathACL
String parentPath = path.substring(0, lastSlash);
ChangeRecord parentRecord = getRecordForPath(parentPath);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
// 根据节点是否持久化和顺序化进行不同的验证
CreateMode createMode =
CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException(path);
}
int newCversion = parentRecord.stat.getCversion()+1;
// 生成事务请求体
request.txn = new CreateTxn(path, createRequest.getData(),
listACL,
createMode.isEphemeral(), newCversion);
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
// 将父节点的变更信息和当前节点的变更信息推送到ZooKeeperServer.outstandingChanges中
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
0, listACL));
break;
}
...
}
}
总结:事务请求对象Request,包含请求头TxnHeader hdr和请求体Record txn,所以PrepRequestProcessor的主要工作就是堆hdr和txn的封装
3.2 SyncRequestProcessor 事务日志添加public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
// 事务请求Request被添加到queuedRequests中
public void processRequest(Request request) {
queuedRequests.add(request);
}
public void run() {
try {
int logCount = 0;
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
// 不断从queuedRequests获取事务请求信息
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// 在这里将Request添加到事务日志中
if (zks.getZKDatabase().append(si)) {
logCount++;
// 如果需要刷新到磁盘则执行flush操作
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
// 快照日志单独启动一个线程来执行,避免阻塞主线程执行,后续专门分析
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
// 执行flush操作
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
}
事务日志的磁盘写入,默认分为两步:写入(append)、刷新(rollLog/commit)
写入动作并不是真正的写入磁盘(而是暂时缓存下来),刷新操作才是真正将缓存的内容写入到磁盘中。
有了以上的分析,我们后面直接去分析append方法和flush方法的执行过程
4.事务日志的生成主要就是对ZKDatabase.append()方法和ZKDatabase.rollLog()方法的调用
4.1 ZKDatabase相关方法public class ZKDatabase {
protected FileTxnSnapLog snapLog;
public boolean append(Request si) throws IOException {
return this.snapLog.append(si);
}
public void rollLog() throws IOException {
this.snapLog.rollLog();
}
public void commit() throws IOException {
this.snapLog.commit();
}
}
本质上都交由snapLog来操作
4.2 FileTxnSnapLog相关方法public class FileTxnSnapLog {
// 事务日志操作类
private final File dataDir;
private TxnLog txnLog;
// 快照日志操作类
private final File snapDir;
private SnapShot snapLog;
public boolean append(Request si) throws IOException {
return txnLog.append(si.hdr, si.txn);
}
/**
* commit the transaction of logs
* @throws IOException
*/
public void commit() throws IOException {
txnLog.commit();
}
/**
* roll the transaction logs
* @throws IOException
*/
public void rollLog() throws IOException {
txnLog.rollLog();
}
}
FileTxnSnapLog本质上只是一个包装类,统一提供对事务日志和快照日志的操作API。
4.3 FileTxnLog 事务日志操作在分析代码之前,我们先看下FileTxnLog类的注释,可以帮助我们很好的理解事务日志文件的组成,如下图所示:
事务日志文件主要由三部分组成:文件头(FileHead)、事务内容(Txn组成的list,每一个Txn包含了checksum Txnlen TxnHeader Record 0x42等属性)、填充数字
事务内容的组成,如下图所示:
public class FileTxnLog implements TxnLog {
// 最新的zxid
long lastZxidSeen;
// 事务日志流
volatile BufferedOutputStream logStream = null;
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) {
return false;
}
if (hdr.getZxid()
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?