zookeeper source code analysis

ZooKeeper Session Management

This chapter analyzes the session management aspects of the ZooKeeper project.

Server-side Processing of Session Requests

This section analyzes the server-side processing flow for session requests in ZooKeeper. The method responsible for handling is NIOServerCnxn#readConnectRequest, with the specific code as follows:

private void readConnectRequest() throws IOException, InterruptedException {
    if (!isZKServerRunning()) {
        throw new IOException("ZooKeeperServer not running");
    }
    zkServer.processConnectRequest(this, incomingBuffer);
    initialized = true;
}

In the above code, it first checks if the ZK service is running. If not, an exception is thrown to end processing. Otherwise, it's handed over to the processConnectRequest method for processing, with the following code:

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
  throws IOException {
  // Data reading related object construction
  BinaryInputArchive bia =
    BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
  ConnectRequest connReq = new ConnectRequest();
  // Deserialize connect data
  connReq.deserialize(bia, "connect");
  if (LOG.isDebugEnabled()) {
    LOG.debug("Session establishment request from client "
              + cnxn.getRemoteSocketAddress()
              + " client's lastZxid is 0x"
              + Long.toHexString(connReq.getLastZxidSeen()));
  }
  // Check if read-only
  boolean readOnly = false;
  try {
    readOnly = bia.readBool("readOnly");
    cnxn.isOldClient = false;
  } catch (IOException e) {
    LOG.warn("Connection request from old client "
             + cnxn.getRemoteSocketAddress()
             + "; will be dropped if server is in r-o mode");
  }
  // Throw exception if not read-only and current ZK service is read-only
  if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
    String msg = "Refusing session request for not-read-only client "
      + cnxn.getRemoteSocketAddress();
    LOG.info(msg);
    throw new CloseRequestException(msg);
  }
  // Check if the zxid in the connection request is greater than the maximum zxid in the data tree
  if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
    String msg = "Refusing session request for client "
      + cnxn.getRemoteSocketAddress()
      + " as it has seen zxid 0x"
      + Long.toHexString(connReq.getLastZxidSeen())
      + " our last zxid is 0x"
      + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
      + " client must try another server";

    LOG.info(msg);
    throw new CloseRequestException(msg);
  }
  // Read session timeout
  int sessionTimeout = connReq.getTimeOut();
  // Read password
  byte passwd[] = connReq.getPasswd();
  // Session timeout related processing
  int minSessionTimeout = getMinSessionTimeout();
  if (sessionTimeout < minSessionTimeout) {
    sessionTimeout = minSessionTimeout;
  }
  int maxSessionTimeout = getMaxSessionTimeout();
  if (sessionTimeout > maxSessionTimeout) {
    sessionTimeout = maxSessionTimeout;
  }
  // Set timeout
  cnxn.setSessionTimeout(sessionTimeout);
  // Disable data reception before session establishment is complete
  cnxn.disableRecv();
  // Get session id
  long sessionId = connReq.getSessionId();
  // If session id equals 0
  if (sessionId == 0) {
    // Recalculate session id
    long id = createSession(cnxn, passwd, sessionTimeout);
    LOG.debug("Client attempting to establish new session:" +
              " session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
              Long.toHexString(id),
              Long.toHexString(connReq.getLastZxidSeen()),
              connReq.getTimeOut(),
              cnxn.getRemoteSocketAddress());
  }
  // If session id is non-zero
  else {
    // Get session id from connection request
    long clientSessionId = connReq.getSessionId();
    LOG.debug("Client attempting to renew session:" +
              " session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
              Long.toHexString(clientSessionId),
              Long.toHexString(connReq.getLastZxidSeen()),
              connReq.getTimeOut(),
              cnxn.getRemoteSocketAddress());
    // Close session if serverCnxnFactory is not null
    if (serverCnxnFactory != null) {
      serverCnxnFactory.closeSession(sessionId);
    }
    // Close session if secureServerCnxnFactory is not null
    if (secureServerCnxnFactory != null) {
      secureServerCnxnFactory.closeSession(sessionId);
    }
    // Set session id
    cnxn.setSessionId(sessionId);
    // Reopen session
    reopenSession(cnxn, sessionId, passwd, sessionTimeout);
  }
}

The core processing flow in the above code is as follows:

  1. Create objects related to reading data.

  2. Read readOnly data. If readOnly is false and the current ZK service is read-only, an exception is thrown.

  3. Get the zxid from the connection request (this zxid is the client's maximum zxid), get the maximum processed zxid from the data tree. If the former is greater than the latter, an exception is thrown.

  4. Calculate the session timeout and assign it to cnxn (server connection object).

  5. Execute ServerCnxn#disableRecv method to prevent data reception before the session is established.

  6. Get the session id from the connection request. If the session id is 0, calculate the sessionId through createSession. Note that this id will not overwrite the session id in the connection request, it's only output in debug-level logs. For non-zero session ids, the process is as follows:

    1. Get the session id from the connection request object.
    2. If variables serverCnxnFactory and secureServerCnxnFactory are not null, perform session closing operations. The ServerCnxn object is closed.
  7. Set the session id for the connection object.

  8. Reopen the session through the reopenSession method.

The most important method in processConnectRequest is reopenSession. Let's analyze the reopenSession method, with the specific processing code as follows:

public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
                          int sessionTimeout) throws IOException {
  // Check password
  if (checkPasswd(sessionId, passwd)) {
    // Validate session
    revalidateSession(cnxn, sessionId, sessionTimeout);
  }
  else {
    LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
             + " for session 0x" + Long.toHexString(sessionId));
    // Complete session creation
    finishSessionInit(cnxn, false);
  }
}

The processing flow in the above code: password verification is performed through the checkPasswd method. If the password verification passes, the session is revalidated through the revalidateSession method. Otherwise, session creation is completed through the finishSessionInit method. In the revalidateSession method, finishSessionInit is still called at the end, with the difference being that the second parameter is calculated through the SessionTracker#touchSession method.

Next, let's analyze the finishSessionInit method, with the specific processing code as follows:

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
  try {
    // If validation passes
    if (valid) {
      // Register connection object
      if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
        serverCnxnFactory.registerConnection(cnxn);
      } else if (secureServerCnxnFactory != null
                 && secureServerCnxnFactory.cnxns.contains(cnxn)) {
        secureServerCnxnFactory.registerConnection(cnxn);
      }
    }
  } catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
  }

  try {
    // Create connection response object
    ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                                              : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                                              // longer valid
                                              valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
    bos.writeInt(-1, "len");
    // Write connection object to bos
    rsp.serialize(bos, "connect");
    // Check if it's an old client version
    if (!cnxn.isOldClient) {
      // Set read-only status
      bos.writeBool(
        this instanceof ReadOnlyZooKeeperServer, "readOnly");
    }
    // Close baos
    baos.close();
    // Convert baos to byte stream and send through connection object
    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
    bb.putInt(bb.remaining() - 4).rewind();
    cnxn.sendBuffer(bb);

    // Validation passed
    if (valid) {
      LOG.debug("Established session 0x"
                + Long.toHexString(cnxn.getSessionId())
                + " with negotiated timeout " + cnxn.getSessionTimeout()
                + " for client "
                + cnxn.getRemoteSocketAddress());
      // Resume data reception
      cnxn.enableRecv();
    } else {

      LOG.info("Invalid session 0x"
               + Long.toHexString(cnxn.getSessionId())
               + " for client "
               + cnxn.getRemoteSocketAddress()
               + ", probably expired");
      // Send close connection message
      cnxn.sendBuffer(ServerCnxnFactory.closeConn);
    }

  } catch (Exception e) {
    LOG.warn("Exception while establishing session, closing", e);
    // Close connection object
    cnxn.close();
  }
}

The core processing flow in the above code is as follows:

  1. Check if validation passed. If passed, register the connection object with serverCnxnFactory or secureServerCnxnFactory.
  2. Create a connection response object.
  3. Create a BinaryOutputArchive object and write the following data to it:
    1. tag: len, value: -1
    2. tag: connect, value: response object
    3. If the client used is not an old version, tag: readOnly, value: whether the current service is of ReadOnlyZooKeeperServer type
  4. The connection object sends the data, which is the data from step 3.
  5. If validation passed, restore the connection object's ability to receive data; otherwise, send a message to close the connection.

SessionTracker Overview

This section provides a brief overview of the SessionTracker interface, which is used for session management in the ZooKeeper project. It can also be understood as session tracking. This interface defines thirteen methods and two interfaces. Information about the methods is shown in the table below:

Method NameMethod PurposeMethod Parameters
createSessionCreate session id1. sessionTimeout: session expiration time
addGlobalSessionAdd global session1. id: session id<br />2. to: session expiration time
addSessionAdd session1. id: session id<br />2. to: session expiration time
touchSessionCheck if session is active1. sessionId: session id<br />2. sessionTimeout: session timeout
setSessionClosingSet session to closing state1. sessionId: session id
shutdownShutdown SessionTracker1. sessionId: session id
removeSessionRemove session1. sessionId: session id
isTrackingSessionCheck if tracking session id1. sessionId: session id
checkSessionCheck session1. sessionId: session id<br />2. owner: session owner
checkGlobalSessionCheck global session1. sessionId: session id<br />2. owner: session owner
setOwnerSet session owner1. id: session id<br />2. owner: session owner
dumpSessionsDump sessions1. pwriter: output writer
getSessionExpiryMapGet expired sessions

The implementation class diagram of the SessionTracker interface in ZooKeeper is shown in the figure below.

SessionTracker.png

Next, let's explain the internal Session interface, defined as follows:

interface Session {
  long getSessionId();

  int getTimeout();

  boolean isClosing();
}

The Session interface defines three methods:

  1. The getSessionId method is used to get the sessionId.
  2. The getTimeout method is used to get the expiration time.
  3. The isClosing method is used to determine if it's closed.

In ZooKeeper, the implementation class of the Session interface is SessionImpl, which has four member variables, detailed in the table below:

Variable NameVariable TypeVariable Description
sessionIdlongSession id
timeoutintSession timeout
isClosingbooleanWhether in closing state
ownerObjectOwner

Finally, let's explain the internal SessionExpirer interface, defined as follows:

interface SessionExpirer {
  void expire(Session session);

  long getServerId();
}

Analysis of getSessionExpiryMap Method

This section analyzes the getSessionExpiryMap method, which primarily involves converting one data structure to another. The detailed code is as follows:

synchronized public Map<Long, Set<Long>> getSessionExpiryMap() {
    Map<Long, Set<SessionImpl>> expiryMap = sessionExpiryQueue.getExpiryMap();
    Map<Long, Set<Long>> sessionExpiryMap = new TreeMap<Long, Set<Long>>();
    for (Entry<Long, Set<SessionImpl>> e : expiryMap.entrySet()) {
        Set<Long> ids = new HashSet<Long>();
        sessionExpiryMap.put(e.getKey(), ids);
        for (SessionImpl s : e.getValue()) {
            ids.add(s.sessionId);
        }
    }
    return sessionExpiryMap;
}

The core processing flow in the above code is as follows:

  1. Retrieve expiration information from the sessionExpiryQueue variable, where the key represents the expiration time and the value represents the set of expired sessions.
  2. Convert the expiration information from step (1) into another map structure, where the key represents the expiration time and the value represents the set of session IDs.

Analysis of touchSession Method

This section analyzes the touchSession method, which primarily aims to determine if a session is in an active (valid) state. The specific processing code is as follows:

synchronized public boolean touchSession(long sessionId, int timeout) {
  // Get session by session ID
  SessionImpl s = sessionsById.get(sessionId);

  // Return false if session is null
  if (s == null) {
    logTraceTouchInvalidSession(sessionId, timeout);
    return false;
  }

  // Return false if session is in closing state
  if (s.isClosing()) {
    logTraceTouchClosingSession(sessionId, timeout);
    return false;
  }

  // Update expiration time
  updateSessionExpiry(s, timeout);
  return true;
}

private void updateSessionExpiry(SessionImpl s, int timeout) {
  logTraceTouchSession(s.sessionId, timeout, "");
  sessionExpiryQueue.update(s, timeout);
}

The processing flow in the above code is as follows:

  1. Retrieve the session from the sessionsById container using the session ID.
  2. Check if the session is null. If it is, log the information and return false.
  3. Check if the session is in a closing state. If it is, log the information and return false.
  4. Update the session expiration time using the updateSessionExpiry method and return true.

Analysis of setSessionClosing Method

This section analyzes the setSessionClosing method, which is responsible for modifying the closing flag. The specific processing code is as follows:

synchronized public void setSessionClosing(long sessionId) {
    if (LOG.isTraceEnabled()) {
        LOG.trace("Session closing: 0x" + Long.toHexString(sessionId));
    }
    SessionImpl s = sessionsById.get(sessionId);
    if (s == null) {
        return;
    }
    s.isClosing = true;
}

The core processing flow in the above code is as follows:

  1. Retrieve the session from the sessionsById container using the session ID.
  2. If the session is null, end processing. If the session is not null, set the isClosing property to true.

Analysis of removeSession Method

This section analyzes the removeSession method, which mainly operates on three variables to remove the corresponding session-related data. The specific code is as follows:

synchronized public void removeSession(long sessionId) {
    LOG.debug("Removing session 0x" + Long.toHexString(sessionId));
    SessionImpl s = sessionsById.remove(sessionId);
    sessionsWithTimeout.remove(sessionId);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                "SessionTrackerImpl --- Removing session 0x"
                        + Long.toHexString(sessionId));
    }
    if (s != null) {
        sessionExpiryQueue.remove(s);
    }
}

The core processing flow in the above code is as follows:

  1. Remove a session from the sessionsById container using the session ID and temporarily store it in memory.
  2. Remove the session from the sessionsWithTimeout container using the session ID.
  3. If the session temporarily stored in memory from step (1) exists, remove it from the sessionExpiryQueue variable.

Analysis of checkSession Method

This section analyzes the checkSession method, which mainly checks the following three items:

  1. Whether the session corresponding to the session ID exists.
  2. Whether the session has expired.
  3. Whether the session owner matches the parameter owner.

The code for checking these three items is as follows:

public synchronized void checkSession(long sessionId, Object owner)
        throws KeeperException.SessionExpiredException,
        KeeperException.SessionMovedException,
        KeeperException.UnknownSessionException {
    LOG.debug("Checking session 0x" + Long.toHexString(sessionId));
    // Get session
    SessionImpl session = sessionsById.get(sessionId);

    // Throw exception if session is null
    if (session == null) {
        throw new KeeperException.UnknownSessionException();
    }

    // Throw exception if session is closed
    if (session.isClosing()) {
        throw new KeeperException.SessionExpiredException();
    }

    // Assign owner if session owner is null
    if (session.owner == null) {
        session.owner = owner;
    } 
    // Throw exception if session owner doesn't match parameter owner
    else if (session.owner != owner) {
        throw new KeeperException.SessionMovedException();
    }
}

Analysis of run Method

Since the SessionTrackerImpl class implements ZooKeeperCriticalThread, it is a thread object, so the analysis of the run method is particularly important. The specific processing code is as follows:

@Override
public void run() {
  try {
    while (running) {
      // Get wait time
      long waitTime = sessionExpiryQueue.getWaitTime();
      if (waitTime > 0) {
        Thread.sleep(waitTime);
        continue;
      }

      // Get session from session expiry queue
      for (SessionImpl s : sessionExpiryQueue.poll()) {
        // Set closing state
        setSessionClosing(s.sessionId);
        // Perform expiration interface operation
        expirer.expire(s);
      }
    }
  } catch (InterruptedException e) {
    handleException(this.getName(), e);
  }
  LOG.info("SessionTrackerImpl exited loop!");
}

The core operation in the above code is to periodically set the session to a closing state and call the expiration interface. The overall process is handled in a dead loop, with the condition for exiting the loop being the running flag set to false. To set it to false, the shutdown method needs to be called. In each loop, the following operations are performed:

  1. Get the wait time from the sessionExpiryQueue variable. If the wait time is greater than 0, wait (thread wait).
  2. Get the sessions that need to be closed from the sessionExpiryQueue variable, set the isClosing property to true, and call the expiration interface method (SessionTracker.SessionExpirer#expire).

Next, let's explain the method for getting the wait time. The specific processing code is as follows:

public long getWaitTime() {
  long now = Time.currentElapsedTime();
  long expirationTime = nextExpirationTime.get();
  return now < expirationTime ? (expirationTime - now) : 0L;
}

The core processing flow in the above code is as follows:

  1. Get the current time and expiration time.
  2. Check if the current time is less than the expiration time. If so, return the difference between the expiration time and the current time as the wait time. If not, return 0 to immediately start executing the session closing operation.

Finally, let's analyze the expire method. In ZooKeeper, this method is implemented by the ZooKeeperServer class. The specific code is as follows:

public void expire(Session session) {
  long sessionId = session.getSessionId();
  LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of "
           + session.getTimeout() + "ms exceeded");
  close(sessionId);
}

The processing flow in the above code: Get the session ID from the session object and then call the close method.

Analysis of ExpiryQueue

This section analyzes the ExpiryQueue class, which is mainly used to store expiration information and is commonly used for session expiration handling. First, we need to understand the member variables of this class. The specific information is shown in the table below:

Variable NameVariable TypeVariable Description
elemMapConcurrentHashMap<E, Long>Expiry container, key: expiry object, value: expiration time
expiryMapConcurrentHashMap<Long, Set<E>>Expiry container, key: expiration time, value: set of expiry objects
nextExpirationTimeAtomicLongNext expiration time
expirationIntervalintInterval period

After understanding the member variables, let's analyze the constructor. The specific constructor code is as follows:

public ExpiryQueue(int expirationInterval) {
    this.expirationInterval = expirationInterval;
    nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
}

In the constructor, the variables nextExpirationTime and expirationInterval are set. The latter is directly assigned from the constructor parameter, while the former needs to be calculated using the roundToNextInterval function. The calculation code is as follows:

private long roundToNextInterval(long time) {
    return (time / expirationInterval + 1) * expirationInterval;
}

The calculation logic in the above code is: (current timestamp (millisecond level) divided by expiration interval + 1) multiplied by expiration interval.

Next, let's analyze some methods in the ExpiryQueue class. First, let's analyze the update method used for adding or updating. The specific processing code is as follows:

public Long update(E elem, int timeout) {
  // Get expiration time from elemMap container based on element, meaning the previous expiration time
  Long prevExpiryTime = elemMap.get(elem);
  // Get current time
  long now = Time.currentElapsedTime();
  // Calculate next expiration time
  Long newExpiryTime = roundToNextInterval(now + timeout);

  // Check if new expiration time is the same as previous expiration time, if so, return null
  if (newExpiryTime.equals(prevExpiryTime)) {
    return null;
  }

  // First add the elem to the new expiry time bucket in expiryMap.
  // Get data corresponding to expiration time
  Set<E> set = expiryMap.get(newExpiryTime);
  if (set == null) {
    // Create set object
    set = Collections.newSetFromMap(
      new ConcurrentHashMap<E, Boolean>());
    // Add expiration data to expiryMap container
    Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
    // If historical expiration data exists, assign it to set variable
    if (existingSet != null) {
      set = existingSet;
    }
  }
  // Add parameter data to set collection
  set.add(elem);

  // Add elem data to elemMap container and get historical expiration time
  prevExpiryTime = elemMap.put(elem, newExpiryTime);
  // Historical expiration time is not null
  // New expiration time is different from historical expiration time
  if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
    // Get set of expired elements based on historical expiration time
    Set<E> prevSet = expiryMap.get(prevExpiryTime);
    // Remove current element if set of expired elements is not null
    if (prevSet != null) {
      prevSet.remove(elem);
    }
  }
  return newExpiryTime;
}

The core processing flow in the above code is as follows:

  1. Get the expiration time (historical expiration time) from the elemMap container based on the parameter element, stored in the variable prevExpiryTime.

  2. Get the current time.

  3. Calculate the next expiration time using the roundToNextInterval method, with calculation parameters: current time + expiration time (expiration time is passed as a parameter).

  4. Check if the next expiration time is the same as the historical expiration time. If so, return null and end processing.

  5. Get the set of data that needs to expire from the expiryMap container based on the expiration time, stored in the variable set.

  6. If the set of data that needs to expire is empty, create a collection object. Try to put the new expiration time and the newly created collection object into the expiryMap container, and get the historical data stored in the container. If historical data exists, set it to the variable set.

  7. Put the current method parameter elem into the variable set.

  8. Add the method parameter elem data to the elemMap container and get the historical expiration time.

  9. If the historical expiration time from step (8) is not null and this time is different from the expiration time in step (3), perform the following operations:

    1. Get the set of expired elements based on the expiration time from step (8). If the set of expired elements exists, remove the method parameter elem from it.
  10. Return the next expiration time from step (3).

Next, let's analyze the poll function, which is used to get expired data. The specific processing code is as follows:

public Set<E> poll() {
  // Get current time
  long now = Time.currentElapsedTime();
  // Get expiration time
  long expirationTime = nextExpirationTime.get();
  // If current time is less than expiration time, return empty set
  if (now < expirationTime) {
    return Collections.emptySet();
  }

  // Create container to store results
  Set<E> set = null;
  // Calculate new expiration time
  long newExpirationTime = expirationTime + expirationInterval;
  // Assign new expiration time to nextExpirationTime variable
  if (nextExpirationTime.compareAndSet(
    expirationTime, newExpirationTime)) {
    // Remove data corresponding to expiration time from expiryMap container and save data to result container
    set = expiryMap.remove(expirationTime);
  }
  // If result storage container is empty, create empty set
  if (set == null) {
    return Collections.emptySet();
  }
  return set;
}

The core processing flow in the above code is as follows:

  1. Get the current time.
  2. Get the expiration time from the member variable nextExpirationTime.
  3. If the current time is less than the expiration time, return an empty set.
  4. Calculate the new expiration time. Calculation rule: result from step (2) + expiration time interval.
  5. Assign the new expiration time to the member variable nextExpirationTime. If the assignment is successful, remove the relevant data from the expiryMap container based on the expiration time obtained in step (2), and put this data in the result container set.
  6. If the result container set is empty at this point, create and return an empty set; otherwise, return the result container set.

Analysis of UpgradeableSessionTracker

This section analyzes the UpgradeableSessionTracker class, which is the parent class of LearnerSessionTracker and LeaderSessionTracker. It contains two member variables, detailed in the following table:

Variable NameVariable TypeDescription
localSessionTrackerLocalSessionTrackerLocal session tracker
localSessionsWithTimeoutsConcurrentMap<Long, Integer>Local session expiration map, key: session id, value: expiry time

The LocalSessionTracker class, a subclass of SessionTrackerImpl, is used in the member variables. Its implementation is relatively simple, as shown below:

public class LocalSessionTracker extends SessionTrackerImpl {
  public LocalSessionTracker(SessionExpirer expirer,
                             ConcurrentMap<Long, Integer> sessionsWithTimeouts,
                             int tickTime, long id, ZooKeeperServerListener listener) {
    super(expirer, sessionsWithTimeouts, tickTime, id, listener);
  }

  /**
    * Checks if the session id is in the sessionsById container
    */
  public boolean isLocalSession(long sessionId) {
    return isTrackingSession(sessionId);
  }

  public boolean isGlobalSession(long sessionId) {
    return false;
  }

  /**
    * Always throws an exception
    */
  public boolean addGlobalSession(long sessionId, int sessionTimeout) {
    throw new UnsupportedOperationException();
  }
}

In the above code, the addGlobalSession method directly throws an exception. Other methods are relatively simple and won't be explained in detail. Let's analyze some key methods, starting with isTrackingSession:

public boolean isTrackingSession(long sessionId) {
    return isLocalSession(sessionId) || isGlobalSession(sessionId);
}

This code uses isLocalSession and isGlobalSession methods to determine if the session corresponding to the session id is being tracked. The latter is an abstract method that needs to be implemented by subclasses (LearnerSessionTracker or LeaderSessionTracker). The former is implemented as follows:

public boolean isLocalSession(long sessionId) {
    return localSessionTracker != null &&
            localSessionTracker.isTrackingSession(sessionId);
}

This code relies on the isTrackingSession method provided by the LocalSessionTracker class, which checks if the session id is in the SessionTrackerImpl#sessionsById container. In the UpgradeableSessionTracker class, besides the abstract isGlobalSession method, there's also a checkGlobalSession method that needs to be implemented by subclasses. If not implemented, it throws an exception by default.

Finally, let's analyze the upgradeSession method in the UpgradeableSessionTracker class. This method is used to upgrade a session from a local session to a global session:

public int upgradeSession(long sessionId) {
  if (localSessionsWithTimeouts == null) {
    return -1;
  }
  Integer timeout = localSessionsWithTimeouts.remove(sessionId);
  if (timeout != null) {
    LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
    addGlobalSession(sessionId, timeout);
    localSessionTracker.removeSession(sessionId);
    return timeout;
  }
  return -1;
}

The process flow in this code is as follows:

  1. Check if the localSessionsWithTimeouts container is empty. If empty, return -1 and end processing.
  2. Remove data from the localSessionsWithTimeouts container based on the session id and store the timeout temporarily.
  3. If the stored timeout is not null, add the session id to the global sessions, remove the local session, and return the stored timeout.
  4. If the stored timeout is null, return -1 and end processing.

In the upgradeSession method, a return value of -1 indicates that there is no session corresponding to the parameter session id.

Summary

This chapter analyzed the SessionTracker interface for session management (session tracking) in the ZooKeeper project. We started by examining the session connection establishment process, which led to the introduction of the session management interface. We then analyzed this interface itself and its implementing classes, providing a comprehensive overview of ZooKeeper's session management system.