zookeeper source code analysis

Zookeeper 状态同步

本章将对Zookeeper项目中状态同步相关内容进行分析,数据同步发生的时机是在选举结束以后。在Zookeeper中关于数据涉及Leader、Follower和Observer三个类,本章的分析核心也将围绕这三个类进行展开。

Leader

本节将对Leader类进行相关分析,首先从它的构造函数作为切入点,构造函数的使用在QuorumPeer#makeLeader方法中有提及,具体处理代码如下。

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
  return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

通过上述方法进入到Leader构造函数,具体构造函数代码如下。

Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
  this.self = self;
  this.proposalStats = new BufferStats();
  try {
    if (self.shouldUsePortUnification() || self.isSslQuorum()) {
      boolean allowInsecureConnection = self.shouldUsePortUnification();
      if (self.getQuorumListenOnAllIPs()) {
        ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection,
                                     self.getQuorumAddress().getPort());
      } else {
        ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
      }
    } else {
      if (self.getQuorumListenOnAllIPs()) {
        ss = new ServerSocket(self.getQuorumAddress().getPort());
      } else {
        ss = new ServerSocket();
      }
    }
    ss.setReuseAddress(true);
    if (!self.getQuorumListenOnAllIPs()) {
      ss.bind(self.getQuorumAddress());
    }
  } catch (BindException e) {
    if (self.getQuorumListenOnAllIPs()) {
      LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
    } else {
      LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
    }
    throw e;
  }
  this.zk = zk;
  this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
    maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}

在上述代码中核心是完成如下几个成员变量的初始化。

  1. 成员变量self,其核心是参选节点QuorumPeer。
  2. 成员变量proposalStats,它是用于进行数据统计的变量。
  3. 成员变量ss,它代表了socket连接。
  4. 成员变量zk,它表示zk服务,这里特指LeaderZooKeeperServer。
  5. 成员变量learnerSnapshotThrottler,用于限制从Leader往Follower或者Observer发送快照的数量。

接下来对核心方法lead进行分析,具体处理代码如下。

void lead() throws IOException, InterruptedException {
  // 获取当前时间
  self.end_fle = Time.currentElapsedTime();
  // 计算选举消耗的时间:当前时间-选举开始时间
  long electionTimeTaken = self.end_fle - self.start_fle;
  // 设置选举消耗的时间
  self.setElectionTimeTaken(electionTimeTaken);
  LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
           QuorumPeer.FLE_TIME_UNIT);
  // 时间计数器归零
  self.start_fle = 0;
  self.end_fle = 0;

  // 注册JMX
  zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

  try {
    self.tick.set(0);
    // 执行数据加载操作
    zk.loadData();

    // 创建状态信息,主要存储参选周期和最后处理的zxid
    leaderStateSummary =
      new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

    // 创建LearnerCnxAcceptor类用于等待Follower发送同步请求
    cnxAcceptor = new LearnerCnxAcceptor();
    cnxAcceptor.start();

    // 计算最新选举周期
    long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

    // 将任选周期转换为zxid并将其设置到zk中
    zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

    synchronized (this) {
      lastProposed = zk.getZxid();
    }

    // 创建NEWLEADER数据包
    newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                                                null, null);


    if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
      LOG.info("NEWLEADER proposal has Zxid of "
               + Long.toHexString(newLeaderProposal.packet.getZxid()));
    }

    QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
    QuorumVerifier curQV = self.getQuorumVerifier();
    if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
      try {
        LOG.debug(String.format(
          "set lastSeenQuorumVerifier to currentQuorumVerifier (%s)",
          curQV.toString()));
        QuorumVerifier newQV = self.configFromString(curQV.toString());
        newQV.setVersion(zk.getZxid());
        self.setLastSeenQuorumVerifier(newQV, true);
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier()
        .getVersion()) {
      newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 等待 epoch 的ACK数据包
    waitForEpochAck(self.getId(), leaderStateSummary);
    // 设置当前选举周期
    self.setCurrentEpoch(epoch);

    try {
      // 等待NEWLEADER的ACK数据包
      waitForNewLeaderAck(self.getId(), zk.getZxid());
    } catch (InterruptedException e) {
      shutdown("Waiting for a quorum of followers, only synced with sids: [ "
               + newLeaderProposal.ackSetsToString() + " ]");
      HashSet<Long> followerSet = new HashSet<Long>();


      for (LearnerHandler f : getLearners()) {
        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
          followerSet.add(f.getSid());
        }
      }
      boolean initTicksShouldBeIncreased = true;
      for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) {
        if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
          initTicksShouldBeIncreased = false;
          break;
        }
      }
      if (initTicksShouldBeIncreased) {
        LOG.warn("Enough followers present. " +
                 "Perhaps the initTicks need to be increased.");
      }
      return;
    }

    // 启动zk服务,注意不是启动zk进程
    startZkServer();

    String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
    if (initialZxid != null) {
      long zxid = Long.parseLong(initialZxid);
      zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
    }

    if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
      self.setZooKeeperServer(zk);
    }

    self.adminServer.setZooKeeperServer(zk);

    boolean tickSkip = true;
    String shutdownMessage = null;

    while (true) {
      synchronized (this) {
        long start = Time.currentElapsedTime();
        long cur = start;
        long end = start + self.tickTime / 2;
        while (cur < end) {
          wait(end - cur);
          cur = Time.currentElapsedTime();
        }

        if (!tickSkip) {
          self.tick.incrementAndGet();
        }

        // 创建SyncedLearnerTracker对象,该对象用于确认是否有大多数同意
        SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
        syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
            .getQuorumVerifier().getVersion()) {
          syncedAckSet.addQuorumVerifier(self
                                         .getLastSeenQuorumVerifier());
        }

        syncedAckSet.addAck(self.getId());

        for (LearnerHandler f : getLearners()) {
          if (f.synced()) {
            syncedAckSet.addAck(f.getSid());
          }
        }

        if (!this.isRunning()) {
          shutdownMessage = "Unexpected internal error";
          break;
        }

        // 没有大多数认可,结束处理
        if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
          shutdownMessage =
            "Not sufficient followers synced, only synced with sids: [ "
            + syncedAckSet.ackSetsToString() + " ]";
          break;
        }
        tickSkip = !tickSkip;
      }
      // 获取LearnerHandler然后发送ping
      for (LearnerHandler f : getLearners()) {
        f.ping();
      }
    }
    if (shutdownMessage != null) {
      shutdown(shutdownMessage);

    }
  } finally {
    zk.unregisterJMX(this);
  }
}

