ZookeeperServer类相关分析
本章将对ZookeeperServer类进行相关分析,分析内容包含成员变量、方法以及子类进行。
ZookeeperServer 分析
本节将对ZookeeperServer类进行分析,在Zookeeper中该类存在多个实现类具体类图如图所示。
在Zookeeper项目中具体使用的是如下四个类。
- LeaderZooKeeperServer领导者服务。
- FollowerZooKeeperServer跟随者服务。
- ObserverZooKeeperServer观察者服务。
- ReadOnlyZooKeeperServer只读服务。
回到ZookeeperServer类本身的成员变量做相关介绍,了解这些成员变量对后续子类的分析有相当的帮助,成员变量信息见表。
变量名称 | 变量类型 | 变量说明 |
---|---|---|
outstandingChanges | Deque<ChangeRecord> | 没有完成的变化档案集合 |
outstandingChangesForPath | HashMap<String, ChangeRecord> | 未完成的变换档案,key:path,value: 变化档案 |
hzxid | AtomicLong | zxid |
requestsInProcess | AtomicInteger | 请求处理中数量 |
serverStats | ServerStats | 服务统计 |
listener | ZooKeeperServerListener | Zookeeper服务监听器 |
jmxServerBean | ZooKeeperServerBean | JMX服务中用于存储zk服务对象的Bean |
jmxDataTreeBean | DataTreeBean | JMX服务中用于存储数据树对象的Bean |
minSessionTimeout | int | session最小过期时间 |
maxSessionTimeout | int | session最大过期时间 |
sessionTracker | SessionTracker | session跟踪器 |
firstProcessor | RequestProcessor | 请求处理器 |
state | State | 服务状态 |
reconfigEnabled | boolean | 是否启用重载配置 |
serverCnxnFactory | ServerCnxnFactory | 服务链接工厂 |
secureServerCnxnFactory | ServerCnxnFactory | 安全服务连接工厂 |
txnLogFactory | FileTxnSnapLog | 事务日志和快照日志 |
zkDb | ZKDatabase | zk数据库 |
zkShutdownHandler | ZooKeeperServerShutdownHandler | zk服务关闭处理器 |
createSessionTrackerServerId | int | 会话跟踪器ID |
接下来先将方法作用先做基本概要说明,方法内容如下。
- 方法getSnapCount用于获取快照总量,数据来源是环境变量zookeeper.snapCount。该方法在同步请求处理器(SyncRequestProcessor)中有具体使用。
- 方法shouldAllowSaslFailedClientsConnect用于获取是否允许SASL认证失败的客户端连接,数据来源是环境变量zookeeper.allowSaslFailedClients。
- 方法shouldRequireClientSaslAuth用于获取客户端是否需要sasl验证,数据来源是环境变量zookeeper.sessionRequireClientSASLAuth。
- 方法loadData用于加载Zookeeper数据库。
- 方法takeSnapshot用于创建快照。
- 方法touch用于判断服务连接是否过期。
- 方法registerJMX用于注册JMX相关信息。
- 方法unregisterJMX用于取消注册JMX相关信息。
- 方法getNumAliveConnections用于获取活跃的连接数量,注意连接数量是serverCnxnFactory中的活跃数量和secureServerCnxnFactory中的活跃数量合计。
- 方法startdata用于启动数据库,启动是初始化zk数据库对象和加载数据(执行loadData方法)。
- 方法startup启动各项所需启动的类。
- 方法setupRequestProcessors用于创建请求处理器并将其启动。子类的实现会改变请求的处理流程。
- 方法createSessionTracker用于创建session跟踪器。
- 方法startSessionTracker用于启动session跟踪器。
- 方法canShutdown用于判断是否关闭。
- 方法isRunning用于判断是否运行中。
- 方法shutdown用于关闭Zookeeper服务。
- 方法createSession用于创建session id。
- 方法setOwner用于设置session id的所有者。
- 方法revalidateSession用于验证session。
- 方法reopenSession用于重新打开session。
- 方法finishSessionInit用于完成session创建。
- 方法closeSession用于关闭session。
- 方法getServerId用于获取服务id。
- 方法setLocalSessionFlag用于设置本地session标记。
- 方法submitRequest用于提交请求。
- 方法getGlobalOutstandingLimit用于获取最大请求数量限制,数据来源是环境变量zookeeper.globalOutstandingLimit。
- 方法getState用于获取服务类型,类型有单机(standalone)、跟随者(follower)、领导者(leader)、观察者(observer)和只读(read-only)。
- 方法processConnectRequest用于处理连接请求。
- 方法shouldThrottle用于判断是否需要截流
- 方法processPacket用于处理数据包。
- 方法hasCnxSASLAuthenticated用于判断服务连接中的Id是否使用SASL认证模式。
- 方法processSasl用于处理sasl请求。
- 方法processTxn用于处理事务请求。
在上述方法中有一部分已经在本章之前的章节中有过相关分析,故不做介绍,此外还有一些简单方法的存在逻辑操作并不复杂也不做介绍,抛去这些方法后需要分析的方法相对有限,接下来对processTxn方法进行分析,具体处理代码如下。
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
Record txn) {
// 事务处理结果
ProcessTxnResult rc;
// 获取操作编码
int opCode = request != null ? request.type : hdr.getType();
// 获取session id
long sessionId = request != null ? request.sessionId : hdr.getClientId();
// 如果事务头信息不为空交给数据库处理事务
if (hdr != null) {
rc = getZKDatabase().processTxn(hdr, txn);
} else {
// 创建事务处理结果对象
rc = new ProcessTxnResult();
}
// 如果操作类型是创建session
if (opCode == OpCode.createSession) {
// 如果事务头信息不为空
// 参数txn类型是CreateSessionTxn
if (hdr != null && txn instanceof CreateSessionTxn) {
// 添加全局session
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
}
// 请求对象不为空
// 本地session
else if (request != null && request.isLocalSession()) {
// 添加session
request.request.rewind();
int timeout = request.request.getInt();
request.request.rewind();
sessionTracker.addSession(request.sessionId, timeout);
}
// 警告日志
else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
+ txn.toString());
}
}
// 如果操作类型是关闭session
else if (opCode == OpCode.closeSession) {
// 移除session id
sessionTracker.removeSession(sessionId);
}
return rc;
}
在上述代码中核心处理流程如下。
- 获取操作编码
- 获取session id
- 如果事务头信息不为空交给数据库处理事务,反之则创建事务处理结果对象。注意此时创建的事务处理结果对象是一个无数据设置的对象。
- 如果操作类型是创建session,做两种不同操作。
- 事务头信息不为空并且参数txn类型是CreateSessionTxn需要将数据放入到全局session集合中。
- 请求对象不为空并且请求中的isLocalSession为真(表示这是一个本地session)则需要将数据放入到普通的session集合中。
- 如果操作类型是关闭session则需要移除session id相关数据。
- 返回事务处理结果对象。
接下来对startup方法进行分析,具体处理代码如下。
public synchronized void startup() {
// session跟踪器为空的情况下创建
if (sessionTracker == null) {
createSessionTracker();
}
// 启动session跟踪器
startSessionTracker();
// 初始化请求处理器并启动
setupRequestProcessors();
// 注册JMX
registerJMX();
// 设置状态
setState(State.RUNNING);
// 发布通知
notifyAll();
}
在上述代码中处理流程如下。
- 判断成员变量sessionTracker是否为空,如果是则需要初始化。
- 启动sessionTracker。
- 初始化请求处理器,并将其启动。创建的处理器是PrepRequestProcessor,它的处理器责任链是PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor。
- 注册JMX数据。
- 设置服务状态为运行中。
接下来将会对子类进行相关分析,对于Zookeeper项目中的ZookeeperServer实现类而言,最重要的方法是setupRequestProcessors,它决定了请求的处理链路,因此对于子类的分析将围绕setupRequestProcessors方法进行,着重分析请求处理器链路。
LeaderZooKeeperServer 分析
本节将对LeaderZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。
@Override
protected void setupRequestProcessors() {
// 最终处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 待应用处理器
RequestProcessor toBeAppliedProcessor =
new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
// 提交处理器
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
// 提案处理器
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
// 提案处理器实例化
proposalProcessor.initialize();
// 前置处理器
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
// 前置处理器启动
prepRequestProcessor.start();
// 领导者处理器
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
// 配置容器
setupContainerManager();
}
在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是LeaderRequestProcessor,它的处理器责任链是PrepRequestProcessor -> ProposalRequestProcessor -> CommitProcessor -> Leader.ToBeAppliedRequestProcessor -> FinalRequestProcessor。
FollowerZooKeeperServer 分析
本节将对FollowerZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。
@Override
protected void setupRequestProcessors() {
// 最终处理器
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
// 提交处理器
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
// 跟随者处理器
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
// 同步请求处理器
syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor((Learner) getFollower()));
syncProcessor.start();
}
在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是FollowerRequestProcessor,它的处理器责任链是FollowerRequestProcessor -> CommitProcessor -> FinalRequestProcessor,此外还创建同步请求处理器(SyncRequestProcessor)并将其启动。
ObserverZooKeeperServer 分析
本节将对ObserverZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}
在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是ObserverRequestProcessor,它的处理器责任链是ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor,此外还还需要判断是否创建同步请求处理器,如果需要则创建同步请求处理器(SyncRequestProcessor),创建完成后需要将其启动。
ReadOnlyZooKeeperServer 分析
本节将对ReadOnlyZooKeeperServer类进行分析,对于该类的分析主要是对setupRequestProcessors方法的分析,具体处理代码如下。
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
((PrepRequestProcessor) prepProcessor).start();
firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
((ReadOnlyRequestProcessor) firstProcessor).start();
}
在这段代码中核心是完成成员变量firstProcessor的初始化,firstProcessor的具体类型是ReadOnlyRequestProcessor,它的处理器责任链是ReadOnlyRequestProcessor -> PrepRequestProcessor -> FinalRequestProcessor。
总结
本章对Zookeeper项目中的ZooKeeperServer类,本章着重对ZooKeeperServer类的成员变量进行分析,以及对setupRequestProcessors方法进行详细说明,对子类的分析也着重强调setupRequestProcessors方法的分析。