zookeeper source code analysis

Zookeeper日志文件

本章将对Zookeeper中的日志文件相关内容进行分析,注意这里所指的日志不是log.info输出的内容而是指事务日志和快照日志。

FileTxnSnapLog 分析

本节将对FileTxnSnapLog类进行分析,该对象的核心是用于存储事务日志和快照日志,首先关注FileTxnSnapLog累的构造函数,具体代码如下。

public FileTxnSnapLog(File dataDir, File snapDir) throws IOException {
  LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir);

  this.dataDir = new File(dataDir, version + VERSION);
  this.snapDir = new File(snapDir, version + VERSION);

  boolean enableAutocreate = Boolean.valueOf(
    System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE,
                       ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT));

  trustEmptySnapshot = Boolean.getBoolean(ZOOKEEPER_SNAPSHOT_TRUST_EMPTY);
  LOG.info(ZOOKEEPER_SNAPSHOT_TRUST_EMPTY + " : " + trustEmptySnapshot);

  if (!this.dataDir.exists()) {
    if (!enableAutocreate) {
      throw new DatadirException("Missing data directory "
                                 + this.dataDir
                                 + ", automatic data directory creation is disabled ("
                                 + ZOOKEEPER_DATADIR_AUTOCREATE
                                 + " is false). Please create this directory manually.");
    }

    if (!this.dataDir.mkdirs() && !this.dataDir.exists()) {
      throw new DatadirException("Unable to create data directory "
                                 + this.dataDir);
    }
  }
  if (!this.dataDir.canWrite()) {
    throw new DatadirException("Cannot write to data directory " + this.dataDir);
  }

  if (!this.snapDir.exists()) {
    if (!enableAutocreate) {
      throw new DatadirException("Missing snap directory "
                                 + this.snapDir
                                 + ", automatic data directory creation is disabled ("
                                 + ZOOKEEPER_DATADIR_AUTOCREATE
                                 + " is false). Please create this directory manually.");
    }

    if (!this.snapDir.mkdirs() && !this.snapDir.exists()) {
      throw new DatadirException("Unable to create snap directory "
                                 + this.snapDir);
    }
  }
  if (!this.snapDir.canWrite()) {
    throw new DatadirException("Cannot write to snap directory " + this.snapDir);
  }

  if (!this.dataDir.getPath().equals(this.snapDir.getPath())) {
    checkLogDir();
    checkSnapDir();
  }


  txnLog = new FileTxnLog(this.dataDir);
  snapLog = new FileSnap(this.snapDir);
}

在上述代码中首先需要关注构造函数的参数信息。

  1. 参数dataDir表示数据文件夹。
  2. 参数snapDir表示快照文件夹。

在构造函数中需要注意上述两个参数只是作为顶层文件夹存在,Zookeeper不会在这层文件下直接存储,存储文件在version-2目录下。此外在构造函数中会判断是否需要进行自动创建,如果文件不存在同时不允许自动创建将会抛出异常。除了是否可以进行自动创建以外还会判断是否存在以及是否创建成功进行判断,如果文件不存在并且创建文件失败需要抛出异常。最后在文件都存在以后需要判断是否可以进行写入操作,如果不允许写入需要抛出异常。在所有验证通过后会创建成员变量txnLog和snapLog。

经过上述分析对FileTxnSnapLog类中的构造函数已经有所认知,接下来将对restore方法进行分析,该方法的目的是读取快照和事务日志恢复数据库相关内容,具体处理代码如下。

public long restore(DataTree dt, Map<Long, Integer> sessions,
                    PlayBackListener listener) throws IOException {
  // 最后一个zxid
  long deserializeResult = snapLog.deserialize(dt, sessions);
  // 创建事务日志
  FileTxnLog txnLog = new FileTxnLog(dataDir);

  RestoreFinalizer finalizer = () -> {
    long highestZxid = fastForwardFromEdits(dt, sessions, listener);
    return highestZxid;
  };

  // 如果最后一个zxid为-1
  if (-1L == deserializeResult) {
    // 如果事务日志中的最后一个zxid不为-1
    if (txnLog.getLastLoggedZxid() != -1) {
      // 确认是否信任空的快照
      if (!trustEmptySnapshot) {
        // 不信任的情况下抛出异常
        throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
      } else {
        LOG.warn("{}This should only be allowed during upgrading.",
                 EMPTY_SNAPSHOT_WARNING);
        // 执行数据恢复操作
        return finalizer.run();
      }
    }
    // 保存快照
    save(dt, (ConcurrentHashMap<Long, Integer>) sessions);
    return 0;
  }

  // 执行数据恢复操作
  return finalizer.run();
}

在上述代码中核心处理流程如下。

  1. 从快照日志(SnapShot)中获取最后一个zxid。
  2. 创建事务日志对象(FileTxnLog)。
  3. 创建数据恢复接口(RestoreFinalizer)的实现。
  4. 判断最后一个zxid是否为-1,如果是则会进行如下操作。
    1. 在事务日志中最后一个zxid不为-1并且是否信任空的快照为真则进行数据恢复接口的调用,如果不信任空的快照会抛出异常。
    2. 保存快照。
  5. 执行数据恢复接口的调用。

通过对restore方法的分析可以发现核心对数据恢复操作是需要依赖RestoreFinalizer接口的实现,在restore方法中核心处理交给fastForwardFromEdits方法进行,具体处理代码如下。