在上述代码中核心处理流程如下。

  1. 获取当前时间,计算选举所消耗的时间并将这个数据赋值到QuorumPeer对象中,赋值完成后需要将时间计数器(开始时间start_fle和结束时间end_fle)清零。
  2. 调用ZooKeeperServer#loadData方法加载数据。在这个过程中会完成ZK数据库的加载以及快照的生产。
  3. 创建StateSummary对象将其赋值到成员变量leaderStateSummary中,在leaderStateSummary变量中记录了当前选举周期和最后处理的zxid。
  4. 创建LearnerCnxAcceptor类,该类是一个线程类,创建目的是为了等待Follower发送状态同步信息。
  5. 通过getEpochToPropose方法计算新的选举周期,注意该数据会赋值到成员变量epoch中。
  6. 将第(5)步中获得的选举中转换为zxid放入到zk对象中。
  7. 创建NEWLEADER类型的数据包,将其赋值到成员变量newLeaderProposal中。
  8. 对比QuorumPeer中的lastSeenQuorumVerifier和quorumVerifier版本号是否相同,如果相同则需要新建QuorumVerifier对象并且修正QuorumPeer中的lastSeenQuorumVerifier数据。
  9. 向提案(newLeaderProposal)中添加quorumVerifier对象,或者在lastSeenQuorumVerifier版本号大于quorumVerifier版本号的时候将lastSeenQuorumVerifier也加入到提案中。
  10. 执行waitForEpochAck方法等待大多数的选举参与者确认接收新的选举周期(变量epoch)。当大多数参选者认可选举周期后需要将选举周期设置到QuorumPeer对象中。
  11. 执行waitForNewLeaderAck方法等待大多数的选举参与者确认收到NEWLEADER数据包。
  12. 调用startZkServer方法将ZookeeperServer类启动。
  13. 执行死循环,在该死循环中主要与Follower进行心跳保持(向Follower发送PING数据包)。

接下来对第(4)步中所使用的LearnerCnxAcceptor类进行分析,该类是一个线程类,因此主要分析run方法,具体代码如下。

@Override
public void run() {
  try {
    while (!stop) {
      Socket s = null;
      boolean error = false;
      try {
        // 接收socket连接
        s = ss.accept();
        s.setSoTimeout(self.tickTime * self.initLimit);
        s.setTcpNoDelay(nodelay);

        BufferedInputStream is = new BufferedInputStream(
          s.getInputStream());
        // 创建LearnerHandler对象并启动
        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
        fh.start();
      } catch (SocketException e) {
        error = true;
        if (stop) {
          LOG.info("exception while shutting down acceptor: "
                   + e);
          stop = true;
        }
        else {
          throw e;
        }
      } catch (SaslException e) {
        LOG.error("Exception while connecting to quorum learner", e);
        error = true;
      } catch (Exception e) {
        error = true;
        throw e;
      } finally {
        // Don't leak sockets on errors
        if (error && s != null && !s.isClosed()) {
          try {
            s.close();
          } catch (IOException e) {
            LOG.warn("Error closing socket", e);
          }
        }
      }
    }
  } catch (Exception e) {
    LOG.warn("Exception while accepting follower", e.getMessage());
    handleException(this.getName(), e);
  }
}

在上述代码中核心处理流程。

  1. 接受socket连接。
  2. 将socket连接中的输入流提取,配合参数创建LearnerHandler类。
  3. 启动LearnerHandler类。

接下来将对getDesignatedLeader方法进行分析,该方法用于确认领导者对应的服务id(myid),具体使用场景有两个。

  1. 在startZkServer方法执行期间如果开启了重载配置则需要通过该方法确认领导者id然后重新加载配置。
  2. 在tryToCommit方法中对于请求类型是reconfig的情况下需要确认领导者。

下面是getDesignatedLeader方法的具体处理代码。

private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
  // 获取最后一个QuorumVerifierAcksetPair对象

  Proposal.QuorumVerifierAcksetPair newQVAcksetPair =
    reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size() - 1);


  // 判断自己是否在newQVAcksetPair中,如果在我自己就是leader
  if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) &&
      newQVAcksetPair.getQuorumVerifier().getVotingMembers()
      .get(self.getId()).addr.equals(self.getQuorumAddress())) {
    return self.getId();
  }
  // 候选人集合
  HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
  // 从候选人集合中移除自己
  candidates.remove(self.getId()); 
  // 当前需要处理的候选人
  long curCandidate = candidates.iterator().next();

  long curZxid = zxid + 1;
  Proposal p = outstandingProposals.get(curZxid);

  // 循环处理,处理目标是获取最多投票数量的候选人
  while (p != null && !candidates.isEmpty()) {
    for (Proposal.QuorumVerifierAcksetPair qvAckset : p.qvAcksetPairs) {
      candidates.retainAll(qvAckset.getAckset());
      if (candidates.isEmpty()) {
        return curCandidate;
      }
      curCandidate = candidates.iterator().next();
      if (candidates.size() == 1) {
        return curCandidate;
      }
    }
    curZxid++;
    p = outstandingProposals.get(curZxid);
  }

  return curCandidate;
}

在上述代码中核心处理流程如下。

  1. 从参数提案中获取最后一个QuorumVerifierAcksetPair对象。
  2. 判断自身服务id是否在QuorumVerifierAcksetPair对象中,如果在则说明自己就是leader。
  3. 从QuorumVerifierAcksetPair对象中获取候选人集合(收到ack的服务id)。
  4. 由于在第(2)步中并未成为leader因此需要在候选人集合中移除。
  5. 通过循环确认领导者id,循环中确认领导者的条件有两种。
    1. 提案中的a ck服务id和变量candidates进行交集运算交集为空时可以确认领导者。
    2. 提案中的ack服务id和变量candidates进行交集运算交集数量为1时可以确认领导者。

LearnerHandler 分析

本节将对LearnerHandler类进行分析,对于这个类可以将其理解为学习处理程序或者是数据同步器,该类是一个线程类,因此主要分析run方法,run方法是一个相对内容较多的方法对于它的分析可以按照五步分进行。

第一部分:确认基础数据操作,基础数据包含如下内容。

  1. 变量tickOfNextAckDeadline,该变量表示下一个ack收到的截止时间。
  2. 变量ia,该变量表示输入档案。
  3. 变量bufferedOutput,该变量用于承载需要写出的字节流。
  4. 变量oa,该变量表示输出档案
  5. 变量sid,该变量表示服务id(myid)。
  6. 变量version,该变量表示协议版本。
  7. 变量learnerType,该变量表示学习者类型,类型分为两种,一种是具备投票能力的学习者(PARTICIPANT),另一种是不具备投票能力的学习者(OBSERVER)。

第一部分相关操作代码如下。

// 第一部分确认基础数据
// 向leader对象中添加学习处理程序
leader.addLearnerHandler(this);
// 计算截止时间
tickOfNextAckDeadline = leader.self.tick.get()
  + leader.self.initLimit + leader.self.syncLimit;

// 将bufferedInput对象做出类型转换
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);

// 创建数据包
QuorumPacket qp = new QuorumPacket();
// 从变量ia中将数据包相关信息读取,反序列化到qp变量
ia.readRecord(qp, "packet");
// 如果数据包类型不是FOLLOWERINFO也不是OBSERVERINFO结束处理
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
  LOG.error("First packet " + qp.toString()
            + " is not FOLLOWERINFO or OBSERVERINFO!");
  return;
}

// 读取数据,数据对象是LearnerInfo,数据是在org.apache.zookeeper.server.quorum.Learner.registerWithLeader中创建的
byte learnerInfoData[] = qp.getData();
// 从learnerInfoData中读取sid、version和configVersion
if (learnerInfoData != null) {
  ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
  if (learnerInfoData.length >= 8) {
    this.sid = bbsid.getLong();
  }
  if (learnerInfoData.length >= 12) {
    this.version = bbsid.getInt(); // 协议版本号
  }
  if (learnerInfoData.length >= 20) {
    long configVersion = bbsid.getLong();
    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
      throw new IOException(
        "Follower is ahead of the leader (has a later activated configuration)");
    }
  }
} else {
  // sid等于Follower类型的节点数量减一
  this.sid = leader.followerCounter.getAndDecrement();
}

