ZooKeeper Session Management
This chapter analyzes the session management aspects of the ZooKeeper project.
Server-side Processing of Session Requests
This section analyzes the server-side processing flow for session requests in ZooKeeper. The method responsible for handling is NIOServerCnxn#readConnectRequest, with the specific code as follows:
private void readConnectRequest() throws IOException, InterruptedException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
zkServer.processConnectRequest(this, incomingBuffer);
initialized = true;
}
In the above code, it first checks if the ZK service is running. If not, an exception is thrown to end processing. Otherwise, it's handed over to the processConnectRequest method for processing, with the following code:
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
throws IOException {
// Data reading related object construction
BinaryInputArchive bia =
BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
// Deserialize connect data
connReq.deserialize(bia, "connect");
if (LOG.isDebugEnabled()) {
LOG.debug("Session establishment request from client "
+ cnxn.getRemoteSocketAddress()
+ " client's lastZxid is 0x"
+ Long.toHexString(connReq.getLastZxidSeen()));
}
// Check if read-only
boolean readOnly = false;
try {
readOnly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
LOG.warn("Connection request from old client "
+ cnxn.getRemoteSocketAddress()
+ "; will be dropped if server is in r-o mode");
}
// Throw exception if not read-only and current ZK service is read-only
if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client "
+ cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg);
}
// Check if the zxid in the connection request is greater than the maximum zxid in the data tree
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg);
}
// Read session timeout
int sessionTimeout = connReq.getTimeOut();
// Read password
byte passwd[] = connReq.getPasswd();
// Session timeout related processing
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
// Set timeout
cnxn.setSessionTimeout(sessionTimeout);
// Disable data reception before session establishment is complete
cnxn.disableRecv();
// Get session id
long sessionId = connReq.getSessionId();
// If session id equals 0
if (sessionId == 0) {
// Recalculate session id
long id = createSession(cnxn, passwd, sessionTimeout);
LOG.debug("Client attempting to establish new session:" +
" session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
}
// If session id is non-zero
else {
// Get session id from connection request
long clientSessionId = connReq.getSessionId();
LOG.debug("Client attempting to renew session:" +
" session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(clientSessionId),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
// Close session if serverCnxnFactory is not null
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId);
}
// Close session if secureServerCnxnFactory is not null
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId);
}
// Set session id
cnxn.setSessionId(sessionId);
// Reopen session
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
}
The core processing flow in the above code is as follows:
-
Create objects related to reading data.
-
Read readOnly data. If readOnly is false and the current ZK service is read-only, an exception is thrown.
-
Get the zxid from the connection request (this zxid is the client's maximum zxid), get the maximum processed zxid from the data tree. If the former is greater than the latter, an exception is thrown.
-
Calculate the session timeout and assign it to cnxn (server connection object).
-
Execute ServerCnxn#disableRecv method to prevent data reception before the session is established.
-
Get the session id from the connection request. If the session id is 0, calculate the sessionId through createSession. Note that this id will not overwrite the session id in the connection request, it's only output in debug-level logs. For non-zero session ids, the process is as follows:
- Get the session id from the connection request object.
- If variables serverCnxnFactory and secureServerCnxnFactory are not null, perform session closing operations. The ServerCnxn object is closed.
-
Set the session id for the connection object.
-
Reopen the session through the reopenSession method.
The most important method in processConnectRequest is reopenSession. Let's analyze the reopenSession method, with the specific processing code as follows:
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
int sessionTimeout) throws IOException {
// Check password
if (checkPasswd(sessionId, passwd)) {
// Validate session
revalidateSession(cnxn, sessionId, sessionTimeout);
}
else {
LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+ " for session 0x" + Long.toHexString(sessionId));
// Complete session creation
finishSessionInit(cnxn, false);
}
}
The processing flow in the above code: password verification is performed through the checkPasswd method. If the password verification passes, the session is revalidated through the revalidateSession method. Otherwise, session creation is completed through the finishSessionInit method. In the revalidateSession method, finishSessionInit is still called at the end, with the difference being that the second parameter is calculated through the SessionTracker#touchSession method.
Next, let's analyze the finishSessionInit method, with the specific processing code as follows:
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
try {
// If validation passes
if (valid) {
// Register connection object
if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
serverCnxnFactory.registerConnection(cnxn);
} else if (secureServerCnxnFactory != null
&& secureServerCnxnFactory.cnxns.contains(cnxn)) {
secureServerCnxnFactory.registerConnection(cnxn);
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
try {
// Create connection response object
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
// longer valid
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
// Write connection object to bos
rsp.serialize(bos, "connect");
// Check if it's an old client version
if (!cnxn.isOldClient) {
// Set read-only status
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
// Close baos
baos.close();
// Convert baos to byte stream and send through connection object
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
cnxn.sendBuffer(bb);
// Validation passed
if (valid) {
LOG.debug("Established session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " with negotiated timeout " + cnxn.getSessionTimeout()
+ " for client "
+ cnxn.getRemoteSocketAddress());
// Resume data reception
cnxn.enableRecv();
} else {
LOG.info("Invalid session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " for client "
+ cnxn.getRemoteSocketAddress()
+ ", probably expired");
// Send close connection message
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
}
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
// Close connection object
cnxn.close();
}
}
The core processing flow in the above code is as follows:
- Check if validation passed. If passed, register the connection object with serverCnxnFactory or secureServerCnxnFactory.
- Create a connection response object.
- Create a BinaryOutputArchive object and write the following data to it:
- tag: len, value: -1
- tag: connect, value: response object
- If the client used is not an old version, tag: readOnly, value: whether the current service is of ReadOnlyZooKeeperServer type
- The connection object sends the data, which is the data from step 3.
- If validation passed, restore the connection object's ability to receive data; otherwise, send a message to close the connection.
SessionTracker Overview
This section provides a brief overview of the SessionTracker interface, which is used for session management in the ZooKeeper project. It can also be understood as session tracking. This interface defines thirteen methods and two interfaces. Information about the methods is shown in the table below:
Method Name | Method Purpose | Method Parameters |
---|---|---|
createSession | Create session id | 1. sessionTimeout: session expiration time |
addGlobalSession | Add global session | 1. id: session id<br />2. to: session expiration time |
addSession | Add session | 1. id: session id<br />2. to: session expiration time |
touchSession | Check if session is active | 1. sessionId: session id<br />2. sessionTimeout: session timeout |
setSessionClosing | Set session to closing state | 1. sessionId: session id |
shutdown | Shutdown SessionTracker | 1. sessionId: session id |
removeSession | Remove session | 1. sessionId: session id |
isTrackingSession | Check if tracking session id | 1. sessionId: session id |
checkSession | Check session | 1. sessionId: session id<br />2. owner: session owner |
checkGlobalSession | Check global session | 1. sessionId: session id<br />2. owner: session owner |
setOwner | Set session owner | 1. id: session id<br />2. owner: session owner |
dumpSessions | Dump sessions | 1. pwriter: output writer |
getSessionExpiryMap | Get expired sessions |
The implementation class diagram of the SessionTracker interface in ZooKeeper is shown in the figure below.
Next, let's explain the internal Session interface, defined as follows:
interface Session {
long getSessionId();
int getTimeout();
boolean isClosing();
}
The Session interface defines three methods:
- The getSessionId method is used to get the sessionId.
- The getTimeout method is used to get the expiration time.
- The isClosing method is used to determine if it's closed.
In ZooKeeper, the implementation class of the Session interface is SessionImpl, which has four member variables, detailed in the table below:
Variable Name | Variable Type | Variable Description |
---|---|---|
sessionId | long | Session id |
timeout | int | Session timeout |
isClosing | boolean | Whether in closing state |
owner | Object | Owner |
Finally, let's explain the internal SessionExpirer interface, defined as follows:
interface SessionExpirer {
void expire(Session session);
long getServerId();
}
Analysis of getSessionExpiryMap Method
This section analyzes the getSessionExpiryMap method, which primarily involves converting one data structure to another. The detailed code is as follows:
synchronized public Map<Long, Set<Long>> getSessionExpiryMap() {
Map<Long, Set<SessionImpl>> expiryMap = sessionExpiryQueue.getExpiryMap();
Map<Long, Set<Long>> sessionExpiryMap = new TreeMap<Long, Set<Long>>();
for (Entry<Long, Set<SessionImpl>> e : expiryMap.entrySet()) {
Set<Long> ids = new HashSet<Long>();
sessionExpiryMap.put(e.getKey(), ids);
for (SessionImpl s : e.getValue()) {
ids.add(s.sessionId);
}
}
return sessionExpiryMap;
}
The core processing flow in the above code is as follows:
- Retrieve expiration information from the sessionExpiryQueue variable, where the key represents the expiration time and the value represents the set of expired sessions.
- Convert the expiration information from step (1) into another map structure, where the key represents the expiration time and the value represents the set of session IDs.
Analysis of touchSession Method
This section analyzes the touchSession method, which primarily aims to determine if a session is in an active (valid) state. The specific processing code is as follows:
synchronized public boolean touchSession(long sessionId, int timeout) {
// Get session by session ID
SessionImpl s = sessionsById.get(sessionId);
// Return false if session is null
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
// Return false if session is in closing state
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
// Update expiration time
updateSessionExpiry(s, timeout);
return true;
}
private void updateSessionExpiry(SessionImpl s, int timeout) {
logTraceTouchSession(s.sessionId, timeout, "");
sessionExpiryQueue.update(s, timeout);
}
The processing flow in the above code is as follows:
- Retrieve the session from the sessionsById container using the session ID.
- Check if the session is null. If it is, log the information and return false.
- Check if the session is in a closing state. If it is, log the information and return false.
- Update the session expiration time using the updateSessionExpiry method and return true.
Analysis of setSessionClosing Method
This section analyzes the setSessionClosing method, which is responsible for modifying the closing flag. The specific processing code is as follows:
synchronized public void setSessionClosing(long sessionId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Session closing: 0x" + Long.toHexString(sessionId));
}
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
return;
}
s.isClosing = true;
}
The core processing flow in the above code is as follows:
- Retrieve the session from the sessionsById container using the session ID.
- If the session is null, end processing. If the session is not null, set the isClosing property to true.
Analysis of removeSession Method
This section analyzes the removeSession method, which mainly operates on three variables to remove the corresponding session-related data. The specific code is as follows:
synchronized public void removeSession(long sessionId) {
LOG.debug("Removing session 0x" + Long.toHexString(sessionId));
SessionImpl s = sessionsById.remove(sessionId);
sessionsWithTimeout.remove(sessionId);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- Removing session 0x"
+ Long.toHexString(sessionId));
}
if (s != null) {
sessionExpiryQueue.remove(s);
}
}
The core processing flow in the above code is as follows:
- Remove a session from the sessionsById container using the session ID and temporarily store it in memory.
- Remove the session from the sessionsWithTimeout container using the session ID.
- If the session temporarily stored in memory from step (1) exists, remove it from the sessionExpiryQueue variable.
Analysis of checkSession Method
This section analyzes the checkSession method, which mainly checks the following three items:
- Whether the session corresponding to the session ID exists.
- Whether the session has expired.
- Whether the session owner matches the parameter owner.
The code for checking these three items is as follows:
public synchronized void checkSession(long sessionId, Object owner)
throws KeeperException.SessionExpiredException,
KeeperException.SessionMovedException,
KeeperException.UnknownSessionException {
LOG.debug("Checking session 0x" + Long.toHexString(sessionId));
// Get session
SessionImpl session = sessionsById.get(sessionId);
// Throw exception if session is null
if (session == null) {
throw new KeeperException.UnknownSessionException();
}
// Throw exception if session is closed
if (session.isClosing()) {
throw new KeeperException.SessionExpiredException();
}
// Assign owner if session owner is null
if (session.owner == null) {
session.owner = owner;
}
// Throw exception if session owner doesn't match parameter owner
else if (session.owner != owner) {
throw new KeeperException.SessionMovedException();
}
}
Analysis of run Method
Since the SessionTrackerImpl class implements ZooKeeperCriticalThread, it is a thread object, so the analysis of the run method is particularly important. The specific processing code is as follows:
@Override
public void run() {
try {
while (running) {
// Get wait time
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
// Get session from session expiry queue
for (SessionImpl s : sessionExpiryQueue.poll()) {
// Set closing state
setSessionClosing(s.sessionId);
// Perform expiration interface operation
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
The core operation in the above code is to periodically set the session to a closing state and call the expiration interface. The overall process is handled in a dead loop, with the condition for exiting the loop being the running flag set to false. To set it to false, the shutdown method needs to be called. In each loop, the following operations are performed:
- Get the wait time from the sessionExpiryQueue variable. If the wait time is greater than 0, wait (thread wait).
- Get the sessions that need to be closed from the sessionExpiryQueue variable, set the isClosing property to true, and call the expiration interface method (SessionTracker.SessionExpirer#expire).
Next, let's explain the method for getting the wait time. The specific processing code is as follows:
public long getWaitTime() {
long now = Time.currentElapsedTime();
long expirationTime = nextExpirationTime.get();
return now < expirationTime ? (expirationTime - now) : 0L;
}
The core processing flow in the above code is as follows:
- Get the current time and expiration time.
- Check if the current time is less than the expiration time. If so, return the difference between the expiration time and the current time as the wait time. If not, return 0 to immediately start executing the session closing operation.
Finally, let's analyze the expire method. In ZooKeeper, this method is implemented by the ZooKeeperServer class. The specific code is as follows:
public void expire(Session session) {
long sessionId = session.getSessionId();
LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of "
+ session.getTimeout() + "ms exceeded");
close(sessionId);
}
The processing flow in the above code: Get the session ID from the session object and then call the close method.
Analysis of ExpiryQueue
This section analyzes the ExpiryQueue class, which is mainly used to store expiration information and is commonly used for session expiration handling. First, we need to understand the member variables of this class. The specific information is shown in the table below:
Variable Name | Variable Type | Variable Description |
---|---|---|
elemMap | ConcurrentHashMap<E, Long> | Expiry container, key: expiry object, value: expiration time |
expiryMap | ConcurrentHashMap<Long, Set<E>> | Expiry container, key: expiration time, value: set of expiry objects |
nextExpirationTime | AtomicLong | Next expiration time |
expirationInterval | int | Interval period |
After understanding the member variables, let's analyze the constructor. The specific constructor code is as follows:
public ExpiryQueue(int expirationInterval) {
this.expirationInterval = expirationInterval;
nextExpirationTime.set(roundToNextInterval(Time.currentElapsedTime()));
}
In the constructor, the variables nextExpirationTime and expirationInterval are set. The latter is directly assigned from the constructor parameter, while the former needs to be calculated using the roundToNextInterval function. The calculation code is as follows:
private long roundToNextInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}
The calculation logic in the above code is: (current timestamp (millisecond level) divided by expiration interval + 1) multiplied by expiration interval.
Next, let's analyze some methods in the ExpiryQueue class. First, let's analyze the update method used for adding or updating. The specific processing code is as follows:
public Long update(E elem, int timeout) {
// Get expiration time from elemMap container based on element, meaning the previous expiration time
Long prevExpiryTime = elemMap.get(elem);
// Get current time
long now = Time.currentElapsedTime();
// Calculate next expiration time
Long newExpiryTime = roundToNextInterval(now + timeout);
// Check if new expiration time is the same as previous expiration time, if so, return null
if (newExpiryTime.equals(prevExpiryTime)) {
return null;
}
// First add the elem to the new expiry time bucket in expiryMap.
// Get data corresponding to expiration time
Set<E> set = expiryMap.get(newExpiryTime);
if (set == null) {
// Create set object
set = Collections.newSetFromMap(
new ConcurrentHashMap<E, Boolean>());
// Add expiration data to expiryMap container
Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
// If historical expiration data exists, assign it to set variable
if (existingSet != null) {
set = existingSet;
}
}
// Add parameter data to set collection
set.add(elem);
// Add elem data to elemMap container and get historical expiration time
prevExpiryTime = elemMap.put(elem, newExpiryTime);
// Historical expiration time is not null
// New expiration time is different from historical expiration time
if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
// Get set of expired elements based on historical expiration time
Set<E> prevSet = expiryMap.get(prevExpiryTime);
// Remove current element if set of expired elements is not null
if (prevSet != null) {
prevSet.remove(elem);
}
}
return newExpiryTime;
}
The core processing flow in the above code is as follows:
-
Get the expiration time (historical expiration time) from the elemMap container based on the parameter element, stored in the variable prevExpiryTime.
-
Get the current time.
-
Calculate the next expiration time using the roundToNextInterval method, with calculation parameters: current time + expiration time (expiration time is passed as a parameter).
-
Check if the next expiration time is the same as the historical expiration time. If so, return null and end processing.
-
Get the set of data that needs to expire from the expiryMap container based on the expiration time, stored in the variable set.
-
If the set of data that needs to expire is empty, create a collection object. Try to put the new expiration time and the newly created collection object into the expiryMap container, and get the historical data stored in the container. If historical data exists, set it to the variable set.
-
Put the current method parameter elem into the variable set.
-
Add the method parameter elem data to the elemMap container and get the historical expiration time.
-
If the historical expiration time from step (8) is not null and this time is different from the expiration time in step (3), perform the following operations:
- Get the set of expired elements based on the expiration time from step (8). If the set of expired elements exists, remove the method parameter elem from it.
-
Return the next expiration time from step (3).
Next, let's analyze the poll function, which is used to get expired data. The specific processing code is as follows:
public Set<E> poll() {
// Get current time
long now = Time.currentElapsedTime();
// Get expiration time
long expirationTime = nextExpirationTime.get();
// If current time is less than expiration time, return empty set
if (now < expirationTime) {
return Collections.emptySet();
}
// Create container to store results
Set<E> set = null;
// Calculate new expiration time
long newExpirationTime = expirationTime + expirationInterval;
// Assign new expiration time to nextExpirationTime variable
if (nextExpirationTime.compareAndSet(
expirationTime, newExpirationTime)) {
// Remove data corresponding to expiration time from expiryMap container and save data to result container
set = expiryMap.remove(expirationTime);
}
// If result storage container is empty, create empty set
if (set == null) {
return Collections.emptySet();
}
return set;
}
The core processing flow in the above code is as follows:
- Get the current time.
- Get the expiration time from the member variable nextExpirationTime.
- If the current time is less than the expiration time, return an empty set.
- Calculate the new expiration time. Calculation rule: result from step (2) + expiration time interval.
- Assign the new expiration time to the member variable nextExpirationTime. If the assignment is successful, remove the relevant data from the expiryMap container based on the expiration time obtained in step (2), and put this data in the result container set.
- If the result container set is empty at this point, create and return an empty set; otherwise, return the result container set.
Analysis of UpgradeableSessionTracker
This section analyzes the UpgradeableSessionTracker class, which is the parent class of LearnerSessionTracker and LeaderSessionTracker. It contains two member variables, detailed in the following table:
Variable Name | Variable Type | Description |
---|---|---|
localSessionTracker | LocalSessionTracker | Local session tracker |
localSessionsWithTimeouts | ConcurrentMap<Long, Integer> | Local session expiration map, key: session id, value: expiry time |
The LocalSessionTracker class, a subclass of SessionTrackerImpl, is used in the member variables. Its implementation is relatively simple, as shown below:
public class LocalSessionTracker extends SessionTrackerImpl {
public LocalSessionTracker(SessionExpirer expirer,
ConcurrentMap<Long, Integer> sessionsWithTimeouts,
int tickTime, long id, ZooKeeperServerListener listener) {
super(expirer, sessionsWithTimeouts, tickTime, id, listener);
}
/**
* Checks if the session id is in the sessionsById container
*/
public boolean isLocalSession(long sessionId) {
return isTrackingSession(sessionId);
}
public boolean isGlobalSession(long sessionId) {
return false;
}
/**
* Always throws an exception
*/
public boolean addGlobalSession(long sessionId, int sessionTimeout) {
throw new UnsupportedOperationException();
}
}
In the above code, the addGlobalSession method directly throws an exception. Other methods are relatively simple and won't be explained in detail. Let's analyze some key methods, starting with isTrackingSession:
public boolean isTrackingSession(long sessionId) {
return isLocalSession(sessionId) || isGlobalSession(sessionId);
}
This code uses isLocalSession and isGlobalSession methods to determine if the session corresponding to the session id is being tracked. The latter is an abstract method that needs to be implemented by subclasses (LearnerSessionTracker or LeaderSessionTracker). The former is implemented as follows:
public boolean isLocalSession(long sessionId) {
return localSessionTracker != null &&
localSessionTracker.isTrackingSession(sessionId);
}
This code relies on the isTrackingSession method provided by the LocalSessionTracker class, which checks if the session id is in the SessionTrackerImpl#sessionsById container. In the UpgradeableSessionTracker class, besides the abstract isGlobalSession method, there's also a checkGlobalSession method that needs to be implemented by subclasses. If not implemented, it throws an exception by default.
Finally, let's analyze the upgradeSession method in the UpgradeableSessionTracker class. This method is used to upgrade a session from a local session to a global session:
public int upgradeSession(long sessionId) {
if (localSessionsWithTimeouts == null) {
return -1;
}
Integer timeout = localSessionsWithTimeouts.remove(sessionId);
if (timeout != null) {
LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
addGlobalSession(sessionId, timeout);
localSessionTracker.removeSession(sessionId);
return timeout;
}
return -1;
}
The process flow in this code is as follows:
- Check if the localSessionsWithTimeouts container is empty. If empty, return -1 and end processing.
- Remove data from the localSessionsWithTimeouts container based on the session id and store the timeout temporarily.
- If the stored timeout is not null, add the session id to the global sessions, remove the local session, and return the stored timeout.
- If the stored timeout is null, return -1 and end processing.
In the upgradeSession method, a return value of -1 indicates that there is no session corresponding to the parameter session id.
Summary
This chapter analyzed the SessionTracker interface for session management (session tracking) in the ZooKeeper project. We started by examining the session connection establishment process, which led to the introduction of the session management interface. We then analyzed this interface itself and its implementing classes, providing a comprehensive overview of ZooKeeper's session management system.