public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions,
                                 PlayBackListener listener) throws IOException {
  // 读取事务日志
  TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
  // 从数据树中获取最大zxid
  long highestZxid = dt.lastProcessedZxid;
  // 事务头信息
  TxnHeader hdr;
  try {
    while (true) {
      // 获取事务头信息
      hdr = itr.getHeader();
      // 如果头信息为空则返回数据树中的最大zxid
      if (hdr == null) {
        //empty logs
        return dt.lastProcessedZxid;
      }
      // 事务头信息中存在zxid并且小于最大的zxid并且最大zxid不为0
      if (hdr.getZxid() < highestZxid && highestZxid != 0) {
        // 异常日志记录
        LOG.error("{}(highestZxid) > {}(next log) for type {}",
                  highestZxid, hdr.getZxid(), hdr.getType());
      } else {
        // 覆盖最大zxid
        highestZxid = hdr.getZxid();
      }
      try {
        // 处理事务
        processTransaction(hdr, dt, sessions, itr.getTxn());
      } catch (KeeperException.NoNodeException e) {
        throw new IOException("Failed to process transaction type: " +
                              hdr.getType() + " error: " + e.getMessage(), e);
      }
      // 事务处理完成后进行后续处理
      listener.onTxnLoaded(hdr, itr.getTxn());
      if (!itr.next())
        break;
    }
  } finally {
    if (itr != null) {
      itr.close();
    }
  }
  return highestZxid;
}

在上述代码中核心处理流程如下。

  1. 读取事务日志。
  2. 从数据树中获取最大zxid。
  3. 循环处理事务日志,单个的处理流程如下。
    1. 获取事务头信息。
    2. 如果头信息为空则返回数据树中的最大zxid。
    3. 判断事务头信息中存在zxid并且小于最大的zxid并且最大zxid不为0,通过这个判断则会抛出异常,反之则会将事务头信息中的zxid覆盖到最大zxid上。
    4. 通过processTransaction方法处理事务。
    5. 进行事务处理完成后的后续处理。

接下来对一些细节方法进行分析,首先对processTransaction方法进行分析,具体处理代码如下。

public void processTransaction(TxnHeader hdr, DataTree dt,
                               Map<Long, Integer> sessions, Record txn)
  throws KeeperException.NoNodeException {
  // 事务处理结果存储对象
  ProcessTxnResult rc;
  // 根据事务头类型做不同处理
  switch (hdr.getType()) {
      // 创建session
    case OpCode.createSession:
      // 向session容器中设置信息
      sessions.put(hdr.getClientId(),
                   ((CreateSessionTxn) txn).getTimeOut());
      if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                 "playLog --- create session in log: 0x"
                                 + Long.toHexString(hdr.getClientId())
                                 + " with timeout: "
                                 + ((CreateSessionTxn) txn).getTimeOut());
      }
      // 处理事务
      rc = dt.processTxn(hdr, txn);
      break;
      // 关闭session
    case OpCode.closeSession:
      // 从session容器中移除数据
      sessions.remove(hdr.getClientId());
      if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                                 "playLog --- close session in log: 0x"
                                 + Long.toHexString(hdr.getClientId()));
      }
      // 处理事务
      rc = dt.processTxn(hdr, txn);
      break;
      // 默认处理
    default:
      // 处理事务
      rc = dt.processTxn(hdr, txn);
  }

  if (rc.err != Code.OK.intValue()) {
    LOG.debug(
      "Ignoring processTxn failure hdr: {}, error: {}, path: {}",
      hdr.getType(), rc.err, rc.path);
  }
}

在上述代码中会对不同的事务头类型做出不同的操作,总共分为两类。

  1. 事务头类型为createSession会向session容器中设置数据信息。
  2. 事务头类型为closeSession会从session容器中移除数据信息。

在processTransaction方法中核心的事务处理会交给数据树(DataTree)进行处理,在fastForwardFromEdits方法处理中还需要对onTxnLoaded方法进行分析,具体处理代码如下。

private final PlayBackListener commitProposalPlaybackListener = new PlayBackListener() {
    public void onTxnLoaded(TxnHeader hdr, Record txn) {
        addCommittedProposal(hdr, txn);
    }
};

上述代码核心需要依赖addCommittedProposal方法进行处理,具体代码如下。

public void addCommittedProposal(Request request) {
  // 写锁
  WriteLock wl = logLock.writeLock();
  try {
    // 上锁
    wl.lock();
    // 提交日志总量大于默认最大提交数量
    if (committedLog.size() > commitLogCount) {
      // 移除第一个元素
      committedLog.removeFirst();
      // 最小提交日志的zxid设置为事务日志中的第一个
      minCommittedLog = committedLog.getFirst().packet.getZxid();
    }
    // 如果提交日志为空
    if (committedLog.isEmpty()) {
      // 最小提交日志的zxid和最大提交日志的zxid都设置为请求中的zxid
      minCommittedLog = request.zxid;
      maxCommittedLog = request.zxid;
    }

    // 解析请求中的数据
    byte[] data = SerializeUtils.serializeRequest(request);
    // 构造数据信息
    QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
    // 构造提案
    Proposal p = new Proposal();
    p.packet = pp;
    p.request = request;
    // 向提交日志中加入提案信息
    committedLog.add(p);
    // 将最大提交日志的zxid修改为提案中的zxid
    maxCommittedLog = p.packet.getZxid();
  } finally {
    // 解锁
    wl.unlock();
  }
}

在上述代码中主要处理流程如下。

  1. 判断提交日志总量是否大于默认最大提交数量(500个),如果是则会移除提交日志中的第一个元素,同时最小提交日志的zxid会采用移除后的第一个数据节点的zxid。
  2. 如果提交日志为则会将最小提交日志的zxid和最大提交日志的zxid都设置为请求中的zxid。
  3. 对请求中 的数据进行解析将其构造为数据信息(数据信息存储使用QuorumPacket类)。
  4. 构造提案对象(Proposal),将请求信息和数据信息放入提案中。
  5. 向提交日志中加入提案信息。
  6. 将最大提交日志的zxid修改为提案中的zxid。

在addCommittedProposal方法的处理中核心目标是完成请求到提案的转换,转换需要分为两步。

  1. 提取请求中的数据信息将其转换为QuorumPacket类并组合到提案对象中。
  2. 将提案对象放入到提交日志中。

至此对于fastForwardFromEdits方法的处理细节都分析结束,接下来将回到FileTxnSnapLog#restore方法对其中的save方法进行分析,具体代码如下。

