zookeeper source code analysis

ZooKeeper State Synchronization

This chapter analyzes the state synchronization process in the ZooKeeper project. Data synchronization occurs after the leader election is completed. In ZooKeeper, data handling involves three main classes: Leader, Follower, and Observer. Our analysis will focus on these three classes.

Leader

We'll begin our analysis of the Leader class by examining its constructor. The constructor is used in the QuorumPeer#makeLeader method, as shown in the following code:

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
  return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}

This method leads us to the Leader constructor. Here's the specific constructor code:

Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
  this.self = self;
  this.proposalStats = new BufferStats();
  try {
    if (self.shouldUsePortUnification() || self.isSslQuorum()) {
      boolean allowInsecureConnection = self.shouldUsePortUnification();
      if (self.getQuorumListenOnAllIPs()) {
        ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection,
                                     self.getQuorumAddress().getPort());
      } else {
        ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
      }
    } else {
      if (self.getQuorumListenOnAllIPs()) {
        ss = new ServerSocket(self.getQuorumAddress().getPort());
      } else {
        ss = new ServerSocket();
      }
    }
    ss.setReuseAddress(true);
    if (!self.getQuorumListenOnAllIPs()) {
      ss.bind(self.getQuorumAddress());
    }
  } catch (BindException e) {
    if (self.getQuorumListenOnAllIPs()) {
      LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
    } else {
      LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
    }
    throw e;
  }
  this.zk = zk;
  this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
    maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}

The core functionality of this code is to initialize several key member variables:

  1. The 'self' member variable, which represents the QuorumPeer node participating in the election.
  2. The 'proposalStats' member variable, used for data statistics.
  3. The 'ss' member variable, representing the socket connection.
  4. The 'zk' member variable, representing the ZooKeeper service, specifically the LeaderZooKeeperServer in this context.
  5. The 'learnerSnapshotThrottler' member variable, used to limit the number of snapshots sent from the Leader to Followers or Observers.

Next, we'll analyze the core 'lead' method, which handles the specific

void lead() throws IOException, InterruptedException {
  // Get current time
  self.end_fle = Time.currentElapsedTime();
  // Calculate time taken for election: current time - election start time
  long electionTimeTaken = self.end_fle - self.start_fle;
  // Set the time taken for election
  self.setElectionTimeTaken(electionTimeTaken);
  LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
           QuorumPeer.FLE_TIME_UNIT);
  // Reset time counters
  self.start_fle = 0;
  self.end_fle = 0;

  // Register JMX
  zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

  try {
    self.tick.set(0);
    // Execute data loading operation
    zk.loadData();

    // Create state information, mainly storing election epoch and last processed zxid
    leaderStateSummary =
      new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

    // Create LearnerCnxAcceptor class to wait for Follower sync requests
    cnxAcceptor = new LearnerCnxAcceptor();
    cnxAcceptor.start();

    // Calculate the latest election epoch
    long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

    // Convert election epoch to zxid and set it in zk
    zk.setZxid(ZxidUtils.makeZxid(epoch, 0));

    synchronized (this) {
      lastProposed = zk.getZxid();
    }

    // Create NEWLEADER packet
    newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                                                null, null);


    if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
      LOG.info("NEWLEADER proposal has Zxid of "
               + Long.toHexString(newLeaderProposal.packet.getZxid()));
    }

    QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
    QuorumVerifier curQV = self.getQuorumVerifier();
    if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
      try {
        LOG.debug(String.format(
          "set lastSeenQuorumVerifier to currentQuorumVerifier (%s)",
          curQV.toString()));
        QuorumVerifier newQV = self.configFromString(curQV.toString());
        newQV.setVersion(zk.getZxid());
        self.setLastSeenQuorumVerifier(newQV, true);
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier()
        .getVersion()) {
      newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // Wait for epoch ACK packets
    waitForEpochAck(self.getId(), leaderStateSummary);
    // Set current election epoch
    self.setCurrentEpoch(epoch);

    try {
      // Wait for NEWLEADER ACK packets
      waitForNewLeaderAck(self.getId(), zk.getZxid());
    } catch (InterruptedException e) {
      shutdown("Waiting for a quorum of followers, only synced with sids: [ "
               + newLeaderProposal.ackSetsToString() + " ]");
      HashSet<Long> followerSet = new HashSet<Long>();


      for (LearnerHandler f : getLearners()) {
        if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
          followerSet.add(f.getSid());
        }
      }
      boolean initTicksShouldBeIncreased = true;
      for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) {
        if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
          initTicksShouldBeIncreased = false;
          break;
        }
      }
      if (initTicksShouldBeIncreased) {
        LOG.warn("Enough followers present. " +
                 "Perhaps the initTicks need to be increased.");
      }
      return;
    }

    // Start ZooKeeper service (not the ZK process)
    startZkServer();

    String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
    if (initialZxid != null) {
      long zxid = Long.parseLong(initialZxid);
      zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
    }

    if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
      self.setZooKeeperServer(zk);
    }

    self.adminServer.setZooKeeperServer(zk);

    boolean tickSkip = true;
    String shutdownMessage = null;

    while (true) {
      synchronized (this) {
        long start = Time.currentElapsedTime();
        long cur = start;
        long end = start + self.tickTime / 2;
        while (cur < end) {
          wait(end - cur);
          cur = Time.currentElapsedTime();
        }

        if (!tickSkip) {
          self.tick.incrementAndGet();
        }

        // Create SyncedLearnerTracker object to confirm majority agreement
        SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
        syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
            .getQuorumVerifier().getVersion()) {
          syncedAckSet.addQuorumVerifier(self
                                         .getLastSeenQuorumVerifier());
        }

        syncedAckSet.addAck(self.getId());

        for (LearnerHandler f : getLearners()) {
          if (f.synced()) {
            syncedAckSet.addAck(f.getSid());
          }
        }

        if (!this.isRunning()) {
          shutdownMessage = "Unexpected internal error";
          break;
        }

        // End processing if no majority approval
        if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
          shutdownMessage =
            "Not sufficient followers synced, only synced with sids: [ "
            + syncedAckSet.ackSetsToString() + " ]";
          break;
        }
        tickSkip = !tickSkip;
      }
      // Get LearnerHandler and send ping
      for (LearnerHandler f : getLearners()) {
        f.ping();
      }
    }
    if (shutdownMessage != null) {
      shutdown(shutdownMessage);

    }
  } finally {
    zk.unregisterJMX(this);
  }
}

