首页 技术 正文
技术 2022年11月20日
0 收藏 734 点赞 4,936 浏览 12186 个字

一、前言

  前一篇已经分析了序列化,这篇接着分析Zookeeper的持久化过程源码,持久化对于数据的存储至关重要,下面进行详细分析。

二、持久化总体框架

  持久化的类主要在包org.apache.zookeeper.server.persistence下,此次也主要是对其下的类进行分析,其包下总体的类结构如下图所示。

  【Zookeeper】源码分析之持久化(一)之FileTxnLog

  · TxnLog,接口类型,读取事务性日志的接口。

  · FileTxnLog,实现TxnLog接口,添加了访问该事务性日志的API。

  · Snapshot,接口类型,持久层快照接口。

  · FileSnap,实现Snapshot接口,负责存储、序列化、反序列化、访问快照。

  · FileTxnSnapLog,封装了TxnLog和SnapShot。

  · Util,工具类,提供持久化所需的API。

  下面先来分析TxnLog和FileTxnLog的源码。

三、TxnLog源码分析

  TxnLog是接口,规定了对日志的响应操作。  

public interface TxnLog {    /**
* roll the current
* log being appended to
* @throws IOException
*/
// 回滚日志
void rollLog() throws IOException;
/**
* Append a request to the transaction log
* @param hdr the transaction header
* @param r the transaction itself
* returns true iff something appended, otw false
* @throws IOException
*/
// 添加一个请求至事务性日志
boolean append(TxnHeader hdr, Record r) throws IOException; /**
* Start reading the transaction logs
* from a given zxid
* @param zxid
* @return returns an iterator to read the
* next transaction in the logs.
* @throws IOException
*/
// 读取事务性日志
TxnIterator read(long zxid) throws IOException; /**
* the last zxid of the logged transactions.
* @return the last zxid of the logged transactions.
* @throws IOException
*/
// 事务性操作的最新zxid
long getLastLoggedZxid() throws IOException; /**
* truncate the log to get in sync with the
* leader.
* @param zxid the zxid to truncate at.
* @throws IOException
*/
// 清空日志,与Leader保持同步
boolean truncate(long zxid) throws IOException; /**
* the dbid for this transaction log.
* @return the dbid for this transaction log.
* @throws IOException
*/
// 获取数据库的id
long getDbId() throws IOException; /**
* commmit the trasaction and make sure
* they are persisted
* @throws IOException
*/
// 提交事务并进行确认
void commit() throws IOException; /**
* close the transactions logs
*/
// 关闭事务性日志
void close() throws IOException;
/**
* an iterating interface for reading
* transaction logs.
*/
// 读取事务日志的迭代器接口
public interface TxnIterator {
/**
* return the transaction header.
* @return return the transaction header.
*/
// 获取事务头部
TxnHeader getHeader(); /**
* return the transaction record.
* @return return the transaction record.
*/
// 获取事务
Record getTxn(); /**
* go to the next transaction record.
* @throws IOException
*/
// 下个事务
boolean next() throws IOException; /**
* close files and release the
* resources
* @throws IOException
*/
// 关闭文件释放资源
void close() throws IOException;
}
}

  其中,TxnLog除了提供读写事务日志的API外,还提供了一个用于读取日志的迭代器接口TxnIterator。

四、FileTxnLog源码分析

  对于LogFile而言,其格式可分为如下三部分

  LogFile:

    FileHeader TxnList ZeroPad

  FileHeader格式如下  

  FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
  }

  TxnList格式如下

  TxnList:

    Txn || Txn TxnList

  Txn格式如下

  Txn:

    checksum Txnlen TxnHeader Record 0x42

  Txnlen格式如下

  Txnlen:
    len 4bytes

  TxnHeader格式如下

  TxnHeader: {

    sessionid 8bytes
    cxid 4bytes
      zxid 8bytes
    time 8bytes
    type 4bytes
  }

  ZeroPad格式如下

  ZeroPad:
    0 padded to EOF (filled during preallocation stage)

  了解LogFile的格式对于理解源码会有很大的帮助。

  4.1 属性  

