zookeeper source code analysis

Zookeeper 选举实现

本章将对Zookeeper项目中关于选举算法的实现进行相关介绍。

选举入口

本节将对Zookeeper项目中关于选举算法的入口进行相关分析,在Zookeeper项目中选举行为的定义接口是Election,具体代码如下。

public interface Election {
    Vote lookForLeader() throws InterruptedException;

    void shutdown();
}

在Election接口中定义了两个方法分别是。

  1. 方法lookForLeader用于寻找Leader,方法返回值是Vote,可以将返回值Vote理解为选票。
  2. 方法shutdown用于关闭选举。

Election接口在项目中存在三个实现类:FastLeaderElection、LeaderElection和AuthFastLeaderElection,目前在Zookeeper项目中已经将后两者做了废弃标记,主要使用的选举接口实现类是FastLeaderElection。

了解Election接口和实现类的信息后下面对他们的构造进行说明,构造Election接口的方法是QuorumPeer#createElectionAlgorithm具体处理代码如下。

@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        switch(electionAlgorithm){
            case 0:
                le=new LeaderElection(this);
                break;
            case 1:
                le=new AuthFastLeaderElection(this);
                break;
            case 2:
                le=new AuthFastLeaderElection(this,true);
                break;
            case 3:
                QuorumCnxManager qcm=createCnxnManager();
                QuorumCnxManager oldQcm=qcmRef.getAndSet(qcm);
                if(oldQcm!=null){
                    LOG.warn(
                    "Clobbering already-set QuorumCnxManager (restarting leader election?)");
                    oldQcm.halt();
                }
                
                QuorumCnxManager.Listener listener=qcm.listener;
                if(listener!=null){
                    listener.start();
                    FastLeaderElection fle=new FastLeaderElection(this,qcm);
                    fle.start();
                    le=fle;
                }else{
                    LOG.error("Null listener when initializing cnx manager");
              }
              break;
             default:
                assert false;
        }
        return le;
}

在上述代码中会根据参数electionAlgorithm做出不同的选举算法实现类的创建,具体映射如下

  1. 参数electionAlgorithm为0时选择LeaderElection
  2. 参数electionAlgorithm为1时选择AuthFastLeaderElection
  3. 参数electionAlgorithm为2时选择AuthFastLeaderElection
  4. 参数electionAlgorithm为3时选择FastLeaderElection。

接下来需要找到createElectionAlgorithm函数的调用,具体调用代码如下。

this.electionAlg=createElectionAlgorithm(electionType);

在上述代码中会使用成员变量electionType来完成Election接口实现类的创建,这里需要明确electionType数值的推论,既然它作为成员变量那么它的赋值操作有两种可能:通过构造函数赋值或者通过set方法进行赋值。在Zookeeper项目中可以找到它的赋值操作是通过set方法进行的,具体代码位于QuorumPeerMain类,处理代码如下。

quorumPeer.setElectionType(config.getElectionAlg())

在上述代码中会从配置类(QuorumPeerConfig)中获取成员变量electionAlg来作为选举算法类型的参数,详细数据如下。

protected int electionAlg=3;

综上所示在Zookeeper项目中选举算法的默认选择实现类是FastLeaderElection。接下来需要找到具体的选举发生时间,寻找这个发生时间的本质是寻找在哪里执行了Election#lookForLeader方法,执行方法位于QuorumPeer类的run方法中,部分处理代码如下。

// 省略部分代码
switch(getPeerState()){
        case LOOKING:
        LOG.info("LOOKING");
        setCurrentVote(makeLEStrategy().lookForLeader());
        break;

通过上述代码可以发现在状态是LOOKING时会进行选举操作。

此类管理仲裁协议。此服务器可以处于三种状态: 领导者选举 - 每个服务器将选举一个领导者(最初提议自己作为领导者)。 Follower - 服务器将与领导者同步并复制任何事务。 领导者 - 服务器将处理请求并将它们转发给追随者。大多数追随者必须先记录请求才能被接受。 此类将设置一个数据报套接字,该套接字将始终以其当前领导者的视图进行响应。响应将采用以下形式:

QuorumPeer 分析

本节将对QuorumPeer类进行分析,QuorumPeer类是线程的子类,它主要负责管理选举协议,具体实现在run方法中后续将会对这部分内容进行详细说明。接下来对QuorumPeer类中的部分成员变量、方法以及内部类进行介绍。首先是内部类ServerState,它是一个枚举,它表示服务状态,存在四种状态。

  1. LOOKING,寻找Leader阶段,需要发起选举。
  2. FOLLOWING,跟随状态,表示自身为跟随者并且集群内已经有Leader。
  3. LEADING,领导状态,表示自身为领导者。
  4. OBSERVING,观察状态,集群内已有Leader。

需要注意的是QuorumPeer类中存在成员变量state,它表示服务状态,默认情况下为ServerState.LOOKING,即只要QuorumPeer启动默认会发起选举。接下来还需要了解成员变量myId,通过这个变量的命名可以联想到它可能是和Zookeeper配置文件(zoo.cfg)中的myid属性对应。在Zookeeper项目中成员变量myId的赋值是通过以下代码进行。

quorumPeer.setMyid(config.getServerId())

在上述代码中会将QuorumPeerConfig类中的成员变量serverId赋值到成员变量myId中,成员变量serverId的数据来源代码如下。

private void setupMyId()throws IOException{
  File myIdFile=new File(dataDir,"myid");
  if(!myIdFile.isFile()){
      return;
  }
  BufferedReader br=new BufferedReader(new FileReader(myIdFile));
  String myIdString;
  try{
      myIdString=br.readLine();
  }finally{
      br.close();
  }
  try{
      serverId=Long.parseLong(myIdString);
      MDC.put("myid",myIdString);
  }catch(NumberFormatException e){
      throw new IllegalArgumentException("serverid "+myIdString
      +" is not a number");
  }
}

通过上述代码可以下结论:成员变量serverId来自配置文件的myid属性,成员变量myid就是配置文件的myid属性。

至此对重要的两个成员变量state和myid就分析结束了,接下来将进入到方法分析阶段,在方法分析中主要以start方法和run方法作为入口进行分析,在分析时会对其他成员变量进行补充说明。

start 方法分析

本节将对QuorumPeer类中的start方法进行分析,具体处理代码如下。

@Override
public synchronized void start(){
 // 判断视图中是否包含myid,如果没有则抛出异常
 if(!getView().containsKey(myid)){
     throw new RuntimeException("My id "+myid+" not in the peer list");
 }
 // 加载数据
 loadDataBase();
 // 启动ServerCnxnFactory
 startServerCnxnFactory();
 try{
     // 启动AdminServer
     adminServer.start();
  }catch(AdminServerException e){
     LOG.warn("Problem starting AdminServer",e);
     System.out.println(e);
 }
 // 启动选举
 startLeaderElection();
 // 线程启动
 super.start();
}

在上述代码中处理流程如下。

  1. 判断视图中是否包含myid,如果没有则抛出异常。
  2. 通过loadDataBase方法加载数据。
  3. 启动ServerCnxnFactory,启动内容有两个成员变量cnxnFactory和secureCnxnFactory。
  4. 启动AdminServer
  5. 通过startLeaderElection方法启动选举相关内容。
  6. 启动真正的线程。

在上述处理流程 中需要对视图进行解释,视图在这里是指参与选举的所有成员,视图数据结构如下。

  1. key表示服务id(myid)。
  2. value表示参与选举的服务对象。

接下来对loadDataBase方法进行分析,具体处理代码如下。

private void loadDataBase() {
    try {
        // zk数据库加载数据
        zkDb.loadDataBase();

        // load the epochs
        // 加载选举周期相关数据

        // 从zk数据库中获取最后操作的zxid
        long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
        // 从最后的zxid中获取周期
        long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
        try {
            // 从currentEpoch文件中获取当前周期
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // 将最后操作zxid中获取的周期赋值到成员变量currentEpoch,并写入到currentEpoch文件
            currentEpoch = epochOfZxid;
            LOG.info(CURRENT_EPOCH_FILENAME
                     + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                     currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
        }
        // 如果zxid中的周期大于currentEpoch文件中的周期抛出异常
        if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch)
                                  + ", is older than the last zxid, " + lastProcessedZxid);
        }
        try {
            // 从acceptedEpoch文件中读取接受的选举周期
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
        } catch (FileNotFoundException e) {
            // 将最后操作zxid中的周期赋值到成员变量acceptedEpoch,并写入到acceptedEpoch文件
            acceptedEpoch = epochOfZxid;
            LOG.info(ACCEPTED_EPOCH_FILENAME
                     + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                     acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
        }
        // 如果acceptedEpoch文件中的选举周期小于当前选举周期抛出异常
        if (acceptedEpoch < currentEpoch) {
            throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch)
                                  + " is less than the current epoch, " + ZxidUtils.zxidToString(
                                      currentEpoch));
        }
    } catch (IOException ie) {
        LOG.error("Unable to load database on disk", ie);
        throw new RuntimeException("Unable to run quorum server ", ie);
    }
}

