zk 有 2 种文件,快照和事务日志,快照是某一时刻的全量数据,事务日志中记录了数据的修改事件。

快照的文件名是 snapshot.zxid,zxid 是当前最大的事务 id

// org.apache.zookeeper.server.persistence.FileTxnSnapLog#save
public void save(DataTree dataTree,
ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
throws IOException {
long lastZxid = dataTree.lastProcessedZxid;
// 快照文件名为 snapshot.lastZxid
File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
snapshotFile);
snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }

事务日志的文件名是 log.zxid,zxid 是当前文件第一条日志的事务 id

// org.apache.zookeeper.server.persistence.FileTxnLog#append
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr == null) {
return false;
} if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
} if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
}
// 创建一个新的事务日志文件 log.zxid,这里的 zxid 是第一条事务日志的 zxid
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
filePadding.setCurrentSize(fos.getChannel().position());
streamsToFlush.add(fos);
}
filePadding.padFile(fos.getChannel());
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf); return true;
}

zk 加载数据:从 snap 文件和 log 文件解析出全量数据

// org.apache.zookeeper.server.persistence.FileTxnSnapLog#restore
public long restore(DataTree dt, Map<Long, Integer> sessions,
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
return fastForwardFromEdits(dt, sessions, listener);
}

逆序选择 100 个 snap 文件,从最新的文件开始解析,如果有一个文件校验正确,则退出

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
throws IOException {
// we run through 100 snapshots (not all of them)
// if we cannot get it running within 100 snapshots
// we should give up
List<File> snapList = findNValidSnapshots(100);
if (snapList.size() == 0) {
return -1L;
}
File snap = null;
boolean foundValid = false;
for (int i = 0; i < snapList.size(); i++) {
snap = snapList.get(i);
InputStream snapIS = null;
CheckedInputStream crcIn = null;
try {
LOG.info("Reading snapshot " + snap);
snapIS = new BufferedInputStream(new FileInputStream(snap));
crcIn = new CheckedInputStream(snapIS, new Adler32());
InputArchive ia = BinaryInputArchive.getArchive(crcIn);
deserialize(dt, sessions, ia);
long checkSum = crcIn.getChecksum().getValue();
long val = ia.readLong("val");
if (val != checkSum) {
throw new IOException("CRC corruption in snapshot : " + snap);
}
foundValid = true;
break;
} catch(IOException e) {
LOG.warn("problem reading snap file " + snap, e);
} finally {
if (snapIS != null)
snapIS.close();
if (crcIn != null)
crcIn.close();
}
}
if (!foundValid) {
throw new IOException("Not able to find valid snapshots in " + snapDir);
}
dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
return dt.lastProcessedZxid;
}

对比已经解析出的最大 zxid,选择对应的 log 文件

// org.apache.zookeeper.server.persistence.FileTxnLog.FileTxnIterator#init
void init() throws IOException {
storedFiles = new ArrayList<File>();
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX, false);
for (File f: files) {
if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
storedFiles.add(f);
}
// add the last logfile that is less than the zxid
else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
storedFiles.add(f);
break;
}
}
goToNextLog();
if (!next())
return;
while (hdr.getZxid() < zxid) {
if (!next())
return;
}
}

最新文章

  1. C++ - unordered_map 源码解析
  2. Visual Studio2012打开时弹出“遇到异常:这可能是由某个扩展导致的”错误的解决办法
  3. Spring Boot Admin 的使用 2
  4. git初始化
  5. 解决NetworkOnMainThreadException
  6. Linux进程间通信IPC学习笔记之同步一(线程、互斥锁和条件变量)
  7. BZOJ3390: [Usaco2004 Dec]Bad Cowtractors牛的报复
  8. [Linux命令]tar命令
  9. jQuery执行进度提示窗口的实现(progressbar)
  10. laravel 报错 mcrypt_decrypt(): Key of size 11 not supported by this algorithm. Only keys of sizes 16, 24 or 32 supported
  11. CoreAnimation 开篇
  12. iOS开发之URLSession
  13. JavaWeb 后端 &lt;九&gt; 之 JDBC加强
  14. Android ListView 设置单选
  15. C语言_了解下结构体指针
  16. 注解式controller开发,action找不到controller???
  17. holer实现外网访问本地网站
  18. ltp执行过程总结
  19. table中怎么设置两行间距
  20. Linux环境编程之IPC进程间通信(五):Posix消息队列1

热门文章

  1. python、第四篇:记录相关操作
  2. Tensorflowlite移植ARM平台iMX6
  3. 基于 Debian 的 Netrunner 19.08 “Indigo” 发布
  4. 第01章 部署虚拟环境安装Linux系统
  5. 10个不为人知的 Python 冷知识
  6. 多线程(二)Object类方法、线程的操作sleep(),join(),interrupt(),yield()
  7. hdu3586 Information Disturbing[二分答案+树形DP]
  8. 加密web.config数据库连接
  9. 乱花渐入迷人眼------从解决jqueryEasyUI上传插件提交ajax请求谈网页调试
  10. 【Python网络爬虫三】 爬取网页新闻