zookeeper source code analysis

Zookeeper Watcher 分析

本章将对Zookeeper项目中Watcher相关内容进行分析。

Watcher 概述

Zookeeper中Watcher的核心目标是为了解决发布-订阅功能,在一个最小发布-订阅实现中需要解决一对多的订阅关系维护,当订阅者订阅的内容发生变化时通知所有订阅者,在Zookeeper中通过客户端向服务端注册Watcher来实现上述逻辑。

sequenceDiagram
    zk客户端 ->> zk服务 : 注册Watcher
    zk客户端 ->> WatchManager : Watcher加入管理器
    zk服务->>zk客户端: 通知节点变化
    zk客户端->>WatchManager: 获取Watcher执行业务
    
   

Zookeeper中的Watcher具备如下特性。

  1. 一次性:在Zookeeper中所有的Watcher都遵循使用后销毁这个策略。
  2. 回调串行执行:在Zookeeper中Watcher的回调都是串行同步执行的,因此单个Watcher的执行时间不易过长,如果过长会影响整个Watcher的回调。
  3. 轻量化:在Zookeeper中机械能一个消息的通知数据传输量包含三部分,通知状态、事件类型和节点路径。

除了上述特性以外还需要关注到Zookeeper中对于Watcher的定义,主要分为如下几项。

  1. 接口Watcher,用于处理观察事件。
  2. 类WatchedEvent,表示观察事件,是接口Watcher的处理参数。
  3. 枚举WatcherType,表示观察类型,细分三项:Children表示观察子节点、Data表示观察数据和Any表示都观察。
  4. 枚举KeeperState,表示Zookeeper状态类型,详细内容见表
  5. 枚举EventType,表示事件类型,纤细内容见表。
  6. 接口ClientWatchManager,表示客户端观察者管理器。

KeeperState细节

枚举名称枚举含义
Unknown未知状态。
Disconnected连接断开状态。
NoSyncConnected未使用状态。
SyncConnected以连接状态。
AuthFailed身份验证失败。
ConnectedReadOnly已连接状态,只能够读取。
SaslAuthenticatedSASL认证通过。
Expired会话过期。
Closed连接关闭。

EventType细节

枚举名称枚举含义
None
NodeCreated创建节点事件
NodeDeleted删除节点事件
NodeDataChanged节点数据变化事件
NodeChildrenChanged节点中的子节点变化事件
DataWatchRemoved数据观察者移除事件
ChildWatchRemoved子节点观察者移除事件

上述都为理论知识阶段,接下来将编写一些代码来使用Watcher,首先介绍在Zookeeper客户端创建时的Watcher具体代码如下。

private static ZooKeeper getZooKeeper() throws IOException, InterruptedException {
  ZooKeeper s = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
      System.out.println(event);
    }
  });

  return s;
}

上述代码会在客户端连接成功后输出event的数据信息,详细启动信息如下。

WatchedEvent state:SyncConnected type:None path:null

除了在Zookeeper客户端创建时设置Watcher以外还可以在Zookeeper#getData方法、Zookeeper#getChildren和Zookeeper#exists方法中设置Watcher。

Watcher 注册分析

本节将对Zookeeper项目中Watcher注册器相关内容进行分析,进行注册分析最重要的是找到注册入口,一般的可以通过Zookeeper#getData、Zookeeper#getChildren和Zookeeper#exists方法进行注册,首先查看getData方法,具体代码如下。

public byte[] getData(final String path, Watcher watcher, Stat stat)
  throws KeeperException, InterruptedException {
  final String clientPath = path;
  PathUtils.validatePath(clientPath);


  WatchRegistration wcb = null;
  if (watcher != null) {
    wcb = new DataWatchRegistration(watcher, clientPath);
  }

  final String serverPath = prependChroot(clientPath);
  RequestHeader h = new RequestHeader();
  h.setType(ZooDefs.OpCode.getData);
  GetDataRequest request = new GetDataRequest();
  request.setPath(serverPath);
  request.setWatch(watcher != null);
  GetDataResponse response = new GetDataResponse();
  ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
  if (r.getErr() != 0) {
    throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                                 clientPath);
  }
  if (stat != null) {
    DataTree.copyStat(response.getStat(), stat);
  }
  return response.getData();
}

其次查看getChildren代码,具体代码如下