public class FileTxnLog implements TxnLog {
private static final Logger LOG; // 预分配大小 64M
static long preAllocSize = 65536 * 1024; // 魔术数字,默认为1514884167
public final static int TXNLOG_MAGIC =
ByteBuffer.wrap("ZKLG".getBytes()).getInt(); // 版本号
public final static int VERSION = 2; /** Maximum time we allow for elapsed fsync before WARNing */
// 进行同步时,发出warn之前所能等待的最长时间
private final static long fsyncWarningThresholdMS; // 静态属性,确定Logger、预分配空间大小和最长时间
static {
LOG = LoggerFactory.getLogger(FileTxnLog.class); String size = System.getProperty("zookeeper.preAllocSize");
if (size != null) {
try {
preAllocSize = Long.parseLong(size) * 1024;
} catch (NumberFormatException e) {
LOG.warn(size + " is not a valid value for preAllocSize");
}
}
fsyncWarningThresholdMS = Long.getLong("fsync.warningthresholdms", 1000);
} // 最大(新)的zxid
long lastZxidSeen;
// 存储数据相关的流
volatile BufferedOutputStream logStream = null;
volatile OutputArchive oa;
volatile FileOutputStream fos = null; // log目录文件
File logDir; // 是否强制同步
private final boolean forceSync = !System.getProperty("zookeeper.forceSync", "yes").equals("no");; // 数据库id
long dbId; // 流列表
private LinkedList<FileOutputStream> streamsToFlush =
new LinkedList<FileOutputStream>(); // 当前大小
long currentSize;
// 写日志文件
File logFileWrite = null;
}

  4.2. 核心函数 

  1. append函数

    public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) { // 事务头部不为空
if (hdr.getZxid() <= lastZxidSeen) { // 事务的zxid小于等于最后的zxid
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) { // 日志流为空
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
} //
logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
//
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
// 序列化
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
// 刷新到磁盘
logStream.flush(); // 当前通道的大小
currentSize = fos.getChannel().position();
// 添加fos
streamsToFlush.add(fos);
} // 填充文件
padFile(fos); // Serializes transaction header and transaction data into a byte buffer.
// 将事务头和事务数据序列化成Byte Buffer
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) { // 为空,抛出异常
throw new IOException("Faulty serialization for header " +
"and txn");
}
// 生成一个验证算法
Checksum crc = makeChecksumAlgorithm();
// Updates the current checksum with the specified array of bytes
// 使用Byte数组来更新当前的Checksum
crc.update(buf, 0, buf.length);
// 写long类型数据
oa.writeLong(crc.getValue(), "txnEntryCRC");
// Write the serialized transaction record to the output archive.
// 将序列化的事务记录写入OutputArchive
Util.writeTxnBytes(oa, buf); return true;
}
return false;
}

  说明:append函数主要用做向事务日志中添加一个条目,其大体步骤如下

  ① 检查TxnHeader是否为空,若不为空,则进入②,否则,直接返回false

  ② 检查logStream是否为空(初始化为空),若不为空,则进入③,否则,进入⑤

  ③ 初始化写数据相关的流和FileHeader,并序列化FileHeader至指定文件,进入④

  ④ 强制刷新(保证数据存到磁盘),并获取当前写入数据的大小。进入⑤

  ⑤ 填充数据,填充0,进入⑥

  ⑥ 将事务头和事务序列化成ByteBuffer(使用Util.marshallTxnEntry函数),进入⑦

  ⑦ 使用Checksum算法更新步骤⑥的ByteBuffer。进入⑧

  ⑧ 将更新的ByteBuffer写入磁盘文件,返回true

  append间接调用了padLog函数,其源码如下 

    public static long padLogFile(FileOutputStream f,long currentSize,
long preAllocSize) throws IOException{
// 获取位置
long position = f.getChannel().position();
if (position + 4096 >= currentSize) { // 计算后是否大于当前大小
// 重新设置当前大小,剩余部分填充0
currentSize = currentSize + preAllocSize;
fill.position(0);
f.getChannel().write(fill, currentSize-fill.remaining());
}
return currentSize;
}

  说明:其主要作用是当文件大小不满64MB时,向文件填充0以达到64MB大小。

  2. getLogFiles函数  

    public static File[] getLogFiles(File[] logDirList,long snapshotZxid) {
// 按照zxid对文件进行排序
List<File> files = Util.sortDataDir(logDirList, "log", true);
long logZxid = 0;
// Find the log file that starts before or at the same time as the
// zxid of the snapshot
for (File f : files) { // 遍历文件
// 从文件中获取zxid
long fzxid = Util.getZxidFromName(f.getName(), "log");
if (fzxid > snapshotZxid) { // 跳过大于snapshotZxid的文件
continue;
}
// the files
// are sorted with zxid's
if (fzxid > logZxid) { // 找出文件中最大的zxid(同时还需要小于等于snapshotZxid)
logZxid = fzxid;
}
}
// 文件列表
List<File> v=new ArrayList<File>(5);
for (File f : files) { // 再次遍历文件
// 从文件中获取zxid
long fzxid = Util.getZxidFromName(f.getName(), "log");
if (fzxid < logZxid) { // 跳过小于logZxid的文件
continue;
}
// 添加
v.add(f);
}
// 转化成File[] 类型后返回
return v.toArray(new File[0]); }

  说明:该函数的作用是找出刚刚小于或者等于snapshot的所有log文件。其步骤大致如下。

  ① 对所有log文件按照zxid进行升序排序,进入②

  ② 遍历所有log文件并记录刚刚小于或等于给定snapshotZxid的log文件的logZxid,进入③

   ③ 再次遍历log文件,添加zxid大于等于步骤②中的logZxid的所有log文件,进入④

  ④ 转化后返回

  getLogFiles函数调用了sortDataDir,其源码如下: 