The core processing flow in the above code is as follows:

  1. Get the current time, calculate the time consumed by the election, and assign this data to the QuorumPeer object. After assignment, the time counters (start time start_fle and end time end_fle) need to be reset to zero.
  2. Call the ZooKeeperServer#loadData method to load data. During this process, the ZK database will be loaded and snapshots will be generated.
  3. Create a StateSummary object and assign it to the member variable leaderStateSummary, which records the current election epoch and the last processed zxid.
  4. Create a LearnerCnxAcceptor class, which is a thread class, created to wait for Followers to send state synchronization information.
  5. Calculate the new election epoch using the getEpochToPropose method. Note that this data will be assigned to the member variable epoch.
  6. Convert the election epoch obtained in step (5) to zxid and put it into the zk object.
  7. Create a NEWLEADER type packet and assign it to the member variable newLeaderProposal.
  8. Compare if the version numbers of lastSeenQuorumVerifier and quorumVerifier in QuorumPeer are the same. If they are the same, a new QuorumVerifier object needs to be created and the lastSeenQuorumVerifier data in QuorumPeer needs to be corrected.
  9. Add the quorumVerifier object to the proposal (newLeaderProposal), or add lastSeenQuorumVerifier to the proposal when its version number is greater than the quorumVerifier version number.
  10. Execute the waitForEpochAck method to wait for the majority of election participants to confirm receipt of the new election epoch (variable epoch). After the majority of candidates approve the election epoch, it needs to be set in the QuorumPeer object.
  11. Execute the waitForNewLeaderAck method to wait for the majority of election participants to confirm receipt of the NEWLEADER packet.
  12. Call the startZkServer method to start the ZookeeperServer class.
  13. Execute an infinite loop, which mainly maintains heartbeats with Followers (sending PING packets to Followers).

Next, we'll analyze the LearnerCnxAcceptor class used in step (4). This is a thread class, so we'll mainly analyze the run method. The specific code is as follows:

@Override
public void run() {
  try {
    while (!stop) {
      Socket s = null;
      boolean error = false;
      try {
        // Accept socket connection
        s = ss.accept();
        s.setSoTimeout(self.tickTime * self.initLimit);
        s.setTcpNoDelay(nodelay);

        BufferedInputStream is = new BufferedInputStream(
          s.getInputStream());
        // Create and start LearnerHandler object
        LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
        fh.start();
      } catch (SocketException e) {
        error = true;
        if (stop) {
          LOG.info("exception while shutting down acceptor: "
                   + e);
          stop = true;
        }
        else {
          throw e;
        }
      } catch (SaslException e) {
        LOG.error("Exception while connecting to quorum learner", e);
        error = true;
      } catch (Exception e) {
        error = true;
        throw e;
      } finally {
        // Don't leak sockets on errors
        if (error && s != null && !s.isClosed()) {
          try {
            s.close();
          } catch (IOException e) {
            LOG.warn("Error closing socket", e);
          }
        }
      }
    }
  } catch (Exception e) {
    LOG.warn("Exception while accepting follower", e.getMessage());
    handleException(this.getName(), e);
  }
}

The core processing flow in the above code is as follows:

  1. Accept socket connection.
  2. Extract the input stream from the socket connection and create a LearnerHandler class with the parameters.
  3. Start the LearnerHandler class.

Next, we'll analyze the getDesignatedLeader method, which is used to confirm the service ID (myid) corresponding to the leader. There are two specific use cases:

  1. During the execution of the startZkServer method, if configuration reloading is enabled, this method is needed to confirm the leader ID and then reload the configuration.
  2. In the tryToCommit method, for requests of type reconfig, the leader needs to be confirmed.

Below is the specific processing code for the getDesignatedLeader method:

private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
  // Get the last QuorumVerifierAcksetPair object

  Proposal.QuorumVerifierAcksetPair newQVAcksetPair =
    reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size() - 1);


  // Check if I am in newQVAcksetPair, if so, I am the leader
  if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) &&
      newQVAcksetPair.getQuorumVerifier().getVotingMembers()
      .get(self.getId()).addr.equals(self.getQuorumAddress())) {
    return self.getId();
  }
  // Candidate set
  HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
  // Remove myself from the candidate set
  candidates.remove(self.getId()); 
  // Current candidate to be processed
  long curCandidate = candidates.iterator().next();

  long curZxid = zxid + 1;
  Proposal p = outstandingProposals.get(curZxid);

  // Loop processing, aiming to get the candidate with the most votes
  while (p != null && !candidates.isEmpty()) {
    for (Proposal.QuorumVerifierAcksetPair qvAckset : p.qvAcksetPairs) {
      candidates.retainAll(qvAckset.getAckset());
      if (candidates.isEmpty()) {
        return curCandidate;
      }
      curCandidate = candidates.iterator().next();
      if (candidates.size() == 1) {
        return curCandidate;
      }
    }
    curZxid++;
    p = outstandingProposals.get(curZxid);
  }

  return curCandidate;
}

The core processing flow in the above code is as follows:

  1. Get the last QuorumVerifierAcksetPair object from the parameter proposal.
  2. Check if the service's own ID is in the QuorumVerifierAcksetPair object. If so, it means the service itself is the leader.
  3. Get the candidate set (service IDs that received acks) from the QuorumVerifierAcksetPair object.
  4. Since the service didn't become the leader in step (2), it needs to be removed from the candidate set.
  5. Confirm the leader ID through a loop. There are two conditions for confirming the leader in the loop:
    1. When the intersection of the ack service IDs in the proposal and the candidates variable is empty, the leader can be confirmed.
    2. When the intersection of the ack service IDs in the proposal and the candidates variable has a size of 1, the leader can be confirmed.

Analysis of LearnerHandler

This section will analyze the LearnerHandler class, which can be understood as a learning handler or data synchronizer. As this class is a thread class, we will mainly focus on analyzing the run method. The run method is relatively content-heavy, so we'll break down its analysis into five steps.

Part One: Confirming basic data operations. The basic data includes the following:

  1. Variable tickOfNextAckDeadline, representing the deadline for receiving the next ack.
  2. Variable ia, representing the input archive.
  3. Variable bufferedOutput, used to hold the byte stream that needs to be written out.
  4. Variable oa, representing the output archive.
  5. Variable sid, representing the service id (myid).
  6. Variable version, representing the protocol version.
  7. Variable learnerType, indicating the learner type. There are two types: learners with voting ability (PARTICIPANT) and learners without voting ability (OBSERVER).

The code for the first part of operations is as follows:

// Part One: Confirm basic data
// Add learning handler to the leader object
leader.addLearnerHandler(this);
// Calculate deadline
tickOfNextAckDeadline = leader.self.tick.get()
  + leader.self.initLimit + leader.self.syncLimit;

// Convert bufferedInput object
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);

// Create data packet
QuorumPacket qp = new QuorumPacket();
// Read packet information from ia variable, deserialize to qp variable
ia.readRecord(qp, "packet");
// End processing if packet type is neither FOLLOWERINFO nor OBSERVERINFO
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
  LOG.error("First packet " + qp.toString()
            + " is not FOLLOWERINFO or OBSERVERINFO!");
  return;
}