public void save(DataTree dataTree,
                 ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)
  throws IOException {
  // 从数据树中获取最大zxid
  long lastZxid = dataTree.lastProcessedZxid;
  // 创建快照文件
  File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));
  LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),
           snapshotFile);
  // 序列化快照
  snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile);

}

在上述代码中处理流程如下。

  1. 从数据树中获取最大zxid。
  2. 创建快照文件。
  3. 通过成员变量snapLog进行序列化写出快照内容。

TxnLog 分析

通过对FileTxnSnapLog类的分析可以知道在FileTxnSnapLog类中核心数据有TxnLog类和SnapShot类,本节将对TxnLog类进行相关分析。TxnLog的实际类型是一个接口,具体定义代码如下。

public interface TxnLog {

    void setServerStats(ServerStats serverStats);

    void rollLog() throws IOException;

    boolean append(TxnHeader hdr, Record r) throws IOException;

    TxnIterator read(long zxid) throws IOException;

    long getLastLoggedZxid() throws IOException;

    boolean truncate(long zxid) throws IOException;

    long getDbId() throws IOException;

    void commit() throws IOException;

    long getTxnLogSyncElapsedTime();

    void close() throws IOException;

    interface TxnIterator {

        TxnHeader getHeader();

        Record getTxn();

        boolean next() throws IOException;

        void close() throws IOException;

        long getStorageSize() throws IOException;
    }
}

在TxnLog接口内部存在一个接口先对内部接口进行说明,TxnIterator接口具备五个方法分别是。

  1. 方法getHeader用于获取交易头信息(TxnHeader对象)。
  2. 方法getTxn用于获交易信息(Record对象)。
  3. 方法next用于判断是否存在下一个数据。
  4. 方法close用于关闭资源。
  5. 方法getStorageSize用于获取存储空间大小。

回到TxnLog接口中对其拥有的几个方法进行介绍。

  1. 方法setServerStats用于设置服务状态。
  2. 方法rollLog用于滚动日志。
  3. 方法append用于追加日志,追加内容是交易信息(Record对象)。
  4. 方法read根据zxid读取交易日志,读取的核心对象是TxnIterator。
  5. 方法getLastLoggedZxid用于获取最后一个zxid。
  6. 方法truncate根据zxid截断日志。
  7. 方法getDbId用于获取当前事务日志所在的数据库id。
  8. 方法commit用于提交事务日志。
  9. 方法getTxnLogSyncElapsedTime用于获取同步事务日志所消费的时间,时间单位毫秒。
  10. 方法close用于关闭事务日志。

<details> <summary>日志结构不确定是否需要编写</summary> TxnLog接口在Zookeeper项目中具体实现类是FileTxnLog,对于FileTxnLog类重点需要关注日志格式,FileTxnLog的格式由三部分组成,

  1. FileHeader,结构信息如下
FileHeader: {
    magic 4bytes (ZKLG)
    version 4bytes
    dbid 8bytes
}
  1. TxnList,结构信息如下。
Txn || Txn TxnList

在TxnList结构中重点结构是Txn,结构信息如下。

checksum Txnlen TxnHeader Record 0x42

在Txn结构信息中还涉及三个结构分别是。

  1. 结构Txnlen,具体信息如下。
len 4bytes
  1. 结构TxnHeader,具体信息如下。
TxnHeader: {
    sessionid 8bytes
    cxid 4bytes
    zxid 8bytes
    time 8bytes
    type 4bytes
}
  1. 结构Record,具体信息如下。
Record:
See Jute definition file for details on the various record types
  1. ZeroPad,结构信息如下。
0 padded to EOF (filled during preallocation stage)

</details>

append 方法分析

本节将对对TxnLog接口的实现类FileTxnLog中的append方法进行分析,具体处理代码如下。

public synchronized boolean append(TxnHeader hdr, Record txn)
  throws IOException {

  if (hdr == null) {
    return false;
  }

  if (hdr.getZxid() <= lastZxidSeen) {
    LOG.warn("Current zxid " + hdr.getZxid()
             + " is <= " + lastZxidSeen + " for "
             + hdr.getType());
  } else {
    lastZxidSeen = hdr.getZxid();
  }
  // 日志流为空
  if (logStream == null) {
    if (LOG.isInfoEnabled()) {
      LOG.info("Creating new log file: " + Util.makeLogName(hdr.getZxid()));
    }

    // 创建写出的文件
    logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
    // 创建文件输出流
    fos = new FileOutputStream(logFileWrite);
    // 初始化日志流对象
    logStream = new BufferedOutputStream(fos);
    // 初始化输出档案对象
    oa = BinaryOutputArchive.getArchive(logStream);
    // 构造事务头信息
    FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
    // 序列化头信息
    fhdr.serialize(oa, "fileheader");
    // 写出到文件
    logStream.flush();
    // 计算当前通道的大小并设置到filePadding对象中
    filePadding.setCurrentSize(fos.getChannel().position());
    // 将fos放入到集合中
    streamsToFlush.add(fos);
  }
  // 填充文件
  filePadding.padFile(fos.getChannel());
  // 对事务头信息和事务对象进行序列化
  byte[] buf = Util.marshallTxnEntry(hdr, txn);
  // 序列化结果为空或者长度为0抛出异常
  if (buf == null || buf.length == 0) {
    throw new IOException("Faulty serialization for header " +
                          "and txn");
  }
  // 获取校验和计算接口
  Checksum crc = makeChecksumAlgorithm();
  // 更新校验和
  crc.update(buf, 0, buf.length);
  // 将校验和信息写入到输出档案
  oa.writeLong(crc.getValue(), "txnEntryCRC");
  // 写入到输出档案中
  Util.writeTxnBytes(oa, buf);

  return true;
}