public static List<File> sortDataDir(File[] files, String prefix, boolean ascending)
{
if(files==null)
return new ArrayList<File>(0);
// 转化为列表
List<File> filelist = Arrays.asList(files);
// 进行排序,Comparator是关键,根据zxid进行排序
Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
return filelist;
}

  说明:其用于排序log文件,可以选择根据zxid进行升序或降序。

  getLogFiles函数间接调用了getZxidFromName,其源码如下: 

    // 从文件名中解析出zxid
public static long getZxidFromName(String name, String prefix) {
long zxid = -1;
// 对文件名进行分割
String nameParts[] = name.split("\\.");
if (nameParts.length == 2 && nameParts[0].equals(prefix)) { // 前缀相同
try {
// 转化成长整形
zxid = Long.parseLong(nameParts[1], 16);
} catch (NumberFormatException e) {
}
}
return zxid;
}

  说明:getZxidFromName主要用作从文件名中解析zxid,并且需要从指定的前缀开始。

  3. getLastLoggedZxid函数 

    public long getLastLoggedZxid() {
// 获取已排好序的所有的log文件
File[] files = getLogFiles(logDir.listFiles(), 0);
// 获取最大的zxid(最后一个log文件对应的zxid)
long maxLog=files.length>0?
Util.getZxidFromName(files[files.length-1].getName(),"log"):-1; // if a log file is more recent we must scan it to find
// the highest zxid
//
long zxid = maxLog;
// 迭代器
TxnIterator itr = null;
try {
// 新生FileTxnLog
FileTxnLog txn = new FileTxnLog(logDir);
// 开始读取从给定zxid之后的所有事务
itr = txn.read(maxLog);
while (true) { // 遍历
if(!itr.next()) // 是否存在下一项
break;
// 获取事务头
TxnHeader hdr = itr.getHeader();
// 获取zxid
zxid = hdr.getZxid();
}
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
} finally {
// 关闭迭代器
close(itr);
}
return zxid;
}

  说明:该函数主要用于获取记录在log中的最后一个zxid。其步骤大致如下

  ① 获取已排好序的所有log文件,并从最后一个文件中取出zxid作为候选的最大zxid,进入②

  ② 新生成FileTxnLog并读取步骤①中zxid之后的所有事务,进入③

  ③ 遍历所有事务并提取出相应的zxid,最后返回。

  其中getLastLoggedZxid调用了read函数,其源码如下 