// Read data, data object is LearnerInfo, created in org.apache.zookeeper.server.quorum.Learner.registerWithLeader
byte learnerInfoData[] = qp.getData();
// Read sid, version, and configVersion from learnerInfoData
if (learnerInfoData != null) {
  ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
  if (learnerInfoData.length >= 8) {
    this.sid = bbsid.getLong();
  }
  if (learnerInfoData.length >= 12) {
    this.version = bbsid.getInt(); // Protocol version number
  }
  if (learnerInfoData.length >= 20) {
    long configVersion = bbsid.getLong();
    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
      throw new IOException(
        "Follower is ahead of the leader (has a later activated configuration)");
    }
  }
} else {
  // sid equals the number of Follower type nodes minus one
  this.sid = leader.followerCounter.getAndDecrement();
}

// Log recording
if (leader.self.getView().containsKey(this.sid)) {
  LOG.info("Follower sid: " + this.sid + " : info : "
           + leader.self.getView().get(this.sid).toString());
} else {
  LOG.info("Follower sid: " + this.sid + " not in the current config "
           + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}

// If packet type is OBSERVERINFO, modify member variable learnerType
if (qp.getType() == Leader.OBSERVERINFO) {
  learnerType = LearnerType.OBSERVER;
}

It's important to note that the data type obtained from qp.getData() is LearnerInfo, and this data originates from the Learner#registerWithLeader method.

Part Two: Operations between different protocol versions. The specific processing code is as follows:

// Part Two: Operations between different protocol versions
// Get the last accepted epoch from the packet based on zxid
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

long peerLastZxid;
StateSummary ss = null;
// Get zxid from the packet
long zxid = qp.getZxid();
// Construct new election cycle
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
// Construct new zxid
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);


if (this.getVersion() < 0x10000) {
  // From zxid
  long epoch = ZxidUtils.getEpochFromZxid(zxid);
  ss = new StateSummary(epoch, zxid);
  // Wait for majority of epoch packet acks, ACK packets come from org.apache.zookeeper.server.quorum.Learner#registerWithLeader method
  leader.waitForEpochAck(this.getSid(), ss);
}
// Mainstream versions will execute this code
else {
  byte ver[] = new byte[4];
  ByteBuffer.wrap(ver).putInt(0x10000);
  // Construct LEADERINFO packet and send it to other nodes
  QuorumPacket newEpochPacket =
    new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
  oa.writeRecord(newEpochPacket, "packet");
  bufferedOutput.flush();
  QuorumPacket ackEpochPacket = new QuorumPacket();
  ia.readRecord(ackEpochPacket, "packet");
  if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
    LOG.error(ackEpochPacket.toString()
              + " is not ACKEPOCH");
    return;
  }
  ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
  ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
  leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();

The core processing flow in the above code is as follows:

  1. Get zxid from the packet, then convert this zxid to an election cycle, which is called the last accepted election cycle.
  2. Get zxid from the packet.
  3. Calculate a new election cycle based on the current service id (myId) and the last accepted election cycle.
  4. Convert the election cycle from step (3) to zxid.
  5. Perform different operations based on the version. If the version is less than 0x10000, it needs to wait to construct a StateSummary object and then wait for the epoch packet to be received by the majority. Otherwise, it needs to construct a LEADERINFO packet, send it out, and wait for a majority of ACKs.

Part Three: Determining the need for full synchronization. The specific operation code is as follows:

boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
// Operations for full synchronization
if (needSnap) {
  boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
  LearnerSnapshot snapshot =
    leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
  try {
    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
    // Send full synchronization data packet
    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
    bufferedOutput.flush();

    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
             + "send zxid of db as 0x{}, {} concurrent snapshots, "
             + "snapshot was {} from throttle",
             Long.toHexString(peerLastZxid),
             Long.toHexString(leaderLastZxid),
             Long.toHexString(zxidToSend),
             snapshot.getConcurrentSnapshotNumber(),
             snapshot.isEssential() ? "exempt" : "not exempt");
    // Dump data to peer
    // Serialize ZooKeeper database data to the 'oa' variable
    leader.zk.getZKDatabase().serializeSnapshot(oa);
    oa.writeString("BenWasHere", "signature");
    bufferedOutput.flush();
  } finally {
    snapshot.close();
  }
}

In the code above, the syncFollower method is first used to determine if full synchronization is necessary. If not, subsequent operations are performed. If required, the last processed zxid is retrieved from the ZooKeeper database, and this zxid is assembled into a SNAP type data packet and placed in the 'oa' variable. Additionally, the ZooKeeper database needs to be serialized to the 'oa' variable, at which point 'oa' contains the entire ZooKeeper database.

In the third part of the operation, the most crucial method is syncFollower, which is used to determine whether full synchronization is needed. The specific handling code is as follows:

public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
  boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;

  long currentZxid = peerLastZxid;
  boolean needSnap = true;
  // Check if transaction log synchronization is enabled
  boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
  ReentrantReadWriteLock lock = db.getLogLock();
  ReadLock rl = lock.readLock();
  try {
    rl.lock();
    // Maximum transaction ID
    long maxCommittedLog = db.getmaxCommittedLog();
    // Minimum transaction ID
    long minCommittedLog = db.getminCommittedLog();
    // Election epoch << 32
    long lastProcessedZxid = db.getDataTreeLastProcessedZxid();

    LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
             + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
             + " peerLastZxid=0x{}", getSid(),
             Long.toHexString(maxCommittedLog),
             Long.toHexString(minCommittedLog),
             Long.toHexString(lastProcessedZxid),
             Long.toHexString(peerLastZxid));

    // Database committed log is empty
    if (db.getCommittedLog().isEmpty()) {

      minCommittedLog = lastProcessedZxid;
      maxCommittedLog = lastProcessedZxid;
    }


    if (forceSnapSync) {
      LOG.warn("Forcing snapshot sync - should not see this in production");
    }
    // If equal, Follower has no difference, send empty DIFF data
    else if (lastProcessedZxid == peerLastZxid) {
      LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
               " for peer sid: " + getSid());
      queueOpPacket(Leader.DIFF, peerLastZxid);
      needOpPacket = false;
      // Full synchronization not needed
      needSnap = false;
    }
    // If Follower data is newer than Leader, send TRUNC
    else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
      LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
                Long.toHexString(maxCommittedLog) +
                " for peer sid:" + getSid());
      queueOpPacket(Leader.TRUNC, maxCommittedLog);
      currentZxid = maxCommittedLog;
      needOpPacket = false;
      needSnap = false;
    }
    // Follower data is within Leader's data range
    else if ((maxCommittedLog >= peerLastZxid)
             && (minCommittedLog <= peerLastZxid)) {
      LOG.info("Using committedLog for peer sid: " + getSid());
      Iterator<Proposal> itr = db.getCommittedLog().iterator();
      currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                            null, maxCommittedLog);
      needSnap = false;
    }
    //
    else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
      // Calculate transaction log size limit
      long sizeLimit = db.calculateTxnLogSizeLimit();
      // Read a certain amount of data
      Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
        peerLastZxid, sizeLimit);
      //
      if (txnLogItr.hasNext()) {
        LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
        // Put data into queuedPackets queue
        currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                              minCommittedLog, maxCommittedLog);

        LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
        Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
        currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                              null, maxCommittedLog);
        needSnap = false;
      }
      // Close txnLogItr resource
      if (txnLogItr instanceof TxnLogProposalIterator) {
        TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
        txnProposalItr.close();
      }
    } else {
      LOG.warn("Unhandled scenario for peer sid: " + getSid());
    }
    LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
              " for peer sid: " + getSid());
    // Notify leader that synchronization is complete
    leaderLastZxid = leader.startForwarding(this, currentZxid);
  } finally {
    rl.unlock();
  }

  if (needOpPacket && !needSnap) {
    LOG.error("Unhandled scenario for peer sid: " + getSid() +
              " fall back to use snapshot");
    needSnap = true;
  }

  return needSnap;
}

