zookeeper source code analysis

ZookeeperServer类相关分析

本章将对ZookeeperServer类进行相关分析,分析内容包含成员变量、方法以及子类进行。

ZookeeperServer 分析

本节将对ZookeeperServer类进行分析,在Zookeeper中该类存在多个实现类具体类图如图所示。

image-20220519141349023

在Zookeeper项目中具体使用的是如下四个类。

  1. LeaderZooKeeperServer领导者服务。
  2. FollowerZooKeeperServer跟随者服务。
  3. ObserverZooKeeperServer观察者服务。
  4. ReadOnlyZooKeeperServer只读服务。

回到ZookeeperServer类本身的成员变量做相关介绍,了解这些成员变量对后续子类的分析有相当的帮助,成员变量信息见表。

变量名称变量类型变量说明
outstandingChangesDeque<ChangeRecord>没有完成的变化档案集合
outstandingChangesForPathHashMap<String, ChangeRecord>未完成的变换档案,key:path,value: 变化档案
hzxidAtomicLongzxid
requestsInProcessAtomicInteger请求处理中数量
serverStatsServerStats服务统计
listenerZooKeeperServerListenerZookeeper服务监听器
jmxServerBeanZooKeeperServerBeanJMX服务中用于存储zk服务对象的Bean
jmxDataTreeBeanDataTreeBeanJMX服务中用于存储数据树对象的Bean
minSessionTimeoutintsession最小过期时间
maxSessionTimeoutintsession最大过期时间
sessionTrackerSessionTrackersession跟踪器
firstProcessorRequestProcessor请求处理器
stateState服务状态
reconfigEnabledboolean是否启用重载配置
serverCnxnFactoryServerCnxnFactory服务链接工厂
secureServerCnxnFactoryServerCnxnFactory安全服务连接工厂
txnLogFactoryFileTxnSnapLog事务日志和快照日志
zkDbZKDatabasezk数据库
zkShutdownHandlerZooKeeperServerShutdownHandlerzk服务关闭处理器
createSessionTrackerServerIdint会话跟踪器ID

接下来先将方法作用先做基本概要说明,方法内容如下。

  1. 方法getSnapCount用于获取快照总量,数据来源是环境变量zookeeper.snapCount。该方法在同步请求处理器(SyncRequestProcessor)中有具体使用。
  2. 方法shouldAllowSaslFailedClientsConnect用于获取是否允许SASL认证失败的客户端连接,数据来源是环境变量zookeeper.allowSaslFailedClients。
  3. 方法shouldRequireClientSaslAuth用于获取客户端是否需要sasl验证,数据来源是环境变量zookeeper.sessionRequireClientSASLAuth。
  4. 方法loadData用于加载Zookeeper数据库。
  5. 方法takeSnapshot用于创建快照。
  6. 方法touch用于判断服务连接是否过期。
  7. 方法registerJMX用于注册JMX相关信息。
  8. 方法unregisterJMX用于取消注册JMX相关信息。
  9. 方法getNumAliveConnections用于获取活跃的连接数量,注意连接数量是serverCnxnFactory中的活跃数量和secureServerCnxnFactory中的活跃数量合计。
  10. 方法startdata用于启动数据库,启动是初始化zk数据库对象和加载数据(执行loadData方法)。
  11. 方法startup启动各项所需启动的类。
  12. 方法setupRequestProcessors用于创建请求处理器并将其启动。子类的实现会改变请求的处理流程。
  13. 方法createSessionTracker用于创建session跟踪器。
  14. 方法startSessionTracker用于启动session跟踪器。
  15. 方法canShutdown用于判断是否关闭。
  16. 方法isRunning用于判断是否运行中。
  17. 方法shutdown用于关闭Zookeeper服务。
  18. 方法createSession用于创建session id。
  19. 方法setOwner用于设置session id的所有者。
  20. 方法revalidateSession用于验证session。
  21. 方法reopenSession用于重新打开session。
  22. 方法finishSessionInit用于完成session创建。
  23. 方法closeSession用于关闭session。
  24. 方法getServerId用于获取服务id。
  25. 方法setLocalSessionFlag用于设置本地session标记。
  26. 方法submitRequest用于提交请求。
  27. 方法getGlobalOutstandingLimit用于获取最大请求数量限制,数据来源是环境变量zookeeper.globalOutstandingLimit。
  28. 方法getState用于获取服务类型,类型有单机(standalone)、跟随者(follower)、领导者(leader)、观察者(observer)和只读(read-only)。
  29. 方法processConnectRequest用于处理连接请求。
  30. 方法shouldThrottle用于判断是否需要截流
  31. 方法processPacket用于处理数据包。
  32. 方法hasCnxSASLAuthenticated用于判断服务连接中的Id是否使用SASL认证模式。
  33. 方法processSasl用于处理sasl请求。
  34. 方法processTxn用于处理事务请求。

