zookeeper source code analysis

Zookeeper客户端与服务端交互

本章将对Zookeeper客户端与服务端交互相关内容进行分析,一般情况下客户端与服务端之间的交互可以分为如下几个阶段。

  1. 客户端与服务端建立连接。
  2. 客户端发起请求。
  3. 服务端处理请求。
  4. 客户端接收响应。

本章站在客户端视角进行分析,主要对第(1)、(2)和(4)阶段进行分析。

客户端与服务端建立连接

本节将对客户端与服务端建立连接相关内容进行分析,在Zookeeper项目中提供了原生客户端,客户端类为ZooKeeper,通常情况下使用代码如下。

ZooKeeper s = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        System.out.println(event);

        if (event.getState().equals(Event.KeeperState.SyncConnected)) {

            countDownLatch.countDown();
        }
    }
});

根据上述代码进一步在Zookeeper源码中找到完整的构造函数,具体代码如下。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly, HostProvider aHostProvider,
                 ZKClientConfig clientConfig) throws IOException {
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    // zk客户端配置为空的情况下设置默认配置
    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    // zk客户端配置放入成员变量
    this.clientConfig = clientConfig;
    // 创建默认的观察者管理器
    watchManager = defaultWatchManager();
    // 在观察者管理器中设置默认观察者
    watchManager.defaultWatcher = watcher;
    // 创建连接字符串解析器
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    // 设置主机供应商
    hostProvider = aHostProvider;

    // 创建连接
    cnxn = createConnection(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    // 启动
    cnxn.start();
}

在上述代码中需要重点关注ConnectStringParser类和createConnection方法,先对ConnectStringParser类进行分析,该类的核心目标是对连接字符串进行解析,先看构造函数,具体代码如下。

public ConnectStringParser(String connectString) {
  // 确认/的索引位置
  int off = connectString.indexOf('/');
  // 如果斜杠索引存在就将其解析成为成员变量chrootPath的数据,不存在将成员变量chrootPath设置为null
  if (off >= 0) {
    String chrootPath = connectString.substring(off);
    if (chrootPath.length() == 1) {
      this.chrootPath = null;
    } else {
      PathUtils.validatePath(chrootPath);
      this.chrootPath = chrootPath;
    }
    connectString = connectString.substring(0, off);
  }
  else {
    this.chrootPath = null;
  }

  // 将连接字符串按照逗号拆分
  List<String> hostsList = split(connectString, ",");
  for (String host : hostsList) {
    // 设置端口为默认端口
    int port = DEFAULT_PORT;
    // 拆分host,拆分后结果:host,port
    String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
    if (hostAndPort.length != 0) {
      host = hostAndPort[0];
      if (hostAndPort.length == 2) {
        port = Integer.parseInt(hostAndPort[1]);
      }
    }
    // 拆分后长度为0
    else {
      // 按照冒号拆分
      int pidx = host.lastIndexOf(':');
      if (pidx >= 0) {
        if (pidx < host.length() - 1) {
          port = Integer.parseInt(host.substring(pidx + 1));
        }
        host = host.substring(0, pidx);
      }
    }
    serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
  }
}

在上述代码中核心目标是完成成员变量chrootPath和serverAddresses的初始化,关于成员变量chrootPath的初始化流程如下。

  1. 从参数连接地址中获取斜杠('/')的索引,如果索引存在将连接字符串解析后设置给成员变量chrootPath,如果索引不存在则将成员变量chrootPath设置为null。
  2. 将连接字符串按照逗号(',')进行拆分,将拆分后结果进行循环处理,单个元素的处理流程如下。
    1. 设置端口号位默认端口号2181。
    2. 将单个元素进行拆分,拆分内容包含host和port。
    3. 将host和port组装成InetSocketAddress对象加入到成员变量serverAddresses中。

如果ConnectStringParser类的构造函数中传入127.0.0.1:2181数据在经过处理后得到的成员变量信息如图所示。

image-20220309084843148

在得到ConnectStringParser类后就可以开始进行连接的创建,具体创建连接代码如下。

cnxn = createConnection(connectStringParser.getChrootPath(),
                        hostProvider, sessionTimeout, this, watchManager,
                        getClientCnxnSocket(), canBeReadOnly);

在上述代码中需要对变量hostProvider和方法getClientCnxnSocket进行说明,前者是HostProvider接口的实现类,在Zookeeper中采用StaticHostProvider类做具体实现,构造StaticHostProvider需要输入InetSocketAddress集合,InetSocketAddress集合在分析ConnectStringParser类时有对其进行阐述产生流程,关于HostProvider接口的具体构造流程代码如下。

private static HostProvider createDefaultHostProvider(String connectString) {
    return new StaticHostProvider(
            new ConnectStringParser(connectString).getServerAddresses());
}

从上述代码中可以发现它通过ConnectStringParser类对连接字符串进行解析,将解析结果中的serverAddresses的数据信息作为StaticHostProvider类构造函数的参数传递从而得到HostProvider接口的具体实现类。

接下来对getClientCnxnSocket方法进行分析,该方法用于获取ClientCnxnSocket类,具体处理代码如下。

private ClientCnxnSocket getClientCnxnSocket() throws IOException {
  // 从zk客户端配置中获取zookeeper.clientCnxnSocket数据
  // 客户端名称
  String clientCnxnSocketName = getClientConfig().getProperty(
    ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
  // 如果客户端名称为空将使用ClientCnxnSocketNIO的类名
  if (clientCnxnSocketName == null) {
    clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
  }

  try {
    // 通过类名反射寻找参数为ZKClientConfig的构造函数
    Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
      .getDeclaredConstructor(ZKClientConfig.class);
    // 调用构造函数完成实例化
    ClientCnxnSocket clientCxnSocket =
      (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
    // 返回
    return clientCxnSocket;
  } catch (Exception e) {
    IOException ioe = new IOException("Couldn't instantiate "
                                      + clientCnxnSocketName);
    ioe.initCause(e);
    throw ioe;
  }
}

在上述代码中核心处理流程如下。

  1. 从zk客户端配置中获取zookeeper.clientCnxnSocket数据,将该数据作为客户端的名称(类名)。如果客户端名称不存在将采用ClientCnxnSocketNIO类名作为默认值。
  2. 通过类名反射构造类对象,同时在类对象中寻找参数为ZKClientConfig的构造函数。
  3. 通过第(2)步中的构造函数完成实例化返回。

至此已经得到构造ClientCnxn类的所有数据信息,接下来只需要调用ClientCnxn类的构造函数将其初始化即可。

客户端发起请求

本节将对客户端发起请求做相关分析,客户端发起请求以删除数据节点为例进行分析,删除节点代码如下。

public void delete(final String path, int version)
  throws InterruptedException, KeeperException {
  final String clientPath = path;
  PathUtils.validatePath(clientPath);

  final String serverPath;

  if (clientPath.equals("/")) {
    serverPath = clientPath;
  } else {
    serverPath = prependChroot(clientPath);
  }

  RequestHeader h = new RequestHeader();
  h.setType(ZooDefs.OpCode.delete);
  DeleteRequest request = new DeleteRequest();
  request.setPath(serverPath);
  request.setVersion(version);
  ReplyHeader r = cnxn.submitRequest(h, request, null, null);
  if (r.getErr() != 0) {
    throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                                 clientPath);
  }
}

在上述代码中发起请求的核心代码是cnxn.submitRequest(h, request, null, null),通过cnxn.submitRequest方法可以将请求发送,核心处理代码如下。

public ReplyHeader submitRequest(RequestHeader h, Record request,
                                 Record response, WatchRegistration watchRegistration,
                                 WatchDeregistration watchDeregistration)
  throws InterruptedException {
  // 创建ReplyHeader对象
  ReplyHeader r = new ReplyHeader();
  // 创建Packet对象
  Packet packet = queuePacket(h, r, request, response, null, null, null,
                              null, watchRegistration, watchDeregistration);
  synchronized (packet) {
    // 请求超时时间大于0
    if (requestTimeout > 0) {
      // 发起请求并且等requestTimeout时间
      waitForPacketFinish(r, packet);
    }
    else {
      // 一直等待直到请求处理结束
      while (!packet.finished) {
        packet.wait();
      }
    }
  }
  // 如果ReplyHeader对象中的err数值是-122
  if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
    // 清理数据
    sendThread.cleanAndNotifyState();
  }
  return r;
}

在上述代码中首先会创建ReplyHeader对象,该对象可以理解为用于接收响应的对象。其次会创建Packet对象,该对象是用于传输数据的对象(数据包),负责处理这个行为的方法是queuePacket,代码如下。

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
                          Record response, AsyncCallback cb, String clientPath,
                          String serverPath, Object ctx, WatchRegistration watchRegistration,
                          WatchDeregistration watchDeregistration) {
    Packet packet = null;
    packet = new Packet(h, r, request, response, watchRegistration);
    packet.cb = cb;
    packet.ctx = ctx;
    packet.clientPath = clientPath;
    packet.serverPath = serverPath;
    packet.watchDeregistration = watchDeregistration;
    synchronized (state) {
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    sendThread.getClientCnxnSocket().packetAdded();
    return packet;
}

在这段代码中分为三部分处理。第一部分是负责组装Packet对象的数据信息,第二部分是根据状态做出不同的操作,第三部分是客户端相关操作。重点放在第二部分,流程如下。

  1. 判断客户端状态是否是存活的或者已经关闭,如果满足则会进行如下操作,核心处理流程在conLossPacket方法中。
    1. 根据不同的状态设置不同的异常信息。
    2. 发送异常事件。
  2. 在不满足上一个条件的情况下将数据加入到outgoingQueue集合中。

通过上述操作可以将数据加入到outgoingQueue集合中,在加入到outgoingQueue集合中以后会交给发送线程(sendThread)进行处理,重点关注SendThread#run。在发送完成请求后会需要读取服务端的响应信息,这部分逻辑代码也是交给发送线程进行处理的,具体处理方法是SendThread#readResponse。

ClientCnxnSocket 分析

本节将对ClientCnxnSocket类进行分析,该类是用于进行socket交互的类,在Zookeeper中存在两个实现类分别是。

  1. 基于JavaNIO实现的ClientCnxnSocketNIO。
  2. 基于Netty实现的ClientCnxnSocketNetty。

接下来将对ClientCnxnSocket类以及子类进行分析,首先需要对ClientCnxnSocket类做了解,先从成员变量开始,详细内容见表。

变量名称变量类型变量说明
lenBufferByteBuffer用于读取消息的缓存长度
sentCountAtomicLong发送数量
recvCountAtomicLong接收数量
initializedboolean是否初始化
incomingBufferByteBuffer读取消息的长度
lastHeardlong上一次数据接收时间
lastSendlong上一次数据发送时间
nowlong当前时间
sendThreadClientCnxn.SendThread发送线程
outgoingQueueLinkedBlockingDeque<Packet>需要发送的队列
clientConfigZKClientConfigzk客户端配置
sessionIdlong会话id
packetLenint数据包长度

了解成员变量后下面对ClientCnxnSocket类中的部分方法做介绍,具体内容如下。

  1. 方法readConnectResult用于读取连接结果。
  2. 方法isConnected用于判断是否已连接。
  3. 方法connect用于建立连接。
  4. 方法getRemoteSocketAddress用于获取远端套接字地址。
  5. 方法getLocalSocketAddress用于获取本地套接字地址。
  6. 方法cleanup用于清理资源。
  7. 方法packetAdded用于将packet放入到outgoingQueue中。
  8. 方法onClosing用于将connState状态设置为CLOSED并且通知ClientCnxnSocket。
  9. 方法saslCompleted在sasl连接后执行。
  10. 方法connectionPrimed在完成PrimeConnection后执行。
  11. 方法doTransport用于数据传输。
  12. 方法testableCloseSocket用于关闭套接字。
  13. 方法close用于关闭客户端。
  14. 方法sendPacket用于发送包。
  15. 方法initProperties用于初始化配置,具体配置是packetLen。

接下来对上述方法中的readConnectResult方法进行分析,具体处理代码如下。

void readConnectResult() throws IOException {

  if (LOG.isTraceEnabled()) {
    StringBuilder buf = new StringBuilder("0x[");
    for (byte b : incomingBuffer.array()) {
      buf.append(Integer.toHexString(b) + ",");
    }
    buf.append("]");
    LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
              + buf.toString());
  }
  ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
  BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  ConnectResponse conRsp = new ConnectResponse();
  conRsp.deserialize(bbia, "connect");

  boolean isRO = false;
  try {
    isRO = bbia.readBool("readOnly");
  } catch (IOException e) {
    LOG.warn("Connected to an old server; r-o mode will be unavailable");
  }

  this.sessionId = conRsp.getSessionId();
  sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                         conRsp.getPasswd(), isRO);
}

