zookeeper source code analysis

Analysis of ZookeeperServer Class

This chapter will analyze the ZookeeperServer class, including its member variables, methods, and subclasses.

ZookeeperServer Analysis

This section will analyze the ZookeeperServer class. In Zookeeper, this class has multiple implementations, as shown in the class diagram below.

ZookeeperServer Class Hierarchy

In the Zookeeper project, the following four classes are specifically used:

  1. LeaderZooKeeperServer - leader service
  2. FollowerZooKeeperServer - follower service
  3. ObserverZooKeeperServer - observer service
  4. ReadOnlyZooKeeperServer - read-only service

Let's introduce the member variables of the ZookeeperServer class itself. Understanding these variables will greatly help in the subsequent analysis of subclasses. The member variable information is shown in the table below.

Variable NameVariable TypeDescription
outstandingChangesDeque<ChangeRecord>Collection of incomplete change records
outstandingChangesForPathHashMap<String, ChangeRecord>Incomplete change records, key: path, value: change record
hzxidAtomicLongzxid
requestsInProcessAtomicIntegerNumber of requests in process
serverStatsServerStatsServer statistics
listenerZooKeeperServerListenerZookeeper service listener
jmxServerBeanZooKeeperServerBeanJMX bean for storing zk service object
jmxDataTreeBeanDataTreeBeanJMX bean for storing data tree object
minSessionTimeoutintMinimum session timeout
maxSessionTimeoutintMaximum session timeout
sessionTrackerSessionTrackerSession tracker
firstProcessorRequestProcessorRequest processor
stateStateService state
reconfigEnabledbooleanWhether reconfiguration is enabled
serverCnxnFactoryServerCnxnFactoryServer connection factory
secureServerCnxnFactoryServerCnxnFactorySecure server connection factory
txnLogFactoryFileTxnSnapLogTransaction log and snapshot log
zkDbZKDatabaseZookeeper database
zkShutdownHandlerZooKeeperServerShutdownHandlerZookeeper server shutdown handler
createSessionTrackerServerIdintSession tracker ID

Next, we'll provide a basic overview of the methods' purposes. The methods are as follows:

  1. getSnapCount: Gets the snapshot count from the environment variable zookeeper.snapCount. This method is used in the SyncRequestProcessor.
  2. shouldAllowSaslFailedClientsConnect: Determines if SASL authentication failed clients are allowed to connect, based on the environment variable zookeeper.allowSaslFailedClients.
  3. shouldRequireClientSaslAuth: Checks if client SASL authentication is required, based on the environment variable zookeeper.sessionRequireClientSASLAuth.
  4. loadData: Loads the Zookeeper database.
  5. takeSnapshot: Creates a snapshot.
  6. touch: Checks if the service connection has expired.
  7. registerJMX: Registers JMX related information.
  8. unregisterJMX: Unregisters JMX related information.
  9. getNumAliveConnections: Gets the number of active connections, combining counts from serverCnxnFactory and secureServerCnxnFactory.
  10. startdata: Starts the database by initializing the zk database object and loading data (executes loadData method).
  11. startup: Starts all required classes.
  12. setupRequestProcessors: Creates and starts request processors. Subclass implementations may change the request processing flow.
  13. createSessionTracker: Creates a session tracker.
  14. startSessionTracker: Starts the session tracker.
  15. canShutdown: Checks if shutdown is possible.
  16. isRunning: Checks if the server is running.
  17. shutdown: Shuts down the Zookeeper service.
  18. createSession: Creates a session ID.
  19. setOwner: Sets the owner of a session ID.
  20. revalidateSession: Validates a session.
  21. reopenSession: Reopens a session.
  22. finishSessionInit: Completes session creation.
  23. closeSession: Closes a session.
  24. getServerId: Gets the server ID.
  25. setLocalSessionFlag: Sets the local session flag.
  26. submitRequest: Submits a request.
  27. getGlobalOutstandingLimit: Gets the maximum request limit from the environment variable zookeeper.globalOutstandingLimit.
  28. getState: Gets the service type (standalone, follower, leader, observer, or read-only).
  29. processConnectRequest: Processes connection requests.
  30. shouldThrottle: Determines if throttling is needed.
  31. processPacket: Processes data packets.
  32. hasCnxSASLAuthenticated: Checks if the service connection ID uses SASL authentication mode.
  33. processSasl: Processes SASL requests.
  34. processTxn: Processes transaction requests.

Some of these methods have been analyzed in previous chapters, so we won't cover them again. Additionally, some simple methods with uncomplicated logic won't be discussed. After excluding these, we'll focus on analyzing the processTxn method. Here's the specific processing code:

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
                                    Record txn) {
  // Transaction processing result
  ProcessTxnResult rc;
  // Get operation code
  int opCode = request != null ? request.type : hdr.getType();
  // Get session ID
  long sessionId = request != null ? request.sessionId : hdr.getClientId();
  // If transaction header is not empty, let the database process the transaction
  if (hdr != null) {
    rc = getZKDatabase().processTxn(hdr, txn);
  } else {
    // Create transaction processing result object
    rc = new ProcessTxnResult();
  }
  // If operation type is create session
  if (opCode == OpCode.createSession) {
    // If transaction header is not empty
    // and txn parameter is CreateSessionTxn
    if (hdr != null && txn instanceof CreateSessionTxn) {
      // Add global session
      CreateSessionTxn cst = (CreateSessionTxn) txn;
      sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
    }
    // If request object is not empty
    // Local session
    else if (request != null && request.isLocalSession()) {
      // Add session
      request.request.rewind();
      int timeout = request.request.getInt();
      request.request.rewind();
      sessionTracker.addSession(request.sessionId, timeout);
    }
    // Warning log
    else {
      LOG.warn("*****>>>>> Got "
               + txn.getClass() + " "
               + txn.toString());
    }
  }
  // If operation type is close session
  else if (opCode == OpCode.closeSession) {
    // Remove session ID
    sessionTracker.removeSession(sessionId);
  }
  return rc;
}

