zookeeper source code analysis

ZooKeeper Leader Election Implementation

This chapter provides a comprehensive introduction to the implementation of the leader election algorithm in the ZooKeeper project.

Election Entry Point

This section analyzes the entry point for the leader election algorithm in the ZooKeeper project. In ZooKeeper, the interface defining the election behavior is called Election. The specific code is as follows:

public interface Election {
    Vote lookForLeader() throws InterruptedException;

    void shutdown();
}

The Election interface defines two methods:

  1. The lookForLeader method is used to find a Leader. It returns a Vote object, which can be understood as an election ballot.
  2. The shutdown method is used to close the election process.

There are three implementation classes of the Election interface in the project: FastLeaderElection, LeaderElection, and AuthFastLeaderElection. Currently, the latter two have been deprecated in the ZooKeeper project, with FastLeaderElection being the primary election interface implementation class.

After understanding the Election interface and its implementation classes, let's explain their construction. The method for constructing the Election interface is QuorumPeer#createElectionAlgorithm. The specific implementation code is as follows:

@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        switch(electionAlgorithm){
            case 0:
                le=new LeaderElection(this);
                break;
            case 1:
                le=new AuthFastLeaderElection(this);
                break;
            case 2:
                le=new AuthFastLeaderElection(this,true);
                break;
            case 3:
                QuorumCnxManager qcm=createCnxnManager();
                QuorumCnxManager oldQcm=qcmRef.getAndSet(qcm);
                if(oldQcm!=null){
                    LOG.warn(
                    "Clobbering already-set QuorumCnxManager (restarting leader election?)");
                    oldQcm.halt();
                }
                
                QuorumCnxManager.Listener listener=qcm.listener;
                if(listener!=null){
                    listener.start();
                    FastLeaderElection fle=new FastLeaderElection(this,qcm);
                    fle.start();
                    le=fle;
                }else{
                    LOG.error("Null listener when initializing cnx manager");
              }
              break;
             default:
                assert false;
        }
        return le;
}

Based on the electionAlgorithm parameter, different election algorithm implementation classes are created as follows:

  1. When electionAlgorithm is 0, LeaderElection is chosen
  2. When electionAlgorithm is 1, AuthFastLeaderElection is chosen
  3. When electionAlgorithm is 2, AuthFastLeaderElection is chosen
  4. When electionAlgorithm is 3, FastLeaderElection is chosen.

Next, we need to find the call to the createElectionAlgorithm function. The specific call code is as follows:

this.electionAlg=createElectionAlgorithm(electionType);

In the above code, the member variable electionType is used to create the Election interface implementation class. Here we need to clarify the inference of the electionType value. Since it's a member variable, there are two possibilities for its assignment: through the constructor or through a set method. In the ZooKeeper project, we can find that its assignment is done through a set method, specifically in the QuorumPeerMain class. The handling code is as follows:

quorumPeer.setElectionType(config.getElectionAlg())

In the above code, the member variable electionAlg is obtained from the configuration class (QuorumPeerConfig) as the parameter for the election algorithm type. The detailed data is as follows:

protected int electionAlg=3;

In summary, the default choice of election algorithm implementation class in the ZooKeeper project is FastLeaderElection. Next, we need to find the specific time when the election occurs. The essence of finding this occurrence time is to find where the Election#lookForLeader method is executed. The execution method is located in the run method of the QuorumPeer class. Part of the handling code is as follows:

// Some code omitted
switch(getPeerState()){
        case LOOKING:
        LOG.info("LOOKING");
        setCurrentVote(makeLEStrategy().lookForLeader());
        break;

From the above code, we can see that the election operation is performed when the state is LOOKING.

This class manages the quorum protocol. This server can be in three states: Leader Election - Each server will elect a leader (initially proposing itself as the leader). Follower - The server will synchronize with the leader and replicate any transactions. Leader - The server will process requests and forward them to followers. A majority of followers must log the request before it can be accepted. This class will set up a datagram socket that will always respond with its current view of who the leader is. The response will take the following form:

QuorumPeer Analysis

This section analyzes the QuorumPeer class, which is a subclass of Thread. It primarily manages the election protocol, with the specific implementation in the run method, which will be explained in detail later. We'll now introduce some member variables, methods, and inner classes of the QuorumPeer class. First is the inner class ServerState, an enumeration representing the service state with four possible states:

  1. LOOKING: The phase of searching for a Leader, requiring an election to be initiated.
  2. FOLLOWING: The follower state, indicating that the node is a follower and there's already a Leader in the cluster.
  3. LEADING: The leader state, indicating that the node itself is the leader.
  4. OBSERVING: The observer state, indicating that there's already a Leader in the cluster.

It's important to note that the QuorumPeer class has a member variable 'state', which represents the service state. By default, it's set to ServerState.LOOKING, meaning that QuorumPeer will initiate an election by default when started. Next, we need to understand the member variable 'myId', which, from its naming, can be inferred to correspond to the 'myid' property in the ZooKeeper configuration file (zoo.cfg). In the ZooKeeper project, the assignment of the 'myId' member variable is done through the following code:

quorumPeer.setMyid(config.getServerId())

This code assigns the 'serverId' member variable from the QuorumPeerConfig class to the 'myId' member variable. The source of the 'serverId' member variable is as follows:

private void setupMyId()throws IOException{
  File myIdFile=new File(dataDir,"myid");
  if(!myIdFile.isFile()){
      return;
  }
  BufferedReader br=new BufferedReader(new FileReader(myIdFile));
  String myIdString;
  try{
      myIdString=br.readLine();
  }finally{
      br.close();
  }
  try{
      serverId=Long.parseLong(myIdString);
      MDC.put("myid",myIdString);
  }catch(NumberFormatException e){
      throw new IllegalArgumentException("serverid "+myIdString
      +" is not a number");
  }
}

From this code, we can conclude that the 'serverId' member variable comes from the 'myid' property in the configuration file, and the 'myid' member variable is indeed the 'myid' property from the configuration file.

This concludes the analysis of the important member variables 'state' and 'myid'. Next, we'll move on to method analysis, focusing on the 'start' and 'run' methods as entry points. During this analysis, we'll provide supplementary explanations for other member variables.

Analysis of the start Method

This section analyzes the 'start' method in the QuorumPeer class. The specific handling code is as follows:

@Override
public synchronized void start(){
 // Check if the view contains myid, if not, throw an exception
 if(!getView().containsKey(myid)){
     throw new RuntimeException("My id "+myid+" not in the peer list");
 }
 // Load data
 loadDataBase();
 // Start ServerCnxnFactory
 startServerCnxnFactory();
 try{
     // Start AdminServer
     adminServer.start();
  }catch(AdminServerException e){
     LOG.warn("Problem starting AdminServer",e);
     System.out.println(e);
 }
 // Start election
 startLeaderElection();
 // Start the actual thread
 super.start();
}

The processing flow in the above code is as follows:

  1. Check if the view contains myid; if not, throw an exception.
  2. Load data using the loadDataBase method.
  3. Start ServerCnxnFactory, initializing two member variables: cnxnFactory and secureCnxnFactory.
  4. Start AdminServer.
  5. Start election-related content using the startLeaderElection method.
  6. Start the actual thread.

In this processing flow, we need to explain the concept of "view". Here, the view refers to all members participating in the election. The view data structure is as follows:

  1. The key represents the service id (myid).
  2. The value represents the service object participating in the election.

Next, we'll analyze the loadDataBase method. The specific handling code is as follows:

private void loadDataBase() {
    try {
        // Load data from ZooKeeper database
        zkDb.loadDataBase();

        // Load the epochs
        // Load election cycle related data

        // Get the last processed zxid from the ZooKeeper database
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        // Get the epoch from the last zxid
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            // Get the current epoch from the currentEpoch file
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // Assign the epoch from the last processed zxid to the currentEpoch member variable and write it to the currentEpoch file
            currentEpoch = epochOfZxid;
            LOG.info(CURRENT_EPOCH_FILENAME
                     + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                     currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        // If the epoch from zxid is greater than the epoch in the currentEpoch file, throw an exception
        if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
                                  + ", is older than the last zxid, " + lastProcessedZxid);
        }
        try {
            // Read the accepted election epoch from the acceptedEpoch file
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // Assign the epoch from the last processed zxid to the acceptedEpoch member variable and write it to the acceptedEpoch file
            acceptedEpoch = epochOfZxid;
            LOG.info(ACCEPTED_EPOCH_FILENAME
                     + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                     acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        // If the election epoch in the acceptedEpoch file is less than the current election epoch, throw an exception
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch)
                                  + " is less than the current epoch, " + ZxidUtils.zxidToString(
                                      currentEpoch));
        }
    } catch (IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}