在上述代码中首先会完成日志的输出,输出内容是incomingBuffer变量中的十六机制数据,其次会从incomingBuffer变量中将数据转换为ConnectResponse类和isRO变量,前者表示连接响应结果后者表示是否只读,最后会通过sendThread变量组装数据进行onConnected方法调度。

ClientCnxnSocketNIO 分析

本节将对ClientCnxnSocketNIO类进行分析,该类是基于Java NIO原生开发的主要围绕Selector类和SelectionKey类进行操作。接下来对建立连接的connect方法进行分析,该方法用于建立连接,具体处理代码如下。

@Override
void connect(InetSocketAddress addr) throws IOException {
  SocketChannel sock = createSock();
  try {
    registerAndConnect(sock, addr);
  } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
    LOG.error("Unable to open socket to {}" + addr);
    sock.close();
    throw e;
  }
  initialized = false;

  lenBuffer.clear();
  incomingBuffer = lenBuffer;
}

在上述代码中核心处理流程如下。

  1. 打开socket通道。
  2. 注册套接字地址并建立连接。
  3. 清理incomingBuffer变量和lenBuffer变量。

在上述三个处理流程中主要关注第(2)步,具体代码如下。

void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
  throws IOException {
  sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
  boolean immediateConnect = sock.connect(addr);
  if (immediateConnect) {
    sendThread.primeConnection();
  }
}