// 日志记录
if (leader.self.getView().containsKey(this.sid)) {
  LOG.info("Follower sid: " + this.sid + " : info : "
           + leader.self.getView().get(this.sid).toString());
} else {
  LOG.info("Follower sid: " + this.sid + " not in the current config "
           + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}

// 如果数据包类型是OBSERVERINFO,修改成员变量learnerType
if (qp.getType() == Leader.OBSERVERINFO) {
  learnerType = LearnerType.OBSERVER;
}

在上述代码需要注意qp.getData代码所得到的数据类型是LearnerInfo,以及这条数据的产生来源是Learner#registerWithLeader方法。

第二部分:不同协议版本号之间的操作,具体处理代码如下。

// 第二部分不同协议版本号之间的操作
// 从数据包中根据zxid获取最后接受的epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

long peerLastZxid;
StateSummary ss = null;
// 从数据包中获取zxid
long zxid = qp.getZxid();
// 构建新的选举周期
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
// 构建新的zxid
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);


if (this.getVersion() < 0x10000) {
  // 从zxid
  long epoch = ZxidUtils.getEpochFromZxid(zxid);
  ss = new StateSummary(epoch, zxid);
  // 等待epoch数据包的ack数量过半,ACK数据包从org.apache.zookeeper.server.quorum.Learner#registerWithLeader方法中来
  leader.waitForEpochAck(this.getSid(), ss);
}
// 主流版本都将执行这个代码
else {
  byte ver[] = new byte[4];
  ByteBuffer.wrap(ver).putInt(0x10000);
  // 构建 LEADERINFO 数据包并将其发送给其他节点
  QuorumPacket newEpochPacket =
    new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
  oa.writeRecord(newEpochPacket, "packet");
  bufferedOutput.flush();
  QuorumPacket ackEpochPacket = new QuorumPacket();
  ia.readRecord(ackEpochPacket, "packet");
  if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
    LOG.error(ackEpochPacket.toString()
              + " is not ACKEPOCH");
    return;
  }
  ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
  ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
  leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();

在上述代码中核心处理流程如下。

  1. 从数据包中获取zxid,然后将这个zxid转换为选举周期,该选举周期称之为最后接受的选举周期。
  2. 从数据包中获取zxid。
  3. 根据当前服务id(myId)和最后接受的选举周期计算新的选举周期。
  4. 将第(3)步中的选举周期转换为zxid。
  5. 根据版做出不同操作,如果版本小于0x10000则需要等待构建StateSummary对象然后开始等待epoch数据包被大多数接收,反之则需要构建LEADERINFO数据包将其发送出去并等待半数ACK

第三部分:是否需要进行全量同步操作,具体操作代码如下。

boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
// 需要全量同步的操作
if (needSnap) {
  boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
  LearnerSnapshot snapshot =
    leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
  try {
    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
    // 发送全量同步数据包
    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
    bufferedOutput.flush();

    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
             + "send zxid of db as 0x{}, {} concurrent snapshots, "
             + "snapshot was {} from throttle",
             Long.toHexString(peerLastZxid),
             Long.toHexString(leaderLastZxid),
             Long.toHexString(zxidToSend),
             snapshot.getConcurrentSnapshotNumber(),
             snapshot.isEssential() ? "exempt" : "not exempt");
    // Dump data to peer
    // 将zk数据库的数据序列化到oa变量写出
    leader.zk.getZKDatabase().serializeSnapshot(oa);
    oa.writeString("BenWasHere", "signature");
    bufferedOutput.flush();
  } finally {
    snapshot.close();
  }
}

在上述代码首先会通过syncFollower方法判断是否需要进行全量同步,如果不需要则进行后续操作,如果需要则从zk数据库中获取最后处理的zxid,并将这个zxid组装为SNAP类型的数据表放入到变量oa中,同时还需要将zk序列化到oa变量,此时oa就具备整个zk数据库。

在第三部分操作中最关键的方法是syncFollower,它用于判断是否需要进行全量同步,具体处理代码如下。

public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
  boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;

  long currentZxid = peerLastZxid;
  boolean needSnap = true;
  // 是否开启事务同步日志
  boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
  ReentrantReadWriteLock lock = db.getLogLock();
  ReadLock rl = lock.readLock();
  try {
    rl.lock();
    // 最大事务id
    long maxCommittedLog = db.getmaxCommittedLog();
    // 最小事务id
    long minCommittedLog = db.getminCommittedLog();
    // 选举周期 << 32
    long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

    LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
             + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
             + " peerLastZxid=0x{}", getSid(),
             Long.toHexString(maxCommittedLog),
             Long.toHexString(minCommittedLog),
             Long.toHexString(lastProcessedZxid),
             Long.toHexString(peerLastZxid));

    // 数据库提交日志为空
    if (db.getCommittedLog().isEmpty()) {

      minCommittedLog = lastProcessedZxid;
      maxCommittedLog = lastProcessedZxid;
    }


    if (forceSnapSync) {
      LOG.warn("Forcing snapshot sync - should not see this in production");
    }
    // 相等表示Follower没有差异量发送空的DIFF数据
    else if (lastProcessedZxid == peerLastZxid) {
      LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
               " for peer sid: " + getSid());
      queueOpPacket(Leader.DIFF, peerLastZxid);
      needOpPacket = false;
      // 不需要全量同步
      needSnap = false;
    }
    // Follower 数据比Leader新发送TRUNC
    else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
      LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
                Long.toHexString(maxCommittedLog) +
                " for peer sid:" + getSid());
      queueOpPacket(Leader.TRUNC, maxCommittedLog);
      currentZxid = maxCommittedLog;
      needOpPacket = false;
      needSnap = false;
    }
    // Follower 数据在Leader数据范围内
    else if ((maxCommittedLog >= peerLastZxid)
             && (minCommittedLog <= peerLastZxid)) {
      LOG.info("Using committedLog for peer sid: " + getSid());
      Iterator<Proposal> itr = db.getCommittedLog().iterator();
      currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                            null, maxCommittedLog);
      needSnap = false;
    }
    //
    else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
      // 计算事务日志的大小限制
      long sizeLimit = db.calculateTxnLogSizeLimit();
      // 读取一定量的数据
      Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
        peerLastZxid, sizeLimit);
      //
      if (txnLogItr.hasNext()) {
        LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
        // 将数据放入queuedPackets队列
        currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                              minCommittedLog, maxCommittedLog);

        LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
        Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
        currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                              null, maxCommittedLog);
        needSnap = false;
      }
      // 关闭 txnLogItr 资源
      if (txnLogItr instanceof TxnLogProposalIterator) {
        TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
        txnProposalItr.close();
      }
    } else {
      LOG.warn("Unhandled scenario for peer sid: " + getSid());
    }
    LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
              " for peer sid: " + getSid());
    // 通知领导者同步完成
    leaderLastZxid = leader.startForwarding(this, currentZxid);
  } finally {
    rl.unlock();
  }

  if (needOpPacket && !needSnap) {
    LOG.error("Unhandled scenario for peer sid: " + getSid() +
              " fall back to use snapshot");
    needSnap = true;
  }

  return needSnap;
}