在append方法中核心处理流程如下。

  1. 判断事务头是否为空,如果为空则返回false结束处理。
  2. 判断事务头信息中的zxid是否小于等于最大的zxid,如果是则输出warn日志,反之则会将最大zxid设置数据进行重设,重设内容是事务头信息中的zxid。
  3. 在日志流为空的情况下进行如下操作。
    1. 创建需要写出的文件对象。
    2. 创建文件输出流。
    3. 初始化日志流对象。
    4. 初始化输出档案对象。
    5. 构造事务头信息。
    6. 序列化头信息。
    7. 写出文件。
    8. 计算当前通道的大小并设置到filePadding对象中,通道大小从文件输出流中获取。
    9. 将文件输出流放入到streamsToFlush集合中。
  4. 填充文件。
  5. 对事务头信息和事务对象进行序列化得到byte数组,如果byte数组为空或者byte数组长度为0将抛出异常。
  6. 获取校验和计算接口。
  7. 更新校验和信息,更新内容依靠byte数组。
  8. 将校验和信息写入到输出档案。
  9. 写入到输出档案中。

在append方法中重点需要关注填充文件的padFile方法,具体处理代码如下。

long padFile(FileChannel fileChannel) throws IOException {
  // 计算新的文件大小
  long newFileSize =
    calculateFileSizeWithPadding(fileChannel.position(), currentSize, preAllocSize);
  // 当前文件大小不等于新的文件大小需要填充0
  if (currentSize != newFileSize) {
    fileChannel.write((ByteBuffer) fill.position(0), newFileSize - fill.remaining());
    // 将当前文件大小设置为新的文件大小
    currentSize = newFileSize;
  }
  return currentSize;
}

在上述代码中核心处理流程如下。

  1. 计算新的文件大小。
  2. 如果当前文件大小不等于新的文件大小则会向文件中填充0。
  3. 将当前文件大小设置为新的文件大小。

在padFile方法中核心是calculateFileSizeWithPadding方法,具体处理代码如下。


public static long calculateFileSizeWithPadding(long position,
                                                long fileSize,
                                                long preAllocSize) {
  // 1. 需要填充的数据大小大于0
  // 2. 当前位置+4096后大于等于当前文件大小
  if (preAllocSize > 0 && position + 4096 >= fileSize) {
    // 如果当前位置大于文件大小
    if (position > fileSize) {
      // 文件大小重新计算:当前位置+需要填充数据的大小
      fileSize = position + preAllocSize;
      // 文件大小重新计算: 文件大小-(文件大小取模需要填充的大小)
      fileSize = fileSize - (fileSize % preAllocSize);
    } else {
      // 文件大小重新计算: 文件大小+需要填充的大小
      fileSize = fileSize + preAllocSize;
    }
  }

  return fileSize;
}

在calculateFileSizeWithPadding方法中涉及三个参数,参数含义如下。

  1. 参数position表示当前文件已经写入到的位置。
  2. 参数fileSize表示当前文件大小。
  3. 参数preAllocSize表示需要填充数据的大小。

了解参数含义后来对calculateFileSizeWithPadding方法的处理流程进行说明。首先需要做两个条件判断,在同时满足时会进行文件大小的重新计算,条件如下。

  1. 需要填充的大小大于0。
  2. 当前位置+4096后大于等于当前文件大小。

关于文件大小的计算会判断当前位置是否大于文件大小,如果大于则会进行两步运算得到结果。

  1. 文件大小=前文件已经写入到的位置加需要填充数据的大小。
  2. 文件大小=第(1)步运算结果减去第(1)步运算结果取模需要填充的文件大小。

如果当前位置是否小于等于文件大小文件计算规则:文件大小加需要填充数据的大小。

read 方法分析

本节将对对TxnLog接口的实现类FileTxnLog中的read方法进行分析,具体处理代码如下。

public TxnIterator read(long zxid, boolean fastForward) throws IOException {
    return new FileTxnIterator(logDir, zxid, fastForward);
}

从上述代码中可以发现在read方法中核心是构造FileTxnIterator类,具体构造代码如下。

public FileTxnIterator(File logDir, long zxid, boolean fastForward)
  throws IOException {
  this.logDir = logDir;
  this.zxid = zxid;
  init();

  if (fastForward && hdr != null) {
    while (hdr.getZxid() < zxid) {
      if (!next())
        break;
    }
  }
}

在FileTxnIterator的构造方法中存在三个参数,其含义如下。

  1. 参数logDir表示日志目录。
  2. 参数zxid表示zxid。
  3. 参数fastForward表示是否快速定位到zxid所代表的的txn,

在上述方法中主要是进行复制操作,需要将重点放在init方法,具体处理代码如下。

void init() throws IOException {
  storedFiles = new ArrayList<File>();
  // 读取日志目录下的所有日志文件
  List<File> files =
    Util.sortDataDir(FileTxnLog.getLogFiles(logDir.listFiles(), 0), LOG_FILE_PREFIX,
                     false);
  // 循环日志文件
  for (File f : files) {
    // 获取日志文件名称中的zxid,如果大于等于则加入到storedFiles集合中
    if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) >= zxid) {
      storedFiles.add(f);
    }
    // 如果日志文件名称中的zxid小于zxid则加入到storedFiles集合中,并且结束循环
    else if (Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX) < zxid) {
      storedFiles.add(f);
      break;
    }
  }

  // 将storedFiles集合中的最后一个文件转换为InputArchive类型,并赋值给成员变量ia
  goToNextLog();
  // 移动到下一个FileTxnIterator
  next();
}

在init方法中处理流程如下。

  1. 读取日志目录下的所有日志文件,注意此时读取的日志是排序后的,排序规则是比较zxid小的在前大的在后。
  2. 循环日志文件,执行如下两个操作。
    1. 判断当前日志文件名称中的zxid是否大于等于成员变量zxid,如果是则将其放入到storedFiles集合中。
    2. 判断当前日志文件名称中的zxid是否小于成员变量zxid,如果是则将其放入到storedFiles集合中,并且跳出循环。
  3. 执行goToNextLog方法和next方法。

下面对goToNextLog方法和next方法进行展开分析,首先对goToNextLog方法进行分析,具体处理代码如下。