在上述代码中首先将成员变量selector注册到socket通道中,此时会返回SelectionKey对象将其赋值给成员变量sockKey。完成注册操作后会进行建立连接操作,如果建立连接结果值为真则会进行SendThread#primeConnection方法调度。接下来对sendPacket方法进行分析,该方法用于发送Packet,具体处理代码如下。

@Override
void sendPacket(Packet p) throws IOException {
  SocketChannel sock = (SocketChannel) sockKey.channel();
  if (sock == null) {
    throw new IOException("Socket is null!");
  }
  p.createBB();
  ByteBuffer pbb = p.bb;
  sock.write(pbb);
}

在上述代码中处理流程如下。

  1. 从成员变量sockKey中获取socket通道,如果socket通道为空将爬出异常。
  2. 通过Packet创建ByteBuffer,将创建结果通过socket通道写出。

在ClientCnxnSocketNIO类中还有其他关于socket相关的处理方法本节不对其做完全分析,本节主要对建立连接和发送Packet进行分析,接下来对doTransport方法进行分析,具体处理代码如下。

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
  throws IOException, InterruptedException {
  // 设置等待时间
  selector.select(waitTimeOut);
  // SelectionKey集合
  Set<SelectionKey> selected;
  // 初始化SelectionKey集合
  synchronized (this) {
    selected = selector.selectedKeys();
  }
  // 计算成员变量now
  updateNow();
  for (SelectionKey k : selected) {
    // 从SelectionKey中获取socket通道
    SocketChannel sc = ((SocketChannel) k.channel());
    if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
      // 完成连接
      if (sc.finishConnect()) {
        // 更新lastSend和lastHeard
        updateLastSendAndHeard();
        // 更新localSocketAddress和remoteSocketAddress
        updateSocketAddresses();
        sendThread.primeConnection();
      }
    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
      doIO(pendingQueue, cnxn);
    }
  }
  if (sendThread.getZkState().isConnected()) {
    if (findSendablePacket(outgoingQueue,
                           sendThread.tunnelAuthInProgress()) != null) {
      enableWrite();
    }
  }
  selected.clear();
}