public TxnIterator read(long zxid) throws IOException {
// 返回事务文件访问迭代器
return new FileTxnIterator(logDir, zxid);
}

  说明:read函数会生成一个FileTxnIterator,其是TxnLog.TxnIterator的子类,之后在FileTxnIterator构造函数中会调用init函数,其源码如下 

    void init() throws IOException {
// 新生成文件列表
storedFiles = new ArrayList<File>();
// 进行排序
List<File> files = Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), "log", false);
for (File f: files) { // 遍历文件
if (Util.getZxidFromName(f.getName(), "log") >= zxid) { // 添加zxid大于等于指定zxid的文件
storedFiles.add(f);
}
// add the last logfile that is less than the zxid
else if (Util.getZxidFromName(f.getName(), "log") < zxid) { // 只添加一个zxid小于指定zxid的文件,然后退出
storedFiles.add(f);
break;
}
}
// go to the next logfile
// 进入下一个log文件
goToNextLog();
if (!next()) // 不存在下一项,返回
return;
while (hdr.getZxid() < zxid) { // 从事务头中获取zxid小于给定zxid,直到不存在下一项或者大于给定zxid时退出
if (!next())
return;
}
}

  说明:init函数用于进行初始化操作,会根据zxid的不同进行不同的初始化操作,在init函数中会调用goToNextLog函数,其源码如下  

    private boolean goToNextLog() throws IOException {
if (storedFiles.size() > 0) { // 存储的文件列表大于0
// 取最后一个log文件
this.logFile = storedFiles.remove(storedFiles.size()-1);
// 针对该文件,创建InputArchive
ia = createInputArchive(this.logFile);
// 返回true
return true;
}
return false;
}

  说明:goToNextLog表示选取下一个log文件,在init函数中还调用了next函数,其源码如下  

    public boolean next() throws IOException {
if (ia == null) { // 为空,返回false
return false;
}
try {
// 读取长整形crcValue
long crcValue = ia.readLong("crcvalue");
// 通过input archive读取一个事务条目
byte[] bytes = Util.readTxnBytes(ia);
// Since we preallocate, we define EOF to be an
if (bytes == null || bytes.length==0) { // 对bytes进行判断
throw new EOFException("Failed to read " + logFile);
}
// EOF or corrupted record
// validate CRC
// 验证CRC
Checksum crc = makeChecksumAlgorithm();
// 更新
crc.update(bytes, 0, bytes.length);
if (crcValue != crc.getValue()) // 验证不相等,抛出异常
throw new IOException(CRC_ERROR);
if (bytes == null || bytes.length == 0) // bytes为空,返回false
return false;
// 新生成TxnHeader
hdr = new TxnHeader();
// 将Txn反序列化,并且将对应的TxnHeader反序列化至hdr,整个Record反序列化至record
record = SerializeUtils.deserializeTxn(bytes, hdr);
} catch (EOFException e) { // 抛出异常
LOG.debug("EOF excepton " + e);
// 关闭输入流
inputStream.close();
// 赋值为null
inputStream = null;
ia = null;
hdr = null;
// this means that the file has ended
// we should go to the next file
if (!goToNextLog()) { // 没有log文件,则返回false
return false;
}
// if we went to the next log file, we should call next() again
// 继续调用next
return next();
} catch (IOException e) {
inputStream.close();
throw e;
}
// 返回true
return true;
}

  说明:next表示将迭代器移动至下一个事务,方便读取,next函数的步骤如下。

  ① 读取事务的crcValue值,用于后续的验证,进入②

  ② 读取事务,使用CRC32进行更新并与①中的结果进行比对,若不相同,则抛出异常,否则,进入③

  ③ 将事务进行反序列化并保存至相应的属性中(如事务头和事务体),会确定具体的事务操作类型。

  ④ 在读取过程抛出异常时,会首先关闭流,然后再尝试调用next函数(即进入下一个事务进行读取)。

  4. commit函数  

    public synchronized void commit() throws IOException {
if (logStream != null) {
// 强制刷到磁盘
logStream.flush();
}
for (FileOutputStream log : streamsToFlush) { // 遍历流
// 强制刷到磁盘
log.flush();
if (forceSync) { // 是否强制同步
long startSyncNS = System.nanoTime(); log.getChannel().force(false);
// 计算流式的时间
long syncElapsedMS =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS) { // 大于阈值时则会警告
LOG.warn("fsync-ing the write ahead log in "
+ Thread.currentThread().getName()
+ " took " + syncElapsedMS
+ "ms which will adversely effect operation latency. "
+ "See the ZooKeeper troubleshooting guide");
}
}
}
while (streamsToFlush.size() > 1) { // 移除流并关闭
streamsToFlush.removeFirst().close();
}
}

  说明:该函数主要用于提交事务日志至磁盘,其大致步骤如下

  ① 若日志流logStream不为空,则强制刷新至磁盘,进入②

  ② 遍历需要刷新至磁盘的所有流streamsToFlush并进行刷新,进入③

  ③ 判断是否需要强制性同步,如是,则计算每个流的流式时间并在控制台给出警告,进入④

  ④ 移除所有流并关闭。

  5. truncate函数 

    public boolean truncate(long zxid) throws IOException {
FileTxnIterator itr = null;
try {
// 获取迭代器
itr = new FileTxnIterator(this.logDir, zxid);
PositionInputStream input = itr.inputStream;
long pos = input.getPosition();
// now, truncate at the current position
// 从当前位置开始清空
RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
raf.setLength(pos);
raf.close();
while (itr.goToNextLog()) { // 存在下一个log文件
if (!itr.logFile.delete()) { // 删除
LOG.warn("Unable to truncate {}", itr.logFile);
}
}
} finally {
// 关闭迭代器
close(itr);
}
return true;
}

  说明:该函数用于清空大于给定zxid的所有事务日志。

五、总结

  对于持久化中的TxnLog和FileTxnLog的源码分析就已经完成了,其源码还是相对简单,也谢谢各位园友的观看~ 

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,992
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,506
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,349
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,134
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,766
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,844