private boolean goToNextLog() throws IOException {
  if (storedFiles.size() > 0) {
    this.logFile = storedFiles.remove(storedFiles.size() - 1);
    ia = createInputArchive(this.logFile);
    return true;
  }
  return false;
}

在上述代码中核心目标是对成员变量ia进行赋值操作首先判断集合storedFiles中的文件数量是否大于0,如果不是则直接结束处理返回false,如果是则会进行如下操作。

  1. 将storedFiles集合中的最后一个移除并将其设置到成员变量logFile。
  2. 通过createInputArchive方法创建InputArchive,将结果赋值给成员变量ia,在createInputArchive方法中创建的InputArchive实际类型是BinaryInputArchive,此外在createInputArchive方法中还会对文件头进行反序列化,具体内容在inStreamCreated方法中。

最后对next方法进行分析,

public boolean next() throws IOException {
  if (ia == null) {
    return false;
  }
  try {
    // 从ia(档案)中读取crcvalue数据
    long crcValue = ia.readLong("crcvalue");
    // 读取ia中的txn数据
    byte[] bytes = Util.readTxnBytes(ia);
    if (bytes == null || bytes.length == 0) {
      throw new EOFException("Failed to read " + logFile);
    }
    // 生成校验和接口实现类
    Checksum crc = makeChecksumAlgorithm();
    // 更新校验和数据
    crc.update(bytes, 0, bytes.length);
    // 如果发现前后两个校验和数据不同抛出异常
    if (crcValue != crc.getValue()) {
      throw new IOException(CRC_ERROR);
    }
    // 创建事务头
    hdr = new TxnHeader();
    // 将txn反序列化到record
    record = SerializeUtils.deserializeTxn(bytes, hdr);
  } catch (EOFException e) {
    LOG.debug("EOF exception " + e);
    inputStream.close();
    inputStream = null;
    ia = null;
    hdr = null;
    if (!goToNextLog()) {
      return false;
    }
    return next();
  } catch (IOException e) {
    inputStream.close();
    throw e;
  }
  return true;
}

在上述代码中首先对处理正常的情况进行分析,具体流程如下。

  1. 如果成员变量ia为空将结束处理返回false表示没有下一个。
  2. 从成员变量ia中读取crcvalue数据,crcvalue数据表示校验和数据值。
  3. 从成员变量ia中读取txn数据(事务数据),读取结果是byte数组,如果txn数据为null或者txn数据长度为0将抛出异常,具体异常类型为EOFException。
  4. 生成校验和接口实现类并对txn数据进行校验和计算,在校验和计算后需要比较当前校验和结果是否和第(2)步中的校验和相同,如果不相同将抛出异常,具体异常类型为IOException。
  5. 创建事务头对象,将事务头对象和第(3)步中的txn数据进行反序列化,结果放入到成员变量record中。

上述处理流程为正常情况下的处理,如果出现EOFException类型的异常将会进行goToNextLog方法的执行,通过goToNextLog方法的执行来修改成员变量ia,注意只有在goToNextLog方法处理结果为真的情况下才会执行next方法的流程。

getLastLoggedZxid 方法分析

本节将对对TxnLog接口的实现类FileTxnLog中的getLastLoggedZxid方法进行分析,具体处理代码如下。

public long getLastLoggedZxid() {
    // 获取日志目录下的所有日志文件
    File[] files = getLogFiles(logDir.listFiles(), 0);
    // 确定最大日志编号
    // 日志文件数量大于零的情况下通过getZxidFromName方法获取,反之则直接取值-1
    long maxLog = files.length > 0 ?
            Util.getZxidFromName(files[files.length - 1].getName(), LOG_FILE_PREFIX) : -1;

    // 设置zxid,先将最大日志编号设置给zxid
    long zxid = maxLog;
    TxnIterator itr = null;
    try {
        // 将日志目录转换为事务日志
        FileTxnLog txn = new FileTxnLog(logDir);
        // 从事务日志中读取TxnIterator
        itr = txn.read(maxLog);
        // 循环处理TxnIterator中的数据,跳出条件是没有下一个TxnIterator对象
        while (true) {
            if (!itr.next()) {
                break;
            }
            // 获取头信息,将
            TxnHeader hdr = itr.getHeader();
            zxid = hdr.getZxid();
        }
    } catch (IOException e) {
        LOG.warn("Unexpected exception", e);
    } finally {
        close(itr);
    }
    return zxid;
}

在上述代码中核心处理流程如下。

  1. 通过getLogFiles方法获取日志文件下的所有日志文件。
  2. 确定最大日志编号,确定规则:如果日志文件数量大于0则通过Util#getZxidFromName方法获取最大编号,反之则取值-1。
  3. 设置zxid,先将最大日志编号设置给zxid,然后通过遍历日志目录中的事务日志确认最终的zxid。

在getLastLoggedZxid方法中着重要关注getLogFiles方法和Util#getZxidFromName方法,下面先对getLogFiles方法进行分析,具体处理代码如下。

public static File[] getLogFiles(File[] logDirList, long snapshotZxid) {
  // 通过Util.sortDataDir对日志文件排序
  List<File> files = Util.sortDataDir(logDirList, LOG_FILE_PREFIX, true);
  // 日志zxid标记
  long logZxid = 0;

  // 循环日志文件集合
  for (File f : files) {
    // 从文件名称中获取zxid
    long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
    // 如果文件名的zxid大于快照zxid跳过处理
    if (fzxid > snapshotZxid) {
      continue;
    }
    // 如果文件名的zxid大于日志zxid将文件名的zxid赋值给日志zxid
    if (fzxid > logZxid) {
      logZxid = fzxid;
    }
  }
  // 创建结果集
  List<File> v = new ArrayList<File>(5);
  // 循环日志文件集合
  for (File f : files) {
    // 从文件名称中获取zxid
    long fzxid = Util.getZxidFromName(f.getName(), LOG_FILE_PREFIX);
    // 如果文件名称的zxid小于日志zxid不将其加入到结果集中
    if (fzxid < logZxid) {
      continue;
    }
    v.add(f);
  }
  return v.toArray(new File[0]);

}