在上述代码中核心处理流程如下。

  1. 向成员变量selector设置等待时间。
  2. 从成员变量selector中获取SelectionKey集合。
  3. 通过updateNow方法将成员变量now进行修改。
  4. 循环SelectionKey集合,对单个元素操作流程如下。
    1. 满足(k.readyOps() & SelectionKey.OP_CONNECT) != 0条件和sc.finishConnect()条件,则更新lastSend、lastHeard、localSocketAddress和remoteSocketAddress,条件含义为连接操作。
    2. 满足(k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0条件,则进行真正的数据传输操作,具体处理方法是doIO,条件含义为读或写操作。
  5. 判断成员变量sendThread中的zk状态是否是已连接,如果已连接则会进行Packet的查询,如果查询结果存在将设置成员变量sockKey的数据,设置内容的含义是开启写入。

在对成员变量sockKey的操作提供了三个方法分别是。

  1. 方法enableWrite用于启用写入操作。
  2. 方法disableWrite用于关闭写入操作。
  3. 方法enableRead用于启用读取操作。

接下来将对doIO方法进行分析,该方法的作用是用于传输数据,首先需要明确该方法的两个参数。

  1. 参数pendingQueue表示需要发送的包。
  2. 参数cnxn表示套接字客户端。

对于doIO方法的分析需要分为两部分进行,这两部分是读和写,下面先对读部分代码进行如下,具体处理代码如下。

if (sockKey.isReadable()) {
  // 从socket中读取incomingBuffer量的数据
  int rc = sock.read(incomingBuffer);
  // 读取结果小于0表示从服务端读取失败
  if (rc < 0) {
    throw new EndOfStreamException(
      "Unable to read additional data from server sessionid 0x"
      + Long.toHexString(sessionId)
      + ", likely server has closed socket");
  }
  // 指定区间内没有内容的情况下
  if (!incomingBuffer.hasRemaining()) {
    incomingBuffer.flip();
    // incomingBuffer和lenBuffer相同
    if (incomingBuffer == lenBuffer) {
      // 接收数量累加1
      recvCount.getAndIncrement();
      // 读取长度信息
      readLength();
    }
    // 未初始化的情况
    else if (!initialized) {
      // 读取连接结果
      readConnectResult();
      enableRead();
      if (findSendablePacket(outgoingQueue,
                             sendThread.tunnelAuthInProgress()) != null) {
        enableWrite();
      }
      lenBuffer.clear();
      incomingBuffer = lenBuffer;
      updateLastHeard();
      initialized = true;
    }
    // 其他情况
    else {
      // 交给sendThread读取响应
      sendThread.readResponse(incomingBuffer);
      lenBuffer.clear();
      incomingBuffer = lenBuffer;
      updateLastHeard();
    }
  }
}

在上述代码中主要处理流程如下。

  1. 从socket通道中读取incomingBuffer量的数据,如果读取结果小于0表示从服务端读取失败抛出异常。
  2. 如果成员变量incomingBuffer和成员变量lenBuffer相同,则进行两个操作。
    1. 接收数量累加1。
    2. 从成员变量incomingBuffer中读取长度信息,如果小于0或者大于等于最大长度抛出异常,如果没有异常则会重置成员变量incomingBuffer。
  3. 如果成员变量initialized为false(未初始化的情况)进行如下操作。
    1. 通过readConnectResult方法对连接结果进行处理。
    2. 通过enableRead方法将成员变量sockKey设置为启用读取。
    3. 通过findSendablePacket方法确认是否存在需要发送的Packet,如果存在通过enableWrite方法将成员变量sockKey设置为启用写入。
    4. 清理成员变量lenBuffer和成员变量incomingBuffer、更新lastHeard数据和设置成员变量initialized为true,表示初始化完成。
  4. 处理其他情况,流程如下。
    1. 通过SendThread#readResponse读取响应信息。
    2. 清理成员变量lenBuffer和成员变量incomingBuffer并且更新lastHeard数据。

最后对写入相关操作进行分析,具体处理代码如下。

if (sockKey.isWritable()) {
  // 寻找需要发送的Packet
  Packet p = findSendablePacket(outgoingQueue,
                                sendThread.tunnelAuthInProgress());

  // Packet不为空的情况
  if (p != null) {
    // 更新lastSend
    updateLastSend();
    // packet中的bb变量为空将会创建bb变量
    if (p.bb == null) {
      if ((p.requestHeader != null) &&
          (p.requestHeader.getType() != OpCode.ping) &&
          (p.requestHeader.getType() != OpCode.auth)) {
        p.requestHeader.setXid(cnxn.getXid());
      }
      p.createBB();
    }
    // 写出bb
    sock.write(p.bb);
    // 指定区间内没有内容的情况下
    if (!p.bb.hasRemaining()) {
      // 发送数量累加1
      sentCount.getAndIncrement();
      // 发送队列中移除p
      outgoingQueue.removeFirstOccurrence(p);

      // 如果请求类型不是ping和auth将packet写入参数pendingQueue中
      if (p.requestHeader != null
          && p.requestHeader.getType() != OpCode.ping
          && p.requestHeader.getType() != OpCode.auth) {
        synchronized (pendingQueue) {
          pendingQueue.add(p);
        }
      }
    }
  }
  // 如果需要写出的包集合为空将关闭写入
  if (outgoingQueue.isEmpty()) {
    disableWrite();
  }
  // 如果未初始化并且包不为空并且bb在指定范围内没有数据
  else if (!initialized && p != null && !p.bb.hasRemaining()) {
    disableWrite();
  }
  // 启用写入操作
  else {
    enableWrite();
  }
}

在上述代码中核心处理流程如下。

  1. 通过findSendablePacket方法寻找需要发送的数据包。
  2. 如果数据包不为空则进行如下操作。
    1. 更新成员变量lastSend。
    2. 如果数据包中的bb变量为空,并且数据包中的请求头不为空,类型不是ping和auth,将为数据包中的请求头设置xid,xid数据来源从cnxn中获取。通过数据包的createBB方法初始化bb变量。
    3. 将数据包中的bb变量通过sock写出。
    4. 如果数据包中的bb变量在指定区间内没有内容的情况下,首先将发送数量累加1,在成员变量outgoingQueue中移除数据包。此外如果数据包的头信息不为空并且类型不是ping和auth将会把发送的数据包放入到方法参数pendingQueue中。
  3. 如果成员变量outgoingQueue为空将关闭写入操作。
  4. 如果未初始化并且数据包不为空并且数据包的bb变量在指定范围内没有数据将关闭写入操作。
  5. 其他情况启用写入操作。

在整个处理流程中需要注意,outgoingQueue集合中的一个数据会被转移到pendingQueue集合中,pendingQueue集合中的数据会在后续解析响应中用到。

ClientCnxnSocketNetty 分析

本节将对ClientCnxnSocketNetty类进行分析,该类是基于Netty开发的通讯类,本节分析主要围绕建立连接和发送数据进行分析,首先对建立连接的connect方法进行分析,具体处理代码如下。

@Override
void connect(InetSocketAddress addr) throws IOException {
  // 同步器初始化
  firstConnect = new CountDownLatch(1);

  // 创建Netty引导程序
  Bootstrap bootstrap = new Bootstrap()
    .group(eventLoopGroup)
    .channel(NettyUtils.nioOrEpollSocketChannel())
    .option(ChannelOption.SO_LINGER, -1)
    .option(ChannelOption.TCP_NODELAY, true)
    .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
  // 配置引导程序
  bootstrap = configureBootstrapAllocator(bootstrap);
  // 验证引导程序
  bootstrap.validate();

  // 上锁
  connectLock.lock();
  try {
    // 建立连接
    connectFuture = bootstrap.connect(addr);
    // 在连接结果中增加监听器
    connectFuture.addListener(new ChannelFutureListener() {
      @Override
      public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 初始化连接成功标记为false
        boolean connected = false;
        connectLock.lock();
        try {
          // 建立连接失败
          if (!channelFuture.isSuccess()) {
            LOG.info("future isn't success, cause:", channelFuture.cause());
            return;
          }
          // connectFuture为空关闭通道
          else if (connectFuture == null) {
            LOG.info("connect attempt cancelled");
            channelFuture.channel().close();
            return;
          }
          // 打开通道赋值给成员变量channel
          channel = channelFuture.channel();
          // 设置是否断开为未断开
          disconnected.set(false);
          // 设置是否初始化为未初始化
          initialized = false;
          // 清理lenBuffer变量和incomingBuffer变量
          lenBuffer.clear();
          incomingBuffer = lenBuffer;

          sendThread.primeConnection();
          // 更新now、lastSend和lastHeard
          updateNow();
          updateLastSendAndHeard();

          // 是否处于身份验证中
          if (sendThread.tunnelAuthInProgress()) {
            waitSasl.drainPermits();
            needSasl.set(true);
            sendPrimePacket();
          } else {
            needSasl.set(false);
          }
          connected = true;
        } finally {
          connectFuture = null;
          connectLock.unlock();
          if (connected) {
            LOG.info("channel is connected: {}", channelFuture.channel());
          }
          wakeupCnxn();
          firstConnect.countDown();
        }
      }
    });
  } finally {
    connectLock.unlock();
  }
}

在上述代码中主要使用Netty相关类进行建立连接的操作,核心操作流程如下。

  1. 创建Netty引导程序,配置引导程序,验证引导程序。
  2. 通过Netty引导程序提供的connect方法建立连接,并为连接建立结果设置监听器。

在上述两个操作过程中最重要的是监听器,在这个监听器中处理流程如下。

  1. 初始化连接成功标记为false。
  2. 如果建立连接失败将结束处理。
  3. 如果成员变量connectFuture为空则会关闭通道。
  4. 通过监听器参数打开通道并将其赋值给成员变量channel。
  5. 设置是否断开为未断开。
  6. 设置是否初始化为未初始化。
  7. 清理lenBuffer变量和incomingBuffer变量。
  8. 发送线程执行primeConnection方法。
  9. 更新成员变量now、lastSend和lastHeard。
  10. 如果发送线程处于身份验证中将设置needSasl变量为真,并发送一个Prime数据包。如果未处于身份验证中将设置needSasl变量为假。
  11. 设置连接成功标记为true。

接下来将 对最重要的数据传输方法进行分析,实现方法是doTransport,具体代码如下。