In the method above, it's important to first explain the peerLastZxid parameter. This can be understood as the zxid when the leader is generated, with the data source being the StateSummary object and the data packet being ackEpochPacket. The syncFollower method process flow is as follows:

  1. Read the maximum transaction ID, minimum transaction ID, and last processed zxid from the ZooKeeper database (the last processed zxid is calculated as election epoch << 32 after the election).
  2. If lastProcessedZxid equals peerLastZxid, it indicates no difference in data, so full synchronization is not needed.
  3. If peerLastZxid is greater than maxCommittedLog, it means the Follower's data is newer than the Leader's, so data truncation is needed rather than full synchronization.
  4. If maxCommittedLog is greater than or equal to peerLastZxid and minCommittedLog is less than or equal to peerLastZxid, it indicates the Follower's data is within the Leader's data range. In this case, proposal data needs to be retrieved from the ZooKeeper database and placed in the sending queue for transmission. Full synchronization is not needed here, only partial synchronization is required.
  5. If peerLastZxid is less than minCommittedLog, a sizeLimit amount of data needs to be read from the ZooKeeper database starting from peerLastZxid. If the read data is empty, full synchronization is needed; otherwise, it's not.
  6. Notify the leader that synchronization is complete.
  7. Return the result of whether full synchronization is needed.

Part 4: NEWLEADER packet and UPTODATE packet transmission operations, the specific handling code is as follows:

// Create NEWLEADER packet, write it out differently according to different versions
if (getVersion() < 0x10000) {
  QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                                              newLeaderZxid, null, null);
  oa.writeRecord(newLeaderQP, "packet");
}
else {
  QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid,leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
  queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();

// Start the packet sending thread
startSendingPackets();


// Create an empty packet
qp = new QuorumPacket();
// Read packet information from variable ia, deserialize to qp variable
ia.readRecord(qp, "packet");
// End processing if packet type is not ACK
if (qp.getType() != Leader.ACK) {
  LOG.error("Next packet was supposed to be an ACK,"
            + " but received packet: {}", packetToString(qp));
  return;
}

if (LOG.isDebugEnabled()) {
  LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());

syncLimitCheck.start();

sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

synchronized (leader.zk) {
  while (!leader.zk.isRunning() && !this.isInterrupted()) {
    leader.zk.wait(20);
  }
}
LOG.debug("Sending UPTODATE message to " + sid);
// Add UPTODATE packet
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

The core operation flow in the above code is as follows:

  1. Depending on whether the version is less than 0x10000, if so, create a NEWLEADER packet and put it in the oa variable; otherwise, create a NEWLEADER packet and put it in the queuedPackets queue. The difference between the two NEWLEADER packets is the data information.
  2. Start the sending thread through the startSendingPackets method. The core of the sending thread is to call the sendPackets method, which gets packets from the queuedPackets queue and puts them into the oa variable.
  3. Create an empty packet, read the packet information from the ia variable into this empty packet, and end processing if the database table type is not ACK.
  4. Wait for half of the candidates to send ACK through the waitForNewLeaderAck method.
  5. Create an UPTODATE packet and put it into the queuedPackets queue waiting for the sending thread to send.

Part 5: Perform different operations according to different types of packets, the specific operation code is as follows:

while (true) {
  qp = new QuorumPacket();
  ia.readRecord(qp, "packet");

  long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  if (qp.getType() == Leader.PING) {
    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  }
  if (LOG.isTraceEnabled()) {
    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
  }
  tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


  ByteBuffer bb;
  long sessionId;
  int cxid;
  int type;

  // Perform different behavior operations for different packet types
  switch (qp.getType()) {
      // Type is ACK
    case Leader.ACK:
      if (this.learnerType == LearnerType.OBSERVER) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Received ACK from Observer  " + this.sid);
        }
      }
      syncLimitCheck.updateAck(qp.getZxid());
      // Process ack packet
      leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
      break;
      // Type is PING
    case Leader.PING:
      ByteArrayInputStream bis = new ByteArrayInputStream(qp
                                                          .getData());
      DataInputStream dis = new DataInputStream(bis);
      while (dis.available() > 0) {
        long sess = dis.readLong();
        int to = dis.readInt();
        leader.zk.touch(sess, to);
      }
      break;
      // Type is REVALIDATE
    case Leader.REVALIDATE:
      bis = new ByteArrayInputStream(qp.getData());
      dis = new DataInputStream(bis);
      long id = dis.readLong();
      int to = dis.readInt();
      ByteArrayOutputStream bos = new ByteArrayOutputStream();
      DataOutputStream dos = new DataOutputStream(bos);
      dos.writeLong(id);
      boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
      if (valid) {
        try {
          // Set the owner, mark who the leader is
          leader.zk.setOwner(id, this);
        } catch (SessionExpiredException e) {
          LOG.error("Somehow session " + Long.toHexString(id) +
                    " expired right after being renewed! (impossible)", e);
        }
      }
      if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG,
                                 ZooTrace.SESSION_TRACE_MASK,
                                 "Session 0x" + Long.toHexString(id)
                                 + " is valid: " + valid);
      }
      dos.writeBoolean(valid);
      qp.setData(bos.toByteArray());
      // Add to queuedPackets collection
      queuedPackets.add(qp);
      break;
      // Type is REQUEST
    case Leader.REQUEST:
      bb = ByteBuffer.wrap(qp.getData());
      sessionId = bb.getLong();
      cxid = bb.getInt();
      type = bb.getInt();
      bb = bb.slice();
      Request si;
      if (type == OpCode.sync) {
        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb,
                                    qp.getAuthinfo());
      } else {
        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
      }
      si.setOwner(this);
      // Submit request
      leader.zk.submitLearnerRequest(si);
      break;
    default:
      LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
      break;
  }
}