在上述方法中首先需要对参数peerLastZxid进行说明,该参数可以理解为在领导者产生是的zxid,数据来源是StateSummary对象,数据包是ackEpochPacket。方法syncFollower处理流程如下。

  1. 从zk数据库中读取最大事务id,最小事务id以及最后处理的zxid(最后处理的zxid在选举后计算规则是选举周期<<32)。
  2. 如果lastProcessedZxid和peerLastZxid相等则表示没有差异数据,因此不需要进行全量同步。
  3. 如果peerLastZxid大于maxCommittedLog则表示Follower数据比Leader新因此需要截断数据而不是全量同步。
  4. 如果maxCommittedLog大于等于peerLastZxid并且minCommittedLog小于等于peerLastZxid表示Follower数据在Leader数据范围内,此时需要将提案数据从zk数据库中获取然后放入发送队列进行发送。此时也不需要进行全量同步,只需要部分同步即可。
  5. 如果peerLastZxid小于minCommittedLog此时需要从zk数据库中从peerLastZxid开始向后读取sizeLimit数量的数据,如果读取的数据为空则表示需要进行全量同步,反之则不需要。
  6. 通知领导者同步完成。
  7. 返回是否需要全量同步的结果。

第四部分:NEWLEADER数据包和UPTODATE数据包传输操作,具体处理代码如下。

// 创建NEWLEADER数据包,根据不同版本按照不同方式写出
if (getVersion() < 0x10000) {
  QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                                              newLeaderZxid, null, null);
  oa.writeRecord(newLeaderQP, "packet");
}
else {
  QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid,leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
  queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();

// 启动数据包发送线程
startSendingPackets();


// 创建一个空的数据包
qp = new QuorumPacket();
// 从变量ia中将数据包相关信息读取,反序列化到qp变量
ia.readRecord(qp, "packet");
// 数据包类型不是ACK结束处理
if (qp.getType() != Leader.ACK) {
  LOG.error("Next packet was supposed to be an ACK,"
            + " but received packet: {}", packetToString(qp));
  return;
}

if (LOG.isDebugEnabled()) {
  LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());

syncLimitCheck.start();

sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

synchronized (leader.zk) {
  while (!leader.zk.isRunning() && !this.isInterrupted()) {
    leader.zk.wait(20);
  }
}
LOG.debug("Sending UPTODATE message to " + sid);
// 添加UPTODATE数据包
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

在上述代码中核心操作流程如下。

  1. 根据版本是否小于0x10000,如果是则创建NEWLEADER数据包,将其放入到oa变量中;反之则创建NEWLEADER数据包,将其放入到queuedPackets队列中。两个NEWLEADER数据包的差异是数据信息。
  2. 通过startSendingPackets方法启动发送线程,发送线程的核心是调用sendPackets方法,从queuedPackets队列中获取数据包将其放入变量oa中。
  3. 创建一个空的数据包,从变量ia中将数据包信息读取到这个空的数据包中,如果数据库表类型不是ACK则结束处理。
  4. 通过waitForNewLeaderAck方法等待半数参选人员发送ACK。
  5. 创建UPTODATE数据包放入到queuedPackets队列中等待发送线程发送。

第五部分:根据不同类型的数据包进行不同操作,具体操作代码如下。

while (true) {
  qp = new QuorumPacket();
  ia.readRecord(qp, "packet");

  long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  if (qp.getType() == Leader.PING) {
    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  }
  if (LOG.isTraceEnabled()) {
    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
  }
  tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


  ByteBuffer bb;
  long sessionId;
  int cxid;
  int type;

  // 不同的数据包类型做出不同的行为操作
  switch (qp.getType()) {
      // 类型是ACK
    case Leader.ACK:
      if (this.learnerType == LearnerType.OBSERVER) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Received ACK from Observer  " + this.sid);
        }
      }
      syncLimitCheck.updateAck(qp.getZxid());
      // 处理ack数据包
      leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
      break;
      // 类型是PING
    case Leader.PING:
      ByteArrayInputStream bis = new ByteArrayInputStream(qp
                                                          .getData());
      DataInputStream dis = new DataInputStream(bis);
      while (dis.available() > 0) {
        long sess = dis.readLong();
        int to = dis.readInt();
        leader.zk.touch(sess, to);
      }
      break;
      // 类型是REVALIDATE
    case Leader.REVALIDATE:
      bis = new ByteArrayInputStream(qp.getData());
      dis = new DataInputStream(bis);
      long id = dis.readLong();
      int to = dis.readInt();
      ByteArrayOutputStream bos = new ByteArrayOutputStream();
      DataOutputStream dos = new DataOutputStream(bos);
      dos.writeLong(id);
      boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
      if (valid) {
        try {
          // 设置所有者,标记leader是谁
          leader.zk.setOwner(id, this);
        } catch (SessionExpiredException e) {
          LOG.error("Somehow session " + Long.toHexString(id) +
                    " expired right after being renewed! (impossible)", e);
        }
      }
      if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG,
                                 ZooTrace.SESSION_TRACE_MASK,
                                 "Session 0x" + Long.toHexString(id)
                                 + " is valid: " + valid);
      }
      dos.writeBoolean(valid);
      qp.setData(bos.toByteArray());
      // 加入到queuedPackets集合
      queuedPackets.add(qp);
      break;
      // 类型是REQUEST
    case Leader.REQUEST:
      bb = ByteBuffer.wrap(qp.getData());
      sessionId = bb.getLong();
      cxid = bb.getInt();
      type = bb.getInt();
      bb = bb.slice();
      Request si;
      if (type == OpCode.sync) {
        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb,
                                    qp.getAuthinfo());
      } else {
        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
      }
      si.setOwner(this);
      // 提交请求
      leader.zk.submitLearnerRequest(si);
      break;
    default:
      LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
      break;
  }
}

在上述代码中前置操作是从变量ia中读取数据包,在读取完成数据包后根据不同的数据包类型做出不同操作,具体操作如下。

  1. 如果是ACK类型,需要更新syncLimitCheck中zxid和事件相关数据,并且通过leader对象的processAck方法处理ACK信息。处理的本质是创建一个数据包将数据包发送给socket对应的服务(Follower节点)。
  2. 如果类型是PING,需要将数据包中的数据信息转换为数据输入流对象(DataInputStream),然后将数据输入流对象中sessionid和超时时间交给会话跟踪器(SessionTracker)进行处理。处理本质是更新session对应的超时时间。
  3. 如果类型是REVALIDATE,从数据包中提取sessionid和超时时间通过会话跟踪器确认是否有效,如果有效则将会话id和当前学习处理器(LearnerHandler)绑定,并发送数据包此时数据包中会携带sessionId。
  4. 如果类型是REQUEST,从数据包中获取sessionid、cxid、type和bb数据将其转换为请求对象(Request)然后通过submitLearnerRequest方法提交请求,提交请求后将会进入请求处理责任链。