在上述方法中有一部分已经在本章之前的章节中有过相关分析,故不做介绍,此外还有一些简单方法的存在逻辑操作并不复杂也不做介绍,抛去这些方法后需要分析的方法相对有限,接下来对processTxn方法进行分析,具体处理代码如下。

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
                                    Record txn) {
  // 事务处理结果
  ProcessTxnResult rc;
  // 获取操作编码
  int opCode = request != null ? request.type : hdr.getType();
  // 获取session id
  long sessionId = request != null ? request.sessionId : hdr.getClientId();
  // 如果事务头信息不为空交给数据库处理事务
  if (hdr != null) {
    rc = getZKDatabase().processTxn(hdr, txn);
  } else {
    // 创建事务处理结果对象
    rc = new ProcessTxnResult();
  }
  // 如果操作类型是创建session
  if (opCode == OpCode.createSession) {
    // 如果事务头信息不为空
    // 参数txn类型是CreateSessionTxn
    if (hdr != null && txn instanceof CreateSessionTxn) {
      // 添加全局session
      CreateSessionTxn cst = (CreateSessionTxn) txn;
      sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
    }
    // 请求对象不为空
    // 本地session
    else if (request != null && request.isLocalSession()) {
      // 添加session
      request.request.rewind();
      int timeout = request.request.getInt();
      request.request.rewind();
      sessionTracker.addSession(request.sessionId, timeout);
    }
    // 警告日志
    else {
      LOG.warn("*****>>>>> Got "
               + txn.getClass() + " "
               + txn.toString());
    }
  }
  // 如果操作类型是关闭session
  else if (opCode == OpCode.closeSession) {
    // 移除session id
    sessionTracker.removeSession(sessionId);
  }
  return rc;
}

在上述代码中核心处理流程如下。

  1. 获取操作编码
  2. 获取session id
  3. 如果事务头信息不为空交给数据库处理事务,反之则创建事务处理结果对象。注意此时创建的事务处理结果对象是一个无数据设置的对象。
  4. 如果操作类型是创建session,做两种不同操作。
    1. 事务头信息不为空并且参数txn类型是CreateSessionTxn需要将数据放入到全局session集合中。
    2. 请求对象不为空并且请求中的isLocalSession为真(表示这是一个本地session)则需要将数据放入到普通的session集合中。
  5. 如果操作类型是关闭session则需要移除session id相关数据。
  6. 返回事务处理结果对象。

接下来对startup方法进行分析,具体处理代码如下。

public synchronized void startup() {

    // session跟踪器为空的情况下创建
    if (sessionTracker == null) {
        createSessionTracker();
    }
    // 启动session跟踪器
    startSessionTracker();
    // 初始化请求处理器并启动
    setupRequestProcessors();

    // 注册JMX
    registerJMX();

    // 设置状态
    setState(State.RUNNING);
    // 发布通知
    notifyAll();
}

在上述代码中处理流程如下。

  1. 判断成员变量sessionTracker是否为空,如果是则需要初始化。
  2. 启动sessionTracker。
  3. 初始化请求处理器,并将其启动。创建的处理器是PrepRequestProcessor,它的处理器责任链是PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor。
  4. 注册JMX数据。
  5. 设置服务状态为运行中。

接下来将会对子类进行相关分析,对于Zookeeper项目中的ZookeeperServer实现类而言,最重要的方法是setupRequestProcessors,它决定了请求的处理链路,因此对于子类的分析将围绕setupRequestProcessors方法进行,着重分析请求处理器链路。

LeaderZooKeeperServer 分析

本节将对LeaderZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。

@Override
protected void setupRequestProcessors() {
  // 最终处理器
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  // 待应用处理器
  RequestProcessor toBeAppliedProcessor =
    new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
  // 提交处理器
  commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                                        Long.toString(getServerId()), false,
                                        getZooKeeperServerListener());
  commitProcessor.start();
  // 提案处理器
  ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                                                                            commitProcessor);
  // 提案处理器实例化
  proposalProcessor.initialize();
  // 前置处理器
  prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
  // 前置处理器启动
  prepRequestProcessor.start();
  // 领导者处理器
  firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
  // 配置容器
  setupContainerManager();
}

在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是LeaderRequestProcessor,它的处理器责任链是PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。

FollowerZooKeeperServer 分析

本节将对FollowerZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。

@Override
protected void setupRequestProcessors() {
  // 最终处理器
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  // 提交处理器
  commitProcessor = new CommitProcessor(finalProcessor,
                                        Long.toString(getServerId()), true, getZooKeeperServerListener());
  commitProcessor.start();
  // 跟随者处理器
  firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
  ((FollowerRequestProcessor) firstProcessor).start();
  // 同步请求处理器
  syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor((Learner)     getFollower()));
  syncProcessor.start();
}

在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是FollowerRequestProcessor,它的处理器责任链是FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor,此外还创建同步请求处理器(SyncRequestProcessor)并将其启动。

ObserverZooKeeperServer 分析

本节将对ObserverZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。

@Override
protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  commitProcessor = new CommitProcessor(finalProcessor,
                                        Long.toString(getServerId()), true,
                                        getZooKeeperServerListener());
  commitProcessor.start();
  firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
  ((ObserverRequestProcessor) firstProcessor).start();

  if (syncRequestProcessorEnabled) {
    syncProcessor = new SyncRequestProcessor(this, null);
    syncProcessor.start();
  }
}

在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是ObserverRequestProcessor,它的处理器责任链是ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor,此外还还需要判断是否创建同步请求处理器,如果需要则创建同步请求处理器(SyncRequestProcessor),创建完成后需要将其启动。

ReadOnlyZooKeeperServer 分析

本节将对ReadOnlyZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。

@Override
protected void setupRequestProcessors() {
  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
  RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
  ((PrepRequestProcessor) prepProcessor).start();
  firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
  ((ReadOnlyRequestProcessor) firstProcessor).start();
}

在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是ReadOnlyRequestProcessor,它的处理器责任链是ReadOnlyRequestProcessor -> PrepRequestProcessor -> FinalRequestProcessor。

总结

本章对Zookeeper项目中的ZooKeeperServer类,本章着重对ZooKeeperServer类的成员变量进行分析,以及对setupRequestProcessors方法进行详细说明,对子类的分析也着重强调setupRequestProcessors方法的分析。