在上述代码中主要完成zk数据库的加载以及成员变量currentEpoch和成员变量acceptedEpoch的初始化。成员变量currentEpoch表示当前节点正在经历的选举周期,即当前选举周期。成员变量acceptedEpoch表示当前节点接受的选举周期,即上一个Leader选举通过时的选举周期。注意:成员变量currentEpoch和成员变量acceptedEpoch都是需要进行持久化,它们对应的持久化文件名分别是currentEpoch和acceptedEpoch。了解这些内容后下面对处理流程进行说明。

  1. zk数据库加载数据,这里主要操作的是FileTxnSnapLog类,将数据加载到数据树(DataTree)中。
  2. 从zk数据库中读取最后处理完成的zxid,然后将其转换为周期,转换过程:zxid>>32;

从currentEpoch文件中获取当前周期(赋值到成员变量currentEpoch),如果失败则需要将第(2)步中转换的周期赋值给当前周期(成员变量currentEpoch)并写入到currentEpoch文件中。如果成功则需要判断第(2)步中的周期是否大于成员变量currentEpoch,如果是则需要抛出异常。 4. 从acceptedEpoch文件中获取接受的周期(赋值到成员变量acceptedEpoch),如果失败则需要将第(2)步中转换的周期赋值给当前周期(成员变量acceptedEpoch)并写入到acceptedEpoch文件中。如果成功则需要判断接受的周期是否小于于成员变量currentEpoch,如果是则需要抛出异常。

接下来对startLeaderElection方法进行说明,具体处理代码如下。

