Analysis of Zookeeper Watcher Mechanism
This chapter analyzes the Watcher-related content in the Zookeeper project.
Watcher Overview
The core objective of Zookeeper's Watcher is to solve the publish-subscribe functionality. In a minimal publish-subscribe implementation, it's necessary to maintain a one-to-many subscription relationship. When the subscribed content changes, all subscribers need to be notified. In Zookeeper, this logic is implemented by clients registering Watchers with the server.
Zookeeper's Watcher has the following characteristics:
- One-time use: In Zookeeper, all Watchers follow the use-and-destroy strategy.
- Serial callback execution: In Zookeeper, Watcher callbacks are executed serially and synchronously. Therefore, the execution time of a single Watcher should not be too long, as it would affect the entire Watcher callback process.
- Lightweight: In Zookeeper, the notification data transmission volume for a message includes three parts: notification status, event type, and node path.
In addition to the above features, it's important to note Zookeeper's definitions for Watchers, which mainly include:
- Watcher interface, used to handle observation events.
- WatchedEvent class, representing the observed event, which is the processing parameter of the Watcher interface.
- WatcherType enum, representing the observation type, subdivided into three items: Children for observing child nodes, Data for observing data, and Any for observing both.
- KeeperState enum, representing Zookeeper status types, details in the table below.
- EventType enum, representing event types, details in the table below.
- ClientWatchManager interface, representing the client observer manager.
KeeperState details:
Enum Name | Meaning |
---|---|
Unknown | Unknown state. |
Disconnected | Connection disconnected state. |
NoSyncConnected | Unused state. |
SyncConnected | Connected state. |
AuthFailed | Authentication failed. |
ConnectedReadOnly | Connected state, read-only. |
SaslAuthenticated | SASL authentication passed. |
Expired | Session expired. |
Closed | Connection closed. |
EventType details:
Enum Name | Meaning |
---|---|
None | None |
NodeCreated | Node creation event |
NodeDeleted | Node deletion event |
NodeDataChanged | Node data change event |
NodeChildrenChanged | Child node change event |
DataWatchRemoved | Data watcher removal event |
ChildWatchRemoved | Child node watcher removal event |
The above is theoretical knowledge. Next, we'll write some code to use Watchers. First, let's introduce the Watcher used when creating a Zookeeper client:
private static ZooKeeper getZooKeeper() throws IOException, InterruptedException {
ZooKeeper s = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
}
});
return s;
}
This code will output the event data information after the client connection is successful. The detailed startup information is as follows:
WatchedEvent state:SyncConnected type:None path:null
In addition to setting Watchers when creating a Zookeeper client, you can also set Watchers in the Zookeeper#getData, Zookeeper#getChildren, and Zookeeper#exists methods.
Analysis of Watcher Registration
This section analyzes the Watcher registration-related content in the Zookeeper project. The most important aspect of registration analysis is finding the registration entry point. Generally, registration can be done through the Zookeeper#getData, Zookeeper#getChildren, and Zookeeper#exists methods. Let's first look at the getData method:
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
Next, let's look at the getChildren code:
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ChildWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = new GetChildrenRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildrenResponse response = new GetChildrenResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getChildren();
}
Finally, let's look at the exists method:
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
By reading these three methods, we can see that they all use the WatchRegistration class and the cnxn.submitRequest(h, request, response, wcb) method in the process. The former is used for parameter construction, while the latter is a client method for interacting with the server. In these three methods, we can also see that different methods use different parameter classes. The relationships are as follows:
- In the getData method, DataWatchRegistration class is used as the specific implementation of the WatchRegistration class.
- In the getChildren method, ChildWatchRegistration class is used as the specific implementation of the WatchRegistration class.
- In the exists method, ExistsWatchRegistration class is used as the specific implementation of the WatchRegistration class.
Before analyzing the three implementations of the WatchRegistration class, we need to analyze the class itself. The complete definition code for the WatchRegistration class is as follows:
public abstract class WatchRegistration {
private Watcher watcher;
private String clientPath;
public WatchRegistration(Watcher watcher, String clientPath) {
this.watcher = watcher;
this.clientPath = clientPath;
}
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
protected boolean shouldAddWatch(int rc) {
return rc == 0;
}
}
In the code above, we first need to focus on the member variables. There are two member variables, and their meanings are as follows:
- The member variable 'watcher' represents the observer implementation class.
- The member variable 'clientPath' represents the observed node path.
Besides the member variables, there are three methods that need to be understood. Their specific functions are as follows:
- The 'getWatches' method is used to obtain the Watcher collection.
- The 'register' method is used to register a Watcher.
- The 'shouldAddWatch' method is used to determine whether a Watcher needs to be registered.
After understanding the member variables and method functions, let's explain the three implementation classes. First, let's analyze the DataWatchRegistration class. The specific processing code is as follows:
class DataWatchRegistration extends WatchRegistration {
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
By reading the above code, we can see that it only overrides the getWatches method. In the getWatches method, it directly returns the dataWatches variable from the observer manager. Next, let's analyze the ChildWatchRegistration class. The specific processing code is as follows:
class ChildWatchRegistration extends WatchRegistration {
public ChildWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.childWatches;
}
}
By reading the above code, we can see that it only overrides the getWatches method. In the getWatches method, it directly returns the childWatches variable from the observer manager. Finally, let's analyze the ExistsWatchRegistration class. The specific processing code is as follows:
class ExistsWatchRegistration extends WatchRegistration {
public ExistsWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);
}
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
}
@Override
protected boolean shouldAddWatch(int rc) {
return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
}
}
In the above code, we can see that the getWatches method returns the dataWatches variable from the observer container when the rc variable is 0, otherwise it returns the existWatches variable. In addition to the getWatches method, the shouldAddWatch method is also overridden. In the shouldAddWatch method, if the rc value is 0 or -101, it indicates that registration can be performed.
This concludes the analysis of the parameters for Watcher registration. Next, we need to focus on the specific processing of request interaction. By reading the getData, exists, and getChildren methods, we can clearly see that the entry point for the specific interaction code is the cnxn.submitRequest method. The specific code is as follows:
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
// Submit request
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
// Create ReplyHeader object
ReplyHeader r = new ReplyHeader();
// Create Packet object
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
// If request timeout is greater than 0
if (requestTimeout > 0) {
// Wait for requestTimeout time
waitForPacketFinish(r, packet);
}
else {
// Wait indefinitely until the request is processed
while (!packet.finished) {
packet.wait();
}
}
}
// If the err value in ReplyHeader object is -122
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
The core processing flow in the above code is as follows:
- Create a ReplyHeader object.
- Create a Packet object through the queuePacket method. After creation in the method, the data will be added to the outgoingQueue container. It will be used in the org.apache.zookeeper.ClientCnxn.SendThread#run method. (Related analysis can be found in the ZooKeeper server request processing flow / ZooKeeper client-server request interaction)
- If the timeout is greater than 0, wait within the timeout period; otherwise, wait indefinitely until processing is successful.
In the request processing flow, the finishPacket method is involved at the end, which contains Watcher-related operations. The relevant code is as follows:
// Watcher registration
p.watchRegistration.register(err);
// Watcher deregistration
materializedWatchers = p.watchDeregistration.unregister(err);
Let's analyze the Watcher registration method. The specific processing code is as follows:
public void register(int rc) {
// If rc is 0, proceed with the operation
if (shouldAddWatch(rc)) {
// Get the observer container based on rc
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
// Get the observer set from the observer container list based on the operation path
Set<Watcher> watchers = watches.get(clientPath);
// If the observer set is empty
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
// Add to the observer set
watchers.add(watcher);
}
}
}
The core processing goal in the above code is to complete the addition operation of the three data containers (dataWatches, existWatches, and childWatches) in the ZKWatchManager class. The specific processing flow is as follows:
- Check if the parameter rc is 0. If not, end the processing.
- Get the observer container through the getWatches method. At this point, the obtained container is essentially one of the three data containers in the ZKWatchManager class.
- Get the corresponding observer set from the observer container based on the operation path. If the observer set is empty, create a set container and put the data into the observer container.
- Add the observer to the observer set.
Finally, let's analyze the unregister method for canceling Watcher registration. The specific processing code is as follows:
public Map<EventType, Set<Watcher>> unregister(int rc)
throws KeeperException {
return zkManager.removeWatcher(clientPath, watcher, watcherType, local,
rc);
}
In the above code, the ZKWatchManager#removeWatcher method is used to cancel the Watcher registration.
Analysis of ZKWatchManager
This section analyzes the ZKWatchManager class, which implements the ClientWatchManager interface in the Zookeeper project. First, let's examine the class constructor:
ZKWatchManager(boolean disableAutoWatchReset) {
this.disableAutoWatchReset = disableAutoWatchReset;
}
The code above involves the disableAutoWatchReset variable, which indicates whether to automatically reset Watchers. This variable is directly related to the zookeeper.disableAutoWatchReset configuration of the Zookeeper client. The relationship can be seen in the org.apache.zookeeper.ZooKeeper#defaultWatchManager method, as shown below:
protected ZKWatchManager defaultWatchManager() {
return new ZKWatchManager(
getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}
In the ZKWatchManager class, it's important to focus on three storage containers:
- dataWatches: container for storing data watchers.
- existWatches: container for storing existence watchers.
- childWatches: container for storing child node watchers.
The data for these three containers comes from the WatchRegistration#register method. The ZKWatchManager class contains the following five methods:
- addTo: adds a group of Watchers to another group of Watchers.
- removeWatcher: removes a Watcher.
- containsWatcher: checks if a Watcher exists for a specific path.
- removeWatches: removes multiple Watchers.
- materialize: searches for Watchers.
Analysis of the containsWatcher Method
This section analyzes the containsWatcher method. Here's the implementation:
void containsWatcher(String path, Watcher watcher,
WatcherType watcherType) throws NoWatcherException {
// Existence flag
boolean containsWatcher = false;
// Different logic based on watcher type
switch (watcherType) {
case Children: {
synchronized (childWatches) {
containsWatcher = contains(path, watcher, childWatches);
}
break;
}
case Data: {
synchronized (dataWatches) {
containsWatcher = contains(path, watcher, dataWatches);
}
synchronized (existWatches) {
boolean contains_temp = contains(path, watcher,
existWatches);
containsWatcher |= contains_temp;
}
break;
}
case Any: {
synchronized (childWatches) {
containsWatcher = contains(path, watcher, childWatches);
}
synchronized (dataWatches) {
boolean contains_temp = contains(path, watcher, dataWatches);
containsWatcher |= contains_temp;
}
synchronized (existWatches) {
boolean contains_temp = contains(path, watcher,
existWatches);
containsWatcher |= contains_temp;
}
}
}
if (!containsWatcher) {
throw new KeeperException.NoWatcherException(path);
}
}
The core process in the above code is as follows:
- Create an existence flag, initially set to false.
- Perform different existence checks based on the watcher type. All checks use the contains method, with the difference being the container being checked.
- If the existence flag is false, throw an exception.
The contains method is crucial in this process. Let's analyze it:
private boolean contains(String path, Watcher watcherObj,
Map<String, Set<Watcher>> pathVsWatchers) {
boolean watcherExists = true;
if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
watcherExists = false;
} else {
Set<Watcher> watchers = pathVsWatchers.get(path);
if (watchers == null) {
watcherExists = false;
} else if (watcherObj == null) {
watcherExists = watchers.size() > 0;
} else {
watcherExists = watchers.contains(watcherObj);
}
}
return watcherExists;
}
The core process in the contains method is as follows:
- Create a watcher existence flag, initially set to true.
- If pathVsWatchers (which represents one of the three data containers when used) is null or empty, set the existence flag to false. Otherwise:
- Get the watcher set for the given path from pathVsWatchers.
- If the watcher set is null, set the existence flag to false.
- If watcherObj (the watcher being checked) is null, set the existence flag based on whether the watcher set is empty.
- Check if the watcher set contains watcherObj and set the existence flag accordingly.
- Return the watcher existence flag.
Analysis of removeWatcher and removeWatches Methods
This section analyzes the removeWatcher and removeWatches methods. First, let's look at the removeWatcher method:
public Map<EventType, Set<Watcher>> removeWatcher(String clientPath,
Watcher watcher,
WatcherType watcherType,
boolean local,
int rc)
throws KeeperException {
// Check if the specified watcher exists for the path
containsWatcher(clientPath, watcher, watcherType);
// Removal result
Map<EventType, Set<Watcher>> removedWatchers = new HashMap<EventType, Set<Watcher>>();
// Child node watchers
HashSet<Watcher> childWatchersToRem = new HashSet<Watcher>();
// Add child node watchers to removal result
removedWatchers
.put(EventType.ChildWatchRemoved, childWatchersToRem);
// Data watchers
HashSet<Watcher> dataWatchersToRem = new HashSet<Watcher>();
// Add data watchers to removal result
removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
// Removal success flag
boolean removedWatcher = false;
switch (watcherType) {
case Children: {
synchronized (childWatches) {
removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
}
break;
}
case Data: {
synchronized (dataWatches) {
removedWatcher = removeWatches(dataWatches, watcher,
clientPath, local, rc, dataWatchersToRem);
}
synchronized (existWatches) {
boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
removedWatcher |= removedDataWatcher;
}
break;
}
case Any: {
synchronized (childWatches) {
removedWatcher = removeWatches(childWatches, watcher,
clientPath, local, rc, childWatchersToRem);
}
synchronized (dataWatches) {
boolean removedDataWatcher = removeWatches(dataWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
removedWatcher |= removedDataWatcher;
}
synchronized (existWatches) {
boolean removedDataWatcher = removeWatches(existWatches,
watcher, clientPath, local, rc, dataWatchersToRem);
removedWatcher |= removedDataWatcher;
}
}
}
// Throw exception if removal flag is false
if (!removedWatcher) {
throw new KeeperException.NoWatcherException(clientPath);
}
return removedWatchers;
}
The core process in the removeWatcher method is as follows:
- Use containsWatcher to check if the specified watcher exists for the path. If not, throw an exception.
- Create a container for removal results.
- Create containers for child node watchers and data watchers, and add them to the removal results container.
- Create a removal success flag, initially set to false.
- Perform different removal operations based on the watcher type.
- If the removal flag is false, throw an exception.
- Return the removal results container.
In step 5 of the above process, the removeWatches method is used for removal operations. Here's the implementation of removeWatches:
protected boolean removeWatches(Map<String, Set<Watcher>> pathVsWatcher,
Watcher watcher, String path, boolean local, int rc,
Set<Watcher> removedWatchers) throws KeeperException {
// Throw exception if not local and rc is not OK
if (!local && rc != Code.OK.intValue()) {
throw KeeperException
.create(KeeperException.Code.get(rc), path);
}
// Flag to indicate successful removal
boolean success = false;
// 1. rc code is OK
// 2. It's local and rc code is not OK
if (rc == Code.OK.intValue() || (local && rc != Code.OK.intValue())) {
// Watcher is null
if (watcher == null) {
// Remove the set of watchers for the path
Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
// If the set of watchers is not empty
if (pathWatchers != null) {
// Add removed watchers to the result container
removedWatchers.addAll(pathWatchers);
// Set success flag to true
success = true;
}
}
else {
// Get the set of watchers for the path
Set<Watcher> watchers = pathVsWatcher.get(path);
// If the set of watchers is not empty
if (watchers != null) {
// If the specific watcher is successfully removed from the set
if (watchers.remove(watcher)) {
// Add the removed watcher to the result container
removedWatchers.add(watcher);
// cleanup <path vs watchlist>
// If the watcher set is empty or has no elements, remove it from pathVsWatcher
if (watchers.size() <= 0) {
pathVsWatcher.remove(path);
}
// Set success flag to true
success = true;
}
}
}
}
return success;
}
The core process in the above code revolves around the watcher parameter. When the watcher parameter is null, the following operations are performed:
- Remove the data for the specified path from pathVsWatcher (the source of removal) and store the removed set of watchers temporarily.
- If the removed set of watchers is not empty, add it to the removal result storage container (removedWatchers parameter).
- Set the removal success flag to true.
When the watcher parameter is not null, the following operations are performed:
- Retrieve the corresponding set of watchers for the specified path from pathVsWatcher.
- If the set of watchers is not empty, attempt to remove the specified watcher (watcher parameter). If removal fails, no action is taken. If removal succeeds, perform the following:
- Add the removed specific watcher (watcher parameter) to the removal result storage container (removedWatchers parameter).
- If the number of elements in the set of watchers for the specified path is less than or equal to 0 after removal, remove it from pathVsWatcher.
- Set the removal success flag to true.
Analysis of the materialize Method
This section analyzes the materialize method. The specific processing code is as follows:
@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath) {
// Result container for watchers
Set<Watcher> result = new HashSet<Watcher>();
// Handle different events
switch (type) {
case None:
// Add default watcher
result.add(defaultWatcher);
// Check if cleanup is needed
boolean clear = disableAutoWatchReset
&& state != Watcher.Event.KeeperState.SyncConnected;
synchronized (dataWatches) {
// Add watchers from dataWatches to the result set
for (Set<Watcher> ws : dataWatches.values()) {
result.addAll(ws);
}
// Clear dataWatches if cleanup is needed
if (clear) {
dataWatches.clear();
}
}
synchronized (existWatches) {
// Add watchers from existWatches to the result set
for (Set<Watcher> ws : existWatches.values()) {
result.addAll(ws);
}
// Clear existWatches if cleanup is needed
if (clear) {
existWatches.clear();
}
}
synchronized (childWatches) {
// Add watchers from childWatches to the result set
for (Set<Watcher> ws : childWatches.values()) {
result.addAll(ws);
}
// Clear childWatches if cleanup is needed
if (clear) {
childWatches.clear();
}
}
return result;
// Data change
case NodeDataChanged:
// Node creation
case NodeCreated:
synchronized (dataWatches) {
// Remove watchers for the operation path from dataWatches and add them to the result set
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
// Remove watchers for the operation path from existWatches and add them to the result set
addTo(existWatches.remove(clientPath), result);
}
break;
// Child node change
case NodeChildrenChanged:
synchronized (childWatches) {
// Remove watchers for the operation path from childWatches and add them to the result set
addTo(childWatches.remove(clientPath), result);
}
break;
// Node deletion
case NodeDeleted:
synchronized (dataWatches) {
// Remove watchers for the operation path from dataWatches and add them to the result set
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
// Remove watchers for the operation path from existWatches and add them to the result set
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn(
"We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
// Remove watchers for the operation path from childWatches and add them to the result set
addTo(childWatches.remove(clientPath), result);
}
break;
// Throw exception for other operations
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
The code above handles different events differently. The following table explains the handling of different events:
Event Type | Data Source |
---|---|
None | 1. Default watcher (defaultWatcher)<br />2. dataWatches<br />3. existWatches<br />4. childWatches |
NodeDataChanged and NodeCreated | 1. Data corresponding to the operation path from dataWatches<br />2. Data corresponding to the operation path from existWatches |
NodeChildrenChanged | 1. Data corresponding to the operation path from childWatches |
NodeDeleted | 1. Data corresponding to the operation path from dataWatches<br />2. Data corresponding to the operation path from existWatches<br />3. Data corresponding to the operation path from childWatches |
WatchManager Analysis
This section analyzes the WatchManager class, which primarily serves the DataTree class and is used for observer management. The class has two member variables, detailed in the following table:
Variable Name | Variable Type | Variable Description |
---|---|---|
watchTable | HashMap<String, HashSet<Watcher>> | Mapping of paths to watchers |
watch2Paths | HashMap<Watcher, HashSet<String>> | Mapping of watchers to paths |
The WatchManager class contains nine methods, detailed as follows:
- The size method is used to get the number of watchers in watchTable.
- The addWatch method is used to add a watcher.
- The removeWatcher method is used to remove a watcher.
- The triggerWatch method is used to trigger watchers.
- The dumpWatches method is used to output watcher information.
- The containsWatcher method is used to determine if a watcher exists.
- The getWatches method is used to get watcher reports.
- The getWatchesByPath method is used to get watcher path reports.
- The getWatchesSummary method is used to get watcher summaries.
Among these nine methods, most involve adding, modifying, deleting, and retrieving operations on member variables and are relatively low in complexity. We will focus on analyzing the triggerWatch method, which handles triggering watchers. The detailed processing code is as follows:
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
// Create event
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// Remove data corresponding to the path from watchTable, temporarily store the removed set of watchers
watchers = watchTable.remove(path);
// If the removed set of watchers is empty, return null and end processing
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
// Iterate through the removed set of watchers
for (Watcher w : watchers) {
// Delete the corresponding path in watch2Paths
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
// Iterate through the removed set of watchers
for (Watcher w : watchers) {
// Skip processing if the current watcher exists in the parameter set of watchers, otherwise execute
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
The core process in the above code is as follows:
- Create an event object to be used when watchers are executed later.
- Remove the data corresponding to the path from the watchTable container and temporarily store the removed set of watchers. If the temporarily stored set of watchers is empty or contains no elements, return null and end processing.
- Iterate through the temporarily stored set of watchers, get the corresponding set of paths from watch2Paths, and if the set of paths is not empty, remove the current operation path from it.
- Iterate through the temporarily stored set of watchers. If the current watcher being processed is in the supress parameter set, skip the current operation; otherwise, the watcher needs to execute the operation.
- Return the temporarily stored set of watchers.
Summary
This chapter analyzes the Watcher-related content in the ZooKeeper project. It begins with a basic introduction to observers (Watchers), then analyzes the registration process and deregistration process of observers, which leads to the ZKWatchManager class and its related analysis. Finally, this chapter analyzes the WatchManager class, which mainly serves ZooKeeper's database system, and examines the observer triggering operation.