In the above code, the preliminary operation is to read the packet from the variable ia. After reading the packet, different operations are performed according to different packet types, specifically as follows:

  1. If it's an ACK type, it needs to update the zxid and event-related data in syncLimitCheck, and process the ACK information through the processAck method of the leader object. The essence of processing is to create a packet and send it to the service (Follower node) corresponding to the socket.
  2. If the type is PING, it needs to convert the data information in the packet into a DataInputStream object, and then hand over the sessionid and timeout time in the DataInputStream object to the SessionTracker for processing. The essence of processing is to update the timeout time corresponding to the session.
  3. If the type is REVALIDATE, extract the sessionid and timeout time from the packet, confirm whether it is valid through the session tracker. If valid, bind the session id and the current LearnerHandler, and send a packet with the sessionId.
  4. If the type is REQUEST, get sessionid, cxid, type and bb data from the packet, convert them into a Request object, and then submit the request through the submitLearnerRequest method. After submitting the request, it will enter the request processing responsibility chain.
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
  return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}

Through this method, we enter the Follower constructor. The specific constructor code is as follows:

Follower(QuorumPeer self, FollowerZooKeeperServer zk) {
  this.self = self;
  this.zk = zk;
  this.fzk = zk;
}

In the above code, member variables are initialized:

  1. Member variable self, which is essentially the election node QuorumPeer.
  2. Member variable zk, representing the ZK service, specifically FollowerZooKeeperServer in this case.
  3. Member variable fzk, also representing the ZK service, specifically FollowerZooKeeperServer.

Next, we'll analyze the core method followLeader. The specific processing code is as follows:

void followLeader() throws InterruptedException {
  // Get current time
  self.end_fle = Time.currentElapsedTime();
  // Calculate election time
  long electionTimeTaken = self.end_fle - self.start_fle;
  self.setElectionTimeTaken(electionTimeTaken);
  LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
           QuorumPeer.FLE_TIME_UNIT);
  self.start_fle = 0;
  self.end_fle = 0;
  fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
  try {
    // Find leader service
    QuorumServer leaderServer = findLeader();
    try {
      // Establish connection with leader service
      connectToLeader(leaderServer.addr, leaderServer.hostname);
      // Register current node with leader, get zxid corresponding to new election epoch
      long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
      // Throw exception if current node state changes
      if (self.isReconfigStateChange()) {
        throw new Exception("learned about role change");
      }
      // Convert zxid to election epoch
      long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
      // Throw exception if election epoch is less than current node's accepted epoch
      if (newEpoch < self.getAcceptedEpoch()) {
        LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                  + " is less than our accepted epoch " + ZxidUtils.zxidToString(
                    self.getAcceptedEpoch()));
        throw new IOException("Error: Epoch of leader is lower");
      }
      // Synchronize with leader
      syncWithLeader(newEpochZxid);
      // Create data packet
      QuorumPacket qp = new QuorumPacket();
      while (this.isRunning()) {
        // Read packet
        readPacket(qp);
        // Process packet
        processPacket(qp);
      }
    } catch (Exception e) {
      LOG.warn("Exception when following the leader", e);
      try {
        sock.close();
      } catch (IOException e1) {
        e1.printStackTrace();
      }
      pendingRevalidations.clear();
    }
  } finally {
    zk.unregisterJMX((Learner) this);
  }
}

The core processing flow in the above code is as follows:

  1. Use the findLeader method to find the leader service.
  2. Use the connectToLeader method to establish a connection with the leader service.
  3. Use the registerWithLeader method to register the current node with the leader service. The registerWithLeader method returns a zxid that includes the latest election epoch.
  4. Check if the election epoch in the zxid is less than the current node's accepted epoch. If it's less, an exception needs to be thrown.
  5. Use the syncWithLeader method to synchronize with the leader.
  6. Enter a loop stage, repeating the following operations:
    1. Read data packet.
    2. Process data packet.

Next, we need to analyze the methods involved in these six core steps. Let's start with the findLeader method, which is used to find the leader service. The specific processing code is as follows:

protected QuorumServer findLeader() {
  QuorumServer leaderServer = null;
  // Find the leader by id
  // Get the vote
  Vote current = self.getCurrentVote();
  // Loop through candidate nodes
  for (QuorumServer s : self.getView().values()) {
    // If the candidate node's id matches the id in the vote, it's the leader
    if (s.id == current.getId()) {
      s.recreateSocketAddresses();
      leaderServer = s;
      break;
    }
  }
  if (leaderServer == null) {
    LOG.warn("Couldn't find the leader with id = "
             + current.getId());
  }
  return leaderServer;
}

The core processing flow in the above code is as follows:

  1. Retrieve the vote from the current node (this vote contains the id corresponding to the leader node).
  2. Loop through and compare the id of each candidate node with the leader id contained in the vote from step (1). If they match, this node is the leader node.

The connectToLeader method mainly involves socket operations, so we won't analyze it in detail. Next, let's analyze the registerWithLeader method. The specific processing code is as follows:

protected long registerWithLeader(int pktType) throws IOException {
  // Get the last operated zxid
  long lastLoggedZxid = self.getLastLoggedZxid();
  // Create data packet
  QuorumPacket qp = new QuorumPacket();
  // Set packet type
  qp.setType(pktType);
  // Set zxid
  qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
  // Create LearnerInfo object
  LearnerInfo li =
    new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
  ByteArrayOutputStream bsid = new ByteArrayOutputStream();
  BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
  boa.writeRecord(li, "LearnerInfo");
  qp.setData(bsid.toByteArray());

  // Write out packet, FOLLOWERINFO or OBSERVERINFO
  writePacket(qp, true);
  // Read packet
  readPacket(qp);
  // Get new election epoch from packet
  final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
  // If packet type is LEADERINFO
  if (qp.getType() == Leader.LEADERINFO) {
    // Read leader's protocol version
    leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
    byte epochBytes[] = new byte[4];
    final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
    // If leader's epoch is greater than current node's accepted epoch
    if (newEpoch > self.getAcceptedEpoch()) {
      wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
      // Set current node's accepted epoch to leader's epoch
      self.setAcceptedEpoch(newEpoch);
    }
    // If equal
    else if (newEpoch == self.getAcceptedEpoch()) {
      wrappedEpochBytes.putInt(-1);
    }
    // If greater, throw exception
    else {
      throw new IOException(
        "Leaders epoch, " + newEpoch + " is less than accepted epoch, "
        + self.getAcceptedEpoch());
    }
    // Send ACKEPOCH packet
    QuorumPacket ackNewEpoch =
      new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
    // Write out packet
    writePacket(ackNewEpoch, true);
    // Convert leader's epoch to zxid
    return ZxidUtils.makeZxid(newEpoch, 0);
  }
  // Other packet types
  else {
    // If leader's epoch is greater than current node's accepted epoch, overwrite current node's election epoch
    if (newEpoch > self.getAcceptedEpoch()) {
      self.setAcceptedEpoch(newEpoch);
    }
    // If packet type is not NEWLEADER, throw exception
    if (qp.getType() != Leader.NEWLEADER) {
      LOG.error("First packet should have been NEWLEADER");
      throw new IOException("First packet should have been NEWLEADER");
    }
    // Return zxid from packet
    return qp.getZxid();
  }
}