synchronized public void startLeaderElection() {
    try {
        // 如果服务状态是LOOKING初始化选票
        if (getPeerState() == ServerState.LOOKING) {
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }

    // 选举算法类型为0
    if (electionType == 0) {
        try {
            udpSocket = new DatagramSocket(getQuorumAddress().getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    // 构造选举接口实现类
    this.electionAlg = createElectionAlgorithm(electionType);
}

在上述代码中核心处理流程如下。

  1. 如果服务状态是LOOKING初始化选票,选票初始化参数:myid、最后处理完成的zxid和当前周期。
  2. 如果选举算法类型为0则初始化成员变量udpSocket和responder。
  3. 通过createElectionAlgorithm方法构造选举接口实现类。

在startLeaderElection方法中重点是初始化选票以及选举接口实现类的初始化,选举接口实现类的初始化在前文已经做过相关介绍,下面对选票进行分析,负责承担选票概念的类是Vote,对于选票的理解着重在成员变量上详细内容见表。

变量名称变量类型变量说明
versionid版本
idlong服务器id。
zxidlong最后处理的zxid(事务编号)
electionEpochlong逻辑时钟,历经的周期编号。
peerEpochlong参选时的周期编号。
stateServerState服务器状态

run方法分析

本节将对QuorumPeer类中的run方法进行分析,在run方法中可以分为如下三个核心操作流程。

  1. 更新线程名称。
  2. MBean相关信息注册。
  3. 核心死循环,根据不同服务状态做不同操作。

由于第(1)部分操作是关于字符串相关操作实现内容相对简单因此不做相关说明,第(2)部分操作处理代码如下。

try {
    // 初始化成员变量jmxQuorumBean,并注册到MBean中,主要用于存储自生为jmx提供相关数据支持
    jmxQuorumBean = new QuorumBean(this);
    MBeanRegistry.getInstance().register(jmxQuorumBean, null);
    // 循环所有参与选举的服务,根据服务id是否与当前id相同创建LocalPeerBean或者RemotePeerBean并将其注册到MBean中。
    for (QuorumServer s : getView().values()) {
        ZKMBeanInfo p;
        // 如果当前id和参选服务的id相同
        if (getId() == s.id) {
            p = jmxLocalPeerBean = new LocalPeerBean(this);
            try {
                MBeanRegistry.getInstance().register(p, jmxQuorumBean);
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
                jmxLocalPeerBean = null;
            }
        }
        // 不相同
        else {
            RemotePeerBean rBean = new RemotePeerBean(this, s);
            try {
                MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                jmxRemotePeerBean.put(s.id, rBean);
            } catch (Exception e) {
                LOG.warn("Failed to register with JMX", e);
            }
        }
    }
} catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    jmxQuorumBean = null;
}

在上述代码中处理流程如下。

  1. 将自身作为参数创建QuorumBean,将其注册到MBean中。
  2. 循环所有参与选举的服务,根据服务id是否与当前id相同创建,相同则创建LocalPeerBean不相同则RemotePeerBean,创建完成后需要将其注册到MBean中。

接下来对最重要的第(3)步操作进行说明,第(3)步操作会根据不同的服务状态进行处理,因此接下来的说明也将按照不同服务状态进行。首先当服务状态是LOOKING会进行如下代码操作。

case LOOKING:
// 选举触发
LOG.info("LOOKING");

// 获取readonlymode.enabled属性,
if (Boolean.getBoolean("readonlymode.enabled")) {
    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

    // Create read-only server but don't start it immediately
    // 创建只读zk服务
    final ReadOnlyZooKeeperServer roZk =
        new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

    // Instead of starting roZk immediately, wait some grace
    // period before we decide we're partitioned.
    //
    // Thread is used here because otherwise it would require
    // changes in each of election strategy classes which is
    // unnecessary code coupling.
    // 创建等待线程
    Thread roZkMgr = new Thread() {
        public void run() {
            try {
                // lower-bound grace period to 2 secs
                sleep(Math.max(2000, tickTime));
                if (ServerState.LOOKING.equals(getPeerState())) {
                    roZk.startup();
                }
            } catch (InterruptedException e) {
                LOG.info(
                    "Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
            } catch (Exception e) {
                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
            }
        }
    };
    try {
        // 启动等待线程
        roZkMgr.start();
        // 成员变量reconfigFlag设置为false
        reconfigFlagClear();
        // 如果shuttingDownLE为真重新初始化选举实现类
        if (shuttingDownLE) {
            shuttingDownLE = false;
            startLeaderElection();
        }
        // 设置选票
        setCurrentVote(makeLEStrategy().lookForLeader());
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
        setPeerState(ServerState.LOOKING);
    } finally {
        // If the thread is in the the grace period, interrupt
        // to come out of waiting.
        roZkMgr.interrupt();
        roZk.shutdown();
    }
}
else {
    try {

        reconfigFlagClear();
        if (shuttingDownLE) {
            shuttingDownLE = false;
            // 启动选举
            startLeaderElection();
        }
        // 设置选票
        setCurrentVote(makeLEStrategy().lookForLeader());
    } catch (Exception e) {
        LOG.warn("Unexpected exception", e);
        setPeerState(ServerState.LOOKING);
    }
}
break;

在上述代码中会根据readonlymode.enabled属性做出两种操作,这两种操作的共同处理流程都是确认选票,关于确认选票的处理流程如下。

  1. 判断成员变量shuttingDownLE是否为真,如果为真需要将其设置为假,并且执行startLeaderElection方法。执行startLeaderElection方法的目标是完成第(2)步中所需的选举接口实现类。
  2. 通过makeLEStrategy方法和lookForLeader方法完成选票。其中makeLEStrategy方法适用于获取选举接口实现类,而lookForLeader方法是用于确认最终的选票。

接下来对服务状态是OBSERVING、FOLLOWING和LEADING的代码进行说明,具体处理代码如下。

case OBSERVING:
try {
    LOG.info("OBSERVING");
    // 设置观察者
    setObserver(makeObserver(logFactory));
    // 观察者开始对领导者进行观察操作
    observer.observeLeader();
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    observer.shutdown();
    setObserver(null);
    updateServerState();
}
break;
case FOLLOWING:
try {
    LOG.info("FOLLOWING");
    // 设置跟随者
    setFollower(makeFollower(logFactory));
    // 跟随者执行跟随操作与领导者通讯
    follower.followLeader();
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    follower.shutdown();
    setFollower(null);
    // 更新服务状态
    updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
    // 设置领导者
    setLeader(makeLeader(logFactory));
    // 领导者开始进行领导操作
    leader.lead();
    setLeader(null);
} catch (Exception e) {
    LOG.warn("Unexpected exception", e);
} finally {
    if (leader != null) {
        leader.shutdown("Forcing shutdown");
        setLeader(null);
    }
    // 更新服务状态
    updateServerState();
}
break;

在上述代码中主要涉及如下操作中。

根据不同类型设置成员变量leader、follower和observer,然后调用相关方法。如果是leader则调用lead方法,如果是follower则调用followLeader,如果是observer则调用observeLeader方法。 2. 在调用lead、followLeader或者observeLeader方法时如果产生异常则需要进行异常记录。 3. 执行成员变量leader、follower或者observer的关闭方法,将成员变量设置为null,并通过updateServerState方法更新服务状态。

接下来对updateServerState方法进行分析,具体处理代码如下。

private synchronized void updateServerState() {
    if (!reconfigFlag) {
        setPeerState(ServerState.LOOKING);
        LOG.warn("PeerState set to LOOKING");
        return;
    }

    if (getId() == getCurrentVote().getId()) {
        setPeerState(ServerState.LEADING);
        LOG.debug("PeerState set to LEADING");
    } else if (getLearnerType() == LearnerType.PARTICIPANT) {
        setPeerState(ServerState.FOLLOWING);
        LOG.debug("PeerState set to FOLLOWING");
    } else if (getLearnerType() == LearnerType.OBSERVER) {
        setPeerState(ServerState.OBSERVING);
        LOG.debug("PeerState set to OBSERVER");
    } else { // currently shouldn't happen since there are only 2 learner types
        setPeerState(ServerState.LOOKING);
        LOG.debug("Shouldn't be here");
    }
    reconfigFlag = false;
}

在上述代码中对服务状态进行了相关设置条件的控制,具体如下。

  1. 如果成员变量reconfigFlag为假,则设置服务状态为LOOKING。
  2. 当前服务id和选票中的id相同,则设置服务状态为LEADING。
  3. 当前服务中的learnerType属性为PARTICIPANT,则设置服务状态为FOLLOWING。
  4. 当前服务中的learnerType属性为OBSERVER,则设置服务状态为OBSERVING。
  5. 不满足2~4的情况设置服务状态为LOOKING。

FastLeaderElection 分析

本节将对FastLeaderElection类进行分析,对于FastLeaderElection类的分析着重是lookForLeader方法,在lookForLeader方法中需要对两个投票箱进行说明。

  1. 投票箱recvset,该投票箱将收集服务状态是LOOKING、FOLLOWING和LEADING的选票。
  2. 投票箱outofelection,该投票箱将收集服务状态是FOLLOWING和LEADING的选票。这个选票箱一般用于已经存在Leader节点的时候使用。

除了投票箱以外还需要对一些常用方法进行说明,首先是updateProposal方法,该方法用于更新选票,具体处理代码如下。

synchronized void updateProposal(long leader,long zxid,long epoch){
  proposedLeader=leader;
  proposedZxid=zxid;
  proposedEpoch=epoch;
}

在这段代码中设计三个参数和三成员变量他们的含义如下。

  1. leader:领导者编号
  2. zxid: 事务id
  3. epoch: 任选周期
  4. proposedLeader: 选票中的领导者编号
  5. proposedZxid: 选票中的事务id
  6. proposedEpoch: 选票中的任选周期

接下来对sendNotifications方法进行分析,该方法用于广播选票(通知),具体处理代码如下。

private void sendNotifications() {
  // 将选票信息放入sendqueue,WorkerSender会从中读取数据发送
  for (long sid : self.getCurrentAndNextConfigVoters()) {
    QuorumVerifier qv = self.getQuorumVerifier();
    // 构建发送消息
    ToSend notmsg = new ToSend(ToSend.mType.notification,
                               proposedLeader,
                               proposedZxid,
                               logicalclock.get(),
                               QuorumPeer.ServerState.LOOKING,
                               sid,
                               proposedEpoch, qv.toString().getBytes());
    if (LOG.isDebugEnabled()) {
      LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
                Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(
                  logicalclock.get()) +
                " (n.round), " + sid + " (recipient), " + self.getId() +
                " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
    }
    // 加入到发送队列
    sendqueue.offer(notmsg);
  }
}

在上述代码中会获取参与选举的服务id,然后循环该服务id集合构建消息对象然后放入到sendqueue集合中。再放入sendqueue集合后会有WorkerSender类将数据进行发送操作。

接下来对totalOrderPredicate方法进行分析,该方法用于确认是否需要改票,具体处理代码如下。

protected boolean totalOrderPredicate(long newId,
                                      long newZxid,
                                      long newEpoch,
                                      long curId,
                                      long curZxid,
                                      long curEpoch) {
  if (self.getQuorumVerifier().getWeight(newId) == 0) {
    return false;
  }
  // Epoch > Zxid > myid
  return ((newEpoch > curEpoch) ||
          ((newEpoch == curEpoch) &&
           ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

在上述代码中首先会从参与选举的服务集合中根据新的领导者id获取对应的权重,如果权重为0则返回false,表示不需要改票,不过在实际研发或者使用环境中权重设置为0的情况只有极少数。在totalOrderPredicate方法中更重要的判断逻辑是Epoch、Zxid和myid的对比,如果满足以下情况的某一种则返回true,表示需要改票。

  1. 新的周期大于当前周期。
  2. 新周期与当前周期相同为前提,新的zxid大于当前zxid。
  3. 新周期与当前周期相同为前提,新的 zxid等于当前zxid,新的服务id大于当前服务id。

接下来对termPredicate方法进行分析,该方法用于判断投票是否已经结束,具体处理代码如下。

protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
  // 创建SyncedLearnerTracker类
  SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
  // 将QuorumVerifier接口实现类放入到SyncedLearnerTracker类中
  voteSet.addQuorumVerifier(self.getQuorumVerifier());
  // 确认是否需要添加lastSeenQuorumVerifier
  if (self.getLastSeenQuorumVerifier() != null
      && self.getLastSeenQuorumVerifier().getVersion() > self
      .getQuorumVerifier().getVersion()) {
    voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
  }
  // 循环选票集合,判断是否和当前参数选票相同,如果相同则添加
  for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
    if (vote.equals(entry.getValue())) {
      voteSet.addAck(entry.getKey());
    }
  }

  // 确认是否大多数认可
  return voteSet.hasAllQuorums();
}

在上述代码中,核心处理流程如下。

  1. 创建SyncedLearnerTracker类,并向其中添加成员变量quorumVerifier,如果成员变量lastSeenQuorumVerifier不为空并且lastSeenQuorumVerifier的version大于quorumVerifier的version则也需要将lastSeenQuorumVerifier加入到SyncedLearnerTracker类中。
  2. 循环选票集合,判断当前选票是否和参数选票相同,如果是则调用addAck方法将服务id加入到SyncedLearnerTracker类中。
  3. 通过hasAllQuorums方法判断是否是大多数认可,将这个数据返回。

最后对checkLeader方法进行分析,该方法用于检查参数leader是否是领导者,具体处理代码如下。

protected boolean checkLeader(
  Map<Long, Vote> votes,
  long leader,
  long electionEpoch) {

  boolean predicate = true;

  // 参数leader(选票中的领导者)不等于本地id
  if (leader != self.getId()) {
    // 如果选票集合中不存在leader,将标记设置为false
    if (votes.get(leader) == null) {
      predicate = false;
    }
    // 如果选票集合中leader对应的服务状态不是LEADING,将标记设置为false
    else if (votes.get(leader).getState() != ServerState.LEADING) {
      predicate = false;
    }
  }
  // 如果本地选举周期不等于参数选举周期,将标记设置为false
  else if (logicalclock.get() != electionEpoch) {
    predicate = false;
  }

  return predicate;
}

在上述代码中对于参数leader不是领导者的判断有如下可能。

  1. 前提:参数leader不等于本地id。
    1. 选票集合中根据参数leader查询对应的选票,选票不存在。
    2. 选票集合中根据参数leader查询对应选票中的服务状态,服务状态不是LEADING。
  2. 本地选举周期和参数选举周期不相同。

有了前面这些方法的分析接下来对lookForLeader方法的分析就相对轻松,具体处理代码如下。

public Vote lookForLeader() throws InterruptedException {
  try {
    self.jmxLeaderElectionBean = new LeaderElectionBean();
    MBeanRegistry.getInstance().register(
      self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
  } catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    self.jmxLeaderElectionBean = null;
  }
  if (self.start_fle == 0) {
    self.start_fle = Time.currentElapsedTime();
  }
  try {
    // 投票箱
    HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

    // 逻辑外的选票箱,常用语集群环境已经存在的情况下使用
    HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

    // 选举最大等待时间,默认 200 ms
    int notTimeout = finalizeWait;

    synchronized (this) {
      // 选举周期+1
      logicalclock.incrementAndGet();
      // 更新选票(提案)信息
      updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }

    LOG.info("New election. My id =  " + self.getId() +
             ", proposed zxid=0x" + Long.toHexString(proposedZxid));
    // 广播选票
    sendNotifications();

  
    // 循环操作确认Leader
    while ((self.getPeerState() == ServerState.LOOKING) &&
           (!stop)) {
      // 从待处理消息队列中获取一个通知对象进行处理
      Notification n = recvqueue.poll(notTimeout,
                                      TimeUnit.MILLISECONDS);

      if (n == null) {
        // 通讯管理器检查是还有未发送的消息,如果有则进行发送
        if (manager.haveDelivered()) {
          // 广播选票
          sendNotifications();
        }
        else {
          // 连接所有参选服务
          manager.connectAll();
        }

        // 计算新的超时时间
        int tmpTimeOut = notTimeout * 2;
        notTimeout = (tmpTimeOut < maxNotificationInterval ?
                      tmpTimeOut : maxNotificationInterval);
        LOG.info("Notification time out: " + notTimeout);
      }
      // 1. 如果通知对象的sid在选票服务集合
      // 2. 如果通知对象选择的领导者在选票服务集合
      else if (validVoter(n.sid) && validVoter(n.leader)) {
        // 对通知对象中的不同服务状态进行处理
        switch (n.state) {
          case LOOKING:
            // If notification > current, replace and send messages out
            // 如果通知中的选举周期大于本地选举周期
            if (n.electionEpoch > logicalclock.get()) {
              // 本地选举周期设置为通知中的选举周期
              logicalclock.set(n.electionEpoch);
              // 清空本地投票箱
              recvset.clear();
              // 确认是否需要更新选票,
              if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                      getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                // 更新选票
                updateProposal(n.leader, n.zxid, n.peerEpoch);
              }
              // 通知中领导者的权重为0,
              else {
                // 更新选票
                updateProposal(getInitId(),
                               getInitLastLoggedZxid(),
                               getPeerEpoch());
              }
              // 广播选票
              sendNotifications();
            }
            // 如果通知中的选举周期小于本地选举周期跳出处理
            else if (n.electionEpoch < logicalclock.get()) {
              if (LOG.isDebugEnabled()) {
                LOG.debug(
                  "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                  + Long.toHexString(n.electionEpoch)
                  + ", logicalclock=0x" + Long.toHexString(
                    logicalclock.get()));
              }
              break;
            }
            // 周期相等的情况下确认是否需要更新选票
            else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                         proposedLeader, proposedZxid, proposedEpoch)) {
              // 更新选票
              updateProposal(n.leader, n.zxid, n.peerEpoch);
              // 广播选票
              sendNotifications();
            }

            if (LOG.isDebugEnabled()) {
              LOG.debug("Adding vote: from=" + n.sid +
                        ", proposed leader=" + n.leader +
                        ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                        ", proposed election epoch=0x" + Long.toHexString(
                          n.electionEpoch));
            }
            // 本地投票箱收集选票
            recvset.put(n.sid,
                        new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

            // 归票
            // 判断投票是否结束
            if (termPredicate(recvset,
                              new Vote(proposedLeader, proposedZxid,
                                       logicalclock.get(), proposedEpoch))) {

              // Verify if there is any change in the proposed leader
              // 确认是否存在新的通知,如果存在则进一步判断是否需要更新选票仓库
              while ((n = recvqueue.poll(finalizeWait,
                                         TimeUnit.MILLISECONDS)) != null) {
                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)) {

                  recvqueue.put(n);
                  // 重新选举
                  break;
                }
              }
              // 没有后续的通知
              if (n == null) {
                // 如果选票中的领导者id和本地id相同,设置为当前节点为领导者
                self.setPeerState((proposedLeader == self.getId()) ?
                                  ServerState.LEADING : learningState());
                // 创建返回值选票,Leader选票
                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(),
                                        proposedEpoch);
                // 记录Leader选票并清空recvqueue集合
                leaveInstance(endVote);
                return endVote;
              }
            }
            break;
          case OBSERVING:
            LOG.debug("Notification from observer: " + n.sid);
            break;
          case FOLLOWING:
          case LEADING:
            // 如果通知中的选举周期等于本地选举周期
            if (n.electionEpoch == logicalclock.get()) {
              //
              recvset.put(n.sid,
                          new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
              // 判断投票是否结束
              if (termPredicate(recvset, new Vote(n.version, n.leader,
                                                  n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                  && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                // 设置服务状态
                self.setPeerState((n.leader == self.getId()) ?
                                  ServerState.LEADING : learningState());
                // 创建Leader选票
                Vote endVote = new Vote(n.leader,
                                        n.zxid, n.electionEpoch, n.peerEpoch);
                // 记录Leader选票并清空recvqueue集合
                leaveInstance(endVote);
                return endVote;
              }
            }
            // 如果是FOLLOWING再加入集群前需要确认已经认可的Leader
            // 如果是LEADING需要让所有节点都知道我是Leader
            outofelection.put(n.sid, new Vote(n.version, n.leader,
                                              n.zxid, n.electionEpoch, n.peerEpoch, n.state));
            //  判断投票是否结束,并且检查是否存在Leader
            if (termPredicate(outofelection, new Vote(n.version, n.leader,
                                                      n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                && checkLeader(outofelection, n.leader, n.electionEpoch)) {
              synchronized (this) {
                // 设置新的选举周期
                logicalclock.set(n.electionEpoch);
                // 设置服务状态
                self.setPeerState((n.leader == self.getId()) ?
                                  ServerState.LEADING : learningState());
              }
              // 创建Leader选票
              Vote endVote = new Vote(n.leader, n.zxid,
                                      n.electionEpoch, n.peerEpoch);
              // 记录Leader选票并清空recvqueue集合
              leaveInstance(endVote);
              return endVote;
            }
            break;
          default:
            LOG.warn("Notification state unrecoginized: " + n.state
                     + " (n.state), " + n.sid + " (n.sid)");
            break;
        }
      }
      // 其他情况
      else {
        // 如果通知对象选择的领导者不在选票服务集合
        if (!validVoter(n.leader)) {
          LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}",
                   n.leader, n.sid);
        }
        // 如果通知对象的sid不在选票服务集合
        if (!validVoter(n.sid)) {
          LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}",
                   n.leader, n.sid);
        }
      }
    }
    return null;
  } finally {
    try {
      // 取消MBean注册
      if (self.jmxLeaderElectionBean != null) {
        MBeanRegistry.getInstance().unregister(
          self.jmxLeaderElectionBean);
      }
    } catch (Exception e) {
      LOG.warn("Failed to unregister with JMX", e);
    }
    self.jmxLeaderElectionBean = null;
    LOG.debug("Number of connection processing threads: {}",
              manager.getConnectionThreadCount());
  }
}

在上述代码中核心

  1. 核心流程如下。
  2. 创建recvset投票箱和outofelection投票箱。
  3. 选举周期加1(成员变量logicalclock累加1)。
  4. 更新选票,选票信息如下。
    1. leader:如果当前服务id(myid)在选票集合的服务id中就选择当前服务id,反之则选择Long的最小值。
    2. zxid:如果服务属性中的learnerType属性是PARTICIPANT,则返回最后完成处理的zxid,反之则返回Long的最小值。
    3. epoch:如果服务属性中的learnerType属性是PARTICIPANT,则返回当前选举周期(变量currentEpoch),反之则返回Long的最小值。
  5. 通过sendNotifications方法将选票广播。
  6. 通过死循环获取最终选票,死循环跳出条件,服务类型非LOOKING。
  7. 取消MBean注册。

在第(5)步中首先从recvqueue集合中获取一个需要处理的通知,在这里定义了通知类Notification,需要注意的是Notification中包含选票信息,在获取到需要处理的通知后会根据通知对象做出不同的三种操作。

第一种操作执行条件是通知对象为空,具体处理细节如下。

  1. 判断通讯管理器(QuorumCnxManager)是否还有未发送的消息,如果有则需要广播选票,反之则通过通选管理器连接参选服务。
  2. 重新计算超时时间,计算规则是默认超时时间(默认值200毫秒)的2倍。

第二种操作执行条件通知对象的sid在选票服务集合并且通知对象选择的领导者在选票服务集合中会根据不同的服务状态进行处理。

  1. 如果服务状态是LOOKING将进行如下操作。
    1. 如果通知中的选举周期大于本地选举周期,需要将本地选举周期设置为通知中的选举周期,然后将投票箱清空,最后需要通过totalOrderPredicate方法判断是否需要更新选票,如果需要则将通知中的数据更新到选票上反之则采用默认选票数据,更新完成后将新的选票广播。
    2. 如果通知中的选举周期小于本地选举周期跳出处理。
    3. 如果通知中的选举周期等于本地选举周期,将通知中的数据更新到选票上然后广播选票。
    4. 将通知中的服务id和通知中的数据构造为投票箱recvset的数据。
    5. 通过termPredicate方法判断当前选票是否收到大多数认可,如果结束,则需要判断recvqueue队列中是否还有数据,如果有则需要从recvqueue队列中获取一个通知,将该通知与当前选票通过totalOrderPredicate方法判断是否需要改票,如果是则需要将刚刚获取的通知放入到recvqueue队列并返回。此时返回会发生重新选举操作。
    6. 如果通知对象为空,判断当前选票中的领导者id是否和本地id(服务id、myid)相同,如果是将设置服务状态为LEADING,反之则通过learningState方法判断是FOLLOWING还是OBSERVING。设置完成服务状态后会构造返回值选票(Leader选票),并清空recvqueue集合将返回值选票返回。
  2. 如果服务状态是OBSERVING将结束循环。
  3. 如果服务状态是FOLLOWING或者LEADING将进行如下操作。
    1. 如果通知对象中的选举周期等于本地选举周期,将构造选票对象放入到投票箱recvset中。放入完成后需要通过termPredicate方法判断投票是否结束,如果结束则需要进一步通过checkLeader方法对Leader进行检查,全部检查通过,将设置服务状态,如果通知中的领导者id和本地id相同则设置为LEADING,反之则通过learningState方法判断是FOLLOWING还是OBSERVING。设置完成服务状态后会构造返回值选票(Leader选票),并清空recvqueue集合将返回值选票返回。
    2. 如果通知对象中的选举周期不等于本地选举周期,将构造选票对象放入到投票箱outofelection中。放入完成后需要通过termPredicate方法判断投票是否结束,如果结束则需要进一步通过checkLeader方法对Leader进行检查,全部检查通过,则需要将本地的选举周期设置为通知中的选举周期(electionEpoch),设置完成后将完成服务状态设置,如果通知中的领导者id和本地id相同则设置为LEADING,反之则通过learningState方法判断是FOLLOWING还是OBSERVING。设置完成服务状态后会构造返回值选票(Leader选票),并清空recvqueue集合将返回值选票返回。

第三种操作执行条件是不满足上述两种情况,具体处理细节如下。

  1. 如果通知对象选择的领导者不在选票服务集合,记录警告日志。
  2. 如果通知对象的sid不在选票服务集合,记录警告日志。

WorkerSender 分析

本节将对WorkerSender类进行分析,WorkerSender类用作于发送消息,由于WorkerSender类实现了ZooKeeperThread类因此它是一个线程对象,核心分析方法就是run,具体处理代码如下。

public void run() {
  while (!stop) {
    try {
      ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
      if (m == null) {
        continue;
      }
      process(m);
    } catch (InterruptedException e) {
      break;
    }
  }
  LOG.info("WorkerSender is down");
}

在上述代码中核心是死循环,在循环内会从sendqueue集合中获取一个数据(待发送的数据,数据类型是ToSend),然后判断该数据是否为空,如果为空则跳过这个处理进入下一个循环,反之则通过process方法对消息进行发送操作。方法process具体处理代码如下。

void process(ToSend m) {
  ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                      m.leader,
                                      m.zxid,
                                      m.electionEpoch,
                                      m.peerEpoch,
                                      m.configData);

  manager.toSend(m.sid, requestBuffer);

}