public List<String> getChildren(final String path, Watcher watcher)
  throws KeeperException, InterruptedException {
  final String clientPath = path;
  PathUtils.validatePath(clientPath);

  WatchRegistration wcb = null;
  if (watcher != null) {
    wcb = new ChildWatchRegistration(watcher, clientPath);
  }

  final String serverPath = prependChroot(clientPath);
  RequestHeader h = new RequestHeader();
  h.setType(ZooDefs.OpCode.getChildren);
  GetChildrenRequest request = new GetChildrenRequest();
  request.setPath(serverPath);
  request.setWatch(watcher != null);
  GetChildrenResponse response = new GetChildrenResponse();
  ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
  if (r.getErr() != 0) {
    throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                                 clientPath);
  }
  return response.getChildren();
}

最后查看exists方法,具体代码如下。

public Stat exists(final String path, Watcher watcher)
  throws KeeperException, InterruptedException {
  final String clientPath = path;
  PathUtils.validatePath(clientPath);

  WatchRegistration wcb = null;
  if (watcher != null) {
    wcb = new ExistsWatchRegistration(watcher, clientPath);
  }

  final String serverPath = prependChroot(clientPath);

  RequestHeader h = new RequestHeader();
  h.setType(ZooDefs.OpCode.exists);
  ExistsRequest request = new ExistsRequest();
  request.setPath(serverPath);
  request.setWatch(watcher != null);
  SetDataResponse response = new SetDataResponse();
  ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
  if (r.getErr() != 0) {
    if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
      return null;
    }
    throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                                 clientPath);
  }

  return response.getStat().getCzxid() == -1 ? null : response.getStat();
}

通过阅读上述三个方法可以发现在处理过程中都会使用WatchRegistration类和cnxn.submitRequest(h, request, response, wcb)方法,前者是用于进行参数构造的类,后者是用于与服务端交互的客户端方法。在上述三个方法中也可以发现不同的方法存在不同的参数类,对应关系如下。

  1. 在getData方法中使用DataWatchRegistration类作为WatchRegistration类的具体实现。
  2. 在getChildren方法中使用ChildWatchRegistration类作为WatchRegistration类的具体实现。
  3. 在exists方法中使用ExistsWatchRegistration类作为WatchRegistration类的具体实现。

在分析WatchRegistration类的三种实现上首先需要对其本身进行分析,WatchRegistration类完整定义代码如下。

public abstract class WatchRegistration {
  private Watcher watcher;
  private String clientPath;

  public WatchRegistration(Watcher watcher, String clientPath) {
    this.watcher = watcher;
    this.clientPath = clientPath;
  }

  abstract protected Map<String, Set<Watcher>> getWatches(int rc);

  public void register(int rc) {
    if (shouldAddWatch(rc)) {
      Map<String, Set<Watcher>> watches = getWatches(rc);
      synchronized (watches) {
        Set<Watcher> watchers = watches.get(clientPath);
        if (watchers == null) {
          watchers = new HashSet<Watcher>();
          watches.put(clientPath, watchers);
        }
        watchers.add(watcher);
      }
    }
  }

  protected boolean shouldAddWatch(int rc) {
    return rc == 0;
  }
}

在上述代码中首先需要关注成员变量,总共有两个成员变量其含义如下。

  1. 成员变量watcher表示观察者实现类。
  2. 成员变量clientPath表示观察的节点路径。

出了成员变量以外还有三个方法需要了解,具体作用如下。

  1. 方法getWatches用于获取Watcher集合。
  2. 方法register用于进行注册Watcher。
  3. 方法shouldAddWatch用于判断Watcher是否需要进行注册。

了解成员变量和方法作用后下面对三个实现类进行说明,首先对DataWatchRegistration类进行分析,具体处理代码如下。

class DataWatchRegistration extends WatchRegistration {
  public DataWatchRegistration(Watcher watcher, String clientPath) {
    super(watcher, clientPath);
  }

  @Override
  protected Map<String, Set<Watcher>> getWatches(int rc) {
    return watchManager.dataWatches;
  }
}

通过阅读上述代码可以发现它只重写了getWatches方法,在getWatches方法中直接将观察者管理器中的dataWatches变量返回。其次对ChildWatchRegistration类进行分析,具体处理代码如下。

class ChildWatchRegistration extends WatchRegistration {
  public ChildWatchRegistration(Watcher watcher, String clientPath) {
    super(watcher, clientPath);
  }