@Override
void doTransport(int waitTimeOut,
                 List<Packet> pendingQueue,
                 ClientCnxn cnxn)
  throws IOException, InterruptedException {
  try {
    // 在一定时间内计数器未清零结束处理
    if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
      return;
    }
    Packet head = null;
    if (needSasl.get()) {
      // 信号量获取失败结束处理
      if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
        return;
      }
    } else {
      // 从outgoingQueue中获取数据包
      head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
    }
    // 如果发送线程中的zk状态为不存活
    if (!sendThread.getZkState().isAlive()) {
      // 将数据包写入到outgoingQueue第一位
      addBack(head);
      return;
    }
    // 如果连接断开
    if (disconnected.get()) {
      // 将数据包写入到outgoingQueue第一位
      addBack(head);
      throw new EndOfStreamException("channel for sessionid 0x"
                                     + Long.toHexString(sessionId)
                                     + " is lost");
    }
    // 数据包不为空的情况下写出
    if (head != null) {
      doWrite(pendingQueue, head, cnxn);
    }
  } finally {
    updateNow();
  }
}

在上述代码中核心处理流程如下。

  1. 如果成员变量firstConnect在waitTimeOut时间内未清零计数器将结束处理。
  2. 判断是否需要SASL,如果需要则从SASL信号量中尝试获取锁,如果获取失败则技术处理。如果不需要SASL将从outgoingQueue变量中获取数据包。
  3. 如果发送线程中的zk状态信息是不存活将数据包写入到outgoingQueue第一位,放入后结束处理,写入条件如下。
    1. 数据包不为空。
    2. 数据包数据信息不为null。
  4. 判断是否断开连接,如果断开连接将数据包写入到outgoingQueue第一位,放入后抛出异常结束处理。
  5. 数据包不为空的情况下通过doWrite方法写出数据包。

在doTransport方法中实际发送数据包的方法是doWrite ,具体处理代码如下。

private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
  updateNow();
  // 是否全部发送
  boolean anyPacketsSent = false;
  while (true) {
    // 数据包和空数据包不相同
    if (p != WakeupPacket.getInstance()) {
      // 数据包头信息不为空
      // 数据包头信息中的类型不为ping和auth
      if ((p.requestHeader != null) &&
          (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
          (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
        // 设置头信息的xid
        p.requestHeader.setXid(cnxn.getXid());
        synchronized (pendingQueue) {
          pendingQueue.add(p);
        }
      }
      // 发送包
      sendPktOnly(p);
      anyPacketsSent = true;
    }
    // 如果outgoingQueue为空跳出循环
    if (outgoingQueue.isEmpty()) {
      break;
    }
    // 从outgoingQueue中移除一个数据包用于后续发送
    p = outgoingQueue.remove();
  }
  // 如果全部发送则进行flush操作
  if (anyPacketsSent) {
    channel.flush();
  }
}

在上述代码中核心操作在while循环内,单个循环处理流程如下。

  1. 如果数据包中的各项数据都不为null则进行如下操作。
    1. 如果数据包头信息不为空并且头信息中的类型不是ping和auth,将设置头信息中的xid,并将数据包放入到pendingQueue集合中。
    2. 发送数据包。
  2. 如果outgoingQueue为空跳出循环。
  3. 从outgoingQueue中移除一个数据包用于后续发送。

通过对doWrite方法的分析可以知道核心发送数据包的方法是sendPkt,完整处理代码如下。

private ChannelFuture sendPkt(Packet p, boolean doFlush) {
  // 创建数据包中的bb变量
  p.createBB();
  updateLastSend();
  // 包装bb变量
  final ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb);
  // 写出包装后的字节
  final ChannelFuture result = doFlush
    ? channel.writeAndFlush(writeBuffer)
    : channel.write(writeBuffer);
  // 添加监听器
  result.addListener(onSendPktDoneListener);
  // 返回写出结果
  return result;
}

在上述代码中核心处理流程如下。

  1. 创建数据包中的bb变量。
  2. 包装数据包中的bb变量。
  3. 通过通道写出包装后的bb变量。
  4. 在第(3)步中的写出结果中添加监听器,监听器是成员变量onSendPktDoneListener,核心处理是在发送成功的情况下为发送计数器累加1。
  5. 返回第(3)步中的写出结果。

ClientCnxn 分析

本节将对ClientCnxn类进行分析,在ClientCnxn中存在多个内部类和方法,先对内部类进行简单介绍。

  1. 类AuthData用于存储权限验证相关数据。
  2. 类Packet用于存储客户端和服务端之间的通讯数据。
  3. 类WatcherSetEventPair用于存储观察事件(WatchedEvent)和观察器(Watcher)集合。
  4. 类EndOfStreamException表示流结束异常。
  5. 类SessionTimeoutException表示会话超时异常。
  6. 类SessionExpiredException表示会话过期异常。
  7. 类RWServerFoundException表示服务发现异常。
  8. 类LocalCallback用于存储本地回调实现。
  9. 类EventThread表示事件线程。
  10. 类SendThread表示发送线程。

Packet 分析

本节将对Packet 类进行分析,首先对Packet类中的成员变量进行介绍,详细内容见表 。

变量名称变量类型变量说明
readOnlyboolean是否只读
requestHeaderRequestHeader请求头
replyHeaderReplyHeader响应头
requestRecord请求
responseRecord响应
bbByteBuffer待发送的字节
clientPathString客户端地址
serverPathString服务端地址
finishedboolean是否结束
cbAsyncCallback回调
ctxObject上下文信息对象
watchRegistrationWatchRegistrationWatch注册参数
watchDeregistrationWatchDeregistrationWatch取消注册参数

了 解Packet类中的成员变量后下面对核心方法createBB进行分析,具体代码如下。

public void createBB() {
  try {
    // 创建字节输出流
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    // 创建字节输出档案
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
    // 写入len数据值为-1
    boa.writeInt(-1, "len"); // We'll fill this in later
    // 将请求头写入到输出档案
    if (requestHeader != null) {
      requestHeader.serialize(boa, "header");
    }
    // 如果请求是连接请求
    if (request instanceof ConnectRequest) {
      request.serialize(boa, "connect");
      boa.writeBool(readOnly, "readOnly");
    }
    // 请求不为空的情况下
    else if (request != null) {
      request.serialize(boa, "request");
    }
    // 关闭字节输出流
    baos.close();
    // 字节输出流转换为ByteBuffer
    this.bb = ByteBuffer.wrap(baos.toByteArray());
    this.bb.putInt(this.bb.capacity() - 4);
    this.bb.rewind();
  } catch (IOException e) {
    LOG.warn("Ignoring unexpected exception", e);
  }
}