The core processing flow in the above code is as follows:

  1. Get the operation code
  2. Get the session ID
  3. If the transaction header is not empty, let the database process the transaction; otherwise, create a transaction processing result object. Note that the created result object has no data set at this point.
  4. If the operation type is create session, perform two different operations:
    1. If the transaction header is not empty and the txn parameter is of type CreateSessionTxn, add the data to the global session collection.
    2. If the request object is not empty and the request's isLocalSession is true (indicating a local session), add the data to the regular session collection.
  5. If the operation type is close session, remove the session ID related data.
  6. Return the transaction processing result object.

Next, let's analyze the startup method. Here's the specific processing code:

public synchronized void startup() {
    // Create session tracker if it's null
    if (sessionTracker == null) {
        createSessionTracker();
    }
    // Start the session tracker
    startSessionTracker();
    // Initialize and start request processors
    setupRequestProcessors();

    // Register JMX
    registerJMX();

    // Set server state
    setState(State.RUNNING);
    // Notify all waiting threads
    notifyAll();
}

The process flow in the above code is as follows:

  1. Check if the sessionTracker member variable is null; if so, initialize it.
  2. Start the sessionTracker.
  3. Initialize and start request processors. The created processor is PrepRequestProcessor, with a processor chain of PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor.
  4. Register JMX data.
  5. Set the service state to running.

Next, we'll analyze the subclasses. For ZooKeeperServer implementations in the ZooKeeper project, the most important method is setupRequestProcessors, which determines the request processing chain. Therefore, the analysis of subclasses will focus on the setupRequestProcessors method, emphasizing the request processor chain.

LeaderZooKeeperServer Analysis

This section analyzes the LeaderZooKeeperServer class, focusing on the setupRequestProcessors method. The specific implementation code is as follows:

@Override
protected void setupRequestProcessors() {
  // Final processor
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  // To be applied processor
  RequestProcessor toBeAppliedProcessor =
    new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
  // Commit processor
  commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                                        Long.toString(getServerId()), false,
                                        getZooKeeperServerListener());
  commitProcessor.start();
  // Proposal processor
  ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                                                                            commitProcessor);
  // Initialize proposal processor
  proposalProcessor.initialize();
  // Prep processor
  prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
  // Start prep processor
  prepRequestProcessor.start();
  // Leader processor
  firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
  // Setup container manager
  setupContainerManager();
}

The core of this code is initializing the firstProcessor member variable. The specific type of firstProcessor is LeaderRequestProcessor, with a processor chain of PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor.

FollowerZooKeeperServer Analysis

This section analyzes the FollowerZooKeeperServer class, focusing on the setupRequestProcessors method. The specific implementation code is as follows:

@Override
protected void setupRequestProcessors() {
  // Final processor
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  // Commit processor
  commitProcessor = new CommitProcessor(finalProcessor,
                                        Long.toString(getServerId()), true, getZooKeeperServerListener());
  commitProcessor.start();
  // Follower processor
  firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
  ((FollowerRequestProcessor) firstProcessor).start();
  // Sync request processor
  syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor((Learner)     getFollower()));
  syncProcessor.start();
}

The core of this code is initializing the firstProcessor member variable. The specific type of firstProcessor is FollowerRequestProcessor, with a processor chain of FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor. Additionally, it creates and starts a sync request processor (SyncRequestProcessor).

ObserverZooKeeperServer Analysis

This section analyzes the ObserverZooKeeperServer class, focusing on the setupRequestProcessors method. The specific implementation code is as follows:

@Override
protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  commitProcessor = new CommitProcessor(finalProcessor,
                                        Long.toString(getServerId()), true,
                                        getZooKeeperServerListener());
  commitProcessor.start();
  firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
  ((ObserverRequestProcessor) firstProcessor).start();

  if (syncRequestProcessorEnabled) {
    syncProcessor = new SyncRequestProcessor(this, null);
    syncProcessor.start();
  }
}

The core of this code is initializing the firstProcessor member variable. The specific type of firstProcessor is ObserverRequestProcessor, with a processor chain of ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor. Additionally, it checks whether to create a sync request processor; if needed, it creates and starts a SyncRequestProcessor.

ReadOnlyZooKeeperServer Analysis

This section analyzes the ReadOnlyZooKeeperServer class, focusing on the setupRequestProcessors method. The specific implementation code is as follows:

@Override
protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
  ((PrepRequestProcessor) prepProcessor).start();
  firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
  ((ReadOnlyRequestProcessor) firstProcessor).start();
}

The core of this code is initializing the firstProcessor member variable. The specific type of firstProcessor is ReadOnlyRequestProcessor, with a processor chain of ReadOnlyRequestProcessor -> PrepRequestProcessor -> FinalRequestProcessor.

Summary

This chapter analyzes the ZooKeeperServer class in the ZooKeeper project, focusing on the member variables of the ZooKeeperServer class and providing a detailed explanation of the setupRequestProcessors method. The analysis of subclasses also emphasizes the setupRequestProcessors method, highlighting the request processor chains.