Follower 分析

本节将对Follower类进行相关分析,首先从它的构造函数作为切入点,构造函数的使用在QuorumPeer#makeFollower方法中有提及,具体处理代码如下。

protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
  return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

通过上述方法进入到Follower构造函数,具体构造函数代码如下。

Follower(QuorumPeer self, FollowerZooKeeperServer zk) {
  this.self = self;
  this.zk = zk;
  this.fzk = zk;
}

在上述代码中初始化成员变量

  1. 成员变量self,其核心是参选节点QuorumPeer。
  2. 成员变量zk,它表示zk服务,在这里特指FollowerZooKeeperServer。
  3. 成员变量fzk,它表示zk服务,在这里特指FollowerZooKeeperServer。

接下来对核心方法followLeader进行分析,具体处理代码如下。

void followLeader() throws InterruptedException {
  // 获取当前时间
  self.end_fle = Time.currentElapsedTime();
  // 计算选举耗时
  long electionTimeTaken = self.end_fle - self.start_fle;
  self.setElectionTimeTaken(electionTimeTaken);
  LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
           QuorumPeer.FLE_TIME_UNIT);
  self.start_fle = 0;
  self.end_fle = 0;
  fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
  try {
    // 寻找leader服务
    QuorumServer leaderServer = findLeader();
    try {
      // 与leader服务建立连接
      connectToLeader(leaderServer.addr, leaderServer.hostname);
      // 将当前节点注册到leader,得到新选举周期对应的zxid
      long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
      // 如果当前节点发生了状态改变抛出异常
      if (self.isReconfigStateChange()) {
        throw new Exception("learned about role change");
      }
      // 将zxid转换为选举周期
      long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
      // 如果选举周期小于当前节点认可的周期抛出异常
      if (newEpoch < self.getAcceptedEpoch()) {
        LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                  + " is less than our accepted epoch " + ZxidUtils.zxidToString(
                    self.getAcceptedEpoch()));
        throw new IOException("Error: Epoch of leader is lower");
      }
      // 从leader同步
      syncWithLeader(newEpochZxid);
      // 创建数据包
      QuorumPacket qp = new QuorumPacket();
      while (this.isRunning()) {
        // 读取数据包
        readPacket(qp);
        // 处理数据包
        processPacket(qp);
      }
    } catch (Exception e) {
      LOG.warn("Exception when following the leader", e);
      try {
        sock.close();
      } catch (IOException e1) {
        e1.printStackTrace();
      }
      pendingRevalidations.clear();
    }
  } finally {
    zk.unregisterJMX((Learner) this);
  }
}

在上述代码中核心处理流程如下。

  1. 通过findLeader方法寻找leader服务。
  2. 通过connectToLeader方法与leader服务建立连接。
  3. 通过registerWithLeader方法将当前节点注册到leader服务,方法registerWithLeader会返回一个zxid,在这个zxid中会包含最新的选举周期。
  4. 判断zxid中的选举周期是否小于当前节点认可的选举周期,如果小于则需要抛出异常。
  5. 通过syncWithLeader方法与leader进行同步。
  6. 进入死循环阶段,死循环内重复执行如下操作。
    1. 读取数据包。
    2. 处理数据包。

接下来需要对这六个核心步骤中所涉及的方法进行分析,首先从findLeader方法按开始,它用于寻找leader服务,具体处理代码如下。

protected QuorumServer findLeader() {
  QuorumServer leaderServer = null;
  // Find the leader by id
  // 获取选票
  Vote current = self.getCurrentVote();
  // 循环参选节点
  for (QuorumServer s : self.getView().values()) {
    // 如果参选节点的id和选票中的id对应,那么他就是leader
    if (s.id == current.getId()) {
      s.recreateSocketAddresses();
      leaderServer = s;
      break;
    }
  }
  if (leaderServer == null) {
    LOG.warn("Couldn't find the leader with id = "
             + current.getId());
  }
  return leaderServer;
}

在上述代码中核心处理流程如下。

  1. 从前节点获取选票(该选票中包含了leader节点对应的id)。
  2. 循环对比参选节点的id是否和第(1)步中选票包含的leaderid对应,如果是这个节点就是leader节点。

方法connectToLeader中主要涉及的是socket操作不做相关分析,接下来对registerWithLeader方法进行分析,具体处理代码如下。

protected long registerWithLeader(int pktType) throws IOException {
  // 获取最后操作的zxid
  long lastLoggedZxid = self.getLastLoggedZxid();
  // 创建数据包
  QuorumPacket qp = new QuorumPacket();
  // 设置数据包类型
  qp.setType(pktType);
  // 设置zxid
  qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
  // 创建LearnerInfo对象
  LearnerInfo li =
    new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
  ByteArrayOutputStream bsid = new ByteArrayOutputStream();
  BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
  boa.writeRecord(li, "LearnerInfo");
  qp.setData(bsid.toByteArray());

  // 写出数据包,FOLLOWERINFO或者OBSERVERINFO
  writePacket(qp, true);
  // 读取数据包
  readPacket(qp);
  // 从数据包中获取新的选举周期
  final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
  // 如果数据包类型是LEADERINFO
  if (qp.getType() == Leader.LEADERINFO) {
    // 读取leader中的协议版本
    leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    byte epochBytes[] = new byte[4];
    final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
    // 如果leader中的周期大于当前节点认可的周期
    if (newEpoch > self.getAcceptedEpoch()) {
      wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
      // 设置当前节点认可的周期为leader中的中期
      self.setAcceptedEpoch(newEpoch);
    }
    // 如果相同
    else if (newEpoch == self.getAcceptedEpoch()) {
      wrappedEpochBytes.putInt(-1);
    }
    // 如果大于抛出异常
    else {
      throw new IOException(
        "Leaders epoch, " + newEpoch + " is less than accepted epoch, "
        + self.getAcceptedEpoch());
    }
    // 发送ACKEPOCH数据包
    QuorumPacket ackNewEpoch =
      new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    // 写出数据包
    writePacket(ackNewEpoch, true);
    // 将leader中的周期计算为zxid
    return ZxidUtils.makeZxid(newEpoch, 0);
  }
  // 其他类型的数据包
  else {
    // leader中的周期大于当前节点认可的周期覆盖当前节点的任选周期
    if (newEpoch > self.getAcceptedEpoch()) {
      self.setAcceptedEpoch(newEpoch);
    }
    // 如果数据包类型不是NEWLEADER抛出异常
    if (qp.getType() != Leader.NEWLEADER) {
      LOG.error("First packet should have been NEWLEADER");
      throw new IOException("First packet should have been NEWLEADER");
    }
    // 返回数据包中的zxid
    return qp.getZxid();
  }
}