在上述代码中核心目标是完成成员变量bb的构造,具体处理流程如下。

  1. 创建字节输出流。
  2. 创建字节输出档案。
  3. 向输出档案写入len数据值为-1。
  4. 如果请求头不为空将其写入到输出档案中。
  5. 如果请求类型是连接请求将请求写入到输出档案中并且将是否只读标记写入到输出档案。
  6. 如果请求不为空将其写入到输出档案中。
  7. 字节输出流转换为ByteBuffer赋值到成员变量bb,核心数据来源在输出档案中。

EventThread 分析

本节将对EventThread类进行分析,该类继承自ZooKeeperThread类核心是Thread,在该类中存在四个成员变量详细内容见表。

变量名称变量类型变量说明
waitingEventsLinkedBlockingQueue<Object>等待处理的事件集合
sessionStateKeeperStateZookeeper状态
wasKilledboolean是否停止
isRunningboolean是否运行中

在EventThread类中需要关注两个方法。

  1. 方法queueEvent用于向成员变量waitingEvents中添加数据。
  2. 方法processEvent用于处理waitingEvents中的数据。

下面对queueEvent方法进行分析,具体处理代码如下。

private void queueEvent(WatchedEvent event,
                        Set<Watcher> materializedWatchers) {
  // 如果事件类型是None并且ZK状态和事件中的ZK状态相同
  if (event.getType() == EventType.None
      && sessionState == event.getState()) {
    return;
  }
  // 从事件中获取zk状态赋值到成员变量sessionState
  sessionState = event.getState();
  final Set<Watcher> watchers;
  // 如果参数观察器集合为空需要搜索
  if (materializedWatchers == null) {
    // materialize the watchers based on the event
    watchers = watcher.materialize(event.getState(),
                                   event.getType(), event.getPath());
  }
  // 参数观察器集合不为空加入到watchers集合中
  else {
    watchers = new HashSet<Watcher>();
    watchers.addAll(materializedWatchers);
  }
  // 构造WatcherSetEventPair对象,向waitingEvents中添加WatcherSetEventPair对象
  WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
  waitingEvents.add(pair);
}

在上述代码中核心处理流程如下。

  1. 如果事件类型是None并且ZK状态和事件中的ZK状态相同则结束处理。
  2. 从时间中获取zk状态并将其赋值到成员变量sessionState。
  3. 判断方法参数materializedWatchers是否为空,如果为空需要通过ClientWatchManager#materialize方法搜索,将搜索结果放入到变量watchers中,反之则将方法参数materializedWatchers都加入到变量watchers中。
  4. 构造WatcherSetEventPair对象,向waitingEvents变量中添加WatcherSetEventPair对象。

注意在waitingEvents变量的定义中采用的元素标记是Object,说明它的存储数据是多样的,实际能够放入的数据类型有:WatcherSetEventPair、LocalCallback和Packet三种。

接下来将对waitingEvents变量中的事件处理流程进行分析,核心入口是EventThread#run方法,具体代码如下。

@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
  try {
    isRunning = true;
    while (true) {
      // 获取需要处理的事件
      Object event = waitingEvents.take();
      // 判断事件不是死亡事件
      if (event == eventOfDeath) {
        wasKilled = true;
      } else {
        // 处理事件
        processEvent(event);
      }
      if (wasKilled)
        synchronized (waitingEvents) {
        if (waitingEvents.isEmpty()) {
          isRunning = false;
          break;
        }
      }
    }
  } catch (InterruptedException e) {
    LOG.error("Event thread exiting due to interruption", e);
  }

  LOG.info("EventThread shut down for session: 0x{}",
           Long.toHexString(getSessionId()));
}

在上述代码中会通过循环来完成事件的处理,处理流程如下。

  1. 从成员变量waitingEvents中获取事件。
  2. 如果事件是空对象(new Object())需要标记成员变量wasKilled为真,反之则将事件交给processEvent方法进行处理。
  3. 如果成员变量wasKilled为真并且成员变量waitingEvents中没有事件数据将标记成员变量isRunning为假,并结束循环。

在整个事件线程处理中最终的处理方法是processEvent,在该方法中核心是依靠如下两种方法对事件进行处理。

  1. 通过Watcher#process方法处理事件。
  2. 通过AsyncCallback实现类中的processResult方法处理事件。

SendThread 分析

本节将对EventThread类进行分析,该类继承自ZooKeeperThread类核心是Thread,重点关注run方法,该方法中的核心处理可以分为如下四部分进行,这四部分分别是。

  1. Socket连接未建立的处理。
  2. Zookeeper状态处于已连接并且是Sasl模式。
  3. Zookeeper状态处于已连接非Sasl模式。
  4. Zookeeper状态处于只读连接。

接下来先对第一部分操作进行分析,处理代码如下。

if (!clientCnxnSocket.isConnected()) {
  // 关闭状态跳出循环
  if (closing) {
    break;
  }
  // 处理套接字地址
  if (rwServerAddress != null) {
    serverAddress = rwServerAddress;
    rwServerAddress = null;
  } else {
    // 主机供应商中获取套接字地址
    serverAddress = hostProvider.next(1000);
  }
  // 开始建立连接
  startConnect(serverAddress);
  // 更新数据
  clientCnxnSocket.updateLastSendAndHeard();
}

在这段代码中如果成员变量closing为真则跳出循环结束处理,成员变量closing表示是否处于关闭状态。在没有跳出循环后会处理套接字地址,套接字地址计算有两种模式:第一种成员变量rwServerAddress不为空的情况下套接字地址直接取值rwServerAddress变量、第二种从成员变量hostProvider中获取套接字地址。确认套接字地址后将使用startConnect方法连接连接,建立连接代码如下。

private void startConnect(InetSocketAddress addr) throws IOException {
  saslLoginFailed = false;
  // 不是第一次连接
  if (!isFirstConnect) {
    try {

      Thread.sleep(r.nextInt(1000));
    } catch (InterruptedException e) {
      LOG.warn("Unexpected exception", e);
    }
  }
  // 状态设置为连接建立中
  state = States.CONNECTING;

  // 组装地址
  String hostPort = addr.getHostString() + ":" + addr.getPort();
  MDC.put("myid", hostPort);
  setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
  // 如果启用sasl客户端
  if (clientConfig.isSaslClientEnabled()) {
    try {
      // sasl不为空先关闭sasl客户端并且重新构造sasl客户端
      if (zooKeeperSaslClient != null) {
        zooKeeperSaslClient.shutdown();
      }
      zooKeeperSaslClient = new ZooKeeperSaslClient(
        SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
        clientConfig);
    } catch (LoginException e) {
      LOG.warn("SASL configuration failed: " + e
               + " Will continue connection to Zookeeper server without "
               + "SASL authentication, if Zookeeper server allows it.");
      eventThread.queueEvent(new WatchedEvent(
        Watcher.Event.EventType.None,
        Watcher.Event.KeeperState.AuthFailed, null));
      saslLoginFailed = true;
    }
  }
  logStartConnect(addr);
  // 建立连接
  clientCnxnSocket.connect(addr);
}