The core processing flow in the above code is as follows:

  1. Get the last operated zxid from the current node.
  2. Create a data packet.
  3. Set the packet type for the data packet. In Follower, the packet type is set to FOLLOWERINFO, while in Observer, it's set to OBSERVERINFO.
  4. Set the zxid for the data packet. The zxid data is converted from the election epoch accepted by the current node.
  5. Create a LearnerInfo object, put its data content into the data packet, and write it out using the writePacket method.
  6. Read the data packet using the readPacket method. The packet type read at this time may be LeaderInfo. If it's LEADERINFO type, the packet contains a new election epoch.
  7. Read the new election epoch from the data packet (this packet is obtained from step 6).
  8. If the packet type is LEADERINFO, perform the following operations:
    1. If the new election epoch is greater than the epoch accepted by the current node, set the accepted epoch of the current node to the new election epoch.
    2. If the new election epoch equals the epoch accepted by the current node, do nothing.
    3. If the new election epoch is less than the epoch accepted by the current node, throw an exception.
  9. If the packet type is not LEADERINFO, perform the following operations:
    1. If the new epoch is greater than the epoch accepted by the current node, overwrite the current node's election epoch.
    2. If the packet type is not NEWLEADER, throw an exception.
    3. Return the zxid from the data packet.

Next, we'll analyze the syncWithLeader method, which is used to synchronize data from the Leader. The specific processing code is as follows:

protected void syncWithLeader(long newLeaderZxid) throws Exception {
  // Create ACK packet
  QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
  // Create packet
  QuorumPacket qp = new QuorumPacket();
  // Get new election epoch from zxid parameter
  long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

  QuorumVerifier newLeaderQV = null;
  // Flag for full synchronization
  boolean snapshotNeeded = true;
  // Read packet
  readPacket(qp);
  // Committed packets, storing zxid
  LinkedList<Long> packetsCommitted = new LinkedList<Long>();
  // Uncommitted packets
  LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
  synchronized (zk) {
    // Packet indicates differential synchronization
    if (qp.getType() == Leader.DIFF) {
      LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
      snapshotNeeded = false;
    }
    // Packet indicates full synchronization
    else if (qp.getType() == Leader.SNAP) {
      LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
      // Deserialize snapshot from input stream to zk database
      zk.getZKDatabase().deserializeSnapshot(leaderIs);
      // Initialize config node if reconfig is not enabled
      if (!self.isReconfigEnabled()) {
        LOG.debug(
          "Reset config node content from local config after deserialization of snapshot.");
        zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
      }
      // Read signature information
      String signature = leaderIs.readString("signature");
      // Throw exception if signature is not "BenWasHere"
      if (!signature.equals("BenWasHere")) {
        LOG.error("Missing signature. Got " + signature);
        throw new IOException("Missing signature");
      }
      // Set last processed zxid
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
    }
    // Packet indicates rollback synchronization
    else if (qp.getType() == Leader.TRUNC) {
      LOG.warn("Truncating log to get in sync with the leader 0x"
               + Long.toHexString(qp.getZxid()));
      // Truncate log
      boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
      if (!truncated) {
        // not able to truncate the log
        LOG.error("Not able to truncate the log "
                  + Long.toHexString(qp.getZxid()));
        System.exit(13);
      }
      // Set last processed zxid
      zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

    }
    // Exit in other cases
    else {
      LOG.error("Got unexpected packet from leader: {}, exiting ... ",
                LearnerHandler.packetToString(qp));
      System.exit(13);

    }
    // Initialize config node
    zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
    // Create session tracker
    zk.createSessionTracker();

    long lastQueued = 0;
    // Flag for pre-ZAB 1.0
    boolean isPreZAB1_0 = true;
    // Flag for writing to transaction log
    boolean writeToTxnLog = !snapshotNeeded;
    outerLoop:
    // Loop
    while (self.isRunning()) {
      // Read packet
      readPacket(qp);
      // Perform different operations based on packet type
      switch (qp.getType()) {

        case Leader.PROPOSAL:
          // Create in-flight packet
          // Convert packet to PacketInFlight object and add to packetsNotCommitted collection
          PacketInFlight pif = new PacketInFlight();
          pif.hdr = new TxnHeader();
          pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
          if (pif.hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                     + Long.toHexString(pif.hdr.getZxid())
                     + " expected 0x"
                     + Long.toHexString(lastQueued + 1));
          }
          lastQueued = pif.hdr.getZxid();

          if (pif.hdr.getType() == OpCode.reconfig) {
            SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
            QuorumVerifier qv =
              self.configFromString(new String(setDataTxn.getData()));
            self.setLastSeenQuorumVerifier(qv, true);
          }

          packetsNotCommitted.add(pif);
          break;
        case Leader.COMMIT:
        case Leader.COMMITANDACTIVATE:
          // Get first packet from packetsNotCommitted
          pif = packetsNotCommitted.peekFirst();
          if (pif.hdr.getZxid() == qp.getZxid()
              && qp.getType() == Leader.COMMITANDACTIVATE) {
            QuorumVerifier qv = self.configFromString(
              new String(((SetDataTxn) pif.rec).getData()));
            // Reload configuration, throw exception if successful
            boolean majorChange = self.processReconfig(qv,
                                                       ByteBuffer.wrap(qp.getData()).getLong(),
                                                       qp.getZxid(), true);
            if (majorChange) {
              throw new Exception("changes proposed in reconfig");
            }
          }
          // If writing to transaction log is not needed
          if (!writeToTxnLog) {
            // Log if zxid in flight packet doesn't match packet zxid, otherwise process transaction and remove flight packet from packetsNotCommitted
            if (pif.hdr.getZxid() != qp.getZxid()) {
              LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
                       + pif.hdr.getZxid());
            }
            else {
              zk.processTxn(pif.hdr, pif.rec);
              packetsNotCommitted.remove();
            }
          }
          else {
            // Add zxid from packet to packetsCommitted collection
            packetsCommitted.add(qp.getZxid());
          }
          break;
        case Leader.INFORM:
        case Leader.INFORMANDACTIVATE:
          // Create in-flight packet
          PacketInFlight packet = new PacketInFlight();
          // Assign header information
          packet.hdr = new TxnHeader();

          // Packet type is INFORMANDACTIVATE
          if (qp.getType() == Leader.INFORMANDACTIVATE) {
            ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
            long suggestedLeaderId = buffer.getLong();
            byte[] remainingdata = new byte[buffer.remaining()];
            buffer.get(remainingdata);
            packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
            QuorumVerifier qv = self.configFromString(
              new String(((SetDataTxn) packet.rec).getData()));
            boolean majorChange =
              self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
            // Reload configuration, throw exception if successful
            if (majorChange) {
              throw new Exception("changes proposed in reconfig");
            }
          }
          else {
            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
            if (packet.hdr.getZxid() != lastQueued + 1) {
              LOG.warn("Got zxid 0x"
                       + Long.toHexString(packet.hdr.getZxid())
                       + " expected 0x"
                       + Long.toHexString(lastQueued + 1));
            }
            lastQueued = packet.hdr.getZxid();
          }

          if (!writeToTxnLog) {
            zk.processTxn(packet.hdr, packet.rec);
          }
          else {
            packetsNotCommitted.add(packet);
            packetsCommitted.add(qp.getZxid());
          }

          break;
        case Leader.UPTODATE:
          LOG.info("Learner received UPTODATE message");
          if (newLeaderQV != null) {
            // Reload configuration, throw exception if successful
            boolean majorChange =
              self.processReconfig(newLeaderQV, null, null, true);
            if (majorChange) {
              throw new Exception("changes proposed in reconfig");
            }
          }
          // If using pre-ZAB 1.0 protocol
          if (isPreZAB1_0) {
            // Take snapshot
            zk.takeSnapshot();
            // Set current epoch to new epoch
            self.setCurrentEpoch(newEpoch);
          }
          self.setZooKeeperServer(zk);
          self.adminServer.setZooKeeperServer(zk);
          // Return to start of loop
          break outerLoop;
        case Leader.NEWLEADER:
          LOG.info("Learner received NEWLEADER message");
          // Reset QuorumVerifier object if packet data is not null and length is greater than 0
          if (qp.getData() != null && qp.getData().length > 1) {
            try {
              QuorumVerifier qv = self.configFromString(new String(qp.getData()));
              self.setLastSeenQuorumVerifier(qv, true);
              newLeaderQV = qv;
            } catch (Exception e) {
              e.printStackTrace();
            }
          }

          // If full synchronization is needed
          if (snapshotNeeded) {
            // Take snapshot
            zk.takeSnapshot();
          }

          // Set current epoch to new epoch
          self.setCurrentEpoch(newEpoch);
          // Set flag for writing to transaction log to true
          writeToTxnLog = true; 
          // Set flag for pre-ZAB 1.0 to false
          isPreZAB1_0 = false;
          // Write ACK packet
          writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
          break;
      }
    }
  }
  // Set zxid in ACK packet
  ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
  // Write ACK packet
  writePacket(ack, true);
  // Set timeout
  sock.setSoTimeout(self.tickTime * self.syncLimit);
  // Start zk service
  zk.startup();
  self.updateElectionVote(newEpoch);

  if (zk instanceof FollowerZooKeeperServer) {
    FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
    // Process uncommitted packets
    for (PacketInFlight p : packetsNotCommitted) {
      fzk.logRequest(p.hdr, p.rec);
    }
    // Commit packets in packetsCommitted collection
    for (Long zxid : packetsCommitted) {
      fzk.commit(zxid);
    }
  }
  else if (zk instanceof ObserverZooKeeperServer) {
    ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
    // Process uncommitted packets
    for (PacketInFlight p : packetsNotCommitted) {
      // Get first zxid from committed packets
      Long zxid = packetsCommitted.peekFirst();
      // Skip processing if zxid in unprocessed packet doesn't match first zxid
      if (p.hdr.getZxid() != zxid) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                 + ", but next proposal is "
                 + Long.toHexString(p.hdr.getZxid()));
        continue;
      }
      // Remove first data from packetsCommitted
      packetsCommitted.remove();
      // Create request object
      Request request = new Request(null, p.hdr.getClientId(),
                                    p.hdr.getCxid(), p.hdr.getType(), null, null);
      request.setTxn(p.rec);
      request.setHdr(p.hdr);
      // Process request object
      ozk.commitRequest(request);
    }
  }
  else {
    throw new UnsupportedOperationException("Unknown server type");
  }
}