The code above primarily accomplishes the loading of the ZooKeeper database and the initialization of member variables currentEpoch and acceptedEpoch. The currentEpoch member variable represents the election cycle that the current node is experiencing, i.e., the current election cycle. The acceptedEpoch member variable represents the election cycle accepted by the current node, i.e., the election cycle when the previous Leader election was passed. Note: Both currentEpoch and acceptedEpoch member variables need to be persisted, and their corresponding persistence file names are currentEpoch and acceptedEpoch respectively. After understanding these details, let's explain the processing flow:

  1. Load data into the ZooKeeper database, mainly operating on the FileTxnSnapLog class, loading data into the data tree (DataTree).
  2. Read the last processed zxid from the ZooKeeper database, then convert it to an epoch. The conversion process is: zxid >> 32.
  3. Get the current epoch from the currentEpoch file (assign to the currentEpoch member variable). If this fails, assign the epoch converted in step (2) to the current epoch (currentEpoch member variable) and write it to the currentEpoch file. If successful, check if the epoch from step (2) is greater than the currentEpoch member variable. If so, throw an exception.
  4. Get the accepted epoch from the acceptedEpoch file (assign to the acceptedEpoch member variable). If this fails, assign the epoch converted in step (2) to the current epoch (acceptedEpoch member variable) and write it to the acceptedEpoch file. If successful, check if the accepted epoch is less than the currentEpoch member variable. If so, throw an exception.

Next, let's explain the startLeaderElection method. The specific handling code is as follows:

synchronized public void startLeaderElection() {
    try {
        // Initialize the vote if the server state is LOOKING
        if (getPeerState() == ServerState.LOOKING) {
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }

    // Election algorithm type is 0
    if (electionType == 0) {
        try {
            udpSocket = new DatagramSocket(getQuorumAddress().getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    // Construct the election interface implementation class
    this.electionAlg = createElectionAlgorithm(electionType);
}

The core processing flow in the above code is as follows:

  1. If the server state is LOOKING, initialize the vote with parameters: myid, the last processed zxid, and the current epoch.
  2. If the election algorithm type is 0, initialize the member variables udpSocket and responder.
  3. Construct the election interface implementation class using the createElectionAlgorithm method.

In the startLeaderElection method, the focus is on initializing the vote and the election interface implementation class. The initialization of the election interface implementation class has been discussed in previous sections. Now, let's analyze the vote. The class responsible for the vote concept is Vote. Understanding the vote primarily involves examining its member variables, as detailed in the table below.

Variable NameVariable TypeVariable Description
versionidVersion
idlongServer ID
zxidlongLast processed zxid (transaction number)
electionEpochlongLogical clock, number of epochs experienced
peerEpochlongEpoch number during candidacy
stateServerStateServer state

Analysis of the run method

This section will analyze the run method in the QuorumPeer class. The run method can be divided into three core operational processes:

  1. Update the thread name.
  2. Register MBean-related information.
  3. Core infinite loop, performing different operations based on various server states.

Since the first part involves simple string-related operations, we won't elaborate on it. The code for the second part is as follows:

try {
    // Initialize the jmxQuorumBean member variable and register it with MBean, mainly used to store self-generated data support for jmx
    jmxQuorumBean = new QuorumBean(this);
    MBeanRegistry.getInstance().register(jmxQuorumBean, null);
    // Loop through all services participating in the election, create LocalPeerBean or RemotePeerBean based on whether the service ID matches the current ID, and register it with MBean.
    for (QuorumServer s : getView().values()) {
        ZKMBeanInfo p;
        // If the current ID matches the ID of the participating service
        if (getId() == s.id) {
            p = jmxLocalPeerBean = new LocalPeerBean(this);
            try {
                MBeanRegistry.getInstance().register(p, jmxQuorumBean);
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
                jmxLocalPeerBean = null;
            }
        }
        // If not matching
        else {
            RemotePeerBean rBean = new RemotePeerBean(this, s);
            try {
                MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                jmxRemotePeerBean.put(s.id, rBean);
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
            }
        }
    }
} catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    jmxQuorumBean = null;
}

The processing flow in the above code is as follows:

  1. Create a QuorumBean using itself as a parameter and register it with MBean.
  2. Loop through all services participating in the election, create LocalPeerBean or RemotePeerBean based on whether the service ID matches the current ID, and register them with MBean.

Next, we'll explain the most important third step. The third step operation will handle different server states, so the following explanation will be organized according to different server states. First, when

The processing flow in the above code is as follows:

  1. Create a QuorumBean using itself as a parameter and register it with MBean.
  2. Loop through all services participating in the election, create LocalPeerBean or RemotePeerBean based on whether the service ID matches the current ID, and register them with MBean.

Next, we'll explain the most important third step. The third step operation will handle different server states, so the following explanation will be organized according to different server states. First, when the server state is LOOKING, the following code operations are performed:

case LOOKING:
// Election trigger
LOG.info("LOOKING");

// Get the readonlymode.enabled property
if (Boolean.getBoolean("readonlymode.enabled")) {
    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

    // Create read-only server but don't start it immediately
    // Create read-only ZooKeeper service
    final ReadOnlyZooKeeperServer roZk =
        new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

    // Instead of starting roZk immediately, wait some grace
    // period before we decide we're partitioned.
    //
    // Thread is used here because otherwise it would require
    // changes in each of election strategy classes which is
    // unnecessary code coupling.
    // Create waiting thread
    Thread roZkMgr = new Thread() {
        public void run() {
            try {
                // lower-bound grace period to 2 secs
                sleep(Math.max(2000, tickTime));
                if (ServerState.LOOKING.equals(getPeerState())) {
                    roZk.startup();
                }
            } catch (InterruptedException e) {
                LOG.info(
                    "Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
            } catch (Exception e) {
                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
            }
        }
    };
    try {
        // Start the waiting thread
        roZkMgr.start();
        // Set reconfigFlag member variable to false
        reconfigFlagClear();
        // If shuttingDownLE is true, reinitialize the election implementation class
        if (shuttingDownLE) {
            shuttingDownLE = false;
            startLeaderElection();
        }
        // Set the vote
        setCurrentVote(makeLEStrategy().lookForLeader());
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
        setPeerState(ServerState.LOOKING);
    } finally {
        // If the thread is in the the grace period, interrupt
        // to come out of waiting.
        roZkMgr.interrupt();
        roZk.shutdown();
    }
}
else {
    try {

        reconfigFlagClear();
        if (shuttingDownLE) {
            shuttingDownLE = false;
            // Start the election
            startLeaderElection();
        }
        // Set the vote
        setCurrentVote(makeLEStrategy().lookForLeader());
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
        setPeerState(ServerState.LOOKING);
    }
}
break;

The code above makes two types of operations based on the readonlymode.enabled property. Both operations share a common process of confirming the vote. The process for confirming the vote is as follows:

  1. Check if the member variable shuttingDownLE is true. If it is, set it to false and execute the startLeaderElection method. The goal of executing startLeaderElection is to complete the election interface implementation class needed in step (2).
  2. Complete the vote using the makeLEStrategy method and the lookForLeader method. The makeLEStrategy method is used to obtain the election interface implementation class, while the lookForLeader method is used to confirm the final vote.

Next, we'll explain the code for the server states OBSERVING, FOLLOWING, and LEADING. The specific processing code is as follows:

case OBSERVING:
try {
    LOG.info("OBSERVING");
    // Set the observer
    setObserver(makeObserver(logFactory));
    // Observer starts observing the leader
    observer.observeLeader();
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    observer.shutdown();
    setObserver(null);
    updateServerState();
}
break;
case FOLLOWING:
try {
    LOG.info("FOLLOWING");
    // Set the follower
    setFollower(makeFollower(logFactory));
    // Follower executes follow operation to communicate with the leader
    follower.followLeader();
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    follower.shutdown();
    setFollower(null);
    // Update server state
    updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
    // Set the leader
    setLeader(makeLeader(logFactory));
    // Leader begins leading operation
    leader.lead();
    setLeader(null);
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    if (leader != null) {
        leader.shutdown("Forcing shutdown");
        setLeader(null);
    }
    // Update server state
    updateServerState();
}
break;

The code above mainly involves the following operations:

  1. Set the member variables leader, follower, and observer according to different types, then call the relevant methods. If it's a leader, call the lead method; if it's a follower, call followLeader; if it's an observer, call observeLeader.
  2. If an exception occurs when calling lead, followLeader, or observeLeader, record the exception.
  3. Execute the shutdown method of the member variables leader, follower, or observer, set the member variable to null, and update the server state through the updateServerState method.

Next, we'll analyze the updateServerState method. The specific processing code is as follows:

private synchronized void updateServerState() {
    if (!reconfigFlag) {
        setPeerState(ServerState.LOOKING);
        LOG.warn("PeerState set to LOOKING");
        return;
    }

    if (getId() == getCurrentVote().getId()) {
        setPeerState(ServerState.LEADING);
        LOG.debug("PeerState set to LEADING");
    } else if (getLearnerType() == LearnerType.PARTICIPANT) {
        setPeerState(ServerState.FOLLOWING);
        LOG.debug("PeerState set to FOLLOWING");
    } else if (getLearnerType() == LearnerType.OBSERVER) {
        setPeerState(ServerState.OBSERVING);
        LOG.debug("PeerState set to OBSERVER");
    } else { // currently shouldn't happen since there are only 2 learner types
        setPeerState(ServerState.LOOKING);
        LOG.debug("Shouldn't be here");
    }
    reconfigFlag = false;
}

In the code above, the server state is controlled by the following conditions:

  1. If the member variable reconfigFlag is false, set the server state to LOOKING.
  2. If the current service ID is the same as the ID in the vote, set the server state to LEADING.
  3. If the learnerType property of the current service is PARTICIPANT, set the server state to FOLLOWING.
  4. If the learnerType property of the current service is OBSERVER, set the server state to OBSERVING.
  5. If conditions 2-4 are not met, set the server state to LOOKING.

FastLeaderElection Analysis

This section will analyze the FastLeaderElection class, focusing on the lookForLeader method. In the lookForLeader method, two voting boxes need to be explained.

  1. The voting box recvset, which will collect votes from services with states LOOKING, FOLLOWING, and LEADING.
  2. The voting box outofelection, which will collect votes from services with states FOLLOWING and LEADING. This voting box is generally used when a Leader node already exists.

In addition to the voting boxes, some commonly used methods need to be explained. First is the updateProposal method, which is used to update votes. The specific processing code is as follows:

synchronized void updateProposal(long leader,long zxid,long epoch){
  proposedLeader=leader;
  proposedZxid=zxid;
  proposedEpoch=epoch;
}

This code involves three parameters and three member variables, their meanings are as follows:

  1. leader: leader number
  2. zxid: transaction ID
  3. epoch: election cycle
  4. proposedLeader: leader number in the vote
  5. proposedZxid: transaction ID in the vote
  6. proposedEpoch: election cycle in the vote

Next, we'll analyze the sendNotifications method, which is used to broadcast votes (notifications). The specific processing code is as follows:

private void sendNotifications() {
  // Put vote information into sendqueue, WorkerSender will read data from it and send
  for (long sid : self.getCurrentAndNextConfigVoters()) {
    QuorumVerifier qv = self.getQuorumVerifier();
    // Construct send message
    ToSend notmsg = new ToSend(ToSend.mType.notification,
                               proposedLeader,
                               proposedZxid,
                               logicalclock.get(),
                               QuorumPeer.ServerState.LOOKING,
                               sid,
                               proposedEpoch, qv.toString().getBytes());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
                Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(
                  logicalclock.get()) +
                " (n.round), " + sid + " (recipient), " + self.getId() +
                " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
    }
    // Add to send queue
    sendqueue.offer(notmsg);
  }
}

In the above code, it will get the service IDs participating in the election, then loop through this service ID collection to construct message objects and put them into the sendqueue collection. After putting them into the sendqueue collection, the WorkerSender class will perform the sending operation.

Next, we'll analyze the totalOrderPredicate method, which is used to determine whether a vote change is necessary. The specific processing code is as follows:

protected boolean totalOrderPredicate(long newId,
                                      long newZxid,
                                      long newEpoch,
                                      long curId,
                                      long curZxid,
                                      long curEpoch) {
  if (self.getQuorumVerifier().getWeight(newId) == 0) {
    return false;
  }
  // Epoch > Zxid > myid
  return ((newEpoch > curEpoch) ||
          ((newEpoch == curEpoch) &&
           ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

In the above code, it first retrieves the weight corresponding to the new leader ID from the set of services participating in the election. If the weight is 0, it returns false, indicating that a vote change is not necessary. However, in actual development or production environments, cases where the weight is set to 0 are extremely rare. The more important decision logic in the totalOrderPredicate method is the comparison of Epoch, Zxid, and myid. If any of the following conditions are met, it returns true, indicating that a vote change is needed:

  1. The new epoch is greater than the current epoch.
  2. Given that the new epoch is equal to the current epoch, the new zxid is greater than the current zxid.
  3. Given that the new epoch is equal to the current epoch and the new zxid is equal to the current zxid, the new service ID is greater than the current service ID.

Next, we'll analyze the termPredicate method, which is used to determine whether the voting has concluded. The specific processing code is as follows:

protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
  // Create SyncedLearnerTracker class
  SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
  // Add QuorumVerifier interface implementation to SyncedLearnerTracker class
  voteSet.addQuorumVerifier(self.getQuorumVerifier());
  // Check if lastSeenQuorumVerifier needs to be added
  if (self.getLastSeenQuorumVerifier() != null
      && self.getLastSeenQuorumVerifier().getVersion() > self
      .getQuorumVerifier().getVersion()) {
    voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
  }
  // Loop through the vote collection, check if it matches the current parameter vote, if so, add it
  for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
    if (vote.equals(entry.getValue())) {
      voteSet.addAck(entry.getKey());
    }
  }

  // Confirm if the majority approves
  return voteSet.hasAllQuorums();
}

In the code above, the core processing flow is as follows:

  1. Create a SyncedLearnerTracker class and add the quorumVerifier member variable to it. If the lastSeenQuorumVerifier member variable is not null and its version is greater than the quorumVerifier's version, then lastSeenQuorumVerifier also needs to be added to the SyncedLearnerTracker class.
  2. Loop through the vote collection, check if the current vote matches the parameter vote, and if so, call the addAck method to add the service id to the SyncedLearnerTracker class.
  3. Use the hasAllQuorums method to determine if there's majority approval, and return this result.

Finally, let's analyze the checkLeader method, which is used to check if the parameter leader is indeed the leader. The specific processing code is as follows:

protected boolean checkLeader(
  Map<Long, Vote> votes,
  long leader,
  long electionEpoch) {

  boolean predicate = true;

  // Parameter leader (leader in the vote) is not equal to the local id
  if (leader != self.getId()) {
    // If the leader doesn't exist in the vote collection, set the flag to false
    if (votes.get(leader) == null) {
      predicate = false;
    }
    // If the service state corresponding to the leader in the vote collection is not LEADING, set the flag to false
    else if (votes.get(leader).getState() != ServerState.LEADING) {
      predicate = false;
    }
  }
  // If the local election epoch is not equal to the parameter election epoch, set the flag to false
  else if (logicalclock.get() != electionEpoch) {
    predicate = false;
  }

  return predicate;
}

In the code above, the judgments for the parameter leader not being the actual leader are as follows:

  1. Prerequisite: The parameter leader is not equal to the local id.
    1. Query the corresponding vote in the vote collection using the parameter leader, and the vote doesn't exist.
    2. Query the service state in the corresponding vote using the parameter leader in the vote collection, and the service state is not LEADING.
  2. The local election epoch is not the same as the parameter election epoch.

With the analysis of these previous methods, the analysis of the lookForLeader method becomes relatively easier. The specific processing code is as follows:

public Vote lookForLeader() throws InterruptedException {
  try {
    self.jmxLeaderElectionBean = new LeaderElectionBean();
    MBeanRegistry.getInstance().register(
      self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
  } catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    self.jmxLeaderElectionBean = null;
  }
  if (self.start_fle == 0) {
    self.start_fle = Time.currentElapsedTime();
  }
  try {
    // Ballot box
    HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

    // Out-of-election ballot box, commonly used in existing cluster environments
    HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

    // Maximum election wait time, default 200 ms
    int notTimeout = finalizeWait;

    synchronized (this) {
      // Increment election epoch
      logicalclock.incrementAndGet();
      // Update ballot (proposal) information
      updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }

    LOG.info("New election. My id =  " + self.getId() +
             ", proposed zxid=0x" + Long.toHexString(proposedZxid));
    // Broadcast ballot
    sendNotifications();

  
    // Loop to confirm Leader
    while ((self.getPeerState() == ServerState.LOOKING) &&
           (!stop)) {
      // Get a notification object from the pending message queue for processing
      Notification n = recvqueue.poll(notTimeout,
                                      TimeUnit.MILLISECONDS);

      if (n == null) {
        // Communication manager checks if there are unsent messages, if so, send them
        if (manager.haveDelivered()) {
          // Broadcast ballot
          sendNotifications();
        }
        else {
          // Connect all candidate services
          manager.connectAll();
        }

        // Calculate new timeout
        int tmpTimeOut = notTimeout * 2;
        notTimeout = (tmpTimeOut < maxNotificationInterval ?
                      tmpTimeOut : maxNotificationInterval);
        LOG.info("Notification time out: " + notTimeout);
      }
      // 1. If the sid of the notification object is in the ballot service set
      // 2. If the leader chosen by the notification object is in the ballot service set
      else if (validVoter(n.sid) && validVoter(n.leader)) {
        // Handle different service states in the notification object
        switch (n.state) {
          case LOOKING:
            // If notification > current, replace and send messages out
            // If the election epoch in the notification is greater than the local election epoch
            if (n.electionEpoch > logicalclock.get()) {
              // Set local election epoch to the one in the notification
              logicalclock.set(n.electionEpoch);
              // Clear local ballot box
              recvset.clear();
              // Confirm if ballot update is needed
              if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                      getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                // Update ballot
                updateProposal(n.leader, n.zxid, n.peerEpoch);
              }
              // If the leader's weight in the notification is 0
              else {
                // Update ballot
                updateProposal(getInitId(),
                               getInitLastLoggedZxid(),
                               getPeerEpoch());
              }
              // Broadcast ballot
              sendNotifications();
            }
            // If the election epoch in the notification is less than the local election epoch, skip processing
            else if (n.electionEpoch < logicalclock.get()) {
              if (LOG.isDebugEnabled()) {
                LOG.debug(
                  "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                  + Long.toHexString(n.electionEpoch)
                  + ", logicalclock=0x" + Long.toHexString(
                    logicalclock.get()));
              }
              break;
            }
            // If epochs are equal, confirm if ballot update is needed
            else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                         proposedLeader, proposedZxid, proposedEpoch)) {
              // Update ballot
              updateProposal(n.leader, n.zxid, n.peerEpoch);
              // Broadcast ballot
              sendNotifications();
            }

            if (LOG.isDebugEnabled()) {
              LOG.debug("Adding vote: from=" + n.sid +
                        ", proposed leader=" + n.leader +
                        ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                        ", proposed election epoch=0x" + Long.toHexString(
                          n.electionEpoch));
            }
            // Collect ballot in local ballot box
            recvset.put(n.sid,
                        new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

            // Tally votes
            // Determine if voting has ended
            if (termPredicate(recvset,
                              new Vote(proposedLeader, proposedZxid,
                                       logicalclock.get(), proposedEpoch))) {

              // Verify if there is any change in the proposed leader
              // Check for new notifications, if present, further determine if ballot repository update is needed
              while ((n = recvqueue.poll(finalizeWait,
                                         TimeUnit.MILLISECONDS)) != null) {
                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)) {

                  recvqueue.put(n);
                  // Re-elect
                  break;
                }
              }
              // No subsequent notifications
              if (n == null) {
                // If the leader id in the ballot matches the local id, set the current node as leader
                self.setPeerState((proposedLeader == self.getId()) ?
                                  ServerState.LEADING : learningState());
                // Create return value ballot, Leader ballot
                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(),
                                        proposedEpoch);
                // Record Leader ballot and clear recvqueue set
                leaveInstance(endVote);
                return endVote;
              }
            }
            break;
          case OBSERVING:
            LOG.debug("Notification from observer: " + n.sid);
            break;
          case FOLLOWING:
          case LEADING:
            // If the election epoch in the notification equals the local election epoch
            if (n.electionEpoch == logicalclock.get()) {
              //
              recvset.put(n.sid,
                          new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
              // Determine if voting has ended
              if (termPredicate(recvset, new Vote(n.version, n.leader,
                                                  n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                  && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                // Set service state
                self.setPeerState((n.leader == self.getId()) ?
                                  ServerState.LEADING : learningState());
                // Create Leader ballot
                Vote endVote = new Vote(n.leader,
                                        n.zxid, n.electionEpoch, n.peerEpoch);
                // Record Leader ballot and clear recvqueue set
                leaveInstance(endVote);
                return endVote;
              }
            }
            // If FOLLOWING, need to confirm the acknowledged Leader before joining the cluster
            // If LEADING, need to let all nodes know I am the Leader
            outofelection.put(n.sid, new Vote(n.version, n.leader,
                                              n.zxid, n.electionEpoch, n.peerEpoch, n.state));
            // Determine if voting has ended, and check if a Leader exists
            if (termPredicate(outofelection, new Vote(n.version, n.leader,
                                                      n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
              synchronized (this) {
                // Set new election epoch
                logicalclock.set(n.electionEpoch);
                // Set service state
                self.setPeerState((n.leader == self.getId()) ?
                                  ServerState.LEADING : learningState());
              }
              // Create Leader ballot
              Vote endVote = new Vote(n.leader, n.zxid,
                                      n.electionEpoch, n.peerEpoch);
              // Record Leader ballot and clear recvqueue set
              leaveInstance(endVote);
              return endVote;
            }
            break;
          default:
            LOG.warn("Notification state unrecognized: " + n.state
                     + " (n.state), " + n.sid + " (n.sid)");
            break;
        }
      }
      // Other cases
      else {
        // If the leader chosen by the notification object is not in the ballot service set
        if (!validVoter(n.leader)) {
          LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}",
                   n.leader, n.sid);
        }
        // If the sid of the notification object is not in the ballot service set
        if (!validVoter(n.sid)) {
          LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}",
                   n.leader, n.sid);
        }
      }
    }
    return null;
  } finally {
    try {
      // Unregister MBean
      if (self.jmxLeaderElectionBean != null) {
        MBeanRegistry.getInstance().unregister(
          self.jmxLeaderElectionBean);
      }
    } catch (Exception e) {
      LOG.warn("Failed to unregister with JMX", e);
    }
    self.jmxLeaderElectionBean = null;
    LOG.debug("Number of connection processing threads: {}",
              manager.getConnectionThreadCount());
  }
}