在上述代码中核心处理流程如下。

  1. 通过Util#sortDataDir对日志文件排序。
  2. 创建日志zxid标记,初始值为0。
  3. 循环日志文件集合,对单个日志文件做如下操作。
    1. 通过Util#getZxidFromName方法从文件名称中获取zxid。
    2. 如果文件名的zxid大于快照zxid则跳过处理。
    3. 如果文件名的zxid大于日志zxid将文件名的zxid赋值给日志zxid。
  4. 创建结果集合。
  5. 循环日志文件集合,如果从文件名称中的zxid大于日志zxid则加入到结果集和中。
  6. 将结果集合转换为数组返回。

在getLogFiles方法的处理过程中第(1)步会对日志文件进行排序,具体处理代码如下。

public static List<File> sortDataDir(File[] files, String prefix, boolean ascending) {
  if (files == null)
    return new ArrayList<File>(0);
  List<File> filelist = Arrays.asList(files);
  Collections.sort(filelist, new DataDirFileComparator(prefix, ascending));
  return filelist;
}

在上述代码中可以发现核心排序需要依靠DataDirFileComparator类进行,具体排序实现如下。

public int compare(File o1, File o2) {
  long z1 = Util.getZxidFromName(o1.getName(), prefix);
  long z2 = Util.getZxidFromName(o2.getName(), prefix);
  int result = z1 < z2 ? -1 : (z1 > z2 ? 1 : 0);
  return ascending ? result : -result;
}

通过阅读上述代码可以发现核心是依靠zxid来完成排序,重点方法是Util#getZxidFromName,该方法在getLogFiles方法中也多次出现,下面对其进行分析,具体是现代码如下。

public static long getZxidFromName(String name, String prefix) {
  // 创建zxid标记
  long zxid = -1;
  // 将文件名按照小数点进行拆分
  String nameParts[] = name.split("\\.");
  // 如果拆分后数组长度为2并且第一个元素与前缀相同
  if (nameParts.length == 2 && nameParts[0].equals(prefix)) {
    try {
      // zxid的取值为拆分后的第二个元素
      zxid = Long.parseLong(nameParts[1], 16);
    } catch (NumberFormatException e) {
    }
  }
  return zxid;
}

在上述代码中定义了从文件名称获取zxid的流程:首先需要将文件名称按照小数点进行拆分,如果拆分后的数组长度为2并且第一个元素和前缀相同zxid的取值将会是拆分后数组的第二个元素。

truncate 方法分析

本节将对对TxnLog接口的实现类FileTxnLog中的truncate方法进行分析,具体处理代码如下。

public boolean truncate(long zxid) throws IOException {
  FileTxnIterator itr = null;
  try {
    // 创建FileTxnIterator对象
    itr = new FileTxnIterator(this.logDir, zxid);
    // 从FileTxnIterator对象中获取输入流
    PositionInputStream input = itr.inputStream;
    // 如果输入流为空抛出异常
    if (input == null) {
      throw new IOException("No log files found to truncate! This could " +
                            "happen if you still have snapshots from an old setup or " +
                            "log files were deleted accidentally or dataLogDir was changed in zoo.cfg.");
    }
    // 从输入流中获取位置索引
    long pos = input.getPosition();
    // 截断日志文件内容
    RandomAccessFile raf = new RandomAccessFile(itr.logFile, "rw");
    raf.setLength(pos);
    raf.close();
    // 如果存在下一个日志文件
    while (itr.goToNextLog()) {
      // 删除失败记录warn级别日志
      if (!itr.logFile.delete()) {
        LOG.warn("Unable to truncate {}", itr.logFile);
      }
    }
  } finally {
    close(itr);
  }
  return true;
}

在上述代码中核心处理流程如下。

  1. 创建FileTxnIterator对象。
  2. 从FileTxnIterator对象中获取输入流,如果输入流为空抛出异常。
  3. 从输入流中获取索引。
  4. 创建RandomAccessFile对象,该对象用于进行截断,从第一个数据节点开始截断,截断到索引位置。举例,原本文件内容为123,截断索引是2,经过截断后文件内容会变成12。
  5. 循环操作,判断是否存在下一个日志文件,如果存在则会进行删除操作,在删除失败后记录warn级别的日志。

在truncate方法中主要使用到的类是RandomAccessFile,依靠RandomAccessFile类提供的setLength方法实现截断日志文件。

commit 方法分析

本节将对对TxnLog接口的实现类FileTxnLog中的commit方法进行分析,具体处理代码如下。

public synchronized void commit() throws IOException {
  // 日志流不为空的情况下进行输出
  if (logStream != null) {
    logStream.flush();
  }
  // 循环文件输出流集合
  for (FileOutputStream log : streamsToFlush) {
    // 输出流
    log.flush();
    // 如果需要强制同步
    if (forceSync) {
      // 获取当前纳秒时间
      long startSyncNS = System.nanoTime();
      // 获取通道
      FileChannel channel = log.getChannel();
      // 强制将通道中的数据写出
      channel.force(false);
      // 计算时间差
      syncElapsedMS = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
      // 如果时间差大于最大允许同步时间
      if (syncElapsedMS > fsyncWarningThresholdMS) {
        // 服务状态不为空
        if (serverStats != null) {
          // 同步超时次数累加1
          serverStats.incrementFsyncThresholdExceedCount();
        }
        LOG.warn("fsync-ing the write ahead log in "
                 + Thread.currentThread().getName()
                 + " took " + syncElapsedMS
                 + "ms which will adversely effect operation latency. "
                 + "File size is " + channel.size() + " bytes. "
                 + "See the ZooKeeper troubleshooting guide");
      }
    }
  }
  // 如果文件输出流集合数量大于1则会将第一个移除并且关闭
  while (streamsToFlush.size() > 1) {
    streamsToFlush.removeFirst().close();
  }
}