在上述代码中核心处理流程如下。

  1. 从当前节点获取最后操作的zxid。
  2. 创建数据包。
  3. 为数据包设置数据包类型,在Follower中设置的数据包类型是FOLLOWERINFO,在Observer中设置的数据包类型是OBSERVERINFO。
  4. 为数据包设置zxid,zxid数据是由当前节点认可的任选周期转换而来。
  5. 创建LearnerInfo对象,将其数据内容放入到数据保证然后通过writePacket方法写出。
  6. 通过readPacket方法读取数据包,此时读取到的数据包类型可能是LeaderInfo,如果是LEADERINFO类型那么该数据包中含有新的任选周期。
  7. 从数据包中读取新的选举周期(这个数据包是第(6)步中得到的数据包)。
  8. 如果数据包类型是LEADERINFO,则进行如下操作。
    1. 如果新的选举周期大于当前节点认可的周期,将当前节点中认可的周期设置为新的选举周期。
    2. 如果新的选举周期等于当前节点认可的周期,不做操作。
    3. 如果新的选举周期小于当前节点认可的周期,抛出异常。
  9. 如果数据包类型不是LEADERINFO,则进行如下操作。
    1. 新的周期大于当前节点认可的周期覆盖当前节点的任选周期。、
    2. 如果数据包类型不是NEWLEADER抛出异常。
    3. 返回数据包中的zxid。

接下来对syncWithLeader方法进行分析,该方法用于从Leader中同步数据,具体处理代码如下。

protected void syncWithLeader(long newLeaderZxid) throws Exception {
  // 创建ACK数据包
  QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
  // 创建数据包
  QuorumPacket qp = new QuorumPacket();
  // 从参数zxid中获取新的选举周期
  long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

  QuorumVerifier newLeaderQV = null;
  // 是否全量同步标志
  boolean snapshotNeeded = true;
  // 读取数据包
  readPacket(qp);
  // 已经提交的数据包,存储的是zxid
  LinkedList<Long> packetsCommitted = new LinkedList<Long>();
  // 未提交的数据包
  LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
  synchronized (zk) {
    // 数据包中表示差异化同步
    if (qp.getType() == Leader.DIFF) {
      LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
      snapshotNeeded = false;
    }
    // 数据包中表示全量同步
    else if (qp.getType() == Leader.SNAP) {
      LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
      // 从输入流将档案反序列化到zk数据库
      zk.getZKDatabase().deserializeSnapshot(leaderIs);
      // 重载配置未启用的情况下需要初始化配置节点
      if (!self.isReconfigEnabled()) {
        LOG.debug(
          "Reset config node content from local config after deserialization of snapshot.");
        zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
      }
      // 读取signature信息
      String signature = leaderIs.readString("signature");
      // 如果signature信息不是BenWasHere抛出异常
      if (!signature.equals("BenWasHere")) {
        LOG.error("Missing signature. Got " + signature);
        throw new IOException("Missing signature");
      }
      // 设置最后处理的zxid
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    }
    // 数据包中表示回滚同步
    else if (qp.getType() == Leader.TRUNC) {
      LOG.warn("Truncating log to get in sync with the leader 0x"
               + Long.toHexString(qp.getZxid()));
      // 截断日志
      boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
      if (!truncated) {
        // not able to truncate the log
        LOG.error("Not able to truncate the log "
                  + Long.toHexString(qp.getZxid()));
        System.exit(13);
      }
      // 设置最后处理的zxid
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

    }
    // 其他情况进行退出
    else {
      LOG.error("Got unexpected packet from leader: {}, exiting ... ",
                LearnerHandler.packetToString(qp));
      System.exit(13);

    }
    // 初始化配置节点
    zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
    // 创建会话跟踪器
    zk.createSessionTracker();

    long lastQueued = 0;
    // 是否是zab1.0之前的标记
    boolean isPreZAB1_0 = true;
    // 是否写入事务日志
    boolean writeToTxnLog = !snapshotNeeded;
    outerLoop:
    // 循环
    while (self.isRunning()) {
      // 读取数据包
      readPacket(qp);
      // 根据数据包做不同操作
      switch (qp.getType()) {

        case Leader.PROPOSAL:
          // 创建飞行数据包
          // 将数据包转换为PacketInFlight对象加入到packetsNotCommitted集合
          PacketInFlight pif = new PacketInFlight();
          pif.hdr = new TxnHeader();
          pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
          if (pif.hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                     + Long.toHexString(pif.hdr.getZxid())
                     + " expected 0x"
                     + Long.toHexString(lastQueued + 1));
          }
          lastQueued = pif.hdr.getZxid();

          if (pif.hdr.getType() == OpCode.reconfig) {
            SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
            QuorumVerifier qv =
              self.configFromString(new String(setDataTxn.getData()));
            self.setLastSeenQuorumVerifier(qv, true);
          }

          packetsNotCommitted.add(pif);
          break;
        case Leader.COMMIT:
        case Leader.COMMITANDACTIVATE:
          // 从packetsNotCommitted中获取第一个数据
          pif = packetsNotCommitted.peekFirst();
          if (pif.hdr.getZxid() == qp.getZxid()
              && qp.getType() == Leader.COMMITANDACTIVATE) {
            QuorumVerifier qv = self.configFromString(
              new String(((SetDataTxn) pif.rec).getData()));
            //重载配置,如果重载成功抛出异常
            boolean majorChange = self.processReconfig(qv,
                                                       ByteBuffer.wrap(qp.getData()).getLong(),
                                                       qp.getZxid(), true);
            if (majorChange) {
              throw new Exception("changes proposed in reconfig");
            }
          }
          // 如果不需要写入事务日志
          if (!writeToTxnLog) {
            // 飞行数据包中的zxid和数据包中的zxid不相同记录日志,反之则需要进行事务处理,并将飞行数据包从packetsNotCommitted集合中移除
            if (pif.hdr.getZxid() != qp.getZxid()) {
              LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
                       + pif.hdr.getZxid());
            }
            else {
              zk.processTxn(pif.hdr, pif.rec);
              packetsNotCommitted.remove();
            }
          }
          else {
            // 将数据包中的zxid添加到packetsCommitted集合
            packetsCommitted.add(qp.getZxid());
          }
          break;
        case Leader.INFORM:
        case Leader.INFORMANDACTIVATE:
          // 创建飞行数据包
          PacketInFlight packet = new PacketInFlight();
          // 赋值头信息
          packet.hdr = new TxnHeader();

          // 数据包类型是INFORMANDACTIVATE
          if (qp.getType() == Leader.INFORMANDACTIVATE) {
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
            long suggestedLeaderId = buffer.getLong();
            byte[] remainingdata = new byte[buffer.remaining()];
            buffer.get(remainingdata);
            packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
            QuorumVerifier qv = self.configFromString(
              new String(((SetDataTxn) packet.rec).getData()));
            boolean majorChange =
              self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
            //重载配置,如果重载成功抛出异常
            if (majorChange) {
              throw new Exception("changes proposed in reconfig");
            }
          }
          else {
            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
            if (packet.hdr.getZxid() != lastQueued + 1) {
              LOG.warn("Got zxid 0x"
                       + Long.toHexString(packet.hdr.getZxid())
                       + " expected 0x"
                       + Long.toHexString(lastQueued + 1));
            }
            lastQueued = packet.hdr.getZxid();
          }

          if (!writeToTxnLog) {
            zk.processTxn(packet.hdr, packet.rec);
          }
          else {
            packetsNotCommitted.add(packet);
            packetsCommitted.add(qp.getZxid());
          }

          break;
        case Leader.UPTODATE:
          LOG.info("Learner received UPTODATE message");
          if (newLeaderQV != null) {
            //重载配置,如果重载成功抛出异常
            boolean majorChange =
              self.processReconfig(newLeaderQV, null, null, true);
            if (majorChange) {
              throw new Exception("changes proposed in reconfig");
            }
          }
          // 如果是zab1.0之前的协议
          if (isPreZAB1_0) {
            // 创建快照
            zk.takeSnapshot();
            // 设置当前选举周期为新的选举周期
            self.setCurrentEpoch(newEpoch);
          }
          self.setZooKeeperServer(zk);
          self.adminServer.setZooKeeperServer(zk);
          // 回到循环开始
          break outerLoop;
        case Leader.NEWLEADER:
          LOG.info("Learner received NEWLEADER message");
          // 数据包中的数据不为空并且数据长度大于0 重新设置QuorumVerifier对象
          if (qp.getData() != null && qp.getData().length > 1) {
            try {
              QuorumVerifier qv = self.configFromString(new String(qp.getData()));
              self.setLastSeenQuorumVerifier(qv, true);
              newLeaderQV = qv;
            } catch (Exception e) {
              e.printStackTrace();
            }
          }

          // 如果需要全量同步
          if (snapshotNeeded) {
            // 创建快照
            zk.takeSnapshot();
          }

          // 设置当前选举周期为新的选举周期
          self.setCurrentEpoch(newEpoch);
          // 设置是否需要写入事务日志为真
          writeToTxnLog = true; 
          // 设置是否为ZAB1.0之前的版本为假
          isPreZAB1_0 = false;
          // 写出ACK数据包
          writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
          break;
      }
    }
  }
  // 在ACK数据包中设置zxid
  ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
  // 写出ACK数据包
  writePacket(ack, true);
  // 设置超时时间
  sock.setSoTimeout(self.tickTime * self.syncLimit);
  // zk服务启动
  zk.startup();
  self.updateElectionVote(newEpoch);

  if (zk instanceof FollowerZooKeeperServer) {
    FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
    // 未提交的数据包进行处理
    for (PacketInFlight p : packetsNotCommitted) {
      fzk.logRequest(p.hdr, p.rec);
    }
    // 对已提交的数据包集合进行提交操作
    for (Long zxid : packetsCommitted) {
      fzk.commit(zxid);
    }
  }
  else if (zk instanceof ObserverZooKeeperServer) {
    ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
    // 对未提交的数据包进行处理
    for (PacketInFlight p : packetsNotCommitted) {
      // 获取已提交数据包中的第一个zxid
      Long zxid = packetsCommitted.peekFirst();
      // 未处理数据包中的zxid不等于第一个zxid则跳过处理
      if (p.hdr.getZxid() != zxid) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                 + ", but next proposal is "
                 + Long.toHexString(p.hdr.getZxid()));
        continue;
      }
      // 从packetsCommitted中移除第一个数据
      packetsCommitted.remove();
      // 创建请求对象
      Request request = new Request(null, p.hdr.getClientId(),
                                    p.hdr.getCxid(), p.hdr.getType(), null, null);
      request.setTxn(p.rec);
      request.setHdr(p.hdr);
      // 对请求对象进行处理
      ozk.commitRequest(request);
    }
  }
  else {
    throw new UnsupportedOperationException("Unknown server type");
  }
}