The core process is as follows:

  1. Core workflow.
  2. Create recvset voting box and outofelection voting box.
  3. Increment the election epoch (member variable logicalclock increments by 1).
  4. Update the ballot with the following information:
    1. leader: If the current service id (myid) is in the service id set of the ballot, choose the current service id; otherwise, choose the minimum value of Long.
    2. zxid: If the learnerType property of the service is PARTICIPANT, return the last completed zxid; otherwise, return the minimum value of Long.
    3. epoch: If the learnerType property of the service is PARTICIPANT, return the current election epoch (variable currentEpoch); otherwise, return the minimum value of Long.
  5. Broadcast the ballot using the sendNotifications method.
  6. Use a while loop to obtain the final ballot. The loop exit condition is when the service type is not LOOKING.
  7. Unregister MBean.

In step (5), first retrieve a notification to be processed from the recvqueue collection. Here, the Notification class is defined, which contains ballot information. After obtaining the notification to be processed, three different operations will be performed based on the notification object.

The first operation is executed when the notification object is null. The specific details are as follows:

  1. Check if the communication manager (QuorumCnxManager) has any unsent messages. If so, broadcast the ballot; otherwise, connect to the candidate services through the communication manager.
  2. Recalculate the timeout, which is twice the default timeout (default value is 200 milliseconds).