  @Override
  protected Map<String, Set<Watcher>> getWatches(int rc) {
    return watchManager.childWatches;
  }
}

通过阅读上述代码可以发现它只重写了getWatches方法,在getWatches方法中直接将观察者管理器中的childWatches变量返回。最后对ExistsWatchRegistration类进行分析,具体处理代码如下。

class ExistsWatchRegistration extends WatchRegistration {
  public ExistsWatchRegistration(Watcher watcher, String clientPath) {
    super(watcher, clientPath);
  }

  @Override
  protected Map<String, Set<Watcher>> getWatches(int rc) {
    return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
  }

  @Override
  protected boolean shouldAddWatch(int rc) {
    return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
  }
}

在上述代码中可以发现getWatches方法在rc变量为0时返回观察者容器中的dataWatches变量,反之则返回观察者容器中的existWatches变量。除了getWatches方法以外还对shouldAddWatch方法进行了重写,在shouldAddWatch方法中如果rc数值是0或者-101就表示可以进行注册操作。

至此对于Watcher注册的参数分析就告一段落,接下来需要放在请求交互的具体处理上,通过对getData、exists和getChildren方法的阅读可以明确具体交互代码的入口是cnxn.submitRequest方法,具体代码如下。

public ReplyHeader submitRequest(RequestHeader h, Record request,
                                 Record response, WatchRegistration watchRegistration)
  throws InterruptedException {
  // 提交请求
  return submitRequest(h, request, response, watchRegistration, null);
}

public ReplyHeader submitRequest(RequestHeader h, Record request,
                                 Record response, WatchRegistration watchRegistration,
                                 WatchDeregistration watchDeregistration)
  throws InterruptedException {
  // 创建ReplyHeader对象
  ReplyHeader r = new ReplyHeader();
  // 创建Packet对象
  Packet packet = queuePacket(h, r, request, response, null, null, null,
                              null, watchRegistration, watchDeregistration);
  synchronized (packet) {
    // 请求超时时间大于0
    if (requestTimeout > 0) {
      // 等requestTimeout时间
      waitForPacketFinish(r, packet);
    }
    else {
      // 一直等待直到请求处理结束
      while (!packet.finished) {
        packet.wait();
      }
    }
  }
  // 如果ReplyHeader对象中的err数值是-122
  if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
    sendThread.cleanAndNotifyState();
  }
  return r;
}

在上述代码中核心处理流程如下。

  1. 创建ReplyHeader对象。
  2. 通过queuePacket方法创建Packet对象,在方法内部创建完成后会将数据加入到容器outgoingQueue中。让入后会在org.apache.zookeeper.ClientCnxn.SendThread#run方法中产生使用。(相关分析可以查看zk服务端请求处理流程\zk客户端与服务端请求交互)
  3. 如果超时时间大于0则进行超时时间内的等待,反之则需要无限等待直到处理成功。

在请求处理流程中最后会涉及到finishPacket方法,该方法中存在Wathcer的相关操作,相关代码如下。

// watcher注册
p.watchRegistration.register(err);
// watcher取消注册
materializedWatchers = p.watchDeregistration.unregister(err);

下面对watcher注册方法进行分析,具体处理代码如下。

public void register(int rc) {
  // 如果rc为0则进行操作
  if (shouldAddWatch(rc)) {
    // 根据rc获取观察者容器
    Map<String, Set<Watcher>> watches = getWatches(rc);
    synchronized (watches) {
      // 从观察者容器列表中根据操作路径获取观察者集合
      Set<Watcher> watchers = watches.get(clientPath);
      // 如果观察者集合为空
      if (watchers == null) {
        watchers = new HashSet<Watcher>();
        watches.put(clientPath, watchers);
      }
      // 添加观察者集合
      watchers.add(watcher);
    }
  }
}

在上述代码中核心处理目标是完成ZKWatchManager类中的三个数据容器(dataWatches、existWatches和childWatches)的添加操作,具体处理流程如下。

  1. 判断参数rc是否为0,如果不是则结束处理。
  2. 通过getWatches方法获取观察者容器,此时获取的容器本质上是ZKWatchManager类中的三个数据容器中的某一个。
  3. 在观察者容器中根据操作路径获取对应的观察者集合,如果观察者集合为空则需要创建一个集合容器将数据放入到观察者容器中。
  4. 向观察者集合中添加观察者。