在上述代码中主要执行流程如下。

  1. 判断是否是第一次连接,如果不是则会等待0-1000毫秒的时间。这个等待时间是两次连接之间的间隔时间。
  2. 初始化zk状态为连接建立中。
  3. 将套接字地址转换为字符串将其放入MDC和线程名称中。
  4. 判断是否启用sasl客户端,如果启用并且成员变量sasl客户端存在将先关闭成员变量的sasl客户端再创建一个新的Sasl客户端。
  5. 记录日志,通过clientCnxnSocket建立连接。

这便是Socket连接未建立的处理流程细节,接下来将对第(2)部分内容进行分析,核心代码如下。

// zk状态是已连接
if (state.isConnected()) {
  // sasl客户端存在
  if (zooKeeperSaslClient != null) {
    // 是否发送身份验证事件
    boolean sendAuthEvent = false;
    // 如果sasl客户端状态处于最初状态将进行初始化
    if (zooKeeperSaslClient.getSaslState()
        == ZooKeeperSaslClient.SaslState.INITIAL) {
      try {
        zooKeeperSaslClient.initialize(ClientCnxn.this);
      } catch (SaslException e) {
        LOG.error(
          "SASL authentication with Zookeeper Quorum member failed: "
          + e);
        state = States.AUTH_FAILED;
        sendAuthEvent = true;
      }
    }
    // 从sasl客户端中获取zk状态
    KeeperState authState = zooKeeperSaslClient.getKeeperState();
    // zk状态不为空
    if (authState != null) {
      // zk状态为是否验证失败
      if (authState == KeeperState.AuthFailed) {
        // An authentication error occurred during authentication with the Zookeeper Server.
        state = States.AUTH_FAILED;
        sendAuthEvent = true;
      } else {
        // zk状态为sasl验证通过
        if (authState == KeeperState.SaslAuthenticated) {
          sendAuthEvent = true;
        }
      }
    }

    // 发送身份验证事件
    if (sendAuthEvent) {
      // 发送事件
      eventThread.queueEvent(new WatchedEvent(
        Watcher.Event.EventType.None,
        authState, null));
      // 如果zk状态是身份验证失败发送死亡事件
      if (state == States.AUTH_FAILED) {
        eventThread.queueEventOfDeath();
      }
    }
  }
  // 计算时间差
  to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
  to = connectTimeout - clientCnxnSocket.getIdleRecv();
}

// 时间差小于0抛出异常
if (to <= 0) {
  String warnInfo;
  warnInfo = "Client session timed out, have not heard from server in "
    + clientCnxnSocket.getIdleRecv()
    + "ms"
    + " for sessionid 0x"
    + Long.toHexString(sessionId);
  LOG.warn(warnInfo);
  throw new SessionTimeoutException(warnInfo);
}

在上述代码中核心处理流程如下。

  1. 判断sasl客户端是否存在,如果存在sasl客户端状态处于最初始状态将进行初始化。
  2. 从sasl客户端中获取zk状态,如果zk状态是AUTH_FAILED表示验证失败,同时将成员变量state设置为AuthFailed以及比例sendAuthEvent为true,如果zk状态为sasl验证通过则将变量sendAuthEvent设置为true。
  3. 如果变量sendAuthEvent为真则发送事件,注意在zk状态是AUTH_FAILED将会发送一个死亡事件。
  4. 计算时间差,计算规则有两种。
    1. 读取超时时间-(当前时间-上一次数据接收时间)。该计算规则的使用场景是zk状态是已连接。
    2. 连接超时时间-(当前时间-上一次数据发送时间)。该计算规则的使用场景是zk状态非已连接。
  5. 如果时间差小于等于0将抛出会话超时异常。

至此Zookeeper状态处于已连接并且是Sasl模式分析结束,接下来将对第(3)部分内容进行分析,核心代码如下。

if (state.isConnected()) {

  // 计算下一次ping的时间
  int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
    ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
  if (timeToNextPing <= 0
      || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
    // 发送ping
    sendPing();
    clientCnxnSocket.updateLastSend();
  } else {
    if (timeToNextPing < to) {
      // 覆盖to变量
      to = timeToNextPing;
    }
  }
}

在上述代码中核心处理流程如下。

  1. 计算下一次ping的时间。
  2. 如果下一次ping的时间小于等于0或者当前时间和上一次发送时间的差值大于10000毫秒会发送ping数据包。

第(3)部分的核心操作是完成ping数据包的发送,接下来对第(4)部分操作进行分析,核心代码如下。

if (state == States.CONNECTEDREADONLY) {
  // 获取当前时间
  long now = Time.currentElapsedTime();
  // 当前时间和最后ping服务器时间差
  int idlePingRwServer = (int) (now - lastPingRwServer);
  // 时间差大于ping超时时间
  if (idlePingRwServer >= pingRwTimeout) {
    // 重新计算变量
    lastPingRwServer = now;
    idlePingRwServer = 0;
    pingRwTimeout =
      Math.min(2 * pingRwTimeout, maxPingRwTimeout);
    // 发送ping
    pingRwServer();
  }
  to = Math.min(to, pingRwTimeout - idlePingRwServer);
}

在上述代码中核心操作流程如下。

  1. 获取当前时间。
  2. 计算当前时间和最后ping服务器的时间差,使用变量idlePingRwServer存储。
  3. 如果idlePingRwServer大于等于ping超时时间,则需要重新计算相关变量并且发送ping数据包。

通过分析可以发现在第(4)部分中核心是发送ping数据包。最后在完成第(1)~(4)部分的逻辑处理后会正真的进行数据传输,核心代码如下。

clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