The core processing flow in the above code is as follows:

  1. Create an ACK packet, which needs to be sent under certain conditions.
  2. Create a packet for receiving data.
  3. Obtain the new election epoch from the zxid parameter, denoted as the variable newEpoch.
  4. Create a flag for full synchronization, denoted as the variable snapshotNeeded, with a default value of true, indicating the need for full synchronization.
  5. Read the packet.
  6. Create a collection of committed packets, stored in the variable packetsCommitted, which contains zxids.
  7. Create a collection of uncommitted packets, stored in the variable packetsNotCommitted, which contains packets in flight.
  8. Perform different operations based on the packet type read in step (5).
    1. If the packet type is DIFF, set the variable snapshotNeeded to false.
    2. If the packet type is SNAP, perform the following operations:
      1. Deserialize the data from the input stream into the current node's ZooKeeper database.
      2. Check if the current node's reconfigEnabled property is false; if so, reload the configuration node-related information in the ZooKeeper database.
      3. Obtain the signature data from the input stream; if it's not "BenWasHere", throw an exception.
      4. Set the last operated zxid in the ZooKeeper database to the zxid from the packet.
    3. If the packet type is TRUNC, perform the following operations:
      1. Truncate the ZooKeeper database using the zxid from the packet as the parameter.
      2. If truncation fails, exit the program with code 13.
      3. If truncation succeeds, set the last operated zxid in the ZooKeeper database to the zxid from the packet.
    4. In other cases, exit with code 13.
  9. Initialize the configuration node in the ZooKeeper database.
  10. Create a session tracker.
  11. Create a flag indicating whether it's a pre-ZAB 1.0 version, with a default value of true.
  12. Create a flag indicating whether to write to the transaction log, with an initial value of the negation of the full synchronization flag.
  13. Process packets in a loop, handling them based on different packet types:
    1. If the packet type is PROPOSAL, create a packet in flight with data from the packet, and add it to the packetsNotCommitted collection.
    2. If the packet type is COMMIT or COMMITANDACTIVATE, get the first packet in flight from packetsNotCommitted, compare its zxid with the packet's zxid, and if they match and the packet type is COMMITANDACTIVATE, reload the configuration. If reloading succeeds, throw an exception. Check if writing to the transaction log is needed; if not, compare the zxid of the packet in flight with the packet's zxid, log if they don't match, otherwise process the transaction and remove the packet in flight from packetsNotCommitted. If writing is needed, add the packet's zxid to packetsCommitted.
    3. If the packet type is INFORM or INFORMANDACTIVATE, create a packet in flight, check if writing to the transaction log is needed. If not, let the ZooKeeper service process the transaction; otherwise, add the data to packetsNotCommitted and packetsCommitted.
    4. If the packet type is UPTODATE, check if it's a pre-ZAB 1.0 protocol. If so, create a snapshot and set the new election epoch (newEpoch) to the current node.
    5. If the packet type is NEWLEAD, check if the packet contains data. If so, reset the current node's lastSeenQuorumVerifier property. Check if full synchronization is needed; if so, create a snapshot. After these operations, set the new election epoch to the current node, set the flag for writing to the transaction log to true, set the pre-ZAB 1.0 version flag to false, and write out an ACK packet.
  14. Set the zxid of the ACK packet created in step (1), which includes the new election epoch (newEpoch).
  15. Write out the ACK packet.
  16. Start the ZooKeeper service.
  17. Update the current node's election epoch.
  18. Perform different operations based on the ZooKeeper type:
    1. If the ZooKeeper type is FollowerZooKeeperServer, process data by iterating through the packetsNotCommitted collection. The essence of data processing is request handling, with SyncRequestProcessor as the first request processor. Commit zxids by iterating through the packetsCommitted collection. The essence of committing is request handling, with CommitProcessor as the first request processor.
    2. If the ZooKeeper type is ObserverZooKeeperServer, iterate through the packetsNotCommitted collection, compare the zxid of each packet in flight with the first zxid in the packetsCommitted collection. If they don't match, skip processing; otherwise, remove the first data from packetsCommitted, create a request object, and pass it to ObserverZooKeeperServer for data processing. The essence of data processing is request handling, with CommitProcessor as the first request processor.