在上述代码中首先会将消息对象ToSend转换为字节对象然后通过QuorumCnxManager类中的toSend方法将消息发送。

WorkerReceiver 分析

本节将对WorkerReceiver类进行分析,WorkerReceiver类用作于处理消息,由于WorkerSender类实现了ZooKeeperThread类因此它是一个线程对象,核心分析方法就是run,具体处理代码如下。

public void run() {

  // 响应
  Message response;
  while (!stop) {
    try {
      // 通过manager获取一个需要处理的消息
      response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
      // 待处理的消息为空进入下一个循环
      if (response == null) {
        continue;
      }

      // 获取消息字节长度
      final int capacity = response.buffer.capacity();

      // 消息字节长度小于28进入下一个循环
      if (capacity < 28) {
        LOG.error("Got a short response from server {}: {}", response.sid,
                  capacity);
        continue;
      }
      // 消息字节长度是否是28
      boolean backCompatibility28 = (capacity == 28);

      // 消息字节长度是否是40
      boolean backCompatibility40 = (capacity == 40);

      // 将响应中的字节清空
      response.buffer.clear();

      // 创建通知对象
      Notification n = new Notification();

      // 读取state
      int rstate = response.buffer.getInt();
      // 读取leader
      long rleader = response.buffer.getLong();
      // 读取zxid
      long rzxid = response.buffer.getLong();
      // 读取electionEpoch
      long relectionEpoch = response.buffer.getLong();
      //
      long rpeerepoch;

      // 版本号
      int version = 0x0;
      QuorumVerifier rqv = null;

      try {

        // 如果消息字节长度不是28
        if (!backCompatibility28) {
          // 读取peerEpoch
          rpeerepoch = response.buffer.getLong();
          // 如果消息长度不是40
          if (!backCompatibility40) {
            version = response.buffer.getInt();
          } else {
            LOG.info("Backward compatibility mode (36 bits), server id: {}",
                     response.sid);
          }
        } else {
          LOG.info("Backward compatibility mode (28 bits), server id: {}",
                   response.sid);
          // 从zxid转换为peerEpoch
          rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
        }


        // 如果版本大于0x1
        if (version > 0x1) {
          // 读取配置长度
          int configLength = response.buffer.getInt();
          // 配置长度小于0
          // 配置长度大于消息字节长度
          if (configLength < 0 || configLength > capacity) {
            // 抛出异常
            throw new IOException(String.format(
              "Invalid configLength in notification message! sid=%d, capacity=%d, version=%d, configLength=%d",
              response.sid, capacity, version, configLength));
          }

          // 将配置读取
          byte b[] = new byte[configLength];

          response.buffer.get(b);

          synchronized (self) {
            try {
              // 将配置转换为QuorumVerifier类
              rqv = self.configFromString(new String(b));
              // 获取当前节点的QuorumVerifier对象
              QuorumVerifier curQV = self.getQuorumVerifier();
              // 新建的QuorumVerifier版本号大于当前所持有的QuorumVerifier的版本号
              if (rqv.getVersion() > curQV.getVersion()) {
                LOG.info("{} Received version: {} my version: {}",
                         self.getId(),
                         Long.toHexString(rqv.getVersion()),
                         Long.toHexString(
                           self.getQuorumVerifier().getVersion()));
                // 如果当前节点服务状态是LOOKING
                if (self.getPeerState() == ServerState.LOOKING) {
                  LOG.debug("Invoking processReconfig(), state: {}",
                            self.getServerState());
                  // 重新处理配置
                  self.processReconfig(rqv, null, null, false);
                  // 如果新建的QuorumVerifier对象和当前持有的QuorumVerifier类不相同需要关闭选举接口
                  if (!rqv.equals(curQV)) {
                    LOG.info("restarting leader election");
                    self.shuttingDownLE = true;
                    self.getElectionAlg().shutdown();

                    break;
                  }
                } else {
                  LOG.debug("Skip processReconfig(), state: {}",
                            self.getServerState());
                }
              }
            } catch (IOException | ConfigException e) {
              LOG.error(
                "Something went wrong while processing config received from {}. "
                +
                "Continue to process the notification message without handling the configuration.",
                response.sid);
            }
          }
        } else {
          LOG.info(
            "Backward compatibility mode (before reconfig), server id: {}",
            response.sid);
        }
      } catch (BufferUnderflowException | IOException e) {
        LOG.warn(
          "Skipping the processing of a partial / malformed response message sent by sid={} (message length: {})",
          response.sid, capacity, e);
        continue;
      }

   		// 确认sid是否在参选服务器中
      if (!validVoter(response.sid)) {
        // 获取选票
        Vote current = self.getCurrentVote();
        // 获取QuorumVerifier
        QuorumVerifier qv = self.getQuorumVerifier();
        // 创建发送对象
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                                   current.getId(),
                                   current.getZxid(),
                                   logicalclock.get(),
                                   self.getPeerState(),
                                   response.sid,
                                   current.getPeerEpoch(),
                                   qv.toString().getBytes());
        // 加入到待发送消息集合
        sendqueue.offer(notmsg);
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Receive new notification message. My id = "
                    + self.getId());
        }
        // 将待处理消息中的服务状态从数字转换为枚举
        QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
        switch (rstate) {
          case 0:
            ackstate = QuorumPeer.ServerState.LOOKING;
            break;
          case 1:
            ackstate = QuorumPeer.ServerState.FOLLOWING;
            break;
          case 2:
            ackstate = QuorumPeer.ServerState.LEADING;
            break;
          case 3:
            ackstate = QuorumPeer.ServerState.OBSERVING;
            break;
          default:
            continue;
        }

        // 通知对象数据填写
        n.leader = rleader;
        n.zxid = rzxid;
        n.electionEpoch = relectionEpoch;
        n.state = ackstate;
        n.sid = response.sid;
        n.peerEpoch = rpeerepoch;
        n.version = version;
        n.qv = rqv;

        // 输出通知
        if (LOG.isInfoEnabled()) {
          printNotification(n);
        }

        // 如果当前节点服务状态是LOOKING
        if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
          // 通知放入到recvqueue集合
          recvqueue.offer(n);

          // 如果待处理消息中的服务状态是LOOKING,并且通知中的选举周期小于本地选举周期
          if ((ackstate == QuorumPeer.ServerState.LOOKING)
              && (n.electionEpoch < logicalclock.get())) {
            // 将成员变量proposedLeader、proposedZxid和proposedEpoch构造为选票
            Vote v = getVote();
            // 选票转换为ToSend对象加入到sendqueue集合
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                       v.getId(),
                                       v.getZxid(),
                                       logicalclock.get(),
                                       self.getPeerState(),
                                       response.sid,
                                       v.getPeerEpoch(),
                                       qv.toString().getBytes());
            sendqueue.offer(notmsg);
          }
        }
        else {

          // 获取选票
          Vote current = self.getCurrentVote();
          // 如果待处理消息中的服务状态是LOOKING
          if (ackstate == QuorumPeer.ServerState.LOOKING) {
            if (LOG.isDebugEnabled()) {
              LOG.debug(
                "Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                self.getId(),
                response.sid,
                Long.toHexString(current.getZxid()),
                current.getId(),
                Long.toHexString(
                  self.getQuorumVerifier().getVersion()));
            }

            // 选票转换为ToSend对象加入到sendqueue集合
            QuorumVerifier qv = self.getQuorumVerifier();
            ToSend notmsg = new ToSend(
              ToSend.mType.notification,
              current.getId(),
              current.getZxid(),
              current.getElectionEpoch(),
              self.getPeerState(),
              response.sid,
              current.getPeerEpoch(),
              qv.toString().getBytes());
            sendqueue.offer(notmsg);
          }
        }
      }
    } catch (InterruptedException e) {
      LOG.warn("Interrupted Exception while waiting for new message" +
               e.toString());
    }
  }
  LOG.info("WorkerReceiver is down");
}