在上述代码中主要处理流程如下。

  1. 判断日志流是否为空,如果日志流不为空将进行输出操作。
  2. 循环文件输出流集合(streamsToFlush),对单个文件输出流进行如下操作。
    1. 输出流。
    2. 判断是否需要强制同步,如果需要则进行强制同步处理流程。
  3. 循环关闭输出流中的流。

在上述处理流程中强制同步处理流程如下。

  1. 获取当前纳秒时间。
  2. 获取日志输出流的通道,将通道中的数据强制写出。
  3. 计算当前时间和第(1)步中的时间差。
  4. 判断时间差是否大于最大允许同步时间,如果是则会进行两个操作。
    1. 在成员变量serverStats不为空的状态下进行同步超时次数累加一。
    2. 输出warn级别的日志。

SnapShot 分析

通过对FileTxnSnapLog类的分析可以知道在FileTxnSnapLog类中核心数据有TxnLog类和SnapShot类,本节将对SnapShot类进行相关分析。SnapShot的实际类型是一个接口,具体定义代码如下。

public interface SnapShot {

    long deserialize(DataTree dt, Map<Long, Integer> sessions)
            throws IOException;

    void serialize(DataTree dt, Map<Long, Integer> sessions,
                   File name)
            throws IOException;

    File findMostRecentSnapshot() throws IOException;

    void close() throws IOException;
}

在SnapShot接口中定义了四个方法分别是。

  1. 方法deserialize用于从最后一个有效快照中反序列化数据树并将最后一个zxid返回。
  2. 方法serialize用于将数据树和会话信息进行序列化到文件。
  3. 方法findMostRecentSnapshot用于寻找最新的快照文件。
  4. 方法close用于关闭快照文件。

deserialize 方法分析

本节将对对SnapShot接口的实现类FileSnap中的deserialize方法进行分析,具体处理代码如下。

public long deserialize(DataTree dt, Map<Long, Integer> sessions)
  throws IOException {
  // 查询n个快照文件
  List<File> snapList = findNValidSnapshots(100);
  // 如果快照文件数量为0放回-1
  if (snapList.size() == 0) {
    return -1L;
  }
  File snap = null;
  // 是否寻找成功
  boolean foundValid = false;
  // 循环快照文件集合
  for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
    // 获取当前快照
    snap = snapList.get(i);
    LOG.info("Reading snapshot " + snap);
    // 打开快照输入流和校验流
    try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap));
         CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) {
      // 将校验流转换为输入档案
      InputArchive ia = BinaryInputArchive.getArchive(crcIn);
      // 从输入存档反序列化数据树
      deserialize(dt, sessions, ia);
      // 提取校验流中的校验和
      long checkSum = crcIn.getChecksum().getValue();
      // 从输入档案中获取val数据
      long val = ia.readLong("val");
      // 如果val数据值和校验和不相同将会抛出异常
      if (val != checkSum) {
        throw new IOException("CRC corruption in snapshot :  " + snap);
      }
      // 是否寻找成功设置为真
      foundValid = true;
      // 结束循环
      break;
    } catch (IOException e) {
      LOG.warn("problem reading snap file " + snap, e);
    }
  }
  // 如果是否寻找成功标记为假抛出异常
  if (!foundValid) {
    throw new IOException("Not able to find valid snapshots in " + snapDir);
  }
  // 通过快照文件计算zxid将其赋值给数据树的lastProcessedZxid变量
  dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
  // 返回zxid
  return dt.lastProcessedZxid;
}

在上述代码中核心操作流程如下。

  1. 通过findNValidSnapshots方法寻找100个快照文件,如果寻找结果数量为0则返回-1结束处理。

  2. 循环第(1)步中寻找的快照文件集合对单个快照文件做如下操作。

    1. 将快照文件转换为输入流和校验流,将校验流转换为输入档案。
    2. 通过deserialize方法将会话信息和收入档案反序列化到数据树中。
    3. 从校验流中获取校验和将其与输入档案中的val属性比较,如果不相同则抛出异常,如果校验和检查通过则将是否寻找成功标记设置为真并结束循环。
  3. 判断是否寻找标记是否为假,如果为假则抛出异常。

  4. 通过快照文件计算zxid将其赋值给数据树的lastProcessedZxid变量,并将lastProcessedZxid信息返回。

在上述处理流程中需要重点关注寻找n个快照的方法(findNValidSnapshots)以及反序列化的方法(deserialize),下面先对findNValidSnapshots方法进行分析,具体处理代码如下。

private List<File> findNValidSnapshots(int n) throws IOException {
  // 在快照目录下搜索所有的快照文件并将其根据zxid排序
  List<File> files = Util.sortDataDir(snapDir.listFiles(), SNAPSHOT_FILE_PREFIX, false);
  int count = 0;
  List<File> list = new ArrayList<File>();
  for (File f : files) {
    try {
      // 校验是否是有效快照
      if (Util.isValidSnapshot(f)) {
        // 加入到快照文件集合中
        list.add(f);
        // 计数器累加1
        count++;
        // 如果计数器累加后和需要的文件数量相同则跳出循环
        if (count == n) {
          break;
        }
      }
    } catch (IOException e) {
      LOG.info("invalid snapshot " + f, e);
    }
  }
  return list;
}

在上述方法中核心处理流程如下。

  1. 在快照目录下搜索所有的快照文件并将其根据zxid排序。
  2. 遍历快照文件集合对单个文件的操作流程如下。
    1. 判断快照文件是否是有效快照,如果不是将跳过。
    2. 当快文件是有效快照时将其放入到快照文件集合中。
    3. 计数器累加1。
    4. 如果计数器累加后和需要的文件数量相同则跳出循环。
  3. 返回快照文件集合。

在上述处理流程中核心方法是isValidSnapshot,该方法用于验证快照是否有效,具体处理代码如下。