在上述代码中核心处理流程如下。

  1. 创建ACK数据包,该数据包需要在一定情况下发送。
  2. 创建数据包,该数据包用于接收数据。
  3. 从参数zxid中获取新的任选周期,记做变量newEpoch。
  4. 创建是否全量同步标志,记做变量snapshotNeeded,默认值为真,表示需要全量同步。
  5. 读取数据包。
  6. 创建已经提交的数据包集合,存储变量是packetsCommitted,其中存放的是zxid。
  7. 创建未提交的数据包,存储变量是packetsNotCommitted,其中存储的是飞行数据包。
  8. 根据第(5)步中读取的数据包类型做出不同操作。
    1. 如果数据包类型是DIFF,则将变量snapshotNeeded设置为false。
    2. 如果数据包类型是SNAP,将会进行如下操作。
      1. 将输入流中的数据反序列化到当前节点的zk数据库中。
      2. 判断当前节点的reconfigEnabled属性是否为假,如果是则需要进行zk数据库中配置节点相关信息的重载。
      3. 从输入流中获取signature数据,如果该数据不是BenWasHere将抛出异常。
      4. 设置zk数据库中最后操作的zxid为数据包中的zxid。
    3. 如果数据包类型是TRUNC,则需要进行如下操作。
      1. 将zk数据库进行截断操作,截断参数是数据包中的zxid。
      2. 如果截断失败则需要退出程序,退出码是13。
      3. 如果截断成功则设置zk数据库中最后操作的zxid为数据包中的zxid。
    4. 其他情况进行退出,退出码是13。
  9. 初始化zk数据库中的配置节点。
  10. 创建会话跟踪器。
  11. 创建是否是ZAB1.0之前的版本标记,默认值为true。
  12. 创建是否写入事务日志标记,初始值为取反是否需要全量同步标记。
  13. 在循环中处理数据包,处理细节是根据不同的数据包类型进行分类处理。
    1. 如果数据包类型是PROPOSAL,需要创建飞行数据包,飞行数据包中的数据从数据包中获取,创建完成后需要将飞行数据包放入到packetsNotCommitted集合中
    2. 如果数据包类型是COMMIT或者COMMITANDACTIVATE,从packetsNotCommitted集合获取第一个飞行数据包,判断这个飞行数据包中的zxid和数据包中的zxid是否相同,如果是并且数据包类型是COMMITANDACTIVATE则需要进重载配置,如果重载成功则抛出异常。判断是否需要写入事务日志,如果不需要,则将飞行数据包中的zxid和数据包中的zxid进行比较,如果不相同记录日志,反之则需要进行事务处理,并将飞行数据包从packetsNotCommitted集合中移除;如果需要则需要将数据包中的zxid放入到packetsCommitted集合中。
    3. 如果数据包类型是INFORM或者INFORMANDACTIVATE,需要创建飞行数据包,确认是否需要写入事务日志,如果不需要则交给zk服务进行事务处理,反之则需要将数据放入到packetsNotCommitted和packetsCommitted中。
    4. 如果数据包类型是UPTODATE,判断是否是ZAB1.0之前的协议,如果是则需要创建快照并且将新的选举周期(newEpoch)设置到当前节点。
    5. 如果数据包类型是NEWLEAD,判断数据包中的数据是否存在,如果存在则需要重新设置当前节点的lastSeenQuorumVerifier属性。判断是否需要进行全量同步,如果需要则需要创建快照。执行完成上述操作后需要将新的任选周期设置到当前节点,设置是否需要写入事务日志标记为需要,设置是否是ZAB1.0之前的版本标记为假,写出ACK数据包。
  14. 设置第(1)步中ACK数据包的zxid,在这个zxid中包含新的选举周期(newEpoch)。
  15. 写出ACK数据包。
  16. zk服务启动。
  17. 更新当前节点的选举周期。
  18. 根据不同的zk类型做出不同操作。
    1. 如果zk类型是FollowerZooKeeperServer,循环packetsNotCommitted集合进行数据处理,数据处理的本质是请求处理,第一个请求处理器是SyncRequestProcessor。循环packetsCommitted集合进行zxid的提交操作,提交操作的本质是请求处理,第一个请求处理器是CommitProcessor。
    2. 如果zk类型是ObserverZooKeeperServer,循环packetsNotCommitted集合,判断单个飞行数据包的zxid和packetsCommitted集合中第一个zxid是否相同,如果不相同则跳过处理,反之则需要将packetsCommitted集合中的第一个数据先进行移除,然后创建请求对象交个ObserverZooKeeperServer进行数据处理,数据处理的本质是请求处理,第一个请求处理器是CommitProcessor。