在上述代码中本质是一个死循环对消息进行处理,消息来源是QuorumCnxManager#recvQueue,单层循环处理流程如下。

  1. 从QuorumCnxManager#recvQueue中获取一个需要处理的消息对象。
  2. 如果消息对象为空,则进行下一轮循环。
  3. 如果消息字节长度小于28,则进行下一轮循环。
  4. 从消息对象中提取rstate、rleader、rzxid、electionEpoch、peerEpoch和version,注意version属性是在3.4.6版本加入,默认值为0x0。
  5. 如果version数据值大于0x1需要进行如下操作。
    1. 从消息对象中获取配置长度,如果配置长度大于0或者配置长度大于消息字节长度则抛出异常。
    2. 从消息对象中将配置读取并转换为QuorumVerifier类。
    3. 对比转换的QuorumVerifier和当前持有的QuorumVerifier版本号,如果前者小则不需要操作,反之则需要进行如下操作。 1. 2. 当前节点的服务状态是LOOKING需要通过processReconfig方法重新处理配置。 3. 对比对比转换的QuorumVerifier和当前持有的QuorumVerifier是否相同,如果不相同则需要将选举接口关闭(注意这里的关闭会在QuorumPeer#run方法中进行重启)
  6. 确认响应对象中的sid是否在参选服务器集合中,如果不在则将当前选票转换为ToSend对象放入到sendqueue集合中,反之则需要进行如下操作。
    1. 将第(4)步中提取的数据设置到通知对象(Notification)中。
    2. 当前节点服务状态是LOOKING,则进行如下操作。
      1. 将通知放入到recvqueue集合中。
      2. 如果待处理消息中的服务状态是LOOKING,并且通知中的选举周期小于本地选举周期,则需要将成员变量proposedLeader、proposedZxid和proposedEpoch构造为选票,并将选票转换为ToSend对象并且放入到sendqueue集合中。
    3. 当前节点服务状态不是LOOKING,则进行如下操作。
      1. 获取本地选票。
      2. 如果待处理消息中的服务状态是LOOKING则需要将本地选票转换为ToSend对象并且放入到sendqueue集合中。

QuorumCnxManager 分析

通过对FastLeaderElection类、WorkerSender类和WorkerReceiver类的分析可以发现它们都或多或少的使用了QuorumCnxManager,该类的核心价值是用于管理通讯,可以在WorkerSender#process方法中看通过manager.toSend方法用于发送消息,可以在WorkerReceiver#run方法中看到通过manager.pollRecvQueue方法用于获取消息,以及在FastLeaderElection#lookForLeader方法中看到通过manager.connectAll进行连接。接下来的将会对QuorumCnxManager类进行分析。开始方法分析之前需要先了解成员变量,具体信息见表。

变量名称变量类型变量说明
recvQueueArrayBlockingQueue<Message>接收到的消息
listenerListener监听器
selfQuorumPeer参选节点,这里特指节点本身
mySidlong服务id,配置文件zoo.cfg中的myid属性
socketTimeoutintsocket超时时间
viewMap<Long, QuorumPeer.QuorumServer>用于存储服务id和服务对象之间的关系
listenOnAllIPsboolean是否对所有IP进行监听
senderWorkerMapConcurrentHashMap<Long, SendWorker>服务id和发送线程的映射
queueSendMapConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>服务id和待发送队列的映射
lastMessageSentConcurrentHashMap<Long, ByteBuffer>服务id和最后需要消息的映射
inprogressConnectionsSet<Long>正在进行连接的服务id集合
recvQLockObject处理recvQueue对象时所需要的锁
recvQueueArrayBlockingQueue<Message>接收到的消息
tcpKeepAliveboolean是否保持 TCP 连接,数据从环境变量中读取zookeeper.tcpKeepAlive属性。
shutdownboolean是否处于关闭状态

在QuorumCnxManager类中相关操作主要分为两类,第一类是Socket操作,第二类是消息处理操作,本章接下来将对方法操作进行相关分析。

toSend 方法分析

本节将对toSend方法进行分析,之前在WorkerSender#process方法中有看到相关调度,它的调度表示有一个消息需要发送,具体处理代码如下。

public void toSend(Long sid, ByteBuffer b) {
  if (this.mySid == sid) {
    b.position(0);
    addToRecvQueue(new Message(b.duplicate(), sid));
  } else {
    ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(
      SEND_CAPACITY);
    ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);
    if (oldq != null) {
      addToSendQueue(oldq, b);
    } else {
      addToSendQueue(bq, b);
    }
    connectOne(sid);

  }
}

在上述代码中核心处理流程:判断参数sid和成员变量mySid是否相同,相同表示这个消息发送给本节点因此不需要建立网络通讯,直接调用addToRecvQueue方法将信息存储,反之则需要从queueSendMap集合中根据sid获取对应的消息列表然后将其加入并通过connectOne建立连接。

接下来对addToRecvQueue方法进行分析,该方法主要用于将消息放入到接收消息集合(成员变量recvQueue)中,具体处理代码如下。

public void addToRecvQueue(Message msg) {
  synchronized (recvQLock) {
    if (recvQueue.remainingCapacity() == 0) {
      try {
        recvQueue.remove();
      } catch (NoSuchElementException ne) {
        LOG.debug("Trying to remove from an empty " +
                  "recvQueue. Ignoring exception " + ne);
      }
    }
    try {
      recvQueue.add(msg);
    } catch (IllegalStateException ie) {
      LOG.error("Unable to insert element in the recvQueue " + ie);
    }
  }
}

在上述代码中首先会执行remainingCapacity方法判断剩余容量是否为0,如果是则需要执行remove方法。其次会通过add方法将消息加入到集合中。

接下来将对connectOne方法进行分析,由于connectOne方法存在多个实现因此延续toSend中的方法作为入口,具体处理代码如下。

synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr) {
    if (senderWorkerMap.get(sid) != null) {
        LOG.debug("There is a connection already for server " + sid);
        return true;
    }
    return initiateConnectionAsync(electionAddr, sid);
}

在上述代码中首先会判断senderWorkerMap容器中sid对应的SendWorker数据是否存在,如果不存在则返回true表示连接成功,反之则会调用initiateConnectionAsync方法进行连接初始化,具体处理代码如下。

public boolean initiateConnectionAsync(final InetSocketAddress electionAddr, final Long sid) {
  // 将sid加入到inprogressConnections集合,如果添 加失败则表示已经连接
  if (!inprogressConnections.add(sid)) {
    LOG.debug(
      "Connection request to server id: {} is already in progress, so skipping this request",
      sid);
    return true;
  }
  try {
    // 创建连接线程进行连接
    connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr, sid));
    connectionThreadCnt.incrementAndGet();
  } catch (Throwable e) {
    // 出现异常将sid从inprogressConnections集合中移除
    inprogressConnections.remove(sid);
    LOG.error("Exception while submitting quorum connection request", e);
    return false;
  }
  return true;
}

在上述代码中核心处理流程如下。

  1. 将sid加入到inprogressConnections集合,如果添加失败则表示已经连接。
  2. 通过创建QuorumConnectionReqThread类然后交给线程执行器进行连接,如果在连接建立过程中出现异常将需要把sid从inprogressConnections集合中移除。

在上述处理流程中提到inprogressConnections集合,该集合用于存储正在进行初始化连接的服务id,在QuorumConnectionReqThread类中关于线程执行逻辑的代码如下。

@Override
public void run() {
  try {
    initiateConnection(electionAddr, sid);
  } finally {
    inprogressConnections.remove(sid);
  }
}

在上述代码中可以看到首先进行initiateConnection方法操作进行连接初始化,然后将sid从inprogressConnections集合中移除。在initiateConnection方法中核心会完成如下内容。

  1. socket连接初始化。
  2. 对象SendWorke r初始化,初始化后需要加入到容器senderWorkerMap。
  3. 对象RecvWorker初始化。
  4. 启动SendWorker和RecvWorker

通过toSend方法可以发现数据会加载到recvQueue集合中,最后需要了解recvQueue数据的获取方式,在QuorumCnxManager类中通过pollRecvQueue方法即可获取,具体处理代码如下。

public Message pollRecvQueue(long timeout, TimeUnit unit)
  throws InterruptedException {
  return recvQueue.poll(timeout, unit);
}

SendWorker 分析

本节将对SendWorker类进行分析,该类继承自ZooKeeperThread类,它是一个线程对象,因此主要对run方法进行分析,具体处理代码如下。

@Override
public void run() {
  threadCnt.incrementAndGet();
  try {
    // 从需要发送的消息集合中获取sid对应的消息集合
    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
    // 消息集合不为空
    if (bq == null || isSendQueueEmpty(bq)) {
      // 从lastMessageSent取出消息,在不为空的情况下进行发送
      ByteBuffer b = lastMessageSent.get(sid);
      if (b != null) {
        LOG.debug("Attempting to send lastMessage to sid=" + sid);
        send(b);
      }
    }
  } catch (IOException e) {
    LOG.error("Failed to send last message. Shutting down thread.", e);
    // 结束处理
    this.finish();
  }
  LOG.debug("SendWorker thread started towards {}. myId: {}", sid,
            QuorumCnxManager.this.mySid);
  try {
    // 循环
    while (running && !shutdown && sock != null) {

      ByteBuffer b = null;
      try {
        // 从queueSendMap容器中获取消息集合
        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
          .get(sid);
        if (bq != null) {
          // 从消息集合中获取一个消息
          b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
        }
        else {
          LOG.error("No queue of incoming messages for " +
                    "server " + sid);
          break;
        }

        // 消息不为空
        if (b != null) {
          // 放入到lastMessageSent集合
          lastMessageSent.put(sid, b);
          // 发送
          send(b);
        }
      } catch (InterruptedException e) {
        LOG.warn("Interrupted while waiting for message on queue",
                 e);
      }
    }
  } catch (Exception e) {
    LOG.warn("Exception when using channel: for id " + sid
             + " my id = " + QuorumCnxManager.this.mySid
             + " error = " + e);
  }
  // 结束处理
  this.finish();
  LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}

在上述代码中核心处理流程如下。

  1. 从需要发送的消息集合(queueSendMap)中获取sid对应的消息集合。如果消息集合为空将从lastMessageSent容器中获取sid对应的消息,在消息不为空的情况下将消息通过send方法发送。
  2. 通过循环处理消息,单层循环逻辑如下。
    1. 从queueSendMap容器中获取消息集合,如果消息集合不为空将从中获取一个消息。
    2. 如果消息对象不为空需要将其放入到lastMessageSent集合并将其通过send方法发送。
  3. 执行finish方法结束处理。

RecvWorker 分析

本节将对RecvWorker类进行分析,该类继承自ZooKeeperThread类,它是一个线程对象,因此主要对run方法进行分析,具体处理代码如下。

@Override
public void run() {
  threadCnt.incrementAndGet();
  try {
    LOG.debug("RecvWorker thread towards {} started. myId: {}", sid,
              QuorumCnxManager.this.mySid);
    while (running && !shutdown && sock != null) {
      // 从数据输入流中获取数据长度
      int length = din.readInt();
      // 数据长度小于0 或者超过最大限额
      if (length <= 0 || length > PACKETMAXSIZE) {
        throw new IOException(
          "Received packet with invalid packet: "
          + length);
      }
      // 将数据输入流转换为消息对象
      byte[] msgArray = new byte[length];
      din.readFully(msgArray, 0, length);
      ByteBuffer message = ByteBuffer.wrap(msgArray);
      // 将消息加入到recvQueue集合
      addToRecvQueue(new Message(message.duplicate(), sid));
    }
  } catch (Exception e) {
    LOG.warn("Connection broken for id " + sid + ", my id = "
             + QuorumCnxManager.this.mySid + ", error = ", e);
  } finally {
    LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid,
             QuorumCnxManager.this.mySid);
    sw.finish();
    closeSocket(sock);
  }
}

在上述代码中核心处理流程如下。

  1. 从数据输入流(DataInputStream)中获取数据长度,如果数据长度小于0或者超过最大限额将抛出异常。
  2. 将数据输入流转换为消息对象将其放入到recvQueue集合。

Listener 分析

本节将对RecvWorker类进行分析,该类继承自ZooKeeperThread类,它是一个线程对象,因此主要对run方法中的核心部分进行分析,在run方法中主要处理如下三部分内容。

  1. 完成与某个服务的通讯绑定。
  2. 异步或者非异步的模式处理消息。
  3. 关闭标志为真的情况下进行关闭操作。

对于上述三部分操作主要对第二部分进行说明,执行入口代码如下。

if (quorumSaslAuthEnabled) {
  // 异步
  receiveConnectionAsync(client);
} else {
  // 非异步
  receiveConnection(client);
}

在异步处理中会使用QuorumConnectionReceiverThread类来执行相关操作,在QuorumConnectionReceiverThread类中核心处理流程还是receiveConnection方法,方法receiveConnection处理代码如下。

public void receiveConnection(final Socket sock) {
  DataInputStream din = null;
  try {
    // 将socket中的数据转换为数据输入流
    din = new DataInputStream(
      new BufferedInputStream(sock.getInputStream()));

    LOG.debug("Sync handling of connection request received from: {}",
              sock.getRemoteSocketAddress());
    // 处理连接
    handleConnection(sock, din);
  } catch (IOException e) {
    LOG.error("Exception handling connection, addr: {}, closing server connection",
              sock.getRemoteSocketAddress());
    LOG.debug("Exception details: ", e);
    closeSocket(sock);
  }
}

在上述方法中首先会将socket中的输入流转换为数据输入流对象,然后交给方法handleConnection进行处理,方法handleConnection处理代码如下。

private void handleConnection(Socket sock, DataInputStream din)
  throws IOException {

  Long sid = null, protocolVersion = null;
  InetSocketAddress electionAddr = null;

  try {
    // 从数据输入流中读取协议版本
    protocolVersion = din.readLong();
    // 协议版本大于零将协议版本号设置到sid变量中
    if (protocolVersion >= 0) { 
      sid = protocolVersion;
    } else {
      try {
        // 将数据输入流转换为InitialMessage对象
        InitialMessage init = InitialMessage.parse(protocolVersion, din);
        // sid
        sid = init.sid;
        // 选举地址
        electionAddr = init.electionAddr;
      } catch (InitialMessage.InitialMessageException ex) {
        LOG.error("Initial message parsing error!", ex);
        closeSocket(sock);
        return;
      }
    }

    if (sid == QuorumPeer.OBSERVER_ID) {
      sid = observerCounter.getAndDecrement();
      LOG.info("Setting arbitrary identifier to observer: " + sid);
    }
  } catch (IOException e) {
    LOG.warn("Exception reading or writing challenge: {}", e);
    closeSocket(sock);
    return;
  }

  authServer.authenticate(sock, din);
  // 数据输入流中的sid小于本地sid
  if (sid < self.getId()) {
    SendWorker sw = senderWorkerMap.get(sid);
    if (sw != null) {
      sw.finish();
    }
    LOG.debug("Create new connection to server: {}", sid);
    closeSocket(sock);

    if (electionAddr != null) {
      connectOne(sid, electionAddr);
    } else {
      connectOne(sid);
    }
  }
  // 数据输入流中的sid等于本地sid
  else if (sid == self.getId()) {
    LOG.warn("We got a connection request from a server with our own ID. "
             + "This should be either a configuration error, or a bug.");
  }
  // 数据输入流中的sid大于本地sid
  else {
    SendWorker sw = new SendWorker(sock, sid);
    RecvWorker rw = new RecvWorker(sock, din, sid, sw);
    sw.setRecv(rw);

    // 从senderWorkerMap容器中获取SendWorker然后将其结束
    SendWorker vsw = senderWorkerMap.get(sid);

    if (vsw != null) {
      vsw.finish();
    }

    // 设置新的sendWork
    senderWorkerMap.put(sid, sw);

    queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));

    // 发送和接收数据处理
    sw.start();
    rw.start();
  }
}

在上述代码中核心处理流程如下。

  1. 从数据输入流中获取sid、protocolVersion和electionAddr数据。
  2. 如果获取的sid小于当前节点的sid,则需要进行如下操作。
    1. 从senderWorkerMap容器中获取sid对应的SendWorker对象,如果SendWorker对象不为空将需要关闭SendWorker对象中的资源。
    2. 关闭socket。
    3. 如果electionAddr数据不为空将通过connectOne方法与之建立连接,反之则直接通过sid参数进行连接。
  3. 如果获取的sid等于当前节点的sid不做任何操作。
  4. 如果获取的sid大于当前节点的sid,则需要进行如下操作。
    1. 创建SendWorker对象和RecvWorker对象。
    2. 从senderWorkerMap容器中获取sid对应的SendWorker对象,如果SendWorker对象不为空将需要关闭SendWorker对象中的资源。
    3. 将新建的SendWorker加入到senderWorkerMap容器。
    4. 创建新的sid对应的消息集合放入到queueSendMap容器。
    5. 启动SendWorker对象和RecvWorker对象。

总结

本章对Zookeeper项目中关于选举的实现过程进行分析,主要围绕两个方面,第一方面是选举算法的入口,第二方面是选举算法的实现,在选举算法实现中涉及到多个类进行完成,着重对QuorumPeer类、FastLeaderElection类和QuorumCnxManager类进行相关分析。 通过本章分析可以得到选举的完整执行链路如图所示

sequenceDiagram
participant a as 选举算法
participant b as sendqueue
participant re as recvqueue
participant d as WorkerSender
participant wr as WorkerReceiver



participant f as recvQueue
participant g as queueSendMap
participant h as SendWorker
participant j as QuorumPeer 
participant r as RecvWorker




a ->>b:选举算法生成选票放入待发送队列
b->>d: WorkerSender从sendqueue中获取数据发送
d->>f: 如果sendqueue中取出的数据sid和当前节点的sid相同
d->>g:数据放入queueSendMap容器
g->> h :获取数据
h->>j:发送其他节点

j->>r:接收其他节点发送的数据
r->>f:接收数据放入到接收队列
f->>wr:处理选票
wr->>re:将处理的选票转换为通知放入recvqueue队列
re->>a:重新开始选举