public static boolean isValidSnapshot(File f) throws IOException {
  // 1. 文件为空
  // 2. 文件名称中的zxid为-1
  if (f == null || Util.getZxidFromName(f.getName(), FileSnap.SNAPSHOT_FILE_PREFIX) == -1)
    return false;
  // 将文件对象转换为RandomAccessFile对象
  try (RandomAccessFile raf = new RandomAccessFile(f, "r")) {
    // 如果RandomAccessFile对象长度小于10返回false
    if (raf.length() < 10) {
      return false;
    }
    raf.seek(raf.length() - 5);
    byte bytes[] = new byte[5];
    int readlen = 0;
    int l;
    while (readlen < 5 &&
           (l = raf.read(bytes, readlen, bytes.length - readlen)) >= 0) {
      readlen += l;
    }
    // 读取长度和内容字节长度不相同返回false
    if (readlen != bytes.length) {
      LOG.info("Invalid snapshot " + f
               + " too short, len = " + readlen);
      return false;
    }
    // 将字节数组转换为ByteBuffer
    ByteBuffer bb = ByteBuffer.wrap(bytes);
    // 读取字节数组中第四位的int数据
    int len = bb.getInt();
    // 读取字节数组中的第五位的数据
    byte b = bb.get();
    if (len != 1 || b != '/') {
      LOG.info("Invalid snapshot " + f + " len = " + len
               + " byte = " + (b & 0xff));
      return false;
    }
  }

  return true;
}

在上述代码中核心处理流程如下。

  1. 判断文件是否为空,计算文件名称的zxid是否是-1,如果满足这两个条件中的任意一个就返回false表示不是快照文件。
  2. 将文件对象转换为RandomAccessFile对象。
  3. 判断RandomAccessFile对象的长度是否小于10个字符,如果小于则返回false结束处理。这里的10个字符表示表示文件内容长度,在Zookeeper中规定快照文件最少有10个字符长度。
  4. 将读取指针放在文件长度减5的位置,操作命令为raf.seek(raf.length() - 5)。
  5. 从RandomAccessFile对象中读取5个字节的内容,如果读取内容长度不是5个字节将返回false结束处理。
  6. 将读取内容(变量bytes)转换为ByteBuffer类型,读取第四位和第五位的数据,第四位用int表示,第五位用byte表示,如果第四位不为1或者第五位不为'/'将返回false结束处理。

在了解findNValidSnapshots方法后再来对findMostRecentSnapshot方法进行分析,该方法用于寻找查找最新的快照文件,具体处理代码如下。

public File findMostRecentSnapshot() throws IOException {
  List<File> files = findNValidSnapshots(1);
  if (files.size() == 0) {
    return null;
  }
  return files.get(0);
}

在findMostRecentSnapshot方法中核心操作是将findNValidSnapshots方法参数传递1即可完成最新快照文件。至此对findNValidSnapshots方法的分析告一段落,接下来对deserialize方法进行分析,具体处理代码如下。

public void deserialize(DataTree dt, Map<Long, Integer> sessions,
                        InputArchive ia) throws IOException {
  // 创建文件头信息
  FileHeader header = new FileHeader();
  // 将输入档案中的信息反序列化到文件头信息中
  header.deserialize(ia, "fileheader");
  // 如果文件头信息中的magic属性不是SNAP_MAGIC变量将抛出异常
  if (header.getMagic() != SNAP_MAGIC) {
    throw new IOException("mismatching magic headers "
                          + header.getMagic() +
                          " !=  " + FileSnap.SNAP_MAGIC);
  }
  // 将输入档案中的信息反序列化到数据树和会话信息中
  SerializeUtils.deserializeSnapshot(dt, ia, sessions);
}

在上述代码中核心处理流程如下。

  1. 创建文件头信息。

  2. 将输入档案中的信息反序列化到文件头信息中。

  3. 如果文件头信息中的magic属性和静态变量SNAP_MAGIC不相同就抛出异常。静态变量定义如下。

    public final static int SNAP_MAGIC = ByteBuffer.wrap("ZKSN".getBytes()).getInt();
    
  4. 将输入档案中的信息反序列化到数据树和会话信息中。

serialize 方法分析

本节将对对SnapShot接口的实现类FileSnap中的serialize方法进行分析,具体处理代码如下。

public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)
  throws IOException {
  // 判断是否关闭
  if (!close) {
    // 打开输出流和校验输出流
    try (OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));
         CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32())) {
      // 获取输出文档
      OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);
      // 创建文件头
      FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);
      // 序列化数据
      serialize(dt, sessions, oa, header);
      // 获取校验和
      long val = crcOut.getChecksum().getValue();
      // 写出
      oa.writeLong(val, "val");
      oa.writeString("/", "path");
      sessOS.flush();
    }
  } else {
    throw new IOException("FileSnap has already been closed");
  }
}

在上述代码中主要处理流程如下。

  1. 判断关闭标记是否为真,如果为真则抛出异常。关闭标记通过close方法进行设置,调用后会将关闭标记设置为真,表示已关闭。
  2. 将快照文件转换为输出流和校验输出流,并将校验输出流转换为输出文档。
  3. 创建文件头,
  4. 通过serialize方法进行序列化。
  5. 从校验输出流中获取校验和,将其写入到输出档案中,同时还需要写入"/"和path信息到输出档案中。
  6. 通过输出流写出文件。

在上述处理流程中主要关注serialize方法,具体处理代码如下。

protected void serialize(DataTree dt, Map<Long, Integer> sessions,
                         OutputArchive oa, FileHeader header) throws IOException {
  if (header == null) {
    throw new IllegalStateException(
      "Snapshot's not open for writing: uninitialized header");
  }
  header.serialize(oa, "fileheader");
  SerializeUtils.serializeSnapshot(dt, oa, sessions);
}

在上述代码中首先会判断文件头是否为空,如果为空则抛出异常,然后将头信息序列化到输出档案中,最后会将数据树和会话信息序列化到输出档案中。

总结

本章对Zookeeper日志文件相关内容进行分析,在Zookeeper中主要包含两种日志文件,第一种是事务日志文件,第二种是快照日志文件。这两种日志文件分别由TxnLog接口和SnapShot接口定义,在本章着重对这两种日志文件进行分析。