The second operation is executed when the sid of the notification object is in the ballot service set and the leader chosen by the notification object is in the ballot service set. It will be processed according to different service states.

  1. If the service state is LOOKING, perform the following operations:
    1. If the election epoch in the notification is greater than the local election epoch, set the local election epoch to the one in the notification, clear the voting box, and then use the totalOrderPredicate method to determine if the ballot needs to be updated. If needed, update the ballot with the data from the notification; otherwise, use the default ballot data. After updating, broadcast the new ballot.
    2. If the election epoch in the notification is less than the local election epoch, skip processing.
    3. If the election epoch in the notification equals the local election epoch, update the ballot with the data from the notification and broadcast the ballot.
    4. Construct data for the recvset voting box using the service id and data from the notification.
    5. Use the termPredicate method to determine if the current ballot has received majority approval. If so, check if there's still data in the recvqueue. If there is, get a notification from the recvqueue and use the totalOrderPredicate method to determine if a vote change is needed. If so, put the just-retrieved notification back into the recvqueue and return. This will trigger a re-election.
    6. If the notification object is null, check if the leader id in the current ballot is the same as the local id (service id, myid). If so, set the service state to LEADING; otherwise, use the learningState method to determine if it should be FOLLOWING or OBSERVING. After setting the service state, construct a return ballot (Leader ballot), clear the recvqueue collection, and return the return ballot.
  2. If the service state is OBSERVING, end the loop.
  3. If the service state is FOLLOWING or LEADING, perform the following operations:
    1. If the election epoch in the notification object equals the local election epoch, construct a ballot object and put it into the recvset voting box. After insertion, use the termPredicate method to determine if voting has ended. If so, further check the Leader using the checkLeader method. If all checks pass, set the service state. If the leader id in the notification is the same as the local id, set it to LEADING; otherwise, use the learningState method to determine if it should be FOLLOWING or OBSERVING. After setting the service state, construct a return ballot (Leader ballot), clear the recvqueue collection, and return the return ballot.
    2. If the election epoch in the notification object is not equal to the local election epoch, construct a ballot object and put it into the outofelection voting box. After insertion, use the termPredicate method to determine if voting has ended. If so, further check the Leader using the checkLeader method. If all checks pass, set the local election epoch to the one in the notification (electionEpoch). After setting, complete the service state setting. If the leader id in the notification is the same as the local id, set it to LEADING; otherwise, use the learningState method to determine if it should be FOLLOWING or OBSERVING. After setting the service state, construct a return ballot (Leader ballot), clear the recvqueue collection, and return the return ballot.

The third operation is executed when the above two conditions are not met. The specific details are as follows:

  1. If the leader chosen by the notification object is not in the ballot service set, log a warning.
  2. If the sid of the notification object is not in the ballot service set, log a warning.

Analysis of WorkerSender

In this section, we will analyze the WorkerSender class, which is responsible for sending messages. Since WorkerSender implements the ZooKeeperThread class, it is a thread object. The core method to analyze is run, and the specific processing code is as follows:

public void run() {
  while (!stop) {
    try {
      ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
      if (m == null) {
        continue;
      }
      process(m);
    } catch (InterruptedException e) {
      break;
    }
  }
  LOG.info("WorkerSender is down");
}

The core of the above code is an infinite loop. Within the loop, it retrieves data (data to be sent, of type ToSend) from the sendqueue collection. It then checks if the data is null. If it's null, it skips this processing and enters the next iteration; otherwise, it uses the process method to send the message. The specific processing code for the process method is as follows:

Analysis of WorkerReceiver

In this section, we will analyze the WorkerReceiver class, which is responsible for processing messages. Since the WorkerReceiver class implements the ZooKeeperThread class, it is a thread object. The core method to analyze is the run method. The specific processing code is as follows:

public void run() {

  // Response
  Message response;
  while (!stop) {
    try {
      // Get a message to process from the manager
      response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
      // If the message to be processed is empty, enter the next loop
      if (response == null) {
        continue;
      }

      // Get the message byte length
      final int capacity = response.buffer.capacity();

      // If message byte length is less than 28, enter the next loop
      if (capacity < 28) {
        LOG.error("Got a short response from server {}: {}", response.sid,
                  capacity);
        continue;
      }
      // Check if message byte length is 28
      boolean backCompatibility28 = (capacity == 28);

      // Check if message byte length is 40
      boolean backCompatibility40 = (capacity == 40);

      // Clear the bytes in the response
      response.buffer.clear();

      // Create notification object
      Notification n = new Notification();

      // Read state
      int rstate = response.buffer.getInt();
      // Read leader
      long rleader = response.buffer.getLong();
      // Read zxid
      long rzxid = response.buffer.getLong();
      // Read electionEpoch
      long relectionEpoch = response.buffer.getLong();
      //
      long rpeerepoch;

      // Version number
      int version = 0x0;
      QuorumVerifier rqv = null;

      try {

        // If message byte length is not 28
        if (!backCompatibility28) {
          // Read peerEpoch
          rpeerepoch = response.buffer.getLong();
          // If message length is not 40
          if (!backCompatibility40) {
            version = response.buffer.getInt();
          } else {
            LOG.info("Backward compatibility mode (36 bits), server id: {}",
                     response.sid);
          }
        } else {
          LOG.info("Backward compatibility mode (28 bits), server id: {}",
                   response.sid);
          // Convert zxid to peerEpoch
          rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
        }


        // If version is greater than 0x1
        if (version > 0x1) {
          // Read configuration length
          int configLength = response.buffer.getInt();
          // Configuration length is less than 0
          // Configuration length is greater than message byte length
          if (configLength < 0 || configLength > capacity) {
            // Throw exception
            throw new IOException(String.format(
              "Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
              response.sid, capacity, version, configLength));
          }

          // Read the configuration
          byte b[] = new byte[configLength];

          response.buffer.get(b);

          synchronized (self) {
            try {
              // Convert configuration to QuorumVerifier class
              rqv = self.configFromString(new String(b));
              // Get the current node's QuorumVerifier object
              QuorumVerifier curQV = self.getQuorumVerifier();
              // New QuorumVerifier version number is greater than the current QuorumVerifier version number
              if (rqv.getVersion() > curQV.getVersion()) {
                LOG.info("{} Received version: {} my version: {}",
                         self.getId(),
                         Long.toHexString(rqv.getVersion()),
                         Long.toHexString(
                           self.getQuorumVerifier().getVersion()));
                // If the current node's service state is LOOKING
                if (self.getPeerState() == ServerState.LOOKING) {
                  LOG.debug("Invoking processReconfig(), state: {}",
                            self.getServerState());
                  // Reprocess configuration
                  self.processReconfig(rqv, null, null, false);
                  // If the new QuorumVerifier object is different from the current QuorumVerifier, close the election interface
                  if (!rqv.equals(curQV)) {
                    LOG.info("restarting leader election");
                    self.shuttingDownLE = true;
                    self.getElectionAlg().shutdown();

                    break;
                  }
                } else {
                  LOG.debug("Skip processReconfig(), state: {}",
                            self.getServerState());
                }
              }
            } catch (IOException | ConfigException e) {
              LOG.error(
                "Something went wrong while processing config received from {}. "
                +
                "Continue to process the notification message without handling the configuration.",
                response.sid);
            }
          }
        } else {
          LOG.info(
            "Backward compatibility mode (before reconfig), server id: {}",
            response.sid);
        }
      } catch (BufferUnderflowException | IOException e) {
        LOG.warn(
          "Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
          response.sid, capacity, e);
        continue;
      }

   		// Confirm if sid is in the candidate servers
      if (!validVoter(response.sid)) {
        // Get vote
        Vote current = self.getCurrentVote();
        // Get QuorumVerifier
        QuorumVerifier qv = self.getQuorumVerifier();
        // Create send object
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                   current.getId(),
                                   current.getZxid(),
                                   logicalclock.get(),
                                   self.getPeerState(),
                                   response.sid,
                                   current.getPeerEpoch(),
                                   qv.toString().getBytes());
        // Add to the pending message collection
        sendqueue.offer(notmsg);
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Receive new notification message. My id = "
                    + self.getId());
        }
        // Convert the service state in the pending message from number to enumeration
        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
        switch (rstate) {
          case 0:
            ackstate = QuorumPeer.ServerState.LOOKING;
            break;
          case 1:
            ackstate = QuorumPeer.ServerState.FOLLOWING;
            break;
          case 2:
            ackstate = QuorumPeer.ServerState.LEADING;
            break;
          case 3:
            ackstate = QuorumPeer.ServerState.OBSERVING;
            break;
          default:
            continue;
        }

        // Fill notification object data
        n.leader = rleader;
        n.zxid = rzxid;
        n.electionEpoch = relectionEpoch;
        n.state = ackstate;
        n.sid = response.sid;
        n.peerEpoch = rpeerepoch;
        n.version = version;
        n.qv = rqv;

        // Output notification
        if (LOG.isInfoEnabled()) {
          printNotification(n);
        }

        // If the current node's service state is LOOKING
        if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
          // Add notification to recvqueue collection
          recvqueue.offer(n);

          // If the service state in the pending message is LOOKING, and the election epoch in the notification is less than the local election epoch
          if ((ackstate == QuorumPeer.ServerState.LOOKING)
              && (n.electionEpoch < logicalclock.get())) {
            // Construct vote from member variables proposedLeader, proposedZxid, and proposedEpoch
            Vote v = getVote();
            // Convert vote to ToSend object and add to sendqueue collection
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                       v.getId(),
                                       v.getZxid(),
                                       logicalclock.get(),
                                       self.getPeerState(),
                                       response.sid,
                                       v.getPeerEpoch(),
                                       qv.toString().getBytes());
            sendqueue.offer(notmsg);
          }
        }
        else {

          // Get vote
          Vote current = self.getCurrentVote();
          // If the service state in the pending message is LOOKING
          if (ackstate == QuorumPeer.ServerState.LOOKING) {
            if (LOG.isDebugEnabled()) {
              LOG.debug(
                "Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                self.getId(),
                response.sid,
                Long.toHexString(current.getZxid()),
                current.getId(),
                Long.toHexString(
                  self.getQuorumVerifier().getVersion()));
            }

            // Convert vote to ToSend object and add to sendqueue collection
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(
              ToSend.mType.notification,
              current.getId(),
              current.getZxid(),
              current.getElectionEpoch(),
              self.getPeerState(),
              response.sid,
              current.getPeerEpoch(),
              qv.toString().getBytes());
            sendqueue.offer(notmsg);
          }
        }
      }
    } catch (InterruptedException e) {
      LOG.warn("Interrupted Exception while waiting for new message" +
               e.toString());
    }
  }
  LOG.info("WorkerReceiver is down");
}

The code above essentially implements a loop that processes messages, with the message source being QuorumCnxManager#recvQueue. The single-layer loop process is as follows:

  1. Retrieve a message object to be processed from QuorumCnxManager#recvQueue.
  2. If the message object is empty, proceed to the next iteration.
  3. If the message byte length is less than 28, proceed to the next iteration.
  4. Extract rstate, rleader, rzxid, electionEpoch, peerEpoch, and version from the message object. Note that the version attribute was added in version 3.4.6, with a default value of 0x0.
  5. If the version data value is greater than 0x1, perform the following operations:
    1. Get the configuration length from the message object. If the configuration length is greater than 0 or greater than the message byte length, throw an exception.
    2. Read the configuration from the message object and convert it to a QuorumVerifier class.
    3. Compare the version number of the converted QuorumVerifier with the currently held QuorumVerifier. If the former is smaller, no action is needed; otherwise, perform the following: 1. 2. If the current node's service state is LOOKING, reprocess the configuration using the processReconfig method. 3. Compare the converted QuorumVerifier with the currently held QuorumVerifier. If they are different, close the election interface (note that this closure will be restarted in the QuorumPeer#run method).
  6. Confirm whether the sid in the response object is in the set of candidate servers. If not, convert the current vote to a ToSend object and place it in the sendqueue collection. Otherwise, perform the following operations:
    1. Set the data extracted in step (4) to the notification object (Notification).
    2. If the current node's service state is LOOKING, perform the following:
      1. Place the notification in the recvqueue collection.
      2. If the service state in the pending message is LOOKING, and the election epoch in the notification is less than the local election epoch, construct a vote from the member variables proposedLeader, proposedZxid, and proposedEpoch, convert it to a ToSend object, and place it in the sendqueue collection.
    3. If the current node's service state is not LOOKING, perform the following:
      1. Get the local vote.
      2. If the service state in the pending message is LOOKING, convert the local vote to a ToSend object and place it in the sendqueue collection.

Analysis of QuorumCnxManager

Through the analysis of FastLeaderElection, WorkerSender, and WorkerReceiver classes, we can see that they all use QuorumCnxManager to varying degrees. The core value of this class is to manage communication. In the WorkerSender#process method, we can see the use of manager.toSend method to send messages. In the WorkerReceiver#run method, we can see the use of manager.pollRecvQueue method to receive messages. Additionally, in the FastLeaderElection#lookForLeader method, we can see the use of manager.connectAll to establish connections. Next, we will analyze the QuorumCnxManager class. Before we begin the method analysis, we need to understand the member variables. The specific information is shown in the table below.

Variable NameVariable TypeVariable Description
recvQueueArrayBlockingQueue<Message>Received messages
listenerListenerListener
selfQuorumPeerCandidate node, specifically referring to the node itself
mySidlongService ID, the myid property in the zoo.cfg configuration file
socketTimeoutintSocket timeout
viewMap<Long, QuorumPeer.QuorumServer>Used to store the relationship between service ID and service object
listenOnAllIPsbooleanWhether to listen on all IPs
senderWorkerMapConcurrentHashMap<Long, SendWorker>Mapping of service ID to send thread
queueSendMapConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>Mapping of service ID to send queue
lastMessageSentConcurrentHashMap<Long, ByteBuffer>Mapping of service ID to last message sent
inprogressConnectionsSet<Long>Set of service IDs currently being connected
recvQLockObjectLock needed when processing recvQueue object
recvQueueArrayBlockingQueue<Message>Received messages
tcpKeepAlivebooleanWhether to keep TCP connection alive, data read from environment variable zookeeper.tcpKeepAlive
shutdownbooleanWhether in shutdown state

In the QuorumCnxManager class, the related operations are mainly divided into two categories: Socket operations and message processing operations. In the following sections of this chapter, we will analyze these method operations in detail.

Analysis of the toSend Method

In this section, we will analyze the toSend method. We previously saw its invocation in the WorkerSender#process method, which indicates that a message needs to be sent. The specific processing code is as follows:

public void toSend(Long sid, ByteBuffer b) {
  if (this.mySid == sid) {
    b.position(0);
    addToRecvQueue(new Message(b.duplicate(), sid));
  } else {
    ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
      SEND_CAPACITY);
    ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
    if (oldq != null) {
      addToSendQueue(oldq, b);
    } else {
      addToSendQueue(bq, b);
    }
    connectOne(sid);

  }
}

In the code above, the core processing flow is as follows: It checks if the parameter sid is the same as the member variable mySid. If they are the same, it means this message is sent to the current node, so no network communication is needed. It directly calls the addToRecvQueue method to store the information. Otherwise, it needs to retrieve the corresponding message list from the queueSendMap collection based on sid, add it to the list, and establish a connection through connectOne.

Next, we'll analyze the addToRecvQueue method, which is mainly used to put messages into the received message collection (member variable recvQueue). The specific processing code is as follows:

public void addToRecvQueue(Message msg) {
  synchronized (recvQLock) {
    if (recvQueue.remainingCapacity() == 0) {
      try {
        recvQueue.remove();
      } catch (NoSuchElementException ne) {
        LOG.debug("Trying to remove from an empty " +
                  "recvQueue. Ignoring exception " + ne);
      }
    }
    try {
      recvQueue.add(msg);
    } catch (IllegalStateException ie) {
      LOG.error("Unable to insert element in the recvQueue " + ie);
    }
  }
}

In the code above, it first executes the remainingCapacity method to check if the remaining capacity is 0. If so, it needs to execute the remove method. Then it uses the add method to add the message to the collection.

Next, we'll analyze the connectOne method. Since there are multiple implementations of connectOne, we'll continue with the method from toSend as the entry point. The specific processing code is as follows:

synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr) {
    if (senderWorkerMap.get(sid) != null) {
        LOG.debug("There is a connection already for server " + sid);
        return true;
    }
    return initiateConnectionAsync(electionAddr, sid);
}

In the code above, it first checks if the SendWorker data corresponding to sid exists in the senderWorkerMap container. If it doesn't exist, it returns true indicating a successful connection. Otherwise, it calls the initiateConnectionAsync method to initialize the connection. The specific processing code is as follows:

public boolean initiateConnectionAsync(final InetSocketAddress electionAddr, final Long sid) {
  // Add sid to the inprogressConnections set, if addition fails, it means it's already connected
  if (!inprogressConnections.add(sid)) {
    LOG.debug(
      "Connection request to server id: {} is already in progress, so skipping this request",
      sid);
    return true;
  }
  try {
    // Create a connection thread to establish connection
    connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
    connectionThreadCnt.incrementAndGet();
  } catch (Throwable e) {
    // If an exception occurs, remove sid from the inprogressConnections set
    inprogressConnections.remove(sid);
    LOG.error("Exception while submitting quorum connection request", e);
    return false;
  }
  return true;
}

The core processing flow in the code above is as follows:

  1. Add sid to the inprogressConnections set. If the addition fails, it means it's already connected.
  2. Create a QuorumConnectionReqThread class and hand it over to the thread executor for connection. If an exception occurs during the connection establishment process, sid needs to be removed from the inprogressConnections set.

In the above processing flow, the inprogressConnections set is mentioned, which is used to store the service ids that are currently being initialized for connection. The code for thread execution logic in the QuorumConnectionReqThread class is as follows:

@Override
public void run() {
  try {
    initiateConnection(electionAddr, sid);
  } finally {
    inprogressConnections.remove(sid);
  }
}

In the code above, we can see that it first performs the initiateConnection method operation to initialize the connection, then removes sid from the inprogressConnections set. The initiateConnection method will primarily accomplish the following:

  1. Socket connection initialization.
  2. SendWorker object initialization, which needs to be added to the senderWorkerMap container after initialization.
  3. RecvWorker object initialization.
  4. Start SendWorker and RecvWorker.

Through the toSend method, we can see that data will be loaded into the recvQueue collection. Finally, we need to understand how to retrieve data from recvQueue. In the QuorumCnxManager class, this can be done through the pollRecvQueue method. The specific processing code is as follows:

public Message pollRecvQueue(long timeout, TimeUnit unit)
  throws InterruptedException {
  return recvQueue.poll(timeout, unit);
}

SendWorker Analysis

In this section, we'll analyze the SendWorker class, which extends the ZooKeeperThread class. As it's a thread object, we'll focus on analyzing the run method. The specific processing code is as follows:

@Override
public void run() {
  threadCnt.incrementAndGet();
  try {
    // Get the message collection for sid from the collection of messages to be sent
    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
    // If the message collection is not empty
    if (bq == null || isSendQueueEmpty(bq)) {
      // Retrieve the message from lastMessageSent and send it if it's not null
      ByteBuffer b = lastMessageSent.get(sid);
      if (b != null) {
        LOG.debug("Attempting to send lastMessage to sid=" + sid);
        send(b);
      }
    }
  } catch (IOException e) {
    LOG.error("Failed to send last message. Shutting down thread.", e);
    // End processing
    this.finish();
  }
  LOG.debug("SendWorker thread started towards {}. myId: {}", sid,
            QuorumCnxManager.this.mySid);
  try {
    // Loop
    while (running && !shutdown && sock != null) {

      ByteBuffer b = null;
      try {
        // Get the message collection from the queueSendMap container
        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
          .get(sid);
        if (bq != null) {
          // Get a message from the message collection
          b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
        }
        else {
          LOG.error("No queue of incoming messages for " +
                    "server " + sid);
          break;
        }

        // If the message is not null
        if (b != null) {
          // Put it into the lastMessageSent collection
          lastMessageSent.put(sid, b);
          // Send
          send(b);
        }
      } catch (InterruptedException e) {
        LOG.warn("Interrupted while waiting for message on queue",
                 e);
      }
    }
  } catch (Exception e) {
    LOG.warn("Exception when using channel: for id " + sid
             + " my id = " + QuorumCnxManager.this.mySid
             + " error = " + e);
  }
  // End processing
  this.finish();
  LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}

The core processing flow in the above code is as follows:

  1. Retrieve the message collection for sid from the collection of messages to be sent (queueSendMap). If the message collection is empty, get the message corresponding to sid from the lastMessageSent container, and if the message is not null, send it using the send method.
  2. Process messages through a loop, with the following single-loop logic:
    1. Get the message collection from the queueSendMap container, and if it's not empty, retrieve a message from it.
    2. If the message object is not null, put it into the lastMessageSent collection and send it using the send method.
  3. Execute the finish method to end processing.

RecvWorker Analysis

In this section, we'll analyze the RecvWorker class, which extends the ZooKeeperThread class. As it's a thread object, we'll focus on analyzing the run method. The specific processing code is as follows:

@Override
public void run() {
  threadCnt.incrementAndGet();
  try {
    LOG.debug("RecvWorker thread towards {} started. myId: {}", sid,
              QuorumCnxManager.this.mySid);
    while (running && !shutdown && sock != null) {
      // Get data length from the data input stream
      int length = din.readInt();
      // If data length is less than 0 or exceeds the maximum limit
      if (length <= 0 || length > PACKETMAXSIZE) {
        throw new IOException(
          "Received packet with invalid packet: "
          + length);
      }
      // Convert data input stream to message object
      byte[] msgArray = new byte[length];
      din.readFully(msgArray, 0, length);
      ByteBuffer message = ByteBuffer.wrap(msgArray);
      // Add the message to the recvQueue collection
      addToRecvQueue(new Message(message.duplicate(), sid));
    }
  } catch (Exception e) {
    LOG.warn("Connection broken for id " + sid + ", my id = "
             + QuorumCnxManager.this.mySid + ", error = ", e);
  } finally {
    LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid,
             QuorumCnxManager.this.mySid);
    sw.finish();
    closeSocket(sock);
  }
}

The core processing flow in the above code is as follows:

  1. Get the data length from the data input stream (DataInputStream). If the data length is less than 0 or exceeds the maximum limit, an exception will be thrown.
  2. Convert the data input stream to a message object and put it into the recvQueue collection.

Listener Analysis

This section will analyze the RecvWorker class, which extends the ZooKeeperThread class. As it is a thread object, we will primarily focus on the core parts of the run method. The run method mainly handles three aspects:

  1. Completing communication binding with a specific service.
  2. Processing messages in asynchronous or non-asynchronous mode.
  3. Performing closing operations when the closing flag is true.

Among these three operations, we will mainly explain the second part. The entry point code is as follows:

if (quorumSaslAuthEnabled) {
  // Asynchronous
  receiveConnectionAsync(client);
} else {
  // Non-asynchronous
  receiveConnection(client);
}

In asynchronous processing, the QuorumConnectionReceiverThread class is used to execute related operations. The core processing flow in the QuorumConnectionReceiverThread class is still the receiveConnection method. The code for the receiveConnection method is as follows:

public void receiveConnection(final Socket sock) {
  DataInputStream din = null;
  try {
    // Convert socket data to data input stream
    din = new DataInputStream(
      new BufferedInputStream(sock.getInputStream()));

    LOG.debug("Sync handling of connection request received from: {}",
              sock.getRemoteSocketAddress());
    // Handle connection
    handleConnection(sock, din);
  } catch (IOException e) {
    LOG.error("Exception handling connection, addr: {}, closing server connection",
              sock.getRemoteSocketAddress());
    LOG.debug("Exception details: ", e);
    closeSocket(sock);
  }
}

In the above method, the input stream from the socket is first converted to a DataInputStream object, then passed to the handleConnection method for processing. The code for the handleConnection method is as follows:

private void handleConnection(Socket sock, DataInputStream din)
  throws IOException {

  Long sid = null, protocolVersion = null;
  InetSocketAddress electionAddr = null;

  try {
    // Read protocol version from data input stream
    protocolVersion = din.readLong();
    // If protocol version is greater than zero, set it to sid variable
    if (protocolVersion >= 0) { 
      sid = protocolVersion;
    } else {
      try {
        // Convert data input stream to InitialMessage object
        InitialMessage init = InitialMessage.parse(protocolVersion, din);
        // sid
        sid = init.sid;
        // Election address
        electionAddr = init.electionAddr;
      } catch (InitialMessage.InitialMessageException ex) {
        LOG.error("Initial message parsing error!", ex);
        closeSocket(sock);
        return;
      }
    }

    if (sid == QuorumPeer.OBSERVER_ID) {
      sid = observerCounter.getAndDecrement();
      LOG.info("Setting arbitrary identifier to observer: " + sid);
    }
  } catch (IOException e) {
    LOG.warn("Exception reading or writing challenge: {}", e);
    closeSocket(sock);
    return;
  }

  authServer.authenticate(sock, din);
  // sid from data input stream is less than local sid
  if (sid < self.getId()) {
    SendWorker sw = senderWorkerMap.get(sid);
    if (sw != null) {
      sw.finish();
    }
    LOG.debug("Create new connection to server: {}", sid);
    closeSocket(sock);

    if (electionAddr != null) {
      connectOne(sid, electionAddr);
    } else {
      connectOne(sid);
    }
  }
  // sid from data input stream equals local sid
  else if (sid == self.getId()) {
    LOG.warn("We got a connection request from a server with our own ID. "
             + "This should be either a configuration error, or a bug.");
  }
  // sid from data input stream is greater than local sid
  else {
    SendWorker sw = new SendWorker(sock, sid);
    RecvWorker rw = new RecvWorker(sock, din, sid, sw);
    sw.setRecv(rw);

    // Get SendWorker from senderWorkerMap container and finish it
    SendWorker vsw = senderWorkerMap.get(sid);

    if (vsw != null) {
      vsw.finish();
    }

    // Set new sendWork
    senderWorkerMap.put(sid, sw);

    queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

    // Send and receive data processing
    sw.start();
    rw.start();
  }
}

The core processing flow in the above code is as follows:

  1. Retrieve sid, protocolVersion, and electionAddr data from the input stream.
  2. If the retrieved sid is less than the current node's sid, perform the following operations:
    1. Get the SendWorker object corresponding to sid from the senderWorkerMap container. If the SendWorker object is not null, close the resources in the SendWorker object.
    2. Close the socket.
    3. If electionAddr data is not null, establish a connection using the connectOne method; otherwise, connect directly using the sid parameter.
  3. If the retrieved sid equals the current node's sid, do nothing.
  4. If the retrieved sid is greater than the current node's sid, perform the following operations:
    1. Create SendWorker and RecvWorker objects.
    2. Get the SendWorker object corresponding to sid from the senderWorkerMap container. If the SendWorker object is not null, close the resources in the SendWorker object.
    3. Add the newly created SendWorker to the senderWorkerMap container.
    4. Create a new message collection corresponding to sid and put it into the queueSendMap container.
    5. Start the SendWorker and RecvWorker objects.

Summary

This chapter analyzes the implementation process of the election in the ZooKeeper project, focusing on two aspects. The first aspect is the entry point of the election algorithm, and the second aspect is the implementation of the election algorithm. The implementation of the election algorithm involves multiple classes, with emphasis on analyzing the QuorumPeer class, FastLeaderElection class, and QuorumCnxManager class. Through this chapter's analysis, we can obtain the complete execution chain of the election, as shown in the diagram below: