zookeeper source code analysis

ZooKeeper Server-Side Request Processing Analysis

This chapter analyzes the ZooKeeper server-side request processing flow. In the ZooKeeper project, the core interface for request processing is RequestProcessor, and this chapter will focus on analyzing this interface.

Request Processing Entry Point

This section examines the request processing entry point in the ZooKeeper project. In ZooKeeper, the entry point for request processing is the NIOServerCnxn#readPayload method. This method handles two types of requests: connection establishment requests (processed by readConnectRequest) and operation requests (processed by readRequest). This chapter primarily focuses on data operation-related requests and does not analyze connection establishment requests. The detailed code for the readRequest method is as follows:

private void readRequest() throws IOException {
  zkServer.processPacket(this, incomingBuffer);
}

The above code calls the ZooKeeperServer#processPacket method to complete the request reading operation. The ZooKeeperServer#processPacket method can be divided into three main categories of packet processing:

  1. Requests with authentication header type.
  2. Requests with SASL authentication header type.
  3. Other header types.

Let's first analyze the processing of authentication requests. The specific processing code is as follows:

if (h.getType() == OpCode.auth) {
  LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
  // Create authentication packet
  AuthPacket authPacket = new AuthPacket();
  // Convert byte buffer data to authentication packet object
  ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
  // Get authentication scheme
  String scheme = authPacket.getScheme();
  // Get authentication provider based on the scheme
  AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
  // Set initial authentication status code to AUTHFAILED
  Code authReturn = KeeperException.Code.AUTHFAILED;
  if (ap != null) {
    try {
      // Get status code from authentication provider
      authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
    } catch (RuntimeException e) {
      LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme
               + " due to " + e);
      // Set authentication status code to AUTHFAILED
      authReturn = KeeperException.Code.AUTHFAILED;
    }
  }
  // If authentication status code is OK, send response
  if (authReturn == KeeperException.Code.OK) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Authentication succeeded for scheme: " + scheme);
    }
    LOG.info("auth success " + cnxn.getRemoteSocketAddress());
    ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
    cnxn.sendResponse(rh, null, null);
  }
  // If authentication status code is not OK
  else {
    if (ap == null) {
      LOG.warn("No authentication provider for scheme: " + scheme + " has "
               + ProviderRegistry.listProviders());
    } else {
      LOG.warn("Authentication failed for scheme: " + scheme);
    }
    // Assemble and send response information
    ReplyHeader rh =
      new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
    cnxn.sendResponse(rh, null, null);
    // Send close connection
    cnxn.sendBuffer(ServerCnxnFactory.closeConn);
    // Stop receiving data
    cnxn.disableRecv();
  }
  // End processing
  return;
}

The code first checks if the request header type is OpCode.auth. If so, it performs the following steps:

  1. Create an authentication packet (AuthPacket object).
  2. Convert byte buffer data to an authentication packet object using ZooKeeper's jute technology.
  3. Get the authentication scheme from the authentication packet.
  4. Get the authentication provider based on the scheme from step 3.
  5. Create an authentication status code, initially set to AUTHFAILED.
  6. If the authentication provider from step 4 is not null, get the authentication status code from the provider. If a runtime exception occurs during verification, set the status code to AUTHFAILED.
  7. If the authentication status code from step 6 is OK, send an authentication success response. Otherwise, send an authentication failure response, close the connection, and stop receiving data.

Next, let's analyze the processing of SASL authentication requests. The specific processing code is as follows:

private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn,
                         RequestHeader requestHeader) throws IOException {
  LOG.debug("Responding to client SASL token.");
  // Create SASL request object
  GetSASLRequest clientTokenRecord = new GetSASLRequest();
  // Convert byte buffer data to SASL request object
  ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
  // Get client token from SASL request object
  byte[] clientToken = clientTokenRecord.getToken();
  LOG.debug("Size of client SASL token: " + clientToken.length);
  // Response token
  byte[] responseToken = null;
  try {
    // Get SASL server from service connection object
    ZooKeeperSaslServer saslServer = cnxn.zooKeeperSaslServer;
    try {
      // Create response token through SASL server
      responseToken = saslServer.evaluateResponse(clientToken);
      // Check if SASL server processing is complete
      if (saslServer.isComplete()) {
        // Get authorization ID from SASL server
        String authorizationID = saslServer.getAuthorizationID();
        LOG.info("adding SASL authorization for authorizationID: " + authorizationID);
        // Add security information to service connection object
        cnxn.addAuthInfo(new Id("sasl", authorizationID));
        // If zookeeper.superUser property is not empty and authorization ID matches, add super user security information
        if (System.getProperty("zookeeper.superUser") != null &&
            authorizationID.equals(System.getProperty("zookeeper.superUser"))) {
          cnxn.addAuthInfo(new Id("super", ""));
        }
      }
    }
    // Exception handling
    catch (SaslException e) {
      LOG.warn("Client {} failed to SASL authenticate: {}",
               cnxn.getRemoteSocketAddress(), e);
      // Allow SASL authentication failure, client doesn't require SASL verification, output warning log
      if (shouldAllowSaslFailedClientsConnect() && !shouldRequireClientSaslAuth()) {
        LOG.warn("Maintaining client connection despite SASL authentication failure.");
      } else {
        // Confirm error value
        int error;
        // SASL verification required
        if (shouldRequireClientSaslAuth()) {
          LOG.warn(
            "Closing client connection due to server requires client SASL authentication,"
            +
            "but client SASL authentication has failed, or client is not configured with SASL "
            +
            "authentication.");
          // Session closed, SASL authentication failed
          error = Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue();
        } else {
          LOG.warn("Closing client connection due to SASL authentication failure.");
          // Authentication failed
          error = Code.AUTHFAILED.intValue();
        }

        // Assemble response
        ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, error);
        // Send response
        cnxn.sendResponse(replyHeader, new SetSASLResponse(null), "response");
        // Send close session message
        cnxn.sendCloseSession();
        // Stop receiving data
        cnxn.disableRecv();
        return;
      }
    }
  } catch (NullPointerException e) {
    LOG.error(
      "cnxn.saslServer is null: cnxn object did not initialize its saslServer properly.");
  }
  if (responseToken != null) {
    LOG.debug("Size of server SASL response: " + responseToken.length);
  }

  // SASL verification passed, send success message
  ReplyHeader replyHeader = new ReplyHeader(requestHeader.getXid(), 0, Code.OK.intValue());
  Record record = new SetSASLResponse(responseToken);
  cnxn.sendResponse(replyHeader, record, "response");
}

The core processing flow in the above code is as follows:

  1. Create a SASL request object.
  2. Convert the data in the byte buffer to a SASL request object. The underlying technology is related to jute in Zookeeper.
  3. Obtain the client token from the SASL request object, which will be used to generate the response token later.
  4. Get the SASL service from the service connection object and create a response token through the SASL service. After creating the response token, it will check if the SASL service has completed processing. If completed, the following operations will be performed:
    1. Get the authorization ID through the SASL service.
    2. Add the authorization ID to the security information. The data is stored using an ID object, the ID scheme is sasl, and the value is the authorization ID.
    3. If the zookeeper.superUser property in the system environment is not empty and the authorization ID obtained in 4.1 is the same as the property value in zookeeper.superUser, a new security information will be added. The data is stored in an ID object, the ID scheme is super, and the value is an empty string.

It should be noted that exceptions may occur in steps (3) and (4). The specific exception occurs in the saslServer.evaluateResponse method of step (3). If an exception occurs, the following operations will be performed:

  1. If SASL authentication failure is allowed and the client does not require SASL authentication, a warning log will be output.
  2. If the conditions in step (1) are not met, the following operations will be performed:
    1. Confirm the exception value. Confirmation rule: If SASL authentication is required, SESSIONCLOSEDREQUIRESASLAUTH will be used as the exception value; otherwise, AUTHFAILED will be used.
    2. Assemble the response, send the response, send a close session message, and execute the command to stop receiving messages.

If SASL authentication passes and no exception occurs, information about passing SASL authentication will be sent.

Finally, analyze other request header types. The specific processing code is as follows:

else {
  // SASL authentication is required and has not passed SASL authentication
  if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
    // Assemble response information and send it
    ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0,
                                              Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
    cnxn.sendResponse(replyHeader, null, "response");
    // Send session close information
    cnxn.sendCloseSession();
    // No longer receive data
    cnxn.disableRecv();
  }
  // SASL authentication is not required or SASL authentication has passed
  else {
    Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(),
                             incomingBuffer, cnxn.getAuthInfo());
    si.setOwner(ServerCnxn.me);
    // Confirm if it's a local session, if so, set isLocalSession to true
    setLocalSessionFlag(si);
    // Submit request for processing
    submitRequest(si);
  }
}

The core processing flow in the above code is as follows:

  1. If SASL authentication is required and has not passed SASL authentication, it will return a SESSIONCLOSEDREQUIRESASLAUTH exception message. It will also send a session close message and stop receiving data.
  2. If the conditions in step (1) are not met, it will perform the submit request operation.

At this point, we have finally found the core method for request processing: submitRequest. Here's the specific code:

public void submitRequest(Request si) {
  // If the first request processor is null
  if (firstProcessor == null) {
    synchronized (this) {
      try {
        // Since all requests are passed to the request processor, it should wait for the request processor chain to be set up. The state will be updated to RUNNING after setup.
        while (state == State.INITIAL) {
          wait(1000);
        }
      } catch (InterruptedException e) {
        LOG.warn("Unexpected interruption", e);
      }
      if (firstProcessor == null || state != State.RUNNING) {
        throw new RuntimeException("Not started");
      }
    }
  }
  try {
    // Try to process session-related information
    touch(si.cnxn);
    // Validate request type
    boolean validpacket = Request.isValid(si.type);
    // Request type validation passed
    if (validpacket) {
      // Process request
      firstProcessor.processRequest(si);
      // If connection object exists
      if (si.cnxn != null) {
        // Increment the number of requests being processed
        incInProcess();
      }
    }
    // Request type validation failed
    else {
      LOG.warn("Received packet at server of unknown type " + si.type);
      // Request processor for sending error code response
      new UnimplementedRequestProcessor().processRequest(si);
    }
  } catch (MissingSessionException e) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Dropping request: " + e.getMessage());
    }
  } catch (RequestProcessorException e) {
    LOG.error("Unable to process request:" + e.getMessage(), e);
  }
}

In the submitRequest method, the core request processing relies on the second try block. The execution flow is as follows:

  1. Try to process session-related information through the touch method. The processing steps are as follows:
    1. If the service connection object (ServerCnxn) is empty, end the processing.
    2. Get the sessionId and session timeout from the service connection object.
    3. Use the SessionTracker interface to check if the sessionId exists. If it doesn't exist, it will return false. Use the SessionTracker interface to check if the session timeout has expired. If it has, it will return false. If the return value is false, a MissingSessionException will be thrown.
  2. Verify if the request is valid. If it's invalid, it will be handed over to the UnimplementedRequestProcessor for processing. If it's a valid request, it will be handed over to the firstProcessor member variable for processing.

In the above processing flow, we can see that the core processing of requests is handed over to the firstProcessor member variable. Here's the definition code for the firstProcessor member variable:

protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                                                            finalProcessor);
  ((SyncRequestProcessor) syncProcessor).start();
  firstProcessor = new PrepRequestProcessor(this, syncProcessor);
  ((PrepRequestProcessor) firstProcessor).start();
}

In the above code, we can see that it constructs three types of request processors. The three types are:

  1. Final processor: FinalRequestProcessor.
  2. Preparation processor (request preparation processor): PrepRequestProcessor.
  3. Synchronization processor: SyncRequestProcessor

The core of the setupRequestProcessors method is to complete the initialization of request processing. Regarding the timing of calling this function, it's directly called in the ZooKeeperServer#startup method. Looking further outward, we can find that this method is called when the Zookeeper service starts. Note that the Zookeeper service does not distinguish between leader and follower.

This concludes the analysis of the request processing entry point in Zookeeper. The following sections of this chapter will analyze the various request processors responsible for handling requests in Zookeeper.

PrepRequestProcessor Analysis

This section will analyze the preparation processor (request preparation processor) PrepRequestProcessor. The class diagram of this class is shown in the figure.

PrepRequestProcessor

From the class diagram, we can see that it not only implements the request processing interface (RequestProcessor) but also implements thread-related content. Let's first analyze the implementation of the request processing interface. The specific code is as follows:

public void processRequest(Request request) {
    submittedRequests.add(request);
}

In this code, we can see that the logic of request processing is to add the request object to the member variable submittedRequests. The definition code for the member variable submittedRequests is as follows:

LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();

From the above code, we can see that the storage structure is a Queue. The storage behavior logic is completed. Next, we need to analyze its processing logic. The entire request processing logic is encapsulated in the thread's run method. The specific processing code is as follows:

@Override
public void run() {
  try {
    // Infinite loop
    while (true) {
      // Get a request from the member variable submittedRequests
      Request request = submittedRequests.take();
      // Set the trace type to CLIENT_REQUEST_TRACE_MASK
      long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
      // If the request type is ping, set the trace type to CLIENT_PING_TRACE_MASK
      if (request.type == OpCode.ping) {
        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
      }
      if (LOG.isTraceEnabled()) {
        ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
      }
      // If the current request is the same as the death request, end the infinite loop and stop working
      if (Request.requestOfDeath == request) {
        break;
      }
      // Process the request
      pRequest(request);
    }
  } catch (RequestProcessorException e) {
    if (e.getCause() instanceof XidRolloverException) {
      LOG.info(e.getCause().getMessage());
    }
    // Handle exception
    handleException(this.getName(), e);
  } catch (Exception e) {
    handleException(this.getName(), e);
  }
  LOG.info("PrepRequestProcessor exited loop!");
}

The core of the above code is an infinite loop, which processes requests through this loop. The processing logic is as follows:

  1. Get a request from the member variable submittedRequests (from the head of the queue).
  2. Initialize the trace type to CLIENT_REQUEST_TRACE_MASK. If the request type is ping, set it to CLIENT_PING_TRACE_MASK. Then log it.
  3. Check if the current request is the same as the death request. If so, end the infinite loop.
  4. Hand over to the pRequest method for actual request processing.

In the above processing flow, attention should be paid to exception-related handling. All exception handling is handed over to the handleException method. Next, let's analyze the pRequest method. The processing logic of pRequest can be divided into three parts:

  1. Execution of the try block, which will perform different operations according to different request types.
  2. Execution of the catch block, which mainly completes the logic processing in case of exceptions in the try block.
  3. Processing by the next request processor.

Let's first outline the processing flow in the try block. The processing flow in the try block can be generalized into the following operation steps:

  1. Create different request objects according to different request types. The mapping information between request types and request objects is shown in the table.
  2. Process the conversion between requests and transactions through the pRequest2Txn method.
Request TypeRequest Object (Record object)Description
OpCode.createContainer, OpCode.create, OpCode.create2CreateRequestCreate type request.
OpCode.createTTLCreateTTLRequestCreate request with expiration time.
OpCode.deleteContainer, OpCode.deleteDeleteRequestDelete data type request.
OpCode.setDataSetDataRequestSet data type request.
OpCode.reconfigReconfigRequestReload configuration request.
OpCode.setACLSetACLRequestSet ACL (permission) type request.
OpCode.checkCheckVersionRequestCheck version request.
OpCode.multiMultiTransactionRecordMulti-transaction request.
OpCode.createSession, OpCode.closeSessionNoneNo request conversion, non-local session calls pRequest2Txn method.
OpCode.sync, OpCode.exists, OpCode.getData, OpCode.getACL, OpCode.getChildren, OpCode.getChildren2, OpCode.ping, OpCode.setWatches, OpCode.checkWatches, OpCode.removeWatchesNoneNo request conversion, handed over to sessionTracker to verify if it's legal.

After understanding the relationship between request types and request objects, let's analyze the core processing method pRequest2Txn. This method will be analyzed according to different operation types.

Analysis of Common Methods

Before analyzing various operation types, we need to examine some common methods provided in the PrepRequestProcessor class. There are five common methods:

  1. The fixupACL method is used to calculate ACL.
  2. The getRecordForPath method is used to obtain change records based on input paths.
  3. The checkACL method is used to verify ACL.
  4. The validatePath method is used to validate if a path is legal.
  5. The addChangeRecord method is used to add change records.

Let's analyze the fixupACL method first. Here's the specific processing code:

private List<ACL> fixupACL(String path, List<Id> authInfo, List<ACL> acls)
  throws KeeperException.InvalidACLException {
  // Filter out duplicate data from the acls parameter
  List<ACL> uniqacls = removeDuplicates(acls);
  // Container for storing return values
  LinkedList<ACL> rv = new LinkedList<ACL>();
  // Throw an exception if the filtered ACL count is empty
  if (uniqacls == null || uniqacls.size() == 0) {
    throw new KeeperException.InvalidACLException(path);
  }
  // Iterate through the filtered acl collection
  for (ACL a : uniqacls) {
    LOG.debug("Processing ACL: {}", a);
    // Throw an exception if the current element is null
    if (a == null) {
      throw new KeeperException.InvalidACLException(path);
    }
    // Extract id
    Id id = a.getId();
    // Throw an exception if id is null or the scheme stored in id is null
    if (id == null || id.getScheme() == null) {
      throw new KeeperException.InvalidACLException(path);
    }
    // Add to the result set if the scheme stored in id is "world" and the id variable in id is "anyone"
    if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
      rv.add(a);
    }
    // Perform authentication logic if the scheme stored in id is "auth"
    else if (id.getScheme().equals("auth")) {
      // Authentication pass flag
      boolean authIdValid = false;
      for (Id cid : authInfo) {
        // Get security authenticator based on security scheme
        AuthenticationProvider ap =
          ProviderRegistry.getProvider(cid.getScheme());

        if (ap == null) {
          LOG.error("Missing AuthenticationProvider for "
                    + cid.getScheme());
        }
        // Authentication passed
        else if (ap.isAuthenticated()) {
          authIdValid = true;
          rv.add(new ACL(a.getPerms(), cid));
        }
      }
      // Throw an exception if the authentication pass flag is false
      if (!authIdValid) {
        throw new KeeperException.InvalidACLException(path);
      }
    }
    else {
      // Get security authenticator based on security scheme
      AuthenticationProvider ap = ProviderRegistry.getProvider(id.getScheme());
      // Throw an exception if the security authenticator is null or id verification fails
      if (ap == null || !ap.isValid(id.getId())) {
        throw new KeeperException.InvalidACLException(path);
      }
      rv.add(a);
    }
  }
  return rv;
}

The core processing flow in the fixupACL method is as follows:

  1. Filter out duplicate data from the acls parameter, keeping only unique data.
  2. Create a container for storing return values.
  3. Throw an exception if the filtered ACL count is empty.
  4. Iterate through the filtered acls data, processing each item as follows:
    1. Throw an exception if the current element is null.
    2. Extract the Id object from the current element. Throw an exception if the Id object is null or the scheme stored in the Id object is null.
    3. If the scheme stored in the Id object is "world" and the id variable in the Id object is "anyone", add the current element to the return value container.
    4. If the scheme stored in the Id object is "auth", perform authentication operations.
    5. In cases not meeting conditions (3) and (4), get the security authenticator based on the security scheme. If the security authenticator is null or id verification fails, throw an exception; otherwise, add it to the return value container.

In the 4.4 operation flow, the authInfo parameter is used for authentication processing. The processing flow is as follows:

  1. Create an authentication pass flag (authIdValid), initially set to false.

  2. Iterate through the authInfo parameter, performing the following operations on each item:

    1. Get the security authenticator based on the security scheme.
    2. Perform authentication using the security authenticator. If authentication passes, set authIdValid to true and create an ACL object to add to the return value.
  3. Throw an exception if the authIdValid variable is false.

Next, let's analyze the getRecordForPath method. The complete code is as follows:

private ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
  ChangeRecord lastChange = null;
  // Lock the outstanding changes
  synchronized (zks.outstandingChanges) {
    // Get change record from outstandingChangesForPath object
    lastChange = zks.outstandingChangesForPath.get(path);
    if (lastChange == null) {
      // Get the data node corresponding to the path from the zk database
      DataNode n = zks.getZKDatabase().getNode(path);
      // If the data node is not null
      if (n != null) {
        Set<String> children;
        synchronized (n) {
          // Get the child nodes of the current node
          children = n.getChildren();
        }
        // Rewrite change record
        lastChange = new ChangeRecord(-1, path, n.stat, children.size(),
                                      zks.getZKDatabase().aclForNode(n));
      }
    }
  }
  if (lastChange == null || lastChange.stat == null) {
    throw new KeeperException.NoNodeException(path);
  }
  // Return
  return lastChange;
}

In the getRecordForPath method, it's important to note that operations on records are thread-safe. The specific operation logic is as follows:

  1. Get the outstandingChangesForPath variable from the ZooKeeper service object, then retrieve the change record based on the function parameter path.
  2. If the change record is empty, get the zk data node from the ZooKeeper service object, then retrieve the node data based on the function parameter path.
  3. If the node data obtained in step (2) is empty, end processing without getting the child nodes of the current node and recalculating the record information.
  4. Throw an exception if the record information is empty or the status in the record information is empty.

The getRecordForPath method uses the outstandingChangesForPath and outstandingChanges variables. The definition code for these two variables is as follows:

final Deque<ChangeRecord> outstandingChanges = new ArrayDeque<>();
final HashMap<String, ChangeRecord> outstandingChangesForPath = new HashMap<String, ChangeRecord>();

The outstandingChanges variable uses ArrayDeque for data storage, storing ChangeRecord objects. The outstandingChangesForPath variable uses HashMap for data storage, where the key represents the address (stored in the change record) and the value represents the ChangeRecord. The data for these two variables is set using the addChangeRecord method. The specific processing code is as follows:

private void addChangeRecord(ChangeRecord c) {
  synchronized (zks.outstandingChanges) {
    zks.outstandingChanges.add(c);
    zks.outstandingChangesForPath.put(c.path, c);
  }
}

Next, we will analyze the checkACL method. The specific processing code is as follows:

static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
                     List<Id> ids) throws KeeperException.NoAuthException {
  // Skip ACL authentication
  if (skipACL) {
    return;
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug("Permission requested: {} ", perm);
    LOG.debug("ACLs for node: {}", acl);
    LOG.debug("Client credentials: {}", ids);
  }
  // ACL list is empty or ACL count is zero
  if (acl == null || acl.size() == 0) {
    return;
  }
  // Process id collection
  for (Id authId : ids) {
    // If one of the elements in the id collection has a super scheme, return
    if (authId.getScheme().equals("super")) {
      return;
    }
  }
  // Process ACL collection
  for (ACL a : acl) {
    Id id = a.getId();
    if ((a.getPerms() & perm) != 0) {
      // Allow if scheme is world and id data is anyone
      if (id.getScheme().equals("world")
          && id.getId().equals("anyone")) {
        return;
      }
      // Get security authenticator based on security scheme
      AuthenticationProvider ap = ProviderRegistry.getProvider(id
                                                               .getScheme());
      // Security authenticator is not null
      if (ap != null) {
        // Process parameter ids
        for (Id authId : ids) {
          // Check if the scheme in the current id is the same as the scheme in the ACL id
          // Security authenticator matches the current id and the id in the ACL
          if (authId.getScheme().equals(id.getScheme())
              && ap.matches(authId.getId(), id.getId())) {
            return;
          }
        }
      }
    }
  }
  // Throw exception
  throw new KeeperException.NoAuthException();
}

The core processing flow in the above code is as follows:

  1. Check if ACL authentication needs to be skipped. If so, end processing.
  2. If the acl parameter is null or the number of elements in acl is 0, end processing.
  3. Loop through the acl collection parameter and perform the following operations on each ACL object:
    1. Get the Id object from the current ACL object.
    2. Perform an AND operation between the permission value in the current ACL and the permission value in the method parameter. Skip processing if the result is not 0.
    3. Check if the scheme in the Id object of the ACL object is "world" and if the id data is "anyone". If both conditions are met, end processing.
    4. Get the security authenticator based on the scheme in the Id object of the ACL object.
    5. If the security authenticator is not null, loop through the ids parameter. End processing if the following two conditions are met:
      1. The scheme in the current id is the same as the scheme in the ACL id.
      2. The security authenticator matches the current id and the id in the ACL.
  4. If the above processing flow does not end, it means ACL validation has failed, and an exception will be thrown.

Finally, we'll analyze the validatePath method. The specific processing code is as follows:

private void validatePath(String path, long sessionId) throws BadArgumentsException {
  try {
    PathUtils.validatePath(path);
  } catch (IllegalArgumentException ie) {
    LOG.info("Invalid path {} with session 0x{}, reason: {}",
             path, Long.toHexString(sessionId), ie.getMessage());
    throw new BadArgumentsException(path);
  }
}

In the above code, the validation process is delegated to the PathUtils.validatePath method. The code content is extensive, so we won't include it here. In the PathUtils.validatePath method, the criteria for a valid path are as follows:

  1. The path variable cannot be null.
  2. The length of the path variable cannot be 0.
  3. The first character of the path variable is a forward slash ("/").
  4. The path variable has a length of 1 and meets the first three criteria.
  5. The last character of the path variable cannot be a forward slash ("/").

If these five basic criteria are met, each character in the entire path will be validated. If any of the following conditions are met, the path is considered invalid:

  1. The current character is char c = 0.
  2. The current character is a slash, and the last character is also a slash.
  3. The current character is a dot, the last character is a dot, the second to last character is a slash, and the character after the current one is a slash.
  4. The current character is a dot, the character before it is a slash, and the character after it is a slash.
  5. The current character falls within the following ranges:
c > '\u0000' && c <= '\u001f'
        || c >= '\u007f' && c <= '\u009F'
        || c >= '\ud800' && c <= '\uf8ff'
        || c >= '\ufff0' && c <= '\uffff'

Understanding these five common methods will make the analysis of subsequent methods clearer.

Create Operation

This section will analyze the create operation, which is mainly performed by the pRequest2TxnCreate method. The core processing code is as follows:

private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize)
  throws IOException, KeeperException {
  // Check if deserialization is needed, if so, convert the request to record
  if (deserialize) {
    ByteBufferInputStream.byteBuffer2Record(request.request, record);
  }

  // Flag indicating the creation type
  int flags;
  // Node path
  String path;
  // ACL information
  List<ACL> acl;
  // Data information
  byte[] data;
  // Expiration time
  long ttl;
  // Type is creating a node with expiration time
  if (type == OpCode.createTTL) {
    CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
    flags = createTtlRequest.getFlags();
    path = createTtlRequest.getPath();
    acl = createTtlRequest.getAcl();
    data = createTtlRequest.getData();
    ttl = createTtlRequest.getTtl();
  }
  else {
    CreateRequest createRequest = (CreateRequest) record;
    flags = createRequest.getFlags();
    path = createRequest.getPath();
    acl = createRequest.getAcl();
    data = createRequest.getData();
    ttl = -1;
  }
  // Convert flags to creation type
  CreateMode createMode = CreateMode.fromFlag(flags);
  // Validate if the create request is legal
  validateCreateRequest(path, createMode, request, ttl);
  // Validate the creation path
  String parentPath = validatePathForCreate(path, request.sessionId);

  // Process ACL
  List<ACL> listACL = fixupACL(path, request.authInfo, acl);
  // Get the data record of the parent node
  ChangeRecord parentRecord = getRecordForPath(parentPath);

  // Validate ACL
  checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
  // Get the cversion (creation version number) of the parent node
  int parentCVersion = parentRecord.stat.getCversion();
  // Check if it's a sequential creation
  if (createMode.isSequential()) {
    path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
  }
  // Validate if the path is legal
  validatePath(path, request.sessionId);
  try {
    // Throw an exception if there's already a record for the current path
    if (getRecordForPath(path) != null) {
      throw new KeeperException.NodeExistsException(path);
    }
  } catch (KeeperException.NoNodeException e) {
  }
  // Check if it's an ephemeral parent
  boolean ephemeralParent =
    EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
  if (ephemeralParent) {
    throw new KeeperException.NoChildrenForEphemeralsException(path);
  }
  // Calculate new creation version number
  int newCversion = parentRecord.stat.getCversion() + 1;
  // If the operation type is creating a container, set txn to CreateContainerTxn
  if (type == OpCode.createContainer) {
    request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
  }
  // If the operation type is creating an expiring node, set txn to CreateTTLTxn
  else if (type == OpCode.createTTL) {
    request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
  }
  // For other creation-related operation types, set txn to CreateTxn
  else {
    request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
                                 newCversion));
  }
  StatPersisted s = new StatPersisted();
  if (createMode.isEphemeral()) {
    s.setEphemeralOwner(request.sessionId);
  }
  // Copy parent node data
  parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
  // Increment the number of child nodes in the parent node
  parentRecord.childCount++;
  // Set creation version number
  parentRecord.stat.setCversion(newCversion);
  // Add change record
  addChangeRecord(parentRecord);
  addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
}

The core processing flow in the above code is as follows:

  1. Check if deserialization is needed. If so, convert the request to the record parameter.
  2. Get flags, path, acl, data, and ttl data from the record based on different types.
  3. Convert flags to creation type (CreateMode), which includes the following types:
    1. PERSISTENT: Persistent type (persistent node), this node will not disappear except for delete commands.
    2. PERSISTENT_SEQUENTIAL: Sequential persistent type (sequential persistent node), this type has the same characteristics as PERSISTENT, with additional sequential features.
    3. EPHEMERAL: Temporary type (ephemeral node), the node will be destroyed after the client disconnects.
    4. EPHEMERAL_SEQUENTIAL: Sequential temporary type (sequential ephemeral node), this type has the same characteristics as EPHEMERAL, with additional sequential features.
    5. CONTAINER: Container type (container node), if there are no child nodes under this type of node, it will be cleaned up at 60-second intervals.
    6. PERSISTENT_WITH_TTL: Persistent type with expiration time.
    7. PERSISTENT_SEQUENTIAL_WITH_TTL: Sequential persistent type with expiration time.
  4. Validate if the create request is legal using the validateCreateRequest method.
  5. Validate if the address is legal using the validatePathForCreate method. If legal, it will calculate the address.
  6. Calculate ACL using the fixupACL method.
  7. Get the change record information (ChangeRecord) of the parent address using the getRecordForPath method.
  8. Validate ACL using the checkACL method.
  9. Extract the cversion data information of the parent node.
  10. Judge the creation type obtained in step (3). If it's a sequential creation, the address needs to be recalculated. The calculation rule is: address + parent node's cversion.
  11. Validate if the path is legal using the validatePath method.
  12. Confirm if there's a related record for the path using the getRecordForPath method. If a record exists, an exception will be thrown.
  13. Check if the parent node is an ephemeral node. If so, throw an exception.
  14. Calculate the new version number. The calculation rule is to add one to the cversion data value.
  15. Set txn data information based on different types.
  16. Create a StatPersisted object. If the creation type is ephemeral, set the sessionId for the StatPersisted object.
  17. Copy parent node data, increment the number of child nodes in the parent node by one, set the new version number (cversion).
  18. Add change records using the addChangeRecord method.

These 18 operations can complete node creation. Next, we'll provide extended explanations for some of the methods in these 18 steps. First is the validateCreateRequest method, which is used to validate if the create request is legal. The specific processing code is as follows:

Delete Container Operation

This section analyzes the delete container operation. The delete container operation is not encapsulated as a separate method but is implemented within a case code block. The specific processing code is as follows:

case OpCode.deleteContainer: {
  // Extract the path from the request
  String path = new String(request.request.array());
  // Extract the parent path
  String parentPath = getParentPathAndValidate(path);
  // Parent node record information
  ChangeRecord parentRecord = getRecordForPath(parentPath);
  // Current node record information
  ChangeRecord nodeRecord = getRecordForPath(path);
  // Throw an exception if the current node has child nodes
  if (nodeRecord.childCount > 0) {
    throw new KeeperException.NotEmptyException(path);
  }
  if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner())
      == EphemeralType.NORMAL) {
    throw new KeeperException.BadVersionException(path);
  }
  // Set txn
  request.setTxn(new DeleteTxn(path));
  // Copy parent node data
  parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
  // Decrease the child count of the parent node by one
  parentRecord.childCount--;
  // Add change records
  addChangeRecord(parentRecord);
  addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
  break;
}

The main processing flow in the above code is as follows:

  1. Extract the path from the request object.
  2. Validate the path from the request object and extract the parent path.
  3. Use the getRecordForPath method to obtain the records for both the parent path and the current path.
  4. Throw an exception if the current record has a child node count greater than 0.
  5. Get the ephemeral owner data of the current node, convert the owner data to an EphemeralType object, and throw an exception if the conversion result is NORMAL.
  6. Set the txn data for the request, specifically of type DeleteTxn.
  7. Copy the parent node data and decrease the child node count of the parent node by one.
  8. Use the addChangeRecord method to add the parent node record and the current node's change record to the record container.

Delete Node Operation

This section analyzes the delete node operation. The delete node operation is not encapsulated as a separate method but is implemented within a case code block. The specific processing code is as follows:

case OpCode.delete:
  // Validate session using sessionTracker
  zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  // Convert to delete request
  DeleteRequest deleteRequest = (DeleteRequest) record;
  // Deserialize information from the request object into the delete request if needed
  if (deserialize) {
    ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
  }
  // Get the path to be deleted
  String path = deleteRequest.getPath();
  // Get the parent path of the delete path
  String parentPath = getParentPathAndValidate(path);
  // Get the record of the parent path
  ChangeRecord parentRecord = getRecordForPath(parentPath);
  // Get the record of the current path
  ChangeRecord nodeRecord = getRecordForPath(path);
  // Verify ACL
  checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo);
  // Verify version number
  checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
  // Throw an exception if the current node has child nodes
  if (nodeRecord.childCount > 0) {
    throw new KeeperException.NotEmptyException(path);
  }
  // Set txn
  request.setTxn(new DeleteTxn(path));
  // Copy parent node
  parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
  // Decrease the child count of the parent node by one
  parentRecord.childCount--;
  // Add change records
  addChangeRecord(parentRecord);
  addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
  break;

The core processing flow in the above code is as follows:

  1. Validate the session using sessionTracker; if validation fails, an exception will be thrown.
  2. Convert the record variable to a delete request object. If deserialization is needed, deserialize the data from the request object into the delete request object.
  3. Get the path to be deleted from the delete request, obtain the parent path based on the path to be deleted, and get the records corresponding to the parent path and the delete path.
  4. Verify permissions using the checkACL method.
  5. Verify the version number using the checkAndIncVersion method.
  6. If the current node has child nodes (count greater than 0), an exception will be thrown. This ensures that deletion starts from the lowest nodes.
  7. Set the txn data for the request, specifically of type DeleteTxn.
  8. Copy the parent node data and decrease the child node count of the parent node by one.
  9. Use the addChangeRecord method to add the parent node record and the current node's change record to the record container.

In the above processing flow, special attention should be paid to the checkAndIncVersion method. It not only verifies the version number but also increments it. The specific code is as follows:

private static int checkAndIncVersion(int currentVersion, int expectedVersion, String path)
  throws KeeperException.BadVersionException {
  if (expectedVersion != -1 && expectedVersion != currentVersion) {
    throw new KeeperException.BadVersionException(path);
  }
  return currentVersion + 1;
}

In the checkAndIncVersion method, an exception will be thrown if both of the following conditions are met:

  1. The version number in the delete request (expectedVersion, which directly means the expected version number in this method, using the version number from the delete request as the actual parameter in the delete phase, with corresponding parameter meanings in other phases) is not -1.
  2. The version number in the delete request is different from the current version number (currentVersion).

If no exception is thrown, the current version number will be incremented by one.

Data Setting Operation

This section analyzes the data setting operation. The operation is not encapsulated as a separate method but is implemented within a case block. The specific handling code is as follows:

case OpCode.setData: 
  // Validate session using sessionTracker
  zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  // Convert to SetDataRequest
  SetDataRequest setDataRequest = (SetDataRequest) record;
  // Deserialize request object information into SetDataRequest if needed
  if (deserialize) {
    ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
  }
  // Get current operation path
  path = setDataRequest.getPath();
  // Validate current operation path
  validatePath(path, request.sessionId);
  // Get record for current operation path
  nodeRecord = getRecordForPath(path);
  // Verify permissions
  checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
  // Calculate new version
  int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(),
                                      setDataRequest.getVersion(), path);

  // Set transaction
  request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
  // Duplicate current node's record
  nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
  // Set version number
  nodeRecord.stat.setVersion(newVersion);
  // Add change record
  addChangeRecord(nodeRecord);
  break;

The core processing flow in the above code is as follows:

  1. Validate the session using sessionTracker. If validation fails, an exception is thrown.
  2. Convert the record variable to a SetDataRequest object. If deserialization is required, deserialize the data from the request object into the SetDataRequest object.
  3. Extract the path to be operated on from the SetDataRequest object.
  4. Validate the current operation path using the validatePath method. If validation fails, an exception is thrown.
  5. Verify permissions using the checkACL method.
  6. Validate the version number and calculate a new version number using the checkAndIncVersion method.
  7. Set the transaction data for the request, specifically of type SetDataTxn.
  8. Duplicate the data of the current operation node and set the new version.
  9. Add the change record of the current operation node to the record container using the addChangeRecord method.

Configuration Reload Operation

This section analyzes the configuration reload operation. The operation is not encapsulated as a separate method but is implemented within a case block. The specific handling code is as follows:

case OpCode.reconfig:
  // Throw exception if configuration reload is not enabled in ZK service
  if (!zks.isReconfigEnabled()) {
    LOG.error("Reconfig operation requested but reconfig feature is disabled.");
    throw new KeeperException.ReconfigDisabledException();
  }

  // Output warning log if skipping ACL
  if (skipACL) {
    LOG.warn("skipACL is set, reconfig operation will skip ACL checks!");
  }

  // Validate session using sessionTracker
  zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  // Convert to ReconfigRequest
  ReconfigRequest reconfigRequest = (ReconfigRequest) record;
  // Confirm leader service
  LeaderZooKeeperServer lzks;
  try {
    // Throw exception if current ZK service cannot be converted to LeaderZooKeeperServer
    lzks = (LeaderZooKeeperServer) zks;
  } catch (ClassCastException e) {
    throw new KeeperException.UnimplementedException();
  }
  // Get last seen quorum verifier
  QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier();
  // Throw exception if last seen verifier version doesn't match leader service verifier version
  if (lastSeenQV.getVersion() != lzks.self.getQuorumVerifier().getVersion()) {
    throw new KeeperException.ReconfigInProgress();
  }
  // Get configuration ID
  long configId = reconfigRequest.getCurConfigId();

  // Configuration ID is -1
  // Configuration ID doesn't match last seen verifier version
  if (configId != -1 && configId != lzks.self.getLastSeenQuorumVerifier()
      .getVersion()) {
    String msg = "Reconfiguration from version " + configId
      + " failed -- last seen version is " +
      lzks.self.getLastSeenQuorumVerifier().getVersion();
    throw new KeeperException.BadVersionException(msg);
  }

  // Get new members (configuration data)
  String newMembers = reconfigRequest.getNewMembers();

  // New members not empty
  if (newMembers != null) { 
    LOG.info("Non-incremental reconfig");
    newMembers = newMembers.replaceAll(",", "\n");

    try {
      // Read new member data
      Properties props = new Properties();
      props.load(new StringReader(newMembers));
      // Parse configuration
      request.qv = QuorumPeerConfig.parseDynamicConfig(props,
                                                       lzks.self.getElectionType(), true, false);
      request.qv.setVersion(request.getHdr().getZxid());
    } catch (IOException | ConfigException e) {
      throw new KeeperException.BadArgumentsException(e.getMessage());
    }
  } else {
    LOG.info("Incremental reconfig");

    // Determine servers joining the cluster
    List<String> joiningServers = null;
    String joiningServersString = reconfigRequest.getJoiningServers();
    if (joiningServersString != null) {
      joiningServers = StringUtils.split(joiningServersString, ",");
    }

    // Determine servers leaving the cluster
    List<String> leavingServers = null;
    String leavingServersString = reconfigRequest.getLeavingServers();
    if (leavingServersString != null) {
      leavingServers = StringUtils.split(leavingServersString, ",");
    }

    // Throw exception if types differ
    if (!(lastSeenQV instanceof QuorumMaj)) {
      String msg =
        "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system";
      LOG.warn(msg);
      throw new KeeperException.BadArgumentsException(msg);
    }
    // Server collection
    Map<Long, QuorumServer> nextServers =
      new HashMap<Long, QuorumServer>(lastSeenQV.getAllMembers());
    try {
      // Remove corresponding data from nextServers if leaving servers exist
      if (leavingServers != null) {
        for (String leaving : leavingServers) {
          long sid = Long.parseLong(leaving);
          nextServers.remove(sid);
        }
      }
      // Add data to nextServers if joining servers exist
      if (joiningServers != null) {
        for (String joiner : joiningServers) {
          String[] parts =
            StringUtils.split(joiner, "=").toArray(new String[0]);
          if (parts.length != 2) {
            throw new KeeperException.BadArgumentsException(
              "Wrong format of server string");
          }
          Long sid = Long.parseLong(
            parts[0].substring(parts[0].lastIndexOf('.') + 1));
          QuorumServer qs = new QuorumServer(sid, parts[1]);
          if (qs.clientAddr == null || qs.electionAddr == null
              || qs.addr == null) {
            throw new KeeperException.BadArgumentsException(
              "Wrong format of server string - each server should have 3 ports specified");
          }
          for (QuorumServer nqs : nextServers.values()) {
            if (qs.id == nqs.id) {
              continue;
            }
            qs.checkAddressDuplicate(nqs);
          }

          nextServers.remove(qs.id);
          nextServers.put(qs.id, qs);
        }
      }
    } catch (ConfigException e) {
      throw new KeeperException.BadArgumentsException("Reconfiguration failed");
    }
    // Set request parameter qv
    request.qv = new QuorumMaj(nextServers);
    request.qv.setVersion(request.getHdr().getZxid());
  }
  // Throw exception if quorum configuration is enabled and voting members are less than 2
  if (QuorumPeerConfig.isStandaloneEnabled()
      && request.qv.getVotingMembers().size() < 2) {
    String msg =
      "Reconfig failed - new configuration must include at least 2 followers";
    LOG.warn(msg);
    throw new KeeperException.BadArgumentsException(msg);
  }
  // Throw exception if voting members are less than 1
  else if (request.qv.getVotingMembers().size() < 1) {
    String msg =
      "Reconfig failed - new configuration must include at least 1 follower";
    LOG.warn(msg);
    throw new KeeperException.BadArgumentsException(msg);
  }

  // Check if quorum data is synced, throw exception if not
  if (!lzks.getLeader().isQuorumSynced(request.qv)) {
    String msg2 =
      "Reconfig failed - there must be a connected and synced quorum in new configuration";
    LOG.warn(msg2);
    throw new KeeperException.NewConfigNoQuorum();
  }

  // Get record for configuration node
  nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
  // Verify permissions
  checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
  // Set transaction
  request.setTxn(
    new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1));
  nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
  nodeRecord.stat.setVersion(-1);
  addChangeRecord(nodeRecord);
  break;

The main processing flow in the above code is as follows:

  1. Check if configuration reload is enabled in the ZK service. If not, throw an exception.

  2. If ACL validation is to be skipped, log a warning message.

  3. Validate the session using sessionTracker. If validation fails, throw an exception.

  4. Convert the record variable to a ReconfigRequest.

  5. Attempt to cast the zks variable to LeaderZooKeeperServer. If casting fails, throw an exception. If successful, assign it to the lzks variable.

  6. Get the last seen quorum verifier from the lzks variable.

  7. Get the version information from the last seen verifier. If it doesn't match the verifier version in lzks, throw an exception indicating an ongoing reconfiguration operation.

  8. Get the ID from the reconfiguration request. If the ID is -1 and doesn't match the version of the last seen verifier, throw an exception.

  9. Get new members (configuration data) from the request.

  10. If new members are not null, parse the configuration using QuorumPeerConfig.parseDynamicConfig and set the result in the request object (request.qv).

  11. If new members are null, perform the following operations:

    1. Get the list of servers joining and leaving the cluster from the request.
    2. Create a server collection container, initializing it with data from the ZooKeeper service object (using lastSeenQV.getAllMembers).
    3. If there are servers leaving the cluster, remove the corresponding content from the server collection container.
    4. If there are servers joining the cluster, add the corresponding content to the server collection container.
    5. Convert the server collection container to a QuorumMaj object and set it to the qv property of the request parameter.
  12. If quorum configuration is enabled but the number of voting members in the request object's qv property is less than 2, throw an exception. Or if the number of voting members is less than 1, throw an exception.

  13. Check if the quorum data (voting data) is synchronized. If not, throw an exception.

  14. Get the record for the configuration node (node address: /zookeeper/config).

  15. Verify permissions using the checkACL method.

  16. Set the transaction data for the request, specifically of type SetDataTxn.

  17. Duplicate the record of the configuration node.

  18. Add the change record of the configuration node to the record container using the addChangeRecord method.

ACL Setting Operation

This section analyzes the ACL setting operation. The operation is not encapsulated as a separate method but is implemented within a case block. The specific handling code is as follows:

case OpCode.setACL:
  // Validate session using sessionTracker
  zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  // Type conversion
  SetACLRequest setAclRequest = (SetACLRequest) record;
  // Deserialization
  if (deserialize) {
    ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
  }
  // Get operation path
  path = setAclRequest.getPath();
  // Validate path
  validatePath(path, request.sessionId);
  // Calculate ACL
  List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
  // Get record for current path
  nodeRecord = getRecordForPath(path);
  // Verify ACL
  checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo);
  // Calculate new version number
  newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(),
                                  setAclRequest.getVersion(), path);
  // Set transaction
  request.setTxn(new SetACLTxn(path, listACL, newVersion));
  // Duplicate record for current path
  nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
  nodeRecord.stat.setAversion(newVersion);
  addChangeRecord(nodeRecord);
  break;

The main processing flow in the above code is as follows:

  1. Validate the session using sessionTracker. If validation fails, throw an exception.
  2. Convert the record variable to a SetACLRequest. If deserialization is needed, deserialize the data from the request object into the SetACLRequest object.
  3. Get the operation path from the SetACLRequest object and validate it. If validation fails, throw an exception.
  4. Calculate the ACL using the fixupACL method.
  5. Calculate the new version number using the checkAndIncVersion method.
  6. Set the transaction data for the request, specifically of type SetACLTxn.
  7. Duplicate the record of the current node.
  8. Add the change record of the current node to the record container using the addChangeRecord method.

Creating a Session Operation

This section analyzes the process of creating a session in ZooKeeper. The session creation operation is not encapsulated as a separate method but is implemented within a case block. The specific handling code is as follows:

case OpCode.createSession:
  request.request.rewind();
  // Timeout
  int to = request.request.getInt();
  request.setTxn(new CreateSessionTxn(to));
  request.request.rewind();
  // Set sessionId for different states
  if (request.isLocalSession()) {
    zks.sessionTracker.addSession(request.sessionId, to);
  } else {
    zks.sessionTracker.addGlobalSession(request.sessionId, to);
  }
  zks.setOwner(request.sessionId, request.getOwner());
  break;

In the above code, the operation for creating a session is divided into two types:

  1. If the request is for a local session, the sessionId will be added using the addSession method.
  2. If the request is not for a local session, the sessionId will be added using the addGlobalSession method.

After adding the sessionId, the ZooKeeperServer#setOwner method will bind the sessionId and the owner.

Closing Session Operation

This section analyzes the operation of closing a session. The session closing operation is not encapsulated as a separate method but is implemented within a case block. The specific handling code is as follows:

case OpCode.closeSession:
  // Get the corresponding ephemeral paths for the sessionId
  Set<String> es = zks.getZKDatabase()
    .getEphemerals(request.sessionId);
  // Lock
  synchronized (zks.outstandingChanges) {
    // Iterate through outstandingChanges object
    for (ChangeRecord c : zks.outstandingChanges) {
      // If the status is null
      if (c.stat == null) {
        // Performing a delete
        // Remove from the ephemeral path set
        es.remove(c.path);
      }
      // If the ephemeral owner of the status matches the current sessionId, add it to the ephemeral path set
      else if (c.stat.getEphemeralOwner() == request.sessionId) {
        es.add(c.path);
      }
    }
    // Iterate to mark for deletion
    for (String path2Delete : es) {
      addChangeRecord(
        new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0,
                         null));
    }

    // Close the sessionId
    zks.sessionTracker.setSessionClosing(request.sessionId);
  }
  break;

The main processing flow in the above code is as follows:

  1. Search for the set of ephemeral paths corresponding to the sessionId in the current request in the ZooKeeper database.
  2. Iterate through the outstandingChanges variable in the zks object, performing the following operations on each element:
    1. If the status of the current element is null, it needs to be removed from the ephemeral path set.
    2. If the ephemeral owner attribute of the status in the current element matches the sessionId in the request, it needs to be added to the ephemeral path set.
  3. Iterate through the ephemeral path set to construct change records and add the new records to the record container.
  4. Close the sessionId.

Check Operations

This section analyzes check operations. Check operations are not encapsulated as a separate method but are implemented within a case block. The specific handling code is as follows:

case OpCode.check:
  // Validate session using sessionTracker
  zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
  // Type conversion
  CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
  // Deserialization
  if (deserialize) {
    ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
  }
  // Extract the operation path
  path = checkVersionRequest.getPath();
  // Validate path
  validatePath(path, request.sessionId);
  // Get the record for the operation path
  nodeRecord = getRecordForPath(path);
  // Verify ACL
  checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo);
  // Set txn
  request.setTxn(
    new CheckVersionTxn(path, checkAndIncVersion(nodeRecord.stat.getVersion(),
                                                 checkVersionRequest.getVersion(), path)));
  break;

The main processing flow in the above code is as follows:

  1. Validate the session using sessionTracker. If validation fails, an exception will be thrown.
  2. Convert the record variable to a check request. If deserialization is needed, the data from the request object will be deserialized into the check request object.
  3. Extract the operation path from the check request, validate this path. If validation fails, an exception will be thrown.
  4. Get the record information for the current operation path.
  5. Verify permissions (ACL) using the checkACL method.
  6. Set the txn data for the request, specifically of type CheckVersionTxn.

Multi-transaction Requests

Next, we'll return to the PrepRequestProcessor#pRequest method to analyze the operations of type OpCode.multi. The specific handling code is as follows:

case OpCode.multi:{
  // Create a multi-transaction request object
  MultiTransactionRecord multiRequest = new MultiTransactionRecord();
  try {
    // Convert request object to multi-transaction request object
    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
  } catch (IOException e) {
    // Set header information for exception
    request.setHdr(
      new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
                    Time.currentWallTime(), OpCode.multi));
    throw e;
  }
  List<Txn> txns = new ArrayList<Txn>();
  // Get next zxid
  long zxid = zks.getNextZxid();
  // Exception record variable
  KeeperException ke = null;
  // Convert multi-transaction request object to map structure for subsequent rollback operations
  Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

  // Iterate through multi-transaction request object
  for (Op op : multiRequest) {
    // Get request information from single transaction
    Record subrequest = op.toRequestRecord();
    // Operation type
    int type;
    // Record information
    Record txn;
    // If exception record variable is not null, set exception-related information
    if (ke != null) {
      type = OpCode.error;
      txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
    }
    else {
      try {
        // Execute core request-to-transaction logic
        pRequest2Txn(op.getType(), zxid, request, subrequest, false);
        // Record operation type
        type = request.getHdr().getType();
        // Record txn
        txn = request.getTxn();
      }
      // Record exception-related information
      catch (KeeperException e) {

        ke = e;
        type = OpCode.error;
        txn = new ErrorTxn(e.code().intValue());

        if (e.code().intValue() > Code.APIERROR.intValue()) {
          LOG.info(
            "Got user-level KeeperException when processing {} aborting"
            +
            " remaining multi ops. Error Path:{} Error:{}",
            request.toString(), e.getPath(), e.getMessage());
        }

        request.setException(e);
        // Rollback operation
        rollbackPendingChanges(zxid, pendingChanges);
      }
    }
    // Output record processing
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
    txn.serialize(boa, "request");
    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

    txns.add(new Txn(type, bb.array()));
  }

  request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                               Time.currentWallTime(), request.type));
  request.setTxn(new MultiTxn(txns));

  break;
}

The main processing flow in the above code is as follows:

  1. Create a multi-transaction request object and deserialize the data from the request object into the multi-transaction request object. If an exception occurs during deserialization, set the header information for the request object and throw the exception.
  2. Create a Txn data container.
  3. Use the getPendingChanges method to convert the multi-transaction request object into a Map structure for subsequent rollback operations.
  4. Get the next zxid from the ZK service object.
  5. Create an exception record variable ex, with a default value of null indicating no exception.
  6. Iterate through the multi-transaction request object, performing the following operations on each single transaction request object:
    1. Get the request information (Record) from the single transaction request.
    2. If the external exception record variable is not null, set the operation type to error and set the txn object to ErrorTxn.
    3. If the external exception record variable is null, complete the transaction operation processing through the pRequest2Txn method, record the operation type and txn after completion. If an exception occurs during the pRequest2Txn method processing, set the external exception record variable to the current exception, and set the operation type and txn variable. Then perform a rollback operation using the rollbackPendingChanges method.

In the above processing flow, the two most critical methods are pRequest2Txn and rollbackPendingChanges. The former method has been analyzed in detail in the previous sections of this chapter. This section mainly analyzes the latter method, with the complete code as follows:

void rollbackPendingChanges(long zxid, Map<String, ChangeRecord> pendingChangeRecords) {
  // Lock
  synchronized (zks.outstandingChanges) {
    // Iterate through outstandingChanges variable
    Iterator<ChangeRecord> iter = zks.outstandingChanges.descendingIterator();
    while (iter.hasNext()) {
      ChangeRecord c = iter.next();
      // If the operation record matches the current record
      if (c.zxid == zxid) {
        iter.remove();
        zks.outstandingChangesForPath.remove(c.path);
      }
      else {
        break;
      }
    }

    // If outstandingChanges variable is empty
    if (zks.outstandingChanges.isEmpty()) {
      return;
    }

    // Get the first zxid
    long firstZxid = zks.outstandingChanges.peek().zxid;

    // Skip processing for those less than the first zxid, add those greater than or equal to outstandingChangesForPath variable
    for (ChangeRecord c : pendingChangeRecords.values()) {
      if (c.zxid < firstZxid) {
        continue;
      }
      zks.outstandingChangesForPath.put(c.path, c);
    }
  }
}

The core operation flow in the above code is as follows:

  1. Iterate through the outstandingChanges variable in the ZK service object. If the zxid of a single element matches the current zxid to be processed, this element needs to be removed from outstandingChanges, and the data in the outstandingChangesForPath variable also needs to be removed based on the path.
  2. If the outstandingChanges variable in the ZK service object is empty, end the processing.
  3. Extract the first zxid from the outstandingChanges variable in the ZK service object.
  4. Iterate through the method parameter pendingChangeRecords. If the zxid of an element is less than the zxid extracted in step (3), skip processing; otherwise, add it to the outstandingChangesForPath variable.

Analysis of LeaderRequestProcessor

This section analyzes the LeaderRequestProcessor, which is responsible for handling leader-related requests. The LeaderRequestProcessor class has two member variables, detailed in the table below.

Variable NameVariable TypeDescription
lzksLeaderZooKeeperServerZooKeeper's leader service.
nextProcessorRequestProcessorThe next request processor

After understanding the member variables, let's examine the constructor usage. The constructor code is as follows:

public LeaderRequestProcessor(LeaderZooKeeperServer zks,
                              RequestProcessor nextProcessor) {
  this.lzks = zks;
  this.nextProcessor = nextProcessor;
}

The constructor requires two variables, which are constructed in the LeaderZooKeeperServer#setupRequestProcessors method. The specific processing code is as follows:

@Override
protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  RequestProcessor toBeAppliedProcessor =
    new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
  commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                                        Long.toString(getServerId()), false,
                                        getZooKeeperServerListener());
  commitProcessor.start();
  ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                                                                            commitProcessor);
  proposalProcessor.initialize();
  prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
  prepRequestProcessor.start();
  firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

  setupContainerManager();
}

In the above code, focus on the LeaderRequestProcessor constructor. It uses 'this' to represent the first parameter and 'prepRequestProcessor' for the second parameter. This clarifies that the actual data type of the nextProcessor variable in the LeaderRequestProcessor class is PrepRequestProcessor.

After understanding the specific data types of member variables, let's analyze the processRequest method in the LeaderRequestProcessor class. The specific processing code is as follows:

@Override
public void processRequest(Request request)
  throws RequestProcessorException {
  // Upgraded request object
  Request upgradeRequest = null;
  try {
    // Validate session to determine if an upgrade is needed, and perform the upgrade if necessary
    upgradeRequest = lzks.checkUpgradeSession(request);
  } catch (KeeperException ke) {
    if (request.getHdr() != null) {
      LOG.debug("Updating header");
      request.getHdr().setType(OpCode.error);
      request.setTxn(new ErrorTxn(ke.code().intValue()));
    }
    request.setException(ke);
    LOG.info("Error creating upgrade request " + ke.getMessage());
  } catch (IOException ie) {
    LOG.error("Unexpected error in upgrade", ie);
  }
  // If the upgraded request object is not null
  if (upgradeRequest != null) {
    nextProcessor.processRequest(upgradeRequest);
  }
  nextProcessor.processRequest(request);
}

The core operation flow in the above code is as follows:

  1. Create an upgrade request object, check the request using the QuorumZooKeeperServer#checkUpgradeSession method, and upgrade the request if the check passes.
  2. If an exception occurs during the upgrade process in step (1), the exception-related information will be recorded in the original request.
  3. If the upgrade request object is not null, pass the upgraded request object to the next request processor for handling; otherwise, pass the original request to the next request processor.

Analysis of CommitProcessor

This section analyzes the CommitProcessor, which is used to match already committed requests with locally committed requests. The CommitProcessor class is multi-threaded, and communication between threads uses the LinkedBlockingQueue data structure. Before beginning method analysis, it's necessary to understand the member variables, detailed in the table below.

Variable NameVariable TypeDescription
queuedRequestsLinkedBlockingQueue<Request>All requests entering the CommitProcessor class for processing are first placed in this container.
committedRequestsLinkedBlockingQueue<Request>Already committed requests
nextPendingAtomicReference<Request>Request currently waiting for commitment
currentlyCommittingAtomicReference<Request>Request currently being committed
numRequestsProcessingAtomicIntegerNumber of requests already committed
stoppedbooleanWhether work is paused
workerPoolWorkerServiceWork service
nextProcessorRequestProcessorThe next request processor. In the Follower role, the type is FinalRequestProcessor. In the Leader role, the type is ToBeAppliedRequestProcessor. In the Observer role, the type is FinalRequestProcessor.
matchSyncsbooleanWhether to wait for the leader service to return.
workerShutdownTimeoutMSlongWork service timeout

After understanding the member variables, let's analyze the request processing method in the CommitProcessor class. The specific code is as follows:

@Override
public void processRequest(Request request) {
  if (stopped) {
    return;
  }
  if (LOG.isDebugEnabled()) {
    LOG.debug("Processing request:: " + request);
  }
  queuedRequests.add(request);
  if (!isWaitingForCommit()) {
    wakeup();
  }
}

The main processing flow in the above code is as follows:

  1. Check if the member variable 'stopped' is true, which indicates stopping processing. If true, end processing.
  2. Add the request object to the member variable queuedRequests.
  3. If the member variable nextPending is empty, perform a notification operation (wake up the thread).

In the processRequest method, we can see that it merely puts the request object into the queuedRequests variable without actually processing the request. The real request processing operation is in the run method, with the specific code as follows:

@Override
public void run() {
  Request request;
  try {
    // Loop while not stopped
    while (!stopped) {
      // Lock
      synchronized (this) {
        // Conditions for waiting:
        // 1. Not stopped
        // 2.1 queuedRequests set is empty
        // 2.2 nextPending contains data
        // 2.3 currentlyCommitting contains data
        // 3.1 committedRequests is empty
        // 3.2 numRequestsProcessing count is not 0
        while (
          !stopped &&
          ((queuedRequests.isEmpty() || isWaitingForCommit()
            || isProcessingCommit()) &&
           (committedRequests.isEmpty()
            || isProcessingRequest()))) {

          wait();
        }
      }
      // Process requests in queuedRequests
      // Loop conditions:
      // 1. Not stopped working
      // 2. nextPending contains data
      // 3. currentlyCommitting contains data
      // 4. Get a request object from queuedRequests, and this object is not null
      while (!stopped && !isWaitingForCommit() &&
             !isProcessingCommit() &&
             (request = queuedRequests.poll()) != null) {
        // Determine if commitment is needed
        if (needCommit(request)) {
          nextPending.set(request);
        }
        else {
          // Send to the next request processor for processing
          sendToNextProcessor(request);
        }
      }
      // Process committed requests
      processCommitted();
    }
  } catch (Throwable e) {
    handleException(this.getName(), e);
  }
  LOG.info("CommitProcessor exited loop!");
}

The above code uses a loop to process requests, with each loop stage executing as follows:

  1. If the following three major conditions are met, the thread will wait:

    1. Condition 1: The member variable 'stopped' is true.
    2. Condition 2 consists of two parts. The first part is that the queuedRequests set is empty, and the second part is that nextPending contains data.
    3. Condition 3 consists of two parts. The first part is that the committedRequests set is empty, and the second part is that the numRequestsProcessing count is not 0.
  2. Loop to process request objects in the queuedRequests variable. The loop conditions have four items that must be simultaneously satisfied:

    1. The member variable 'stopped' is true.
    2. The variable nextPending contains data.
    3. The variable currentlyCommitting contains data.
    4. Get a request object from the queuedRequests variable, and this object is not null.

    Under these four loop conditions, the processing logic for a single request is: use the needCommit method to determine if the request needs to be committed. If needed, set the request to the member variable nextPending; otherwise, use the sendToNextProcessor method to pass it to the next request processor for handling.

  3. Use the processCommitted method to process already committed requests.

Next, let's analyze some methods involved in the above processing flow. First, analyze the needCommit method, which determines the return value based on different operation types. The specific mapping relationship is shown in the table:

Request TypeReturn Value
create, create2, createTTL, createContainer, delete, deleteContainer, setData, reconfig, multi, setACLtrue
syncmatchSyncs
createSession, closeSessionNegation of whether it's a local session

Next, let's analyze the sendToNextProcessor method. The specific processing code is as follows:

private void sendToNextProcessor(Request request) {
  numRequestsProcessing.incrementAndGet();
  workerPool.schedule(new CommitWorkRequest(request), request.sessionId);
}

The main processing flow in the above code is as follows:

  1. Increment the counter.
  2. Encapsulate the request as a CommitWorkRequest type and hand it over to the work pool for processing.

From the sendToNextProcessor method, we can see that the core is handed over to the workerPool variable, which is of type WorkerService. Its schedule method code is as follows:

public void schedule(WorkRequest workRequest, long id) {
  // Check if processing is stopped
  if (stopped) {
    // Clean up the request
    workRequest.cleanup();
    return;
  }

  // Convert the request
  ScheduledWorkRequest scheduledWorkRequest =
    new ScheduledWorkRequest(workRequest);
  // Get the number of task executors
  int size = workers.size();
  // If there are task executors available
  if (size > 0) {
    try {
      int workerNum = ((int) (id % size) + size) % size;
      // Get a task executor and execute
      ExecutorService worker = workers.get(workerNum);
      worker.execute(scheduledWorkRequest);
    } catch (RejectedExecutionException e) {
      LOG.warn("ExecutorService rejected execution", e);
      workRequest.cleanup();
    }
  }
  else {
    scheduledWorkRequest.run();
  }
}

The core processing flow in the above code is as follows:

  1. Check if processing is stopped. If so, clean up the request and end processing.
  2. Encapsulate the WorkRequest type request into a ScheduledWorkRequest.
  3. Check if the number of task executors is greater than 0. If so, get a task executor from the executor collection to execute; otherwise, execute directly.

In the WorkerService#schedule method, the core execution object is ScheduledWorkRequest, which implements the Runnable interface. The specific implementation method is as follows:

private class ScheduledWorkRequest implements Runnable {
  private final WorkRequest workRequest;

  ScheduledWorkRequest(WorkRequest workRequest) {
    this.workRequest = workRequest;
  }

  @Override
  public void run() {
    try {
      if (stopped) {
        workRequest.cleanup();
        return;
      }
      workRequest.doWork();
    } catch (Exception e) {
      LOG.warn("Unexpected exception", e);
      workRequest.cleanup();
    }
  }
}

In the above code, we can see that the run method ultimately calls the WorkRequest#doWork method to complete the request operation. Therefore, returning to the sendToNextProcessor method, we find that the actual request type is CommitWorkRequest. After clarifying the type, let's examine the doWork function. The specific code is as follows:

public void doWork() throws RequestProcessorException {
    try {
        nextProcessor.processRequest(request);
    } finally {
        currentlyCommitting.compareAndSet(request, null);
        if (numRequestsProcessing.decrementAndGet() == 0) {
            if (!queuedRequests.isEmpty() ||
                    !committedRequests.isEmpty()) {
                wakeup();
            }
        }
    }
}

In the above code, the request object is first passed to the next request processor for processing. After processing is complete, the following operations are performed:

  1. Set the current request to the currentlyCommitting variable.
  2. If the numRequestsProcessing value is 0, and if either of the following two conditions is met, a notification operation (thread wakeup) will be performed:
    1. The member variable queuedRequests is not empty.
    2. The member variable committedRequests is not empty.

Finally, let's analyze the processCommitted method. The specific processing code is as follows:

protected void processCommitted() {
  // Request object
  Request request;

  // Execute if conditions are met
  // 1. Work has not stopped
  // 2. numRequestsProcessing count is 0
  // 3. Data retrieved from committedRequests is not null
  if (!stopped && !isProcessingRequest() &&
      (committedRequests.peek() != null)) {
    // 1. Data exists in nextPending
    // 2. queuedRequests is not empty
    if (!isWaitingForCommit() && !queuedRequests.isEmpty()) {
      return;
    }
    // Retrieve and process the request
    request = committedRequests.poll();
    // Get the next request from nextPending
    Request pending = nextPending.get();
    // 1. The next request is not null
    // 2. The sessionId of the next request matches the current request
    // 3. The cxid of the next request matches the current request
    if (pending != null &&
        pending.sessionId == request.sessionId &&
        pending.cxid == request.cxid) {
      pending.setHdr(request.getHdr());
      pending.setTxn(request.getTxn());
      pending.zxid = request.zxid;
      currentlyCommitting.set(pending);
      nextPending.set(null);
      sendToNextProcessor(pending);
    }
    else {
      currentlyCommitting.set(request);
      sendToNextProcessor(request);
    }
  }
}

In the above code, the entire processing flow must first meet the following three conditions to proceed:

  1. The member variable stopped is true, indicating work has not stopped.
  2. The member variable numRequestsProcessing has a value of 0.
  3. Data retrieved from committedRequests is not null.

After meeting these three conditions, the specific processing flow is as follows:

  1. If data exists in nextPending and queuedRequests is not empty, end processing.

  2. Retrieve a request from committedRequests.

  3. Get the next request from nextPending.

  4. Check if the following three conditions are met:

    1. The next request is not null
    2. The sessionId of the next request matches the current request
    3. The cxid of the next request matches the current request

If these three conditions are met, set the request information obtained in step (2) to the next request, including hdr, txn, and zxid. After setting, place the next request in the currentlyCommitting variable, clear the data in nextPending, and pass it to the sendToNextProcessor method for processing. If these three conditions are not met, place the request object obtained in step (2) into the member variable currentlyCommitting and pass it to the sendToNextProcessor method for processing.

Analysis of ProposalRequestProcessor

In this section, we'll analyze the ProposalRequestProcessor, which is responsible for handling proposal requests in ZooKeeper. The ProposalRequestProcessor class contains three member variables, detailed in the following table:

Variable NameVariable TypeDescription
zksLeaderZooKeeperServerThe ZooKeeper leader server.
nextProcessorRequestProcessorThe next request processor, specifically a CommitProcessor.
syncProcessorSyncRequestProcessorThe synchronous request processor.

After understanding the member variables, let's analyze the processRequest method. The specific processing code is as follows:

public void processRequest(Request request) throws RequestProcessorException {
  if (request instanceof LearnerSyncRequest) {
    zks.getLeader().processSync((LearnerSyncRequest) request);
  } else {
    nextProcessor.processRequest(request);
    if (request.getHdr() != null) {
      try {
        // Send to other services
        zks.getLeader().propose(request);
      } catch (XidRolloverException e) {
        throw new RequestProcessorException(e.getMessage(), e);
      }
      syncProcessor.processRequest(request);
    }
  }
}

The processing flow in the pendingSyncs method is as follows: It checks if the request is of LearnerSyncRequest type. If so, it's handed over to the Leader for processing, specifically sending it to the Leader server for request handling. If it's not a LearnerSyncRequest type, the request is passed to the next request processor. Additionally, if the request is not a LearnerSyncRequest type and contains an hdr attribute, it will be sent to other ZooKeeper services. After sending, the synchronous request processor is called to process the request.

Analysis of SyncRequestProcessor

This section analyzes the SyncRequestProcessor, which handles synchronous requests in ZooKeeper. In the SyncRequestProcessor class, request processing involves placing request objects into the member variable queuedRequests. The actual request processing occurs in the run method, as shown in the following code:

@Override
public void run() {
  try {
    // Log counter
    int logCount = 0;
    // Random snapshot count to avoid full snapshot logging
    int randRoll = r.nextInt(snapCount / 2);
    while (true) {
      Request si = null;
      // Retrieve request object from queuedRequests collection
      if (toFlush.isEmpty()) {
        si = queuedRequests.take();
      } else {
        si = queuedRequests.poll();
        if (si == null) {
          flush(toFlush);
          continue;
        }
      }
      // Break the loop if the request object matches the death request
      if (si == requestOfDeath) {
        break;
      }
      if (si != null) {
        // Append current request to ZooKeeper database
        if (zks.getZKDatabase().append(si)) {
          // Increment log counter
          logCount++;
          // Check log counter
          if (logCount > (snapCount / 2 + randRoll)) {
            randRoll = r.nextInt(snapCount / 2);
            // Roll the log
            zks.getZKDatabase().rollLog();
            // Check if snapshot generation is needed
            if (snapInProcess != null && snapInProcess.isAlive()) {
              LOG.warn("Too busy to snap, skipping");
            }
            else {
              snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                public void run() {
                  try {
                    // Generate snapshot
                    zks.takeSnapshot();
                  } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                  }
                }
              };
              // Start the thread
              snapInProcess.start();
            }
            // Reset counter
            logCount = 0;
          }
        }
        // If toFlush is empty
        else if (toFlush.isEmpty()) {
          if (nextProcessor != null) {
            nextProcessor.processRequest(si);
            if (nextProcessor instanceof Flushable) {
              ((Flushable) nextProcessor).flush();
            }
          }
          continue;
        }
        // Add to pending flush requests
        toFlush.add(si);
        // Perform write operation if over 1000 requests
        if (toFlush.size() > 1000) {
          flush(toFlush);
        }
      }
    }
  } catch (Throwable t) {
    handleException(this.getName(), t);
  } finally {
    running = false;
  }
  LOG.info("SyncRequestProcessor exited!");
}

The core processing flow in the above code is as follows:

  1. Set the log counter (logCount) with an initial value of 0.
  2. Randomly generate a snapshot count, with a range from 0 to half of the total snapshot count.
  3. Enter an infinite loop for request output processing, with the following steps:
    1. Create the current operation request, stored in the variable si.
    2. If toFlush (the collection of pending write requests) is empty, retrieve a request from queuedRequests and assign it to si. If toFlush is not empty, get a request from queuedRequests and assign it to si. If si is null, perform a flush operation and skip the current processing.
    3. If si matches the death request, break the loop and end processing.
    4. If si is not null, perform the following operations:
      1. Attempt to append the current request data to the ZooKeeper database. If successful, increment the log counter and check if it exceeds snapCount / 2 + randRoll. If it does, perform a log roll operation. After rolling, check if snapInProcess is not null and alive. If not, log a message; otherwise, create and start the snapInProcess thread, which primarily creates a snapshot.
      2. If toFlush is empty and the next request processor is not null, process the current request si. If the next processor implements the Flushable interface, call its flush method after processing. Skip to the next iteration after completing these operations.
      3. Write the request to toFlush, and perform a flush operation when toFlush contains more than 1000 elements.

The flush method is used multiple times in the above process. Here's the detailed code for the flush method:

private void flush(LinkedList<Request> toFlush)
  throws IOException, RequestProcessorException {
  if (toFlush.isEmpty()) {
    return;
  }

  zks.getZKDatabase().commit();
  while (!toFlush.isEmpty()) {
    Request i = toFlush.remove();
    if (nextProcessor != null) {
      nextProcessor.processRequest(i);
    }
  }
  if (nextProcessor != null && nextProcessor instanceof Flushable) {
    ((Flushable) nextProcessor).flush();
  }
}

The core processing flow in the flush method is as follows:

  1. Check if the collection of requests to be processed is empty. If empty, end processing.
  2. Perform a data commit operation, essentially a snapshot commit (org.apache.zookeeper.server.persistence.FileTxnLog#commit).
  3. Loop through the collection of requests to be processed until it's empty, with the following steps for each request:
    1. Retrieve a request from the collection.
    2. If the next request processor is not null, pass the request to it for processing.
  4. If the next request processor is not null and implements Flushable, call its flush method.

During the analysis of SyncRequestProcessor, we repeatedly mentioned the next request processor. In ZooKeeper, the next request processor for SyncRequestProcessor can be one of the following:

  1. In ProposalRequestProcessor, the next processor for SyncRequestProcessor is AckRequestProcessor.
  2. In ObserverZooKeeperServer, the next processor for SyncRequestProcessor is null.
  3. In FollowerZooKeeperServer, the next processor for SyncRequestProcessor is SendAckRequestProcessor.
  4. In ZooKeeperServer, the next processor for SyncRequestProcessor is FinalRequestProcessor.

Analysis of ToBeAppliedRequestProcessor

This section analyzes the ToBeAppliedRequestProcessor class, which primarily filters proposals. It's important to note that the next request processor it carries must be of type FinalRequestProcessor; otherwise, an exception will be thrown (in the constructor).

public void processRequest(Request request) throws RequestProcessorException {
  next.processRequest(request);
  // If the hdr (transaction header) attribute in the request is not null
  if (request.getHdr() != null) {
    // Get zxid from the header information
    long zxid = request.getHdr().getZxid();
    // Get the proposal collection from the leader
    Iterator<Proposal> iter = leader.toBeApplied.iterator();
    // Iterate through the proposal collection
    if (iter.hasNext()) {
      Proposal p = iter.next();
      // If the request object in the proposal is not null and its zxid matches the current zxid, remove the current proposal and end processing
      if (p.request != null && p.request.zxid == zxid) {
        iter.remove();
        return;
      }
    }
    LOG.error("Committed request not found on toBeApplied: "
              + request);
  }
}

The core processing flow in the above code is as follows:

  1. Pass the request to the next request processor (FinalRequestProcessor) for processing.
  2. If the hdr (transaction header) attribute of the request object is not null, perform the following operations:
    1. Get the zxid from the header information.
    2. Obtain the proposal collection from the leader variable.
    3. Loop through the proposal collection. If the request object in the proposal is not null and its zxid matches the previously obtained zxid, remove it from the proposal collection.

Analysis of SendAckRequestProcessor

This section analyzes the SendAckRequestProcessor, which is responsible for sending ACK data packets. It has a member variable called learner, which is of type Learner. In practice, this type usually appears as a Follower or Observer, with Learner serving more as an encapsulation of many common functions. The request processing code in the SendAckRequestProcessor class is as follows:

public void processRequest(Request si) {


  // If the request type is not sync
  if (si.type != OpCode.sync) {
    // Create a data packet
    QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
                                       null);
    try {
      // Write out the data packet
      learner.writePacket(qp, false);
    } catch (IOException e) {
      LOG.warn("Closing connection to leader, exception during packet send", e);
      try {
        if (!learner.sock.isClosed()) {
          learner.sock.close();
        }
      } catch (IOException e1) {
        LOG.debug("Ignoring error closing the connection", e1);
      }
    }
  }
}

The core processing flow in the above code is as follows:

  1. Check if the request type is sync; if so, skip processing.
  2. Assemble the ACK data packet.
  3. Write out the data packet through the learner member variable. The essence of writing out is through the writeRecord provided by the OutputArchive interface.

Analysis of ReadOnlyRequestProcessor

This section analyzes the ReadOnlyRequestProcessor class, which serves the ReadOnlyZooKeeperServer class. It is the first request processor for read-only services, and its next request processor is PrepRequestProcessor. The specific code can be seen in the ReadOnlyZooKeeperServer#setupRequestProcessors method, as follows:

@Override
protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
  ((PrepRequestProcessor) prepProcessor).start();
  firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
  ((ReadOnlyRequestProcessor) firstProcessor).start();
}

Returning to the ReadOnlyRequestProcessor class itself, let's examine the request processing method:

@Override
public void processRequest(Request request) {
  if (!finished) {
    queuedRequests.add(request);
  }
}

In the above code, it checks whether processing is complete. If not, it adds the request to the queuedRequests container for processing. The core request processing relies on threads, with the specific processing content in the run method, as follows:

public void run() {
  try {
    while (!finished) {
      // Get a request from the request list
      Request request = queuedRequests.take();
      // Log recording
      long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
      if (request.type == OpCode.ping) {
        traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
      }
      if (LOG.isTraceEnabled()) {
        ZooTrace.logRequest(LOG, traceMask, 'R', request, "");
      }

      // If the request is the same as the death request, end processing
      if (Request.requestOfDeath == request) {
        break;
      }
      // Filter requests
      switch (request.type) {
        case OpCode.sync:
        case OpCode.create:
        case OpCode.create2:
        case OpCode.createTTL:
        case OpCode.createContainer:
        case OpCode.delete:
        case OpCode.deleteContainer:
        case OpCode.setData:
        case OpCode.reconfig:
        case OpCode.setACL:
        case OpCode.multi:
        case OpCode.check:
          // Construct a new header object
          ReplyHeader hdr = new ReplyHeader(request.cxid, zks.getZKDatabase()
                                            .getDataTreeLastProcessedZxid(), Code.NOTREADONLY.intValue());
          try {
            // Send response
            request.cnxn.sendResponse(hdr, null, null);
          } catch (IOException e) {
            LOG.error("IO exception while sending response", e);
          }
          // Continue to the next processing
          continue;
      }
      // Next request processor handler
      if (nextProcessor != null) {
        nextProcessor.processRequest(request);
      }
    }
  } catch (RequestProcessorException e) {
    if (e.getCause() instanceof XidRolloverException) {
      LOG.info(e.getCause().getMessage());
    }
    handleException(this.getName(), e);
  } catch (Exception e) {
    handleException(this.getName(), e);
  }
  LOG.info("ReadOnlyRequestProcessor exited loop!");
}

The core processing in the above code occurs in a while loop, with the loop condition being the finished variable is true. The single processing flow is as follows:

  1. Get a request from the request list.

  2. Perform log recording.

  3. Check if the request obtained in step (1) is the same as the death request; if so, end the loop.

  4. If the request type is any of sync, create, create2, createTTL, createContainer, delete, deleteContainer, setData, reconfig, setACL, multi, or check, construct a new header object and send a response. After sending, proceed to the next request processing.

  5. If the next request processor is not null, pass the request to the next request processor for processing.

Analysis of UnimplementedRequestProcessor

This section analyzes the UnimplementedRequestProcessor class, which is used when request validation fails. The entry method can be found in org.apache.zookeeper.server.ZooKeeperServer#submitRequest. The request processing code in the UnimplementedRequestProcessor class is as follows:

public void processRequest(Request request) throws RequestProcessorException {
  // Create exception object
  KeeperException ke = new KeeperException.UnimplementedException();
  // Write exception information to the request
  request.setException(ke);
  // Assemble response
  ReplyHeader rh = new ReplyHeader(request.cxid, request.zxid,
                                   ke.code().intValue());
  try {
    // Send response
    request.cnxn.sendResponse(rh, null, "response");
  }
  catch (IOException e) {
    throw new RequestProcessorException("Can't send the response", e);
  }
  // Send close session request
  request.cnxn.sendCloseSession();
}

The core processing flow in the above code is as follows:

  1. Create an exception object, specifically of type UnimplementedException.
  2. Write the exception object to the request object.
  3. Assemble the ReplyHeader response object.
  4. Send the response.
  5. Send a close session request.

The core logic in this code is to create an exception object, return the exception information to the client, and send a close session request.

Analysis of ObserverRequestProcessor

This section analyzes the ObserverRequestProcessor class, which is primarily responsible for forwarding requests to the Leader service. In the ObserverRequestProcessor class, the next request processor type is CommitProcessor, as defined in the ObserverZooKeeperServer#setupRequestProcessors method. The detailed code is as follows:

@Override
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor,
            Long.toString(getServerId()), true,
            getZooKeeperServerListener());
    commitProcessor.start();
    firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
    ((ObserverRequestProcessor) firstProcessor).start();
    if (syncRequestProcessorEnabled) {
        syncProcessor = new SyncRequestProcessor(this, null);
        syncProcessor.start();
    }
}

Returning to the ObserverRequestProcessor class itself, let's examine the request processing method. The specific code is as follows:

public void processRequest(Request request) {
  // Check if processing is already completed
  if (!finished) {
    // Create an upgraded request object
    Request upgradeRequest = null;
    try {
      // Perform request upgrade operation through ObserverZooKeeperServer
      upgradeRequest = zks.checkUpgradeSession(request);
    } catch (KeeperException ke) {
      if (request.getHdr() != null) {
        request.getHdr().setType(OpCode.error);
        request.setTxn(new ErrorTxn(ke.code().intValue()));
      }
      request.setException(ke);
      LOG.info("Error creating upgrade request", ke);
    } catch (IOException ie) {
      LOG.error("Unexpected error in upgrade", ie);
    }
    // If the upgraded request object is not null
    if (upgradeRequest != null) {
      queuedRequests.add(upgradeRequest);
    }
    queuedRequests.add(request);
  }
}

The core processing flow in the above code is as follows:

  1. Check if processing is already completed; if so, no further processing is needed.
  2. Perform request upgrade operation through the zks variable (actual type is ObserverZooKeeperServer).
  3. If the request upgrade operation fails, set txn and exception data.
  4. If the upgraded request object is not null, add it to the request processing container (queuedRequests).
  5. Add the non-upgraded request object to the request processing container.

The processing of requests in the request processing container is executed in the run method. The specific processing code is as follows:

@Override
public void run() {
  try {
    // Check if already processed
    while (!finished) {
      // Get a request from the queue of requests to be processed
      Request request = queuedRequests.take();
      // Logging
      if (LOG.isTraceEnabled()) {
        ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
      }
      // End processing if the current request equals the death request
      if (request == Request.requestOfDeath) {
        break;
      }
      // Pass to the next request processor for processing
      nextProcessor.processRequest(request);

      switch (request.type) {
        case OpCode.sync:
          // Add the current request to pending syncs
          zks.pendingSyncs.add(request);
          // Send data packet
          zks.getObserver().request(request);
          break;
        case OpCode.create:
        case OpCode.create2:
        case OpCode.createTTL:
        case OpCode.createContainer:
        case OpCode.delete:
        case OpCode.deleteContainer:
        case OpCode.setData:
        case OpCode.reconfig:
        case OpCode.setACL:
        case OpCode.multi:
        case OpCode.check:
          zks.getObserver().request(request);
          break;
        case OpCode.createSession:
        case OpCode.closeSession:
          if (!request.isLocalSession()) {
            zks.getObserver().request(request);
          }
          break;
      }
    }
  } catch (Exception e) {
    handleException(this.getName(), e);
  }
  LOG.info("ObserverRequestProcessor exited loop!");
}

The core of the above code is a loop for processing, with the loop exit condition being when the variable finished is true. The processing flow for a single loop is as follows:

  1. Get a request from the queue of requests to be processed.

  2. Check if the request is a death request; if so, end processing.

  3. Pass the request to the next request processor for processing.

  4. Perform different operations based on the request type:

    1. If the request type is sync, add the request to the pendingSyncs collection and send the request to the Leader service through the Observer.

    2. If the request type is one of create, create2, createTTL, createContainer, delete, deleteContainer, setData, reconfig, setACL, multi, or check, send the request to the Leader service through the Observer.

    3. If the request type is createSession or closeSession, check if the request is local; if not, send the request to the Leader service through the Observer.

Analysis of AckRequestProcessor

This section analyzes the AckRequestProcessor class, which is used to handle Ack requests. The specific processing code is as follows:

public void processRequest(Request request) {
  QuorumPeer self = leader.self;
  if (self != null) {
    leader.processAck(self.getId(), request.zxid, null);
  }
  else {
    LOG.error("Null QuorumPeer");
  }
}

In the above code, the request processing is simple: it passes the current node id and the zxid from the request to the leader object for processing. The processing code of the leader object is as follows:

synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
  if (!allowedToCommit){
    return;
  }
  // Log recording
  if (LOG.isTraceEnabled()) {
    LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
    for (Proposal p : outstandingProposals.values()) {
      long packetZxid = p.packet.getZxid();
      LOG.trace("outstanding proposal: 0x{}",
                Long.toHexString(packetZxid));
    }
    LOG.trace("outstanding proposals all");
  }

  // Skip processing
  if ((zxid & 0xffffffffL) == 0) {
    return;
  }

  // Skip processing if the proposal container count is 0
  if (outstandingProposals.size() == 0) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("outstanding is 0");
    }
    return;
  }
  // Skip processing if the last committed zxid is greater than or equal to the parameter zxid
  if (lastCommitted >= zxid) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
                Long.toHexString(lastCommitted), Long.toHexString(zxid));
    }

    return;
  }

  // Get the proposal
  Proposal p = outstandingProposals.get(zxid);
  // Skip processing if the proposal is null
  if (p == null) {
    LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
             Long.toHexString(zxid), followerAddr);
    return;
  }

  // Add the ACKed service id to the proposal
  p.addAck(sid);

  // Try to commit the proposal
  boolean hasCommitted = tryToCommit(p, zxid, followerAddr);

  // 1. Proposal commit failed
  // 2. The request object in the proposal is not null and the type is reconfig
  if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
    long curZxid = zxid;
    while (allowedToCommit && hasCommitted && p != null) {
      curZxid++;
      p = outstandingProposals.get(curZxid);
      if (p != null) {
        hasCommitted = tryToCommit(p, curZxid, null);
      }
    }
  }
}

In the above code, the core operation is creating a proposal object p and attempting to send the proposal. The proposal data types sent here include:

  1. COMMITANDACTIVATE
  2. INFORMANDACTIVATE
  3. COMMIT
  4. INFORM

Analysis of FollowerRequestProcessor

This section analyzes the FollowerRequestProcessor class. The main responsibility of FollowerRequestProcessor is to forward requests to the Leader. The code for the request processing method is as follows:

public void processRequest(Request request) {
  if (!finished) {
    Request upgradeRequest = null;
    try {
      upgradeRequest = zks.checkUpgradeSession(request);
    } catch (KeeperException ke) {
      if (request.getHdr() != null) {
        request.getHdr().setType(OpCode.error);
        request.setTxn(new ErrorTxn(ke.code().intValue()));
      }
      request.setException(ke);
      LOG.info("Error creating upgrade request", ke);
    } catch (IOException ie) {
      LOG.error("Unexpected error in upgrade", ie);
    }
    if (upgradeRequest != null) {
      queuedRequests.add(upgradeRequest);
    }
    queuedRequests.add(request);
  }
}

The main processing flow in the above code is as follows:

  1. Check if processing is complete; if so, no further processing is done.
  2. Upgrade the request.
  3. If an exception occurs during the request upgrade, exception-related information is written.
  4. If the upgraded request object is not null, add it to the collection of requests to be processed.
  5. Add the non-upgraded request object to the collection of requests to be processed.

The request processing method puts the request into the queuedRequests collection. The core processing of requests is in the run method, as shown in the following code:

@Override
public void run() {
  try {
    while (!finished) {
      // Get a request from queuedRequests
      Request request = queuedRequests.take();
      if (LOG.isTraceEnabled()) {
        ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
                            'F', request, "");
      }
      // Request is the same as the death request
      if (request == Request.requestOfDeath) {
        break;
      }
      // Next request processor executes
      nextProcessor.processRequest(request);
      switch (request.type) {
        case OpCode.sync:
          zks.pendingSyncs.add(request);
          zks.getFollower().request(request);
          break;
        case OpCode.create:
        case OpCode.create2:
        case OpCode.createTTL:
        case OpCode.createContainer:
        case OpCode.delete:
        case OpCode.deleteContainer:
        case OpCode.setData:
        case OpCode.reconfig:
        case OpCode.setACL:
        case OpCode.multi:
        case OpCode.check:
          zks.getFollower().request(request);
          break;
        case OpCode.createSession:
        case OpCode.closeSession:
          if (!request.isLocalSession()) {
            zks.getFollower().request(request);
          }
          break;
      }
    }
  } catch (Exception e) {
    handleException(this.getName(), e);
  }
  LOG.info("FollowerRequestProcessor exited loop!");
}

In the above code, the core is a loop for processing, with the loop exit condition being the variable finished is true. The processing flow for a single loop is as follows:

  1. Get a request from the requests to be processed.

  2. Check if the request is a death request; if so, end processing.

  3. Pass the request to the next request processor for processing.

  4. Take different actions based on different request types:

    1. If the request type is sync, put the request into the pendingSyncs collection and send the request to the leader service through the Follower.

    2. If the request type is one of create, create2, createTTL, createContainer, delete, deleteContainer, setData, reconfig, setACL, multi, or check, send the request to the Leader service through the Follower.

    3. If the request type is createSession or closeSession, check if the request is local; if not, send the request to the Leader service through the Follower.

Analysis of FinalRequestProcessor

This section analyzes the FinalRequestProcessor class. FinalRequestProcessor is at the end of the entire request processing chain and is the last request processor. In the FinalRequestProcessor class, there is a lot of logic for request processing, which will be analyzed in sections. First, let's look at the code for handling transaction processing results:

ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
  // ZooKeeper processes the request
  rc = zks.processTxn(request);
  // Request header is not null
  if (request.getHdr() != null) {
    // Get header information
    TxnHeader hdr = request.getHdr();
    // Get the record from the request
    Record txn = request.getTxn();
    // Get zxid from the header
    long zxid = hdr.getZxid();
    // 1. The collection of incomplete change records is not empty
    // 2. Get a record from the collection of incomplete change records and its zxid is less than the current zxid
    while (!zks.outstandingChanges.isEmpty()
           && zks.outstandingChanges.peek().zxid <= zxid) {
      // Remove a data from the collection of incomplete change records
      ChangeRecord cr = zks.outstandingChanges.remove();
      // Warning log
      if (cr.zxid < zxid) {
        LOG.warn("Zxid outstanding " + cr.zxid
                 + " is less than current " + zxid);
      }
      // Get the record from the incomplete change records based on the removed data address, if the records are the same, remove the data from the incomplete change records
      if (zks.outstandingChangesForPath.get(cr.path) == cr) {
        zks.outstandingChangesForPath.remove(cr.path);
      }
    }
  }
  // If it's voting data, add it to the proposal data
  if (request.isQuorum()) {
    zks.getZKDatabase().addCommittedProposal(request);
  }
}

The main processing flow in this transaction result handling code is as follows:

  1. Process the request through the ZooKeeper service.

  2. If the request header is empty, no further processing is done.

  3. Get header information and record information from the request, and get zxid from the header information.

  4. Loop through the collection of incomplete records, specifically operating on the variables outstandingChanges and outstandingChangesForPath. The loop exit conditions are:

    1. The collection of incomplete change records (outstandingChanges) is not empty
    2. Get a record from the collection of incomplete change records and its zxid is less than the current zxid

    The specific processing flow for incomplete change records is as follows:

    1. Remove a record from outstandingChanges and treat it as the record to be processed.
    2. If the zxid of the record being operated on is less than the zxid in the header information, output a warning log.
    3. Get the record information from outstandingChangesForPath based on the address of the current operating record. If the obtained record information is the same as the current operating record, remove the data from outstandingChangesForPath.
  5. If it's voting data, add it to the proposal data.

After completing the transaction processing result handling code, different actions are taken based on the request type and connection object. The specific handling code is as follows:

// 1. Request type is close session
// 2. Connection object is null
if (request.type == OpCode.closeSession && connClosedByClient(request)) {
    // Execute close session operation
    if (closeSession(zks.serverCnxnFactory, request.sessionId) ||
            closeSession(zks.secureServerCnxnFactory, request.sessionId)) {
        return;
    }
}

// End processing if connection object is null
if (request.cnxn == null) {
    return;
}

The core processing flow in the above code is as follows:

  1. Check if both of the following conditions are met, if so, perform the session closing operation:
    1. The request type is close session
    2. The connection object in the request object is null
  2. End processing if the connection object in the request object is null.

In the above processing flow, it should be noted that during closure, two types of connection factories are used to close the session (both types of connection factories are ServerCnxnFactory), the first is a non-secure connection factory, and the second is a secure connection factory.

Next, we will analyze the preparatory work for response processing. The specific processing code is as follows:

// Get connection object
ServerCnxn cnxn = request.cnxn;
String lastOp = "NA";
// Decrement the number of requests being processed
zks.decInProcess();
// Status code
Code err = Code.OK;
// Processing result
Record rsp = null;
try {
    // 1. Header is not null
    // 2. Header type is error
    if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
        // Throw the exception in the request if it's not null, otherwise create a new exception and throw it
        if (request.getException() != null) {
            throw request.getException();
        } else {
            throw KeeperException.create(KeeperException.Code
                    .get(((ErrorTxn) request.getTxn()).getErr()));
        }
    }

    // Get the exception from the request object
    KeeperException ke = request.getException();
    // 1. Exception is not null
    // 2. Request type is multi
    if (ke != null && request.type != OpCode.multi) {
        throw ke;
    }
  // Some code omitted
}

The main process in the above code is to handle exceptions carried in the request, and the processing flow is as follows:

  1. If the header information of the request object is not empty and the type in the header information is an exception, an exception throwing operation will be performed. There are two types of throwing operations:
    1. If the exception object in the request object is not empty, throw it.
    2. If the exception object in the request object is empty, create a new KeeperException and throw it.
  2. If the conditions for executing process (1) are not met, the following operations will be performed:
    1. Get the exception from the request object.
    2. If the exception object is not empty and the request type is multi, throw the exception in the request object.

After handling the exception information, different response processing operations will be performed for different request types. First, let's analyze the ping operation and createSession operation. The specific processing code is as follows:

case OpCode.ping: {

  // Calculate some variables in service statistics based on request creation time
  zks.serverStats().updateLatency(request.createTime);

  // Set operation mark to PING
  lastOp = "PING";
  // Update response statistics
  cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                              request.createTime, Time.currentElapsedTime());

  // Send response
  cnxn.sendResponse(new ReplyHeader(-2,
                                    zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null,
                    "response");
  return;
}
case OpCode.createSession: {
  // Calculate some variables in service statistics based on request creation time
  zks.serverStats().updateLatency(request.createTime);

  // Set operation mark to SESS
  lastOp = "SESS";
  // Update response statistics
  cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
                              request.createTime, Time.currentElapsedTime());

  // Complete session creation
  zks.finishSessionInit(request.cnxn, true);
  return;
}

The processing flow in the above code is as follows:

  1. Calculate variables in service statistics based on request creation time:

    1. Variable totalLatency: Cumulative delay time for all requests.
    2. Variable count: Cumulative total number of requests.
    3. Variable minLatency: Minimum request delay.
    4. Variable maxLatency: Maximum request delay.
  2. Update response statistics, including:

    1. Variable lastCxid: Last operated cxid
    2. Variable lastZxid: Last operated zxid
    3. Variable lastOp: Last operation type
    4. Variable lastResponseTime: Last response time
    5. Variable lastLatency: Last delay time
    6. Variable minLatency: Minimum delay time
    7. Variable maxLatency: Maximum delay time
    8. Variable count: Cumulative number of request processing
    9. Variable totalLatency: Cumulative delay time
  3. If it's a ping request, a response will be sent. If it's a createSession request, the session creation operation will be completed.

Next, let's analyze the getData request. The specific processing code is as follows:

case OpCode.getData: {
  lastOp = "GETD";
  GetDataRequest getDataRequest = new GetDataRequest();
  ByteBufferInputStream.byteBuffer2Record(request.request,
                                          getDataRequest);
  // Get the data node corresponding to the address
  DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
  // Throw an exception if the data node is null
  if (n == null) {
    throw new KeeperException.NoNodeException();
  }
  // Verify ACL
  PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                                ZooDefs.Perms.READ,
                                request.authInfo);
  // Handle statistical information
  Stat stat = new Stat();
  byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                                         getDataRequest.getWatch() ? cnxn : null);
  rsp = new GetDataResponse(b, stat);
  break;
}

The core processing flow in the above code is as follows:

  1. Set operation type.
  2. Create a GetDataRequest object and deserialize data from the request object into the GetDataRequest object.
  3. Get the corresponding data node from the ZooKeeper database based on the address in the request object.
  4. Throw an exception if the data node is null.
  5. Verify ACL.
  6. Process statistical information and create a GetDataResponse response object.

When handling different request types, there are other operations, but due to the similarity of many operations, not all of them are analyzed. After the response preparation is complete, the response sending operation is performed. The specific code is as follows:

// Get the last zxid
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
// Create header information
ReplyHeader hdr =
        new ReplyHeader(request.cxid, lastZxid, err.intValue());

// Calculate service statistics variables based on request creation time
zks.serverStats().updateLatency(request.createTime);
// Update statistics information
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
        request.createTime, Time.currentElapsedTime());

try {
    // Send response
    cnxn.sendResponse(hdr, rsp, "response");
    // If the request type is closeSession, execute the closing operation
    if (request.type == OpCode.closeSession) {
        cnxn.sendCloseSession();
    }
} catch (IOException e) {
    LOG.error("FIXMSG", e);
}

The core processing flow in the above code is as follows:

  1. Retrieve the last zxid from the ZooKeeper database.
  2. Create header information.
  3. Update ZooKeeper service statistics.
  4. Update statistics in the connection object.
  5. Send the response.
  6. If the request type is closeSession, send the session closing information.

Summary

This chapter analyzes the request processors in the ZooKeeper project. In ZooKeeper, request processors are implemented as a chain of responsibility. Typically, the first request processor is PrepRequestProcessor, and the last one is FinalRequestProcessor. Most request processors run as threads. The processRequest method in request processors collects request objects, which are then processed in the thread's run method.

Key concepts:

  • ZooKeeper request processing
  • Chain of responsibility pattern
  • PrepRequestProcessor
  • FinalRequestProcessor
  • Multi-threaded request handling
  • ZooKeeper statistics and performance monitoring