最后对取消Watcher注册的unregister方法进行分析,具体处理代码如下。

public Map<EventType, Set<Watcher>> unregister(int rc)
        throws KeeperException {
    return zkManager.removeWatcher(clientPath, watcher, watcherType, local,
            rc);
}

在上述代码中会通过ZKWatchManager#removeWatcher方法进行取消Watcher注册操作。

ZKWatchManager 分析

本节将对Zookeeper项目中ClientWatchManager接口的实现类ZKWatchManager进行分析,首先查看该类的构造函数,具体代码如下。

ZKWatchManager(boolean disableAutoWatchReset) {
  this.disableAutoWatchReset = disableAutoWatchReset;
}

在上述代码中设计到disableAutoWatchReset变量,该变量表示是否自动进行Watcher的重置操作,该变量与Zookeeper客户端的zookeeper.disableAutoWatchReset配置有直接关系,关系体现可以在org.apache.zookeeper.ZooKeeper#defaultWatchManager方法中看到,具体代码如下。

protected ZKWatchManager defaultWatchManager() {
  return new ZKWatchManager(
    getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}

在ZKWatchManager类中需要重点关注三个存储容器。

  1. 用于存放观察数据观察容器dataWatches。
  2. 用于存放观察是否存在的观察者容器existWatches。
  3. 用于存放观察子节点的观察者容器childWatches。

上述三个数据容器的数据来源是WatchRegistration#register方法,在ZKWatchManager类中存在如下五个方法。

  1. 方法addTo用于将一组Watcher加入到另一组Watcher中。
  2. 方法removeWatcher用于移除Watcher。
  3. 方法containsWatcher用于判断某路径对应的Watcher是否存在。
  4. 方法removeWatches用于移除多个Watcher
  5. 方法materialize用于搜索Watcher。

containsWatcher 方法分析

本节将对containsWatcher方法进行分析,具体处理代码如下。

void containsWatcher(String path, Watcher watcher,
                      WatcherType watcherType) throws NoWatcherException {
    // 是否存在标识
    boolean containsWatcher = false;
    // 根据不同类型进行不同判断逻辑
    switch (watcherType) {
        case Children: {
            synchronized (childWatches) {
                containsWatcher = contains(path, watcher, childWatches);
            }
            break;
        }
        case Data: {
            synchronized (dataWatches) {
                containsWatcher = contains(path, watcher, dataWatches);
            }

            synchronized (existWatches) {
                boolean contains_temp = contains(path, watcher,
                        existWatches);
                containsWatcher |= contains_temp;
            }
            break;
        }
        case Any: {
            synchronized (childWatches) {
                containsWatcher = contains(path, watcher, childWatches);
            }

            synchronized (dataWatches) {
                boolean contains_temp = contains(path, watcher, dataWatches);
                containsWatcher |= contains_temp;
            }
            synchronized (existWatches) {
                boolean contains_temp = contains(path, watcher,
                        existWatches);
                containsWatcher |= contains_temp;
            }
        }
    }
    if (!containsWatcher) {
        throw new KeeperException.NoWatcherException(path);
    }
}

在上述代码中核心处理流程如下。

  1. 创建是否存在标识,默认值为false,标识不存在。
  2. 根据不同观察者类型做出不同的判断是否存在操作。在不同判断处理细节中都会使用到contains方法,它们之间的差异是判断所在的容器。
  3. 如果是否存在标识为false,则需要抛出异常。 通过上述代码的查看可以知道contains方法十分重要,下面对contains方法进行分析,具体处理代码如下。
private boolean contains(String path, Watcher watcherObj,
                          Map<String, Set<Watcher>> pathVsWatchers) {
    boolean watcherExists = true;
    if (pathVsWatchers == null || pathVsWatchers.size() == 0) {
        watcherExists = false;
    } else {
        Set<Watcher> watchers = pathVsWatchers.get(path);
        if (watchers == null) {
            watcherExists = false;
        } else if (watcherObj == null) {
            watcherExists = watchers.size() > 0;
        } else {
            watcherExists = watchers.contains(watcherObj);
        }
    }
    return watcherExists;
}

在上述代码中核心处理流程如下。

  1. 创建是否存在watcher标记,初始值为true。
  2. 如果参数pathVsWatchers(该参数具体使用时为三个数据容器)为空或者参数pathVsWatchers中的数量为0,则将是否存在watcher标记设置为false。反之则需要进行如下操作。
    1. 从参数pathVsWatchers中根据操作路径获取观察者集合。
    2. 如果观察者集合为空则将是否存在watcher标记设置为false。
    3. 如果参数watcherObj(该参数含义是待测观察者)为空,则将是否存在watcher标记根据观察者集合数量进行设置,数量大于0则设置为true。
    4. 判断观察者集合中是否存在参数watcherObj,将其结果设置给是否存在watcher标记。
  3. 返回是否存在watcher标记。

removeWatcher 方法和 removeWatches 方法分析

本节将对removeWatcher方法和removeWatches方法进行分析,首先对removeWatcher方法进行分析,具体处理代码如下。

public Map<EventType, Set<Watcher>> removeWatcher(String clientPath,
                                                  Watcher watcher,
                                                  WatcherType watcherType,
                                                  boolean local,
                                                  int rc)
        throws KeeperException {

    // 判断操作路径是否存在指定的watcher
    containsWatcher(clientPath, watcher, watcherType);

    // 移除结果
    Map<EventType, Set<Watcher>> removedWatchers = new HashMap<EventType, Set<Watcher>>();
    // 子节点观察者
    HashSet<Watcher> childWatchersToRem = new HashSet<Watcher>();
    // 向移除结果中添加子节点观察者
    removedWatchers
            .put(EventType.ChildWatchRemoved, childWatchersToRem);
    // 数据观察者
    HashSet<Watcher> dataWatchersToRem = new HashSet<Watcher>();
    // 向移除结果中添加数据观察者
    removedWatchers.put(EventType.DataWatchRemoved, dataWatchersToRem);
    // 是否移除成功标记
    boolean removedWatcher = false;
    switch (watcherType) {
        case Children: {
            synchronized (childWatches) {
                removedWatcher = removeWatches(childWatches, watcher,
                        clientPath, local, rc, childWatchersToRem);
            }
            break;
        }
        case Data: {
            synchronized (dataWatches) {
                removedWatcher = removeWatches(dataWatches, watcher,
                        clientPath, local, rc, dataWatchersToRem);
            }

            synchronized (existWatches) {
                boolean removedDataWatcher = removeWatches(existWatches,
                        watcher, clientPath, local, rc, dataWatchersToRem);
                removedWatcher |= removedDataWatcher;
            }
            break;
        }
        case Any: {
            synchronized (childWatches) {
                removedWatcher = removeWatches(childWatches, watcher,
                        clientPath, local, rc, childWatchersToRem);
            }

            synchronized (dataWatches) {
                boolean removedDataWatcher = removeWatches(dataWatches,
                        watcher, clientPath, local, rc, dataWatchersToRem);
                removedWatcher |= removedDataWatcher;
            }
            synchronized (existWatches) {
                boolean removedDataWatcher = removeWatches(existWatches,
                        watcher, clientPath, local, rc, dataWatchersToRem);
                removedWatcher |= removedDataWatcher;
            }
        }
    }
    // 移除标记为假抛出异常
    if (!removedWatcher) {
        throw new KeeperException.NoWatcherException(clientPath);
    }
    return removedWatchers;
}


在上述代码中核心处理流程如下。

  1. 通过containsWatcher方法判断操作路径是否存在指定的watcher,如果不存在将抛出异常。
  2. 创建移除数据结果容器。
  3. 创建子节点观察者容器和数据观察者容器,将这两个容器加入到移除数据结果容器中。
  4. 创建是否移除成功标记,初始值为false。
  5. 根据不同观察者类型做出不同的移除操作。
  6. 如果移除标记是false则需要抛出异常。
  7. 返回移除数据结果容器。

在上述处理流程中第(5)步中会使用到removeWatches方法进行移除操作,具体处理代码如下

protected boolean removeWatches(Map<String, Set<Watcher>> pathVsWatcher,
                                Watcher watcher, String path, boolean local, int rc,
                                Set<Watcher> removedWatchers) throws KeeperException {
    // 非本地,rc码非OK抛出异常
    if (!local && rc != Code.OK.intValue()) {
        throw KeeperException
                .create(KeeperException.Code.get(rc), path);
    }
    // 是否移除成功标记
    boolean success = false;
    // 1. rc码为OK
    // 2。 是本地的并且rc码非OK
    if (rc == Code.OK.intValue() || (local && rc != Code.OK.intValue())) {
        // 观察者为空
        if (watcher == null) {
            // 删除路径对应的观察者集合
            Set<Watcher> pathWatchers = pathVsWatcher.remove(path);
            // 观察者集合不为空
            if (pathWatchers != null) {
                // 移除容器中添加移除数据
                removedWatchers.addAll(pathWatchers);
                // 移除成功标记设置为true
                success = true;
            }
        }
        else {
            // 获取删除路径对应的观察者集合
            Set<Watcher> watchers = pathVsWatcher.get(path);
            // 观察者集合不为空
            if (watchers != null) {
                // 如果在观察集合中移除指定的观察者成功
                if (watchers.remove(watcher)) {
                    // 移除容器中添加移除数据
                    removedWatchers.add(watcher);
                    // cleanup <path vs watchlist>
                    // 如果观察者容器小于等于0则需要在参数pathVsWatcher中移除
                    if (watchers.size() <= 0) {
                        pathVsWatcher.remove(path);
                    }
                    // 移除成功标记设置为true
                    success = true;
                }
            }
        }
    }
    return success;
}

在上述代码中核心处理流程围绕参数watcher做出不同操作。当参数watcher为空则进行如下操作。

  1. 在参数pathVsWatcher(移除的源头数据)中移除参数path(需要操作的路径)的数据,将移除的观察者集合暂存。
  2. 如果移除观察者集合不为空则需要将数据加入到移除结果存储容器(参数removedWatchers)中。
  3. 设置移除成功标记为true。

当参数watcher不为空则进行如下操作。

  1. 在参数pathVsWatcher中根据参数path获取对应的观察者集合。
  2. 如果观察者集合不为空则尝试进行移除指定的观察者(参数watcher),如果移除失败则不做处理,移除成功则进行如下操作。
    1. 向移除结果存储容器(参数removedWatchers)中添加移除的指定观察者(参数watcher)。
    2. 如果移除后参数path对应的观察者集合中的元素数量小于等于0则需要在参数pathVsWatcher中移除。
    3. 设置移除成功标记为true。

materialize 方法分析

本节将对materialize方法进行分析,具体处理代码如下

@Override
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                Watcher.Event.EventType type,
                                String clientPath) {
    // 观察者结果容器
    Set<Watcher> result = new HashSet<Watcher>();

    // 不同事件处理
    switch (type) {
        case None:
            // 添加默认观察者
            result.add(defaultWatcher);
            // 是否需要清理
            boolean clear = disableAutoWatchReset
                    && state != Watcher.Event.KeeperState.SyncConnected;
            synchronized (dataWatches) {
                // 将dataWatches中的观察者放入到结果集合中
                for (Set<Watcher> ws : dataWatches.values()) {
                    result.addAll(ws);
                }
                // 如果需要清理则将dataWatches数据清理
                if (clear) {
                    dataWatches.clear();
                }
            }

            synchronized (existWatches) {
                // 将existWatches中的观察者放入到结果集合中
                for (Set<Watcher> ws : existWatches.values()) {
                    result.addAll(ws);
                }
                // 如果需要清理则将existWatches数据清理
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized (childWatches) {
                // 将childWatches中的观察者放入到结果集合中
                for (Set<Watcher> ws : childWatches.values()) {
                    result.addAll(ws);
                }
                // 如果需要清理则将childWatches数据清理
                if (clear) {
                    childWatches.clear();
                }
            }

            return result;
        // 数据变化
        case NodeDataChanged:
            // 节点创建
        case NodeCreated:

            synchronized (dataWatches) {
                // 在dataWatches中移除操作路径对应的观察者并将其加入到结果集合中
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                // 在existWatches中移除操作路径对应的观察者并将其加入到结果集合中
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        // 子节点变化
        case NodeChildrenChanged:
            synchronized (childWatches) {
                // 在childWatches中移除操作路径对应的观察者并将其加入到结果集合中
                addTo(childWatches.remove(clientPath), result);
            }
            break;
            // 节点删除
        case NodeDeleted:

            synchronized (dataWatches) {
                // 在dataWatches中移除操作路径对应的观察者并将其加入到结果集合中
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                // 在existWatches中移除操作路径对应的观察者并将其加入到结果集合中
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(list, result);
                    LOG.warn(
                            "We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                // 在childWatches中移除操作路径对应的观察者并将其加入到结果集合中
                addTo(childWatches.remove(clientPath), result);
            }
            break;
            // 其他操作抛出异常
        default:
            String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
    }

    return result;
}

在上述代码中会 根据不同的事件进行不同处理,下面将对不同事件的处理进行说明,详细内容见表。

事件类型数据来源
None1. 默认观察者(defaultWatcher)<br />2. dataWatches<br />3. existWatches<br />4. childWatches
NodeDataChanged和NodeCreated1. 从dataWatches中操作路径对应的数据<br />2. 从existWatches中操作路径对应的数据
NodeChildrenChanged1. 从childWatches中操作路径对应的数据
NodeDeleted1. 从dataWatches中操作路径对应的数据<br />2. 从existWatches中操作路径对应的数据<br />3. 从childWatches中操作路径对应的数据

WatchManager 分析

本节将对WatchManager类进行分析,该类主要服务于DataTree类,在其中用作观察者管理,在该类中有两个成员变量,详细信息见表。

变量名称变量类型变量说明
watchTableHashMap<String, HashSet<Watcher>>路径和watcher的映射
watch2PathsHashMap<Watcher, HashSet<String>>watcher和路径的映射

在WatchManager类中存在如下九个方法,详细信息如下。

  1. 方法size用于获取watchTable中的观察者数量。
  2. 方法addWatch用于添加观察者。
  3. 方法removeWatcher用于移除观察者。
  4. 方法triggerWatch用于触发观察者。
  5. 方法dumpWatches用于输出观察者信息。
  6. 方法containsWatcher用于判断是否存在观察者。
  7. 方法getWatches用于获取观察者报告
  8. 方法getWatchesByPath用于获取观察者路径报告。
  9. 方法getWatchesSummary用于获取观察者摘要。

在上述九个方法中大部分都是对成员变量的新增、修改、删除和获取操作相对复杂度不高,着重对触发观察者处理的triggerWatc方法进行分析,详细处理代码如下。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
  // 创建事件
  WatchedEvent e = new WatchedEvent(type,
          KeeperState.SyncConnected, path);
  HashSet<Watcher> watchers;
  synchronized (this) {
      // 从watchTable容器中移除路径对应的数据,暂存移除的观察者集合
      watchers = watchTable.remove(path);
      // 移除观察者集合为空返回null结束处理
      if (watchers == null || watchers.isEmpty()) {
          if (LOG.isTraceEnabled()) {
              ZooTrace.logTraceMessage(LOG,
                      ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                      "No watchers for " + path);
          }
          return null;
      }
      // 循环移除观察者集合
      for (Watcher w : watchers) {
          // 在watch2Paths容器中删除对应的路径
          HashSet<String> paths = watch2Paths.get(w);
          if (paths != null) {
              paths.remove(path);
          }
      }
  }
  // 循环移除的观察者集合
  for (Watcher w : watchers) {
      // 如果当前观察者在参数观察者中存在则跳过处理,反之则需要执行
      if (supress != null && supress.contains(w)) {
          continue;
      }
      w.process(e);
  }
  return watchers;
}

在上述代码中核心处理流程如下。

  1. 创建事件对象,在后续观察者执行时需要使用。
  2. 从watchTable容器中移除路径对应的数据,暂存移除的观察者集合。如果此时暂存的观察者集合为空或者其中没有元素则返回null,结束处理。
  3. 循环暂存的观察者集合,从watch2Paths容器中获取对应的路径集合,如果路径集合不为空则需要将当前函数所操作的路径从中移除。
  4. 循环暂存的观察者集合,如果当前循环处理的观察者集合在参数supress容器中则需要跳过当前操作,反之观察者需要执行操作。
  5. 返回暂存的观察者集合。

总结

本章对Zookeeper项目中Watcher相关内容进行分析,首先在本章对观察者(Watcher)做出基本介绍,然后对观察者的注册流程以及取消注册流程进行分析,从而引出ZKWatchManager类,并对该类进行相关分析,最后在本章对WatchManager类进行分析,该类主要服务于Zookeeper的数据库系统,并对观察者触发操作进行相关分析。