Zookeeper 状态同步
本章将对Zookeeper项目中状态同步相关内容进行分析,数据同步发生的时机是在选举结束以后。在Zookeeper中关于数据涉及Leader、Follower和Observer三个类,本章的分析核心也将围绕这三个类进行展开。
Leader
本节将对Leader类进行相关分析,首先从它的构造函数作为切入点,构造函数的使用在QuorumPeer#makeLeader方法中有提及,具体处理代码如下。
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.zkDb));
}
通过上述方法进入到Leader构造函数,具体构造函数代码如下。
Leader(QuorumPeer self, LeaderZooKeeperServer zk) throws IOException {
this.self = self;
this.proposalStats = new BufferStats();
try {
if (self.shouldUsePortUnification() || self.isSslQuorum()) {
boolean allowInsecureConnection = self.shouldUsePortUnification();
if (self.getQuorumListenOnAllIPs()) {
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection,
self.getQuorumAddress().getPort());
} else {
ss = new UnifiedServerSocket(self.getX509Util(), allowInsecureConnection);
}
} else {
if (self.getQuorumListenOnAllIPs()) {
ss = new ServerSocket(self.getQuorumAddress().getPort());
} else {
ss = new ServerSocket();
}
}
ss.setReuseAddress(true);
if (!self.getQuorumListenOnAllIPs()) {
ss.bind(self.getQuorumAddress());
}
} catch (BindException e) {
if (self.getQuorumListenOnAllIPs()) {
LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
} else {
LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
}
throw e;
}
this.zk = zk;
this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
}
在上述代码中核心是完成如下几个成员变量的初始化。
- 成员变量self,其核心是参选节点QuorumPeer。
- 成员变量proposalStats,它是用于进行数据统计的变量。
- 成员变量ss,它代表了socket连接。
- 成员变量zk,它表示zk服务,这里特指LeaderZooKeeperServer。
- 成员变量learnerSnapshotThrottler,用于限制从Leader往Follower或者Observer发送快照的数量。
接下来对核心方法lead进行分析,具体处理代码如下。
void lead() throws IOException, InterruptedException {
// 获取当前时间
self.end_fle = Time.currentElapsedTime();
// 计算选举消耗的时间:当前时间-选举开始时间
long electionTimeTaken = self.end_fle - self.start_fle;
// 设置选举消耗的时间
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
// 时间计数器归零
self.start_fle = 0;
self.end_fle = 0;
// 注册JMX
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick.set(0);
// 执行数据加载操作
zk.loadData();
// 创建状态信息,主要存储参选周期和最后处理的zxid
leaderStateSummary =
new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// 创建LearnerCnxAcceptor类用于等待Follower发送同步请求
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
// 计算最新选举周期
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
// 将任选周期转换为zxid并将其设置到zk中
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
synchronized (this) {
lastProposed = zk.getZxid();
}
// 创建NEWLEADER数据包
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}
QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();
QuorumVerifier curQV = self.getQuorumVerifier();
if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
try {
LOG.debug(String.format(
"set lastSeenQuorumVerifier to currentQuorumVerifier (%s)",
curQV.toString()));
QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV, true);
} catch (Exception e) {
throw new IOException(e);
}
}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier()
.getVersion()) {
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 等待 epoch 的ACK数据包
waitForEpochAck(self.getId(), leaderStateSummary);
// 设置当前选举周期
self.setCurrentEpoch(epoch);
try {
// 等待NEWLEADER的ACK数据包
waitForNewLeaderAck(self.getId(), zk.getZxid());
} catch (InterruptedException e) {
shutdown("Waiting for a quorum of followers, only synced with sids: [ "
+ newLeaderProposal.ackSetsToString() + " ]");
HashSet<Long> followerSet = new HashSet<Long>();
for (LearnerHandler f : getLearners()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
followerSet.add(f.getSid());
}
}
boolean initTicksShouldBeIncreased = true;
for (Proposal.QuorumVerifierAcksetPair qvAckset : newLeaderProposal.qvAcksetPairs) {
if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
initTicksShouldBeIncreased = false;
break;
}
}
if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. " +
"Perhaps the initTicks need to be increased.");
}
return;
}
// 启动zk服务,注意不是启动zk进程
startZkServer();
String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
if (initialZxid != null) {
long zxid = Long.parseLong(initialZxid);
zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
}
if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
self.setZooKeeperServer(zk);
}
self.adminServer.setZooKeeperServer(zk);
boolean tickSkip = true;
String shutdownMessage = null;
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
// 创建SyncedLearnerTracker对象,该对象用于确认是否有大多数同意
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self
.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getId());
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
if (!this.isRunning()) {
shutdownMessage = "Unexpected internal error";
break;
}
// 没有大多数认可,结束处理
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
shutdownMessage =
"Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString() + " ]";
break;
}
tickSkip = !tickSkip;
}
// 获取LearnerHandler然后发送ping
for (LearnerHandler f : getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
}
} finally {
zk.unregisterJMX(this);
}
}
在上述代码中核心处理流程如下。
- 获取当前时间,计算选举所消耗的时间并将这个数据赋值到QuorumPeer对象中,赋值完成后需要将时间计数器(开始时间start_fle和结束时间end_fle)清零。
- 调用ZooKeeperServer#loadData方法加载数据。在这个过程中会完成ZK数据库的加载以及快照的生产。
- 创建StateSummary对象将其赋值到成员变量leaderStateSummary中,在leaderStateSummary变量中记录了当前选举周期和最后处理的zxid。
- 创建LearnerCnxAcceptor类,该类是一个线程类,创建目的是为了等待Follower发送状态同步信息。
- 通过getEpochToPropose方法计算新的选举周期,注意该数据会赋值到成员变量epoch中。
- 将第(5)步中获得的选举中转换为zxid放入到zk对象中。
- 创建NEWLEADER类型的数据包,将其赋值到成员变量newLeaderProposal中。
- 对比QuorumPeer中的lastSeenQuorumVerifier和quorumVerifier版本号是否相同,如果相同则需要新建QuorumVerifier对象并且修正QuorumPeer中的lastSeenQuorumVerifier数据。
- 向提案(newLeaderProposal)中添加quorumVerifier对象,或者在lastSeenQuorumVerifier版本号大于quorumVerifier版本号的时候将lastSeenQuorumVerifier也加入到提案中。
- 执行waitForEpochAck方法等待大多数的选举参与者确认接收新的选举周期(变量epoch)。当大多数参选者认可选举周期后需要将选举周期设置到QuorumPeer对象中。
- 执行waitForNewLeaderAck方法等待大多数的选举参与者确认收到NEWLEADER数据包。
- 调用startZkServer方法将ZookeeperServer类启动。
- 执行死循环,在该死循环中主要与Follower进行心跳保持(向Follower发送PING数据包)。
接下来对第(4)步中所使用的LearnerCnxAcceptor类进行分析,该类是一个线程类,因此主要分析run方法,具体代码如下。
@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
// 接收socket连接
s = ss.accept();
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
// 创建LearnerHandler对象并启动
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
} catch (SocketException e) {
error = true;
if (stop) {
LOG.info("exception while shutting down acceptor: "
+ e);
stop = true;
}
else {
throw e;
}
} catch (SaslException e) {
LOG.error("Exception while connecting to quorum learner", e);
error = true;
} catch (Exception e) {
error = true;
throw e;
} finally {
// Don't leak sockets on errors
if (error && s != null && !s.isClosed()) {
try {
s.close();
} catch (IOException e) {
LOG.warn("Error closing socket", e);
}
}
}
}
} catch (Exception e) {
LOG.warn("Exception while accepting follower", e.getMessage());
handleException(this.getName(), e);
}
}
在上述代码中核心处理流程。
- 接受socket连接。
- 将socket连接中的输入流提取,配合参数创建LearnerHandler类。
- 启动LearnerHandler类。
接下来将对getDesignatedLeader方法进行分析,该方法用于确认领导者对应的服务id(myid),具体使用场景有两个。
- 在startZkServer方法执行期间如果开启了重载配置则需要通过该方法确认领导者id然后重新加载配置。
- 在tryToCommit方法中对于请求类型是reconfig的情况下需要确认领导者。
下面是getDesignatedLeader方法的具体处理代码。
private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
// 获取最后一个QuorumVerifierAcksetPair对象
Proposal.QuorumVerifierAcksetPair newQVAcksetPair =
reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size() - 1);
// 判断自己是否在newQVAcksetPair中,如果在我自己就是leader
if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) &&
newQVAcksetPair.getQuorumVerifier().getVotingMembers()
.get(self.getId()).addr.equals(self.getQuorumAddress())) {
return self.getId();
}
// 候选人集合
HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
// 从候选人集合中移除自己
candidates.remove(self.getId());
// 当前需要处理的候选人
long curCandidate = candidates.iterator().next();
long curZxid = zxid + 1;
Proposal p = outstandingProposals.get(curZxid);
// 循环处理,处理目标是获取最多投票数量的候选人
while (p != null && !candidates.isEmpty()) {
for (Proposal.QuorumVerifierAcksetPair qvAckset : p.qvAcksetPairs) {
candidates.retainAll(qvAckset.getAckset());
if (candidates.isEmpty()) {
return curCandidate;
}
curCandidate = candidates.iterator().next();
if (candidates.size() == 1) {
return curCandidate;
}
}
curZxid++;
p = outstandingProposals.get(curZxid);
}
return curCandidate;
}
在上述代码中核心处理流程如下。
- 从参数提案中获取最后一个QuorumVerifierAcksetPair对象。
- 判断自身服务id是否在QuorumVerifierAcksetPair对象中,如果在则说明自己就是leader。
- 从QuorumVerifierAcksetPair对象中获取候选人集合(收到ack的服务id)。
- 由于在第(2)步中并未成为leader因此需要在候选人集合中移除。
- 通过循环确认领导者id,循环中确认领导者的条件有两种。
- 提案中的a ck服务id和变量candidates进行交集运算交集为空时可以确认领导者。
- 提案中的ack服务id和变量candidates进行交集运算交集数量为1时可以确认领导者。
LearnerHandler 分析
本节将对LearnerHandler类进行分析,对于这个类可以将其理解为学习处理程序或者是数据同步器,该类是一个线程类,因此主要分析run方法,run方法是一个相对内容较多的方法对于它的分析可以按照五步分进行。
第一部分:确认基础数据操作,基础数据包含如下内容。
- 变量tickOfNextAckDeadline,该变量表示下一个ack收到的截止时间。
- 变量ia,该变量表示输入档案。
- 变量bufferedOutput,该变量用于承载需要写出的字节流。
- 变量oa,该变量表示输出档案
- 变量sid,该变量表示服务id(myid)。
- 变量version,该变量表示协议版本。
- 变量learnerType,该变量表示学习者类型,类型分为两种,一种是具备投票能力的学习者(PARTICIPANT),另一种是不具备投票能力的学习者(OBSERVER)。
第一部分相关操作代码如下。
// 第一部分确认基础数据
// 向leader对象中添加学习处理程序
leader.addLearnerHandler(this);
// 计算截止时间
tickOfNextAckDeadline = leader.self.tick.get()
+ leader.self.initLimit + leader.self.syncLimit;
// 将bufferedInput对象做出类型转换
ia = BinaryInputArchive.getArchive(bufferedInput);
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
// 创建数据包
QuorumPacket qp = new QuorumPacket();
// 从变量ia中将数据包相关信息读取,反序列化到qp变量
ia.readRecord(qp, "packet");
// 如果数据包类型不是FOLLOWERINFO也不是OBSERVERINFO结束处理
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
// 读取数据,数据对象是LearnerInfo,数据是在org.apache.zookeeper.server.quorum.Learner.registerWithLeader中创建的
byte learnerInfoData[] = qp.getData();
// 从learnerInfoData中读取sid、version和configVersion
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // 协议版本号
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException(
"Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
// sid等于Follower类型的节点数量减一
this.sid = leader.followerCounter.getAndDecrement();
}
// 日志记录
if (leader.self.getView().containsKey(this.sid)) {
LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.getView().get(this.sid).toString());
} else {
LOG.info("Follower sid: " + this.sid + " not in the current config "
+ Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
}
// 如果数据包类型是OBSERVERINFO,修改成员变量learnerType
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
}
在上述代码需要注意qp.getData代码所得到的数据类型是LearnerInfo,以及这条数据的产生来源是Learner#registerWithLeader方法。
第二部分:不同协议版本号之间的操作,具体处理代码如下。
// 第二部分不同协议版本号之间的操作
// 从数据包中根据zxid获取最后接受的epoch
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
// 从数据包中获取zxid
long zxid = qp.getZxid();
// 构建新的选举周期
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
// 构建新的zxid
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// 从zxid
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
// 等待epoch数据包的ack数量过半,ACK数据包从org.apache.zookeeper.server.quorum.Learner#registerWithLeader方法中来
leader.waitForEpochAck(this.getSid(), ss);
}
// 主流版本都将执行这个代码
else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
// 构建 LEADERINFO 数据包并将其发送给其他节点
QuorumPacket newEpochPacket =
new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
在上述代码中核心处理流程如下。
- 从数据包中获取zxid,然后将这个zxid转换为选举周期,该选举周期称之为最后接受的选举周期。
- 从数据包中获取zxid。
- 根据当前服务id(myId)和最后接受的选举周期计算新的选举周期。
- 将第(3)步中的选举周期转换为zxid。
- 根据版做出不同操作,如果版本小于0x10000则需要等待构建StateSummary对象然后开始等待epoch数据包被大多数接收,反之则需要构建LEADERINFO数据包将其发送出去并等待半数ACK
第三部分:是否需要进行全量同步操作,具体操作代码如下。
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
// 需要全量同步的操作
if (needSnap) {
boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
LearnerSnapshot snapshot =
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
try {
long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
// 发送全量同步数据包
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
bufferedOutput.flush();
LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
+ "send zxid of db as 0x{}, {} concurrent snapshots, "
+ "snapshot was {} from throttle",
Long.toHexString(peerLastZxid),
Long.toHexString(leaderLastZxid),
Long.toHexString(zxidToSend),
snapshot.getConcurrentSnapshotNumber(),
snapshot.isEssential() ? "exempt" : "not exempt");
// Dump data to peer
// 将zk数据库的数据序列化到oa变量写出
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
bufferedOutput.flush();
} finally {
snapshot.close();
}
}
在上述代码首先会通过syncFollower方法判断是否需要进行全量同步,如果不需要则进行后续操作,如果需要则从zk数据库中获取最后处理的zxid,并将这个zxid组装为SNAP类型的数据表放入到变量oa中,同时还需要将zk序列化到oa变量,此时oa就具备整个zk数据库。
在第三部分操作中最关键的方法是syncFollower,它用于判断是否需要进行全量同步,具体处理代码如下。
public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
long currentZxid = peerLastZxid;
boolean needSnap = true;
// 是否开启事务同步日志
boolean txnLogSyncEnabled = db.isTxnLogSyncEnabled();
ReentrantReadWriteLock lock = db.getLogLock();
ReadLock rl = lock.readLock();
try {
rl.lock();
// 最大事务id
long maxCommittedLog = db.getmaxCommittedLog();
// 最小事务id
long minCommittedLog = db.getminCommittedLog();
// 选举周期 << 32
long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
+ " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ " peerLastZxid=0x{}", getSid(),
Long.toHexString(maxCommittedLog),
Long.toHexString(minCommittedLog),
Long.toHexString(lastProcessedZxid),
Long.toHexString(peerLastZxid));
// 数据库提交日志为空
if (db.getCommittedLog().isEmpty()) {
minCommittedLog = lastProcessedZxid;
maxCommittedLog = lastProcessedZxid;
}
if (forceSnapSync) {
LOG.warn("Forcing snapshot sync - should not see this in production");
}
// 相等表示Follower没有差异量发送空的DIFF数据
else if (lastProcessedZxid == peerLastZxid) {
LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, peerLastZxid);
needOpPacket = false;
// 不需要全量同步
needSnap = false;
}
// Follower 数据比Leader新发送TRUNC
else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
Long.toHexString(maxCommittedLog) +
" for peer sid:" + getSid());
queueOpPacket(Leader.TRUNC, maxCommittedLog);
currentZxid = maxCommittedLog;
needOpPacket = false;
needSnap = false;
}
// Follower 数据在Leader数据范围内
else if ((maxCommittedLog >= peerLastZxid)
&& (minCommittedLog <= peerLastZxid)) {
LOG.info("Using committedLog for peer sid: " + getSid());
Iterator<Proposal> itr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(itr, peerLastZxid,
null, maxCommittedLog);
needSnap = false;
}
//
else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
// 计算事务日志的大小限制
long sizeLimit = db.calculateTxnLogSizeLimit();
// 读取一定量的数据
Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
peerLastZxid, sizeLimit);
//
if (txnLogItr.hasNext()) {
LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
// 将数据放入queuedPackets队列
currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
minCommittedLog, maxCommittedLog);
LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
null, maxCommittedLog);
needSnap = false;
}
// 关闭 txnLogItr 资源
if (txnLogItr instanceof TxnLogProposalIterator) {
TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
txnProposalItr.close();
}
} else {
LOG.warn("Unhandled scenario for peer sid: " + getSid());
}
LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
" for peer sid: " + getSid());
// 通知领导者同步完成
leaderLastZxid = leader.startForwarding(this, currentZxid);
} finally {
rl.unlock();
}
if (needOpPacket && !needSnap) {
LOG.error("Unhandled scenario for peer sid: " + getSid() +
" fall back to use snapshot");
needSnap = true;
}
return needSnap;
}
在上述方法中首先需要对参数peerLastZxid进行说明,该参数可以理解为在领导者产生是的zxid,数据来源是StateSummary对象,数据包是ackEpochPacket。方法syncFollower处理流程如下。
- 从zk数据库中读取最大事务id,最小事务id以及最后处理的zxid(最后处理的zxid在选举后计算规则是选举周期<<32)。
- 如果lastProcessedZxid和peerLastZxid相等则表示没有差异数据,因此不需要进行全量同步。
- 如果peerLastZxid大于maxCommittedLog则表示Follower数据比Leader新因此需要截断数据而不是全量同步。
- 如果maxCommittedLog大于等于peerLastZxid并且minCommittedLog小于等于peerLastZxid表示Follower数据在Leader数据范围内,此时需要将提案数据从zk数据库中获取然后放入发送队列进行发送。此时也不需要进行全量同步,只需要部分同步即可。
- 如果peerLastZxid小于minCommittedLog此时需要从zk数据库中从peerLastZxid开始向后读取sizeLimit数量的数据,如果读取的数据为空则表示需要进行全量同步,反之则不需要。
- 通知领导者同步完成。
- 返回是否需要全量同步的结果。
第四部分:NEWLEADER数据包和UPTODATE数据包传输操作,具体处理代码如下。
// 创建NEWLEADER数据包,根据不同版本按照不同方式写出
if (getVersion() < 0x10000) {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
}
else {
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,newLeaderZxid,leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
// 启动数据包发送线程
startSendingPackets();
// 创建一个空的数据包
qp = new QuorumPacket();
// 从变量ia中将数据包相关信息读取,反序列化到qp变量
ia.readRecord(qp, "packet");
// 数据包类型不是ACK结束处理
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK,"
+ " but received packet: {}", packetToString(qp));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Received NEWLEADER-ACK message from " + sid);
}
leader.waitForNewLeaderAck(getSid(), qp.getZxid());
syncLimitCheck.start();
sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
synchronized (leader.zk) {
while (!leader.zk.isRunning() && !this.isInterrupted()) {
leader.zk.wait(20);
}
}
LOG.debug("Sending UPTODATE message to " + sid);
// 添加UPTODATE数据包
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
在上述代码中核心操作流程如下。
- 根据版本是否小于0x10000,如果是则创建NEWLEADER数据包,将其放入到oa变量中;反之则创建NEWLEADER数据包,将其放入到queuedPackets队列中。两个NEWLEADER数据包的差异是数据信息。
- 通过startSendingPackets方法启动发送线程,发送线程的核心是调用sendPackets方法,从queuedPackets队列中获取数据包将其放入变量oa中。
- 创建一个空的数据包,从变量ia中将数据包信息读取到这个空的数据包中,如果数据库表类型不是ACK则结束处理。
- 通过waitForNewLeaderAck方法等待半数参选人员发送ACK。
- 创建UPTODATE数据包放入到queuedPackets队列中等待发送线程发送。
第五部分:根据不同类型的数据包进行不同操作,具体操作代码如下。
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
}
tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;
ByteBuffer bb;
long sessionId;
int cxid;
int type;
// 不同的数据包类型做出不同的行为操作
switch (qp.getType()) {
// 类型是ACK
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ACK from Observer " + this.sid);
}
}
syncLimitCheck.updateAck(qp.getZxid());
// 处理ack数据包
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
break;
// 类型是PING
case Leader.PING:
ByteArrayInputStream bis = new ByteArrayInputStream(qp
.getData());
DataInputStream dis = new DataInputStream(bis);
while (dis.available() > 0) {
long sess = dis.readLong();
int to = dis.readInt();
leader.zk.touch(sess, to);
}
break;
// 类型是REVALIDATE
case Leader.REVALIDATE:
bis = new ByteArrayInputStream(qp.getData());
dis = new DataInputStream(bis);
long id = dis.readLong();
int to = dis.readInt();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
if (valid) {
try {
// 设置所有者,标记leader是谁
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) +
" expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(id)
+ " is valid: " + valid);
}
dos.writeBoolean(valid);
qp.setData(bos.toByteArray());
// 加入到queuedPackets集合
queuedPackets.add(qp);
break;
// 类型是REQUEST
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if (type == OpCode.sync) {
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb,
qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
// 提交请求
leader.zk.submitLearnerRequest(si);
break;
default:
LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
break;
}
}
在上述代码中前置操作是从变量ia中读取数据包,在读取完成数据包后根据不同的数据包类型做出不同操作,具体操作如下。
- 如果是ACK类型,需要更新syncLimitCheck中zxid和事件相关数据,并且通过leader对象的processAck方法处理ACK信息。处理的本质是创建一个数据包将数据包发送给socket对应的服务(Follower节点)。
- 如果类型是PING,需要将数据包中的数据信息转换为数据输入流对象(DataInputStream),然后将数据输入流对象中sessionid和超时时间交给会话跟踪器(SessionTracker)进行处理。处理本质是更新session对应的超时时间。
- 如果类型是REVALIDATE,从数据包中提取sessionid和超时时间通过会话跟踪器确认是否有效,如果有效则将会话id和当前学习处理器(LearnerHandler)绑定,并发送数据包此时数据包中会携带sessionId。
- 如果类型是REQUEST,从数据包中获取sessionid、cxid、type和bb数据将其转换为请求对象(Request)然后通过submitLearnerRequest方法提交请求,提交请求后将会进入请求处理责任链。
Follower 分析
本节将对Follower类进行相关分析,首先从它的构造函数作为切入点,构造函数的使用在QuorumPeer#makeFollower方法中有提及,具体处理代码如下。
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.zkDb));
}
通过上述方法进入到Follower构造函数,具体构造函数代码如下。
Follower(QuorumPeer self, FollowerZooKeeperServer zk) {
this.self = self;
this.zk = zk;
this.fzk = zk;
}
在上述代码中初始化成员变量
- 成员变量self,其核心是参选节点QuorumPeer。
- 成员变量zk,它表示zk服务,在这里特指FollowerZooKeeperServer。
- 成员变量fzk,它表示zk服务,在这里特指FollowerZooKeeperServer。
接下来对核心方法followLeader进行分析,具体处理代码如下。
void followLeader() throws InterruptedException {
// 获取当前时间
self.end_fle = Time.currentElapsedTime();
// 计算选举耗时
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
// 寻找leader服务
QuorumServer leaderServer = findLeader();
try {
// 与leader服务建立连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 将当前节点注册到leader,得到新选举周期对应的zxid
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
// 如果当前节点发生了状态改变抛出异常
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
// 将zxid转换为选举周期
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
// 如果选举周期小于当前节点认可的周期抛出异常
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(
self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
// 从leader同步
syncWithLeader(newEpochZxid);
// 创建数据包
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 读取数据包
readPacket(qp);
// 处理数据包
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner) this);
}
}
在上述代码中核心处理流程如下。
- 通过findLeader方法寻找leader服务。
- 通过connectToLeader方法与leader服务建立连接。
- 通过registerWithLeader方法将当前节点注册到leader服务,方法registerWithLeader会返回一个zxid,在这个zxid中会包含最新的选举周期。
- 判断zxid中的选举周期是否小于当前节点认可的选举周期,如果小于则需要抛出异常。
- 通过syncWithLeader方法与leader进行同步。
- 进入死循环阶段,死循环内重复执行如下操作。
- 读取数据包。
- 处理数据包。
接下来需要对这六个核心步骤中所涉及的方法进行分析,首先从findLeader方法按开始,它用于寻找leader服务,具体处理代码如下。
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
// 获取选票
Vote current = self.getCurrentVote();
// 循环参选节点
for (QuorumServer s : self.getView().values()) {
// 如果参选节点的id和选票中的id对应,那么他就是leader
if (s.id == current.getId()) {
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
在上述代码中核心处理流程如下。
- 从前节点获取选票(该选票中包含了leader节点对应的id)。
- 循环对比参选节点的id是否和第(1)步中选票包含的leaderid对应,如果是这个节点就是leader节点。
方法connectToLeader中主要涉及的是socket操作不做相关分析,接下来对registerWithLeader方法进行分析,具体处理代码如下。
protected long registerWithLeader(int pktType) throws IOException {
// 获取最后操作的zxid
long lastLoggedZxid = self.getLastLoggedZxid();
// 创建数据包
QuorumPacket qp = new QuorumPacket();
// 设置数据包类型
qp.setType(pktType);
// 设置zxid
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
// 创建LearnerInfo对象
LearnerInfo li =
new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
// 写出数据包,FOLLOWERINFO或者OBSERVERINFO
writePacket(qp, true);
// 读取数据包
readPacket(qp);
// 从数据包中获取新的选举周期
final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
// 如果数据包类型是LEADERINFO
if (qp.getType() == Leader.LEADERINFO) {
// 读取leader中的协议版本
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
// 如果leader中的周期大于当前节点认可的周期
if (newEpoch > self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
// 设置当前节点认可的周期为leader中的中期
self.setAcceptedEpoch(newEpoch);
}
// 如果相同
else if (newEpoch == self.getAcceptedEpoch()) {
wrappedEpochBytes.putInt(-1);
}
// 如果大于抛出异常
else {
throw new IOException(
"Leaders epoch, " + newEpoch + " is less than accepted epoch, "
+ self.getAcceptedEpoch());
}
// 发送ACKEPOCH数据包
QuorumPacket ackNewEpoch =
new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
// 写出数据包
writePacket(ackNewEpoch, true);
// 将leader中的周期计算为zxid
return ZxidUtils.makeZxid(newEpoch, 0);
}
// 其他类型的数据包
else {
// leader中的周期大于当前节点认可的周期覆盖当前节点的任选周期
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
// 如果数据包类型不是NEWLEADER抛出异常
if (qp.getType() != Leader.NEWLEADER) {
LOG.error("First packet should have been NEWLEADER");
throw new IOException("First packet should have been NEWLEADER");
}
// 返回数据包中的zxid
return qp.getZxid();
}
}
在上述代码中核心处理流程如下。
- 从当前节点获取最后操作的zxid。
- 创建数据包。
- 为数据包设置数据包类型,在Follower中设置的数据包类型是FOLLOWERINFO,在Observer中设置的数据包类型是OBSERVERINFO。
- 为数据包设置zxid,zxid数据是由当前节点认可的任选周期转换而来。
- 创建LearnerInfo对象,将其数据内容放入到数据保证然后通过writePacket方法写出。
- 通过readPacket方法读取数据包,此时读取到的数据包类型可能是LeaderInfo,如果是LEADERINFO类型那么该数据包中含有新的任选周期。
- 从数据包中读取新的选举周期(这个数据包是第(6)步中得到的数据包)。
- 如果数据包类型是LEADERINFO,则进行如下操作。
- 如果新的选举周期大于当前节点认可的周期,将当前节点中认可的周期设置为新的选举周期。
- 如果新的选举周期等于当前节点认可的周期,不做操作。
- 如果新的选举周期小于当前节点认可的周期,抛出异常。
- 如果数据包类型不是LEADERINFO,则进行如下操作。
- 新的周期大于当前节点认可的周期覆盖当前节点的任选周期。、
- 如果数据包类型不是NEWLEADER抛出异常。
- 返回数据包中的zxid。
接下来对syncWithLeader方法进行分析,该方法用于从Leader中同步数据,具体处理代码如下。
protected void syncWithLeader(long newLeaderZxid) throws Exception {
// 创建ACK数据包
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
// 创建数据包
QuorumPacket qp = new QuorumPacket();
// 从参数zxid中获取新的选举周期
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
QuorumVerifier newLeaderQV = null;
// 是否全量同步标志
boolean snapshotNeeded = true;
// 读取数据包
readPacket(qp);
// 已经提交的数据包,存储的是zxid
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
// 未提交的数据包
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
// 数据包中表示差异化同步
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
snapshotNeeded = false;
}
// 数据包中表示全量同步
else if (qp.getType() == Leader.SNAP) {
LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
// 从输入流将档案反序列化到zk数据库
zk.getZKDatabase().deserializeSnapshot(leaderIs);
// 重载配置未启用的情况下需要初始化配置节点
if (!self.isReconfigEnabled()) {
LOG.debug(
"Reset config node content from local config after deserialization of snapshot.");
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
}
// 读取signature信息
String signature = leaderIs.readString("signature");
// 如果signature信息不是BenWasHere抛出异常
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
throw new IOException("Missing signature");
}
// 设置最后处理的zxid
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
// 数据包中表示回滚同步
else if (qp.getType() == Leader.TRUNC) {
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
// 截断日志
boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log "
+ Long.toHexString(qp.getZxid()));
System.exit(13);
}
// 设置最后处理的zxid
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
}
// 其他情况进行退出
else {
LOG.error("Got unexpected packet from leader: {}, exiting ... ",
LearnerHandler.packetToString(qp));
System.exit(13);
}
// 初始化配置节点
zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
// 创建会话跟踪器
zk.createSessionTracker();
long lastQueued = 0;
// 是否是zab1.0之前的标记
boolean isPreZAB1_0 = true;
// 是否写入事务日志
boolean writeToTxnLog = !snapshotNeeded;
outerLoop:
// 循环
while (self.isRunning()) {
// 读取数据包
readPacket(qp);
// 根据数据包做不同操作
switch (qp.getType()) {
case Leader.PROPOSAL:
// 创建飞行数据包
// 将数据包转换为PacketInFlight对象加入到packetsNotCommitted集合
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
if (pif.hdr.getType() == OpCode.reconfig) {
SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
QuorumVerifier qv =
self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
// 从packetsNotCommitted中获取第一个数据
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid()
&& qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(
new String(((SetDataTxn) pif.rec).getData()));
//重载配置,如果重载成功抛出异常
boolean majorChange = self.processReconfig(qv,
ByteBuffer.wrap(qp.getData()).getLong(),
qp.getZxid(), true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
// 如果不需要写入事务日志
if (!writeToTxnLog) {
// 飞行数据包中的zxid和数据包中的zxid不相同记录日志,反之则需要进行事务处理,并将飞行数据包从packetsNotCommitted集合中移除
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is "
+ pif.hdr.getZxid());
}
else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
}
else {
// 将数据包中的zxid添加到packetsCommitted集合
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.INFORM:
case Leader.INFORMANDACTIVATE:
// 创建飞行数据包
PacketInFlight packet = new PacketInFlight();
// 赋值头信息
packet.hdr = new TxnHeader();
// 数据包类型是INFORMANDACTIVATE
if (qp.getType() == Leader.INFORMANDACTIVATE) {
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
byte[] remainingdata = new byte[buffer.remaining()];
buffer.get(remainingdata);
packet.rec = SerializeUtils.deserializeTxn(remainingdata, packet.hdr);
QuorumVerifier qv = self.configFromString(
new String(((SetDataTxn) packet.rec).getData()));
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
//重载配置,如果重载成功抛出异常
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
else {
packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
if (packet.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(packet.hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = packet.hdr.getZxid();
}
if (!writeToTxnLog) {
zk.processTxn(packet.hdr, packet.rec);
}
else {
packetsNotCommitted.add(packet);
packetsCommitted.add(qp.getZxid());
}
break;
case Leader.UPTODATE:
LOG.info("Learner received UPTODATE message");
if (newLeaderQV != null) {
//重载配置,如果重载成功抛出异常
boolean majorChange =
self.processReconfig(newLeaderQV, null, null, true);
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
}
// 如果是zab1.0之前的协议
if (isPreZAB1_0) {
// 创建快照
zk.takeSnapshot();
// 设置当前选举周期为新的选举周期
self.setCurrentEpoch(newEpoch);
}
self.setZooKeeperServer(zk);
self.adminServer.setZooKeeperServer(zk);
// 回到循环开始
break outerLoop;
case Leader.NEWLEADER:
LOG.info("Learner received NEWLEADER message");
// 数据包中的数据不为空并且数据长度大于0 重新设置QuorumVerifier对象
if (qp.getData() != null && qp.getData().length > 1) {
try {
QuorumVerifier qv = self.configFromString(new String(qp.getData()));
self.setLastSeenQuorumVerifier(qv, true);
newLeaderQV = qv;
} catch (Exception e) {
e.printStackTrace();
}
}
// 如果需要全量同步
if (snapshotNeeded) {
// 创建快照
zk.takeSnapshot();
}
// 设置当前选举周期为新的选举周期
self.setCurrentEpoch(newEpoch);
// 设置是否需要写入事务日志为真
writeToTxnLog = true;
// 设置是否为ZAB1.0之前的版本为假
isPreZAB1_0 = false;
// 写出ACK数据包
writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
break;
}
}
}
// 在ACK数据包中设置zxid
ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
// 写出ACK数据包
writePacket(ack, true);
// 设置超时时间
sock.setSoTimeout(self.tickTime * self.syncLimit);
// zk服务启动
zk.startup();
self.updateElectionVote(newEpoch);
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
// 未提交的数据包进行处理
for (PacketInFlight p : packetsNotCommitted) {
fzk.logRequest(p.hdr, p.rec);
}
// 对已提交的数据包集合进行提交操作
for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
}
}
else if (zk instanceof ObserverZooKeeperServer) {
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
// 对未提交的数据包进行处理
for (PacketInFlight p : packetsNotCommitted) {
// 获取已提交数据包中的第一个zxid
Long zxid = packetsCommitted.peekFirst();
// 未处理数据包中的zxid不等于第一个zxid则跳过处理
if (p.hdr.getZxid() != zxid) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ ", but next proposal is "
+ Long.toHexString(p.hdr.getZxid()));
continue;
}
// 从packetsCommitted中移除第一个数据
packetsCommitted.remove();
// 创建请求对象
Request request = new Request(null, p.hdr.getClientId(),
p.hdr.getCxid(), p.hdr.getType(), null, null);
request.setTxn(p.rec);
request.setHdr(p.hdr);
// 对请求对象进行处理
ozk.commitRequest(request);
}
}
else {
throw new UnsupportedOperationException("Unknown server type");
}
}
在上述代码中核心处理流程如下。
- 创建ACK数据包,该数据包需要在一定情况下发送。
- 创建数据包,该数据包用于接收数据。
- 从参数zxid中获取新的任选周期,记做变量newEpoch。
- 创建是否全量同步标志,记做变量snapshotNeeded,默认值为真,表示需要全量同步。
- 读取数据包。
- 创建已经提交的数据包集合,存储变量是packetsCommitted,其中存放的是zxid。
- 创建未提交的数据包,存储变量是packetsNotCommitted,其中存储的是飞行数据包。
- 根据第(5)步中读取的数据包类型做出不同操作。
- 如果数据包类型是DIFF,则将变量snapshotNeeded设置为false。
- 如果数据包类型是SNAP,将会进行如下操作。
- 将输入流中的数据反序列化到当前节点的zk数据库中。
- 判断当前节点的reconfigEnabled属性是否为假,如果是则需要进行zk数据库中配置节点相关信息的重载。
- 从输入流中获取signature数据,如果该数据不是BenWasHere将抛出异常。
- 设置zk数据库中最后操作的zxid为数据包中的zxid。
- 如果数据包类型是TRUNC,则需要进行如下操作。
- 将zk数据库进行截断操作,截断参数是数据包中的zxid。
- 如果截断失败则需要退出程序,退出码是13。
- 如果截断成功则设置zk数据库中最后操作的zxid为数据包中的zxid。
- 其他情况进行退出,退出码是13。
- 初始化zk数据库中的配置节点。
- 创建会话跟踪器。
- 创建是否是ZAB1.0之前的版本标记,默认值为true。
- 创建是否写入事务日志标记,初始值为取反是否需要全量同步标记。
- 在循环中处理数据包,处理细节是根据不同的数据包类型进行分类处理。
- 如果数据包类型是PROPOSAL,需要创建飞行数据包,飞行数据包中的数据从数据包中获取,创建完成后需要将飞行数据包放入到packetsNotCommitted集合中
- 如果数据包类型是COMMIT或者COMMITANDACTIVATE,从packetsNotCommitted集合获取第一个飞行数据包,判断这个飞行数据包中的zxid和数据包中的zxid是否相同,如果是并且数据包类型是COMMITANDACTIVATE则需要进重载配置,如果重载成功则抛出异常。判断是否需要写入事务日志,如果不需要,则将飞行数据包中的zxid和数据包中的zxid进行比较,如果不相同记录日志,反之则需要进行事务处理,并将飞行数据包从packetsNotCommitted集合中移除;如果需要则需要将数据包中的zxid放入到packetsCommitted集合中。
- 如果数据包类型是INFORM或者INFORMANDACTIVATE,需要创建飞行数据包,确认是否需要写入事务日志,如果不需要则交给zk服务进行事务处理,反之则需要将数据放入到packetsNotCommitted和packetsCommitted中。
- 如果数据包类型是UPTODATE,判断是否是ZAB1.0之前的协议,如果是则需要创建快照并且将新的选举周期(newEpoch)设置到当前节点。
- 如果数据包类型是NEWLEAD,判断数据包中的数据是否存在,如果存在则需要重新设置当前节点的lastSeenQuorumVerifier属性。判断是否需要进行全量同步,如果需要则需要创建快照。执行完成上述操作后需要将新的任选周期设置到当前节点,设置是否需要写入事务日志标记为需要,设置是否是ZAB1.0之前的版本标记为假,写出ACK数据包。
- 设置第(1)步中ACK数据包的zxid,在这个zxid中包含新的选举周期(newEpoch)。
- 写出ACK数据包。
- zk服务启动。
- 更新当前节点的选举周期。
- 根据不同的zk类型做出不同操作。
- 如果zk类型是FollowerZooKeeperServer,循环packetsNotCommitted集合进行数据处理,数据处理的本质是请求处理,第一个请求处理器是SyncRequestProcessor。循环packetsCommitted集合进行zxid的提交操作,提交操作的本质是请求处理,第一个请求处理器是CommitProcessor。
- 如果zk类型是ObserverZooKeeperServer,循环packetsNotCommitted集合,判断单个飞行数据包的zxid和packetsCommitted集合中第一个zxid是否相同,如果不相同则跳过处理,反之则需要将packetsCommitted集合中的第一个数据先进行移除,然后创建请求对象交个ObserverZooKeeperServer进行数据处理,数据处理的本质是请求处理,第一个请求处理器是CommitProcessor。
最后对processPacket方法进行分析,该方法用于进行数据包处理,具体处理代码如下。
protected void processPacket(QuorumPacket qp) throws Exception {
switch (qp.getType()) {
case Leader.PING:
// 发送ping数据包
ping(qp);
break;
case Leader.PROPOSAL:
// 创建事务头和事务对象
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(hdr.getZxid())
+ " expected 0x"
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
if (hdr.getType() == OpCode.reconfig) {
SetDataTxn setDataTxn = (SetDataTxn) txn;
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
self.setLastSeenQuorumVerifier(qv, true);
}
// 处理请求
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
// 提交zxid
fzk.commit(qp.getZxid());
break;
case Leader.COMMITANDACTIVATE:
// 获取请求
Request request = fzk.pendingTxns.element();
SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
long suggestedLeaderId = buffer.getLong();
// 更新配置节点
boolean majorChange =
self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
// 提交zxid
fzk.commit(qp.getZxid());
// 如果更新成功抛出异常
if (majorChange) {
throw new Exception("changes proposed in reconfig");
}
break;
case Leader.UPTODATE:
// 不做操作
LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
// 重新验证
revalidate(qp);
break;
case Leader.SYNC:
// 执行同步
fzk.sync();
break;
default:
LOG.warn("Unknown packet type: {}", LearnerHandler.packetToString(qp));
break;
}
}
在上述代码中核心处理流程如下。
- 如果数据类型是PING,通过ping方法发送数据包。
- 如果数据类型是PROPOSAL,从数据包中提取数据转换为事务头和事务对象交给FollowerZooKeeperServer进行请求处理。
- 如果数据类型是COMMIT或者COMMITANDACTIVATE,则将数据包中的zxid提取出来交给FollowerZooKeeperServer进行处理。
- 如果数据类型是UPTODATE,则不做操作。
- 如果数据类型是REVALIDATE,则通过revalidate这个方法进行重新验证。
- 如果数据类型是SYNC,则通过sync方法进行同步。
Observer 分析
本节将对Observer类进行相关分析,首先从它的构造函数作为切入点,构造函数的使用在QuorumPeer#makeObserver方法中有提及,具体处理代码如下。
protected Observer makeObserver(FileTxnSnapLog logFactory) throws IOException {
return new Observer(this, new ObserverZooKeeperServer(logFactory, this, this.zkDb));
}
通过上述方法进入到Observer构造函数,具体构造函数代码如下。
Observer(QuorumPeer self, ObserverZooKeeperServer observerZooKeeperServer) {
this.self = self;
this.zk = observerZooKeeperServer;
}
在上述代码中初始化成员变量
- 成员变量self,其核心是参选节点QuorumPeer。
- 成员变量zk,它表示zk服务,在这里特指ObserverZooKeeperServer。
接下来对核心方法observeLeader进行分析,具体处理代码如下。
void observeLeader() throws Exception {
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
try {
// 寻找leader服务
QuorumServer leaderServer = findLeader();
LOG.info("Observing " + leaderServer.addr);
try {
// 与leader服务建立连接
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 将当前节点注册到leader,得到新选举周期对应的zxid
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
// 如果当前节点发生了状态改变抛出异常
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
}
// 从leader同步
syncWithLeader(newLeaderZxid);
// 创建数据包
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
// 读取数据包
readPacket(qp);
// 处理数据包
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when observing the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX(this);
}
}
在上述代码中可以发现此时的操作与Follower中的followLeader方法操作类似,相关方法在分析followLeader时已经有提及因此做单个方法的分析,方法observeLeader处理流程如下。
- 通过findLeader方法寻找leader服务。
- 通过connectToLeader方法与leader服务建立连接。
- 通过registerWithLeader方法将当前节点注册到leader服务,方法registerWithLeader会返回一个zxid,在这个zxid中会包含最新的选举周期。
- 判断zxid中的选举周期是否小于当前节点认可的选举周期,如果小于则需要抛出异常。
- 通过syncWithLeader方法与leader进行同步。
- 进入死循环阶段,死循环内重复执行如下操作。
- 读取数据包。
- 处理数据包。
总结
本章对Zookeeper项目中状态同步相关内容进行分析,着重对Leader中的lead方法、Follower中的followLeader以及Observer中的observeLeader方法进行详细分析。通过本章的分析可以发现Follower和Observer的处理操作相似,因此在接下来的总结内容中只以Follower和Leader的方式进行同步流程说明。
Leader执行lead方法创建LearnerCnxAcceptor,在LearnerCnxAcceptor中会等待socket连接中的数据(这个连接用与接收Follower发起状态同步申请),一旦等到数据后将创建LearnerHandler类进行处理,此时的处理会与Follower有相关联动操作具体如图所示。
sequenceDiagram
participant l as leader
participant f as follower
f ->> f: 寻找leader寻找成功后建立连接
f->>l:通过registerWithLeader发送注册信息
note right of f: 此时发送的数据包类是FOLLOWERINFO
l->>l:接收到Follower数据信息后计算新的选举周期
l->> f:将新的选举周期信息发给Follower
note right of f: 此时发送额数据包类型是LEADERINFO
f->>l:接收数据处理完成后发送ack数据包
note right of f: 此时发送额数据包类型是ACKEPOCH
在上图中leader接收到ack数据包后会确认是否需要进行全量数据同步,即调用syncFollower方法与Follower进行数据同步相关操作,Follower与Observer相关的同步逻辑在org.apache.zookeeper.server.quorum.Learner#syncWithLeader方法中。