Finally, analyze the processPacket method, which is used for packet processing. The specific processing code is as follows.

protected void processPacket(QuorumPacket qp) throws Exception {
  switch (qp.getType()) {
    case Leader.PING:
      // Send ping data packet
      ping(qp);
      break;
    case Leader.PROPOSAL:
      // Create transaction header and transaction object
      TxnHeader hdr = new TxnHeader();
      Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
      if (hdr.getZxid() != lastQueued + 1) {
        LOG.warn("Got zxid 0x"
                 + Long.toHexString(hdr.getZxid())
                 + " expected 0x"
                 + Long.toHexString(lastQueued + 1));
      }
      lastQueued = hdr.getZxid();

      if (hdr.getType() == OpCode.reconfig) {
        SetDataTxn setDataTxn = (SetDataTxn) txn;
        QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
        self.setLastSeenQuorumVerifier(qv, true);
      }

      // Process request
      fzk.logRequest(hdr, txn);
      break;
    case Leader.COMMIT:
      // Commit zxid
      fzk.commit(qp.getZxid());
      break;

    case Leader.COMMITANDACTIVATE:
      // Get request
      Request request = fzk.pendingTxns.element();
      SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
      QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
      ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
      long suggestedLeaderId = buffer.getLong();
      // Update configuration node
      boolean majorChange =
        self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
      // Commit zxid
      fzk.commit(qp.getZxid());
      // Throw exception if update is successful
      if (majorChange) {
        throw new Exception("changes proposed in reconfig");
      }
      break;
    case Leader.UPTODATE:
      // No operation
      LOG.error("Received an UPTODATE message after Follower started");
      break;
    case Leader.REVALIDATE:
      // Revalidate
      revalidate(qp);
      break;
    case Leader.SYNC:
      // Execute synchronization
      fzk.sync();
      break;
    default:
      LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
      break;
  }
}

The core processing flow in the above code is as follows:

  1. If the data type is PING, send the data packet using the ping method.
  2. If the data type is PROPOSAL, extract data from the packet, convert it to transaction header and transaction object, and pass it to FollowerZooKeeperServer for request processing.
  3. If the data type is COMMIT or COMMITANDACTIVATE, extract the zxid from the data packet and pass it to FollowerZooKeeperServer for processing.
  4. If the data type is UPTODATE, no operation is performed.
  5. If the data type is REVALIDATE, perform revalidation using the revalidate method.
  6. If the data type is SYNC, perform synchronization using the sync method.

Observer Analysis

This section analyzes the Observer class. We'll start by examining its constructor, which is used in the QuorumPeer#makeObserver method. The specific processing code is as follows:

protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
  return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}

Through this method, we enter the Observer constructor. The specific constructor code is as follows:

Observer(QuorumPeer self, ObserverZooKeeperServer observerZooKeeperServer) {
  this.self = self;
  this.zk = observerZooKeeperServer;
}

In the above code, member variables are initialized:

  1. Member variable self, which is essentially the election node QuorumPeer.
  2. Member variable zk, representing the ZK service, specifically ObserverZooKeeperServer in this case.

Next, we'll analyze the core method observeLeader. The specific processing code is as follows:

void observeLeader() throws Exception {
  zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);

  try {
    // Find leader service
    QuorumServer leaderServer = findLeader();
    LOG.info("Observing " + leaderServer.addr);
    try {
      // Establish connection with leader service
      connectToLeader(leaderServer.addr, leaderServer.hostname);
      // Register current node with leader, get zxid corresponding to new election epoch
      long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
      // Throw exception if current node state changes
      if (self.isReconfigStateChange()) {
        throw new Exception("learned about role change");
      }
      // Synchronize with leader
      syncWithLeader(newLeaderZxid);
      // Create data packet
      QuorumPacket qp = new QuorumPacket();
      while (this.isRunning()) {
        // Read packet
        readPacket(qp);
        // Process packet
        processPacket(qp);
      }
    } catch (Exception e) {
      LOG.warn("Exception when observing the leader", e);
      try {
        sock.close();
      } catch (IOException e1) {
        e1.printStackTrace();
      }
      pendingRevalidations.clear();
    }
  } finally {
    zk.unregisterJMX(this);
  }
}

In the above code, we can see that the operations are similar to the followLeader method in Follower. The relevant methods have been mentioned when analyzing followLeader, so we won't analyze individual methods. The processing flow of the observeLeader method is as follows:

  1. Use the findLeader method to find the leader service.
  2. Use the connectToLeader method to establish a connection with the leader service.
  3. Use the registerWithLeader method to register the current node with the leader service. The registerWithLeader method will return a zxid that includes the latest election epoch.
  4. Check if the election epoch in the zxid is less than the current node's accepted election epoch. If it is, an exception needs to be thrown.
  5. Use the syncWithLeader method to synchronize with the leader.
  6. Enter an infinite loop stage, repeatedly performing the following operations:
    1. Read data packets.
    2. Process data packets.

Summary

This chapter analyzes the state synchronization process in the ZooKeeper project, focusing on a detailed examination of the lead method in Leader, the followLeader method in Follower, and the observeLeader method in Observer. Through this analysis, we discover that the processing operations of Follower and Observer are similar. Therefore, in the following summary, we will explain the synchronization process only in terms of Follower and Leader interactions.

The Leader executes the lead method to create a LearnerCnxAcceptor. In LearnerCnxAcceptor, it waits for data in the socket connection (this connection is used to receive state synchronization requests initiated by Followers). Once data is received, it creates a LearnerHandler class for processing. At this point, the processing involves related interactions with the Follower, as shown in the diagram below.

In the diagram above, after the leader receives the ack packet, it will determine whether a full data synchronization is needed. This involves calling the syncFollower method to perform data synchronization operations with the Follower. The synchronization logic for Follower and Observer is implemented in the org.apache.zookeeper.server.quorum.Learner#syncWithLeader method.