最后对processPacket方法进行分析,该方法用于进行数据包处理,具体处理代码如下。

protected void processPacket(QuorumPacket qp) throws Exception {
  switch (qp.getType()) {
    case Leader.PING:
      // 发送ping数据包
      ping(qp);
      break;
    case Leader.PROPOSAL:
      // 创建事务头和事务对象
      TxnHeader hdr = new TxnHeader();
      Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
      if (hdr.getZxid() != lastQueued + 1) {
        LOG.warn("Got zxid 0x"
                 + Long.toHexString(hdr.getZxid())
                 + " expected 0x"
                 + Long.toHexString(lastQueued + 1));
      }
      lastQueued = hdr.getZxid();

      if (hdr.getType() == OpCode.reconfig) {
        SetDataTxn setDataTxn = (SetDataTxn) txn;
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
        self.setLastSeenQuorumVerifier(qv, true);
      }

      // 处理请求
      fzk.logRequest(hdr, txn);
      break;
    case Leader.COMMIT:
      // 提交zxid
      fzk.commit(qp.getZxid());
      break;

    case Leader.COMMITANDACTIVATE:
      // 获取请求
      Request request = fzk.pendingTxns.element();
      SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
      QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
      ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
      long suggestedLeaderId = buffer.getLong();
      // 更新配置节点
      boolean majorChange =
        self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
      // 提交zxid
      fzk.commit(qp.getZxid());
      // 如果更新成功抛出异常
      if (majorChange) {
        throw new Exception("changes proposed in reconfig");
      }
      break;
    case Leader.UPTODATE:
      // 不做操作
      LOG.error("Received an UPTODATE message after Follower started");
      break;
    case Leader.REVALIDATE:
      // 重新验证
      revalidate(qp);
      break;
    case Leader.SYNC:
      // 执行同步
      fzk.sync();
      break;
    default:
      LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
      break;
  }
}

在上述代码中核心处理流程如下。

  1. 如果数据类型是PING,通过ping方法发送数据包。
  2. 如果数据类型是PROPOSAL,从数据包中提取数据转换为事务头和事务对象交给FollowerZooKeeperServer进行请求处理。
  3. 如果数据类型是COMMIT或者COMMITANDACTIVATE,则将数据包中的zxid提取出来交给FollowerZooKeeperServer进行处理。
  4. 如果数据类型是UPTODATE,则不做操作。
  5. 如果数据类型是REVALIDATE,则通过revalidate这个方法进行重新验证。
  6. 如果数据类型是SYNC,则通过sync方法进行同步。

Observer 分析

本节将对Observer类进行相关分析,首先从它的构造函数作为切入点,构造函数的使用在QuorumPeer#makeObserver方法中有提及,具体处理代码如下。

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
  return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}

通过上述方法进入到Observer构造函数,具体构造函数代码如下。

Observer(QuorumPeer self, ObserverZooKeeperServer observerZooKeeperServer) {
  this.self = self;
  this.zk = observerZooKeeperServer;
}

在上述代码中初始化成员变量

  1. 成员变量self,其核心是参选节点QuorumPeer。
  2. 成员变量zk,它表示zk服务,在这里特指ObserverZooKeeperServer。

接下来对核心方法observeLeader进行分析,具体处理代码如下。

void observeLeader() throws Exception {
  zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);

  try {
    // 寻找leader服务
    QuorumServer leaderServer = findLeader();
    LOG.info("Observing " + leaderServer.addr);
    try {
      // 与leader服务建立连接
      connectToLeader(leaderServer.addr, leaderServer.hostname);
      // 将当前节点注册到leader,得到新选举周期对应的zxid
      long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
      // 如果当前节点发生了状态改变抛出异常
      if (self.isReconfigStateChange()) {
        throw new Exception("learned about role change");
      }
      // 从leader同步
      syncWithLeader(newLeaderZxid);
      // 创建数据包
      QuorumPacket qp = new QuorumPacket();
      while (this.isRunning()) {
        // 读取数据包
        readPacket(qp);
        // 处理数据包
        processPacket(qp);
      }
    } catch (Exception e) {
      LOG.warn("Exception when observing the leader", e);
      try {
        sock.close();
      } catch (IOException e1) {
        e1.printStackTrace();
      }
      pendingRevalidations.clear();
    }
  } finally {
    zk.unregisterJMX(this);
  }
}

在上述代码中可以发现此时的操作与Follower中的followLeader方法操作类似,相关方法在分析followLeader时已经有提及因此做单个方法的分析,方法observeLeader处理流程如下。

  1. 通过findLeader方法寻找leader服务。
  2. 通过connectToLeader方法与leader服务建立连接。
  3. 通过registerWithLeader方法将当前节点注册到leader服务,方法registerWithLeader会返回一个zxid,在这个zxid中会包含最新的选举周期。
  4. 判断zxid中的选举周期是否小于当前节点认可的选举周期,如果小于则需要抛出异常。
  5. 通过syncWithLeader方法与leader进行同步。
  6. 进入死循环阶段,死循环内重复执行如下操作。
    1. 读取数据包。
    2. 处理数据包。

总结

本章对Zookeeper项目中状态同步相关内容进行分析,着重对Leader中的lead方法、Follower中的followLeader以及Observer中的observeLeader方法进行详细分析。通过本章的分析可以发现Follower和Observer的处理操作相似,因此在接下来的总结内容中只以Follower和Leader的方式进行同步流程说明。

Leader执行lead方法创建LearnerCnxAcceptor,在LearnerCnxAcceptor中会等待socket连接中的数据(这个连接用与接收Follower发起状态同步申请),一旦等到数据后将创建LearnerHandler类进行处理,此时的处理会与Follower有相关联动操作具体如图所示。

sequenceDiagram
participant l as leader
participant f as follower

f ->> f: 寻找leader寻找成功后建立连接
f->>l:通过registerWithLeader发送注册信息
note right of f:  此时发送的数据包类是FOLLOWERINFO
l->>l:接收到Follower数据信息后计算新的选举周期
l->> f:将新的选举周期信息发给Follower
note right of f: 此时发送额数据包类型是LEADERINFO
f->>l:接收数据处理完成后发送ack数据包
note right of f: 此时发送额数据包类型是ACKEPOCH


在上图中leader接收到ack数据包后会确认是否需要进行全量数据同步,即调用syncFollower方法与Follower进行数据同步相关操作,Follower与Observer相关的同步逻辑在org.apache.zookeeper.server.quorum.Learner#syncWithLeader方法中。