关于上述代码的处理流程在ClientCnxnSocket类的分析中已经介绍,本节不做重复描述。在请求发送完成后我们需要将关注点从发送转移到对响应的解析上,具体解析响应的处理方法是org.apache.zookeeper.ClientCnxn.SendThread#readResponse,代码如下。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(
            incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();

    // 反序列化 header 数据
    replyHdr.deserialize(bbia, "header");
    // 如果xid等于-2将结束响应读取操作,
    if (replyHdr.getXid() == -2) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got ping response for sessionid: 0x"
                    + Long.toHexString(sessionId)
                    + " after "
                    + ((System.nanoTime() - lastPingSentNs) / 1000000)
                    + "ms");
        }
        return;
    }
    // 如果xid等于-4
    if (replyHdr.getXid() == -4) {
        if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
            state = States.AUTH_FAILED;
            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                    Watcher.Event.KeeperState.AuthFailed, null));
            eventThread.queueEventOfDeath();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got auth sessionid:0x"
                    + Long.toHexString(sessionId));
        }
        return;
    }
    // 如果 xid 等于 -1
    if (replyHdr.getXid() == -1) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got notification sessionid:0x"
                    + Long.toHexString(sessionId));
        }
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");

        // 进行一次地址转换
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if (serverPath.compareTo(chrootPath) == 0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath()
                        + " which is too short for chroot path "
                        + chrootPath);
            }
        }

        // 发送监听事件
        WatchedEvent we = new WatchedEvent(event);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got " + we + " for sessionid 0x"
                    + Long.toHexString(sessionId));
        }

        eventThread.queueEvent(we);
        return;
    }

    // sasl 安全认证处理
    if (tunnelAuthInProgress()) {
        GetSASLRequest request = new GetSASLRequest();
        request.deserialize(bbia, "token");
        zooKeeperSaslClient.respondToServer(request.getToken(),
                ClientCnxn.this);
        return;
    }

    // 拿出一个需要处理的包
    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got "
                    + replyHdr.getXid());
        }
        packet = pendingQueue.remove();
    }

    try {
        // xid不相同抛出异常
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(
                    KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid "
                    + replyHdr.getXid() + " with err " +
                    +replyHdr.getErr() +
                    " expected Xid "
                    + packet.requestHeader.getXid()
                    + " for a packet with details: "
                    + packet);
        }

        // 补充replyHeader相关数据
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        // zid不为空的情况下重置本地最大zxid
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        // 反序列化response数据信息
        if (packet.response != null && replyHdr.getErr() == 0) {
            packet.response.deserialize(bbia, "response");
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Reading reply sessionid:0x"
                    + Long.toHexString(sessionId) + ", packet:: " + packet);
        }
    } finally {
        // 组装Packet
        finishPacket(packet);
    }
}

在开始对readResponse方法的处理流程进行介绍之前需要知道一些特殊的xid,总共有如下三个:

  1. XID为-2时表示这使用于ping的数据包。
  2. XID为-4时表示这使用于安全相关的数据包。
  3. XID为-1时表示客户端接收到通知。

了解特殊的xid以后开始对处理流程进行介绍,具体流程如下:

  1. 组装用于解析响应信息的相关对象。
  2. 如果xid为-2,在日志级别是debug的情况下记录ping所消耗的时间,并结束处理。
  3. 如果xid为-4,会判断授权是否通过,在未通过的情况下会发送监听事件,以及死亡事件,最终结束处理。
  4. 如果xid为-1,会从bbia中提取响应信息写入到监听事件中,在成员变量chrootPath不为空的情况下做地址转化,并设置到监听事件中,数据设置万抽发布该事件,并结束处理。
  5. 判断是否是 sasl 安全认证相关处理,如果是则会解析token并通过sasl客户端发送给服务端,发送方法调用后结束处理。
  6. 从成员变量pendingQueue中获取第一个数据包。
  7. 如果数据包中replyHeader携带的xid和replyHdr中携带的xid不相同需要抛出异常。
  8. 为数据包中的replyHeader对象设置xid、err和zxid。
  9. 如果变量replyHdr中的zxid大于零需要将成员变量lastZxid进行覆盖,用来表示最后一次的zxid操作记录号。
  10. 反序列化response信息到数据包中。

最后对 finishPacket 方法进行分析,具体处理代码如下。

protected void finishPacket(Packet p) {

    // 获取err码
    int err = p.replyHeader.getErr();
    // 确认watcher注册参数是否存在
    if (p.watchRegistration != null) {
        // 注册watcher
        p.watchRegistration.register(err);
    }
    // 确认watcher取消注册参数是否存在
    if (p.watchDeregistration != null) {
        Map<EventType, Set<Watcher>> materializedWatchers = null;
        try {
            // 取消注册
            materializedWatchers = p.watchDeregistration.unregister(err);
            // 循环取消注册的对应数据
            for (Entry<EventType, Set<Watcher>> entry : materializedWatchers
                    .entrySet()) {
                Set<Watcher> watchers = entry.getValue();
                if (watchers.size() > 0) {
                    queueEvent(p.watchDeregistration.getClientPath(), err,
                            watchers, entry.getKey());
                    p.replyHeader.setErr(Code.OK.intValue());
                }
            }
        } catch (KeeperException.NoWatcherException nwe) {
            p.replyHeader.setErr(nwe.code().intValue());
        } catch (KeeperException ke) {
            p.replyHeader.setErr(ke.code().intValue());
        }
    }

    // 回调为空
    if (p.cb == null) {
        synchronized (p) {
            // 处理完成标记设置
            p.finished = true;
            // 通知
            p.notifyAll();
        }
    } else {
        // 处理完成标记设置
        p.finished = true;
        // 发送数据包
        eventThread.queuePacket(p);
    }
}

在上述代码中核心处理流程如下。

  1. 从replyHeader获取中获取err码。
  2. 确认watcher注册参数是否存在,如果存在则需要进行注册操作。
  3. 确认watcher取消注册参数是否存在,如果存在则需要进行如下操作。
    1. 进行取消注册操作,取消操作后会得到事件与Watcher集合的映射。
    2. 循环事件与Watcher集合的映射,如果Watcher集合数量大于0则会通过事件线程(EventThread)将事件发送。
  4. 如果数据包中的回调不存在则将处理完成标记设置为true并发送通知,反之也需要将处理完成标记设置为true,此外还需要通过事件线程发送数据包。

关于上述处理流程中的事件线程发送处理代码如下。

void queueEvent(String clientPath, int err,
                Set<Watcher> materializedWatchers, EventType eventType) {
    KeeperState sessionState = KeeperState.SyncConnected;
    if (KeeperException.Code.SESSIONEXPIRED.intValue() == err
            || KeeperException.Code.CONNECTIONLOSS.intValue() == err) {
        sessionState = Event.KeeperState.Disconnected;
    }
    WatchedEvent event = new WatchedEvent(eventType, sessionState,
            clientPath);
    eventThread.queueEvent(event, materializedWatchers);
}

总结

本章对Zookeeper项目中的客户端与服务端交互相关内容进行分析。站在客户端视角有如下几个关键对象。

  1. 负责进行通讯的ClientCnxnSocket对象,在Zookeeper中默认会选择ClientCnxnSocketNIO作为具体实现。
  2. 负责承载数据包的Packet对象。
  3. 负责进行数据传输的发送线程(SendThread)。

客户端与服务端的交互流程可以简单概括为通过ClientCnxnSocket对象创建socket连接,然后组装数据包信息,最终通过发送线程将数据发送。