ZooKeeper Client-Server Interaction
This chapter analyzes the interaction between ZooKeeper clients and servers. Generally, the interaction between clients and servers can be divided into the following phases:
- Client establishes a connection with the server.
- Client initiates a request.
- Server processes the request.
- Client receives the response.
This chapter focuses on analyzing phases (1), (2), and (4) from the client's perspective.
Client-Server Connection Establishment
This section examines how clients establish connections with servers in ZooKeeper. The ZooKeeper project provides a native client, with the client class named ZooKeeper. Typically, it's used as follows:
ZooKeeper s = new ZooKeeper("127.0.0.1:2181", 2000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
if (event.getState().equals(Event.KeeperState.SyncConnected)) {
countDownLatch.countDown();
}
}
});
Looking further into the ZooKeeper source code, we find the complete constructor:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
// Set default config if client config is null
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
// Store client config in member variable
this.clientConfig = clientConfig;
// Create default watch manager
watchManager = defaultWatchManager();
// Set default watcher in watch manager
watchManager.defaultWatcher = watcher;
// Create connect string parser
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
// Set host provider
hostProvider = aHostProvider;
// Create connection
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
// Start
cnxn.start();
}
In this code, we need to focus on the ConnectStringParser class and the createConnection method. Let's first analyze the ConnectStringParser class, which aims to parse the connection string. Here's its constructor:
public ConnectStringParser(String connectString) {
// Find index of '/'
int off = connectString.indexOf('/');
// Parse chrootPath if '/' exists, otherwise set to null
if (off >= 0) {
String chrootPath = connectString.substring(off);
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
}
else {
this.chrootPath = null;
}
// Split connection string by comma
List<String> hostsList = split(connectString, ",");
for (String host : hostsList) {
// Set default port
int port = DEFAULT_PORT;
// Split host, result: host,port
String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);
if (hostAndPort.length != 0) {
host = hostAndPort[0];
if (hostAndPort.length == 2) {
port = Integer.parseInt(hostAndPort[1]);
}
}
// If split length is 0
else {
// Split by colon
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
The main goal of this code is to initialize the member variables chrootPath and serverAddresses. The initialization process for chrootPath is as follows:
- Get the index of '/' in the connection address. If it exists, parse the connection string and set it to chrootPath. If not, set chrootPath to null.
- Split the connection string by comma, then process each element as follows:
- Set the port number to the default (2181).
- Split the element into host and port.
- Create an InetSocketAddress object with host and port, and add it to serverAddresses.
After processing, if "127.0.0.1:2181" is passed to the ConnectStringParser constructor, the resulting member variables would look like this:
Once we have the ConnectStringParser, we can create the connection:
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
The hostProvider variable is an implementation of the HostProvider interface. ZooKeeper uses the StaticHostProvider class for this. To construct StaticHostProvider, we need a collection of InetSocketAddress, which we obtained when analyzing the ConnectStringParser class. Here's how the HostProvider interface is constructed:
private static HostProvider createDefaultHostProvider(String connectString) {
return new StaticHostProvider(
new ConnectStringParser(connectString).getServerAddresses());
}
This code parses the connection string using ConnectStringParser, then uses the serverAddresses data as a parameter for the StaticHostProvider constructor to get the concrete implementation of the HostProvider interface.
Next, let's analyze the getClientCnxnSocket method, which is used to get the ClientCnxnSocket class:
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
// Get zookeeper.clientCnxnSocket data from ZK client config
// Client name
String clientCnxnSocketName = getClientConfig().getProperty(
ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
// If client name is null, use ClientCnxnSocketNIO class name
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
// Find constructor with ZKClientConfig parameter through reflection
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
.getDeclaredConstructor(ZKClientConfig.class);
// Instantiate using the constructor
ClientCnxnSocket clientCxnSocket =
(ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
// Return
return clientCxnSocket;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
The core process in this code is as follows:
- Get the zookeeper.clientCnxnSocket data from the ZK client config, using it as the client name (class name). If it doesn't exist, use ClientCnxnSocketNIO class name as default.
- Use reflection to construct the class object and find the constructor with ZKClientConfig parameter.
- Instantiate using the constructor found in step 2 and return.
With this, we have all the necessary information to construct the ClientCnxn class. Now we just need to call the ClientCnxn class constructor to initialize it.
Client Request Initiation
This section analyzes how clients initiate requests, using the deletion of a data node as an example. Here's the code for deleting a node:
public void delete(final String path, int version)
throws InterruptedException, KeeperException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
final String serverPath;
if (clientPath.equals("/")) {
serverPath = clientPath;
} else {
serverPath = prependChroot(clientPath);
}
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.delete);
DeleteRequest request = new DeleteRequest();
request.setPath(serverPath);
request.setVersion(version);
ReplyHeader r = cnxn.submitRequest(h, request, null, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
}
The core code for initiating the request is cnxn.submitRequest(h, request, null, null). The cnxn.submitRequest method sends the request, and its core processing code is as follows:
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
// Create ReplyHeader object
ReplyHeader r = new ReplyHeader();
// Create Packet object
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
// If request timeout is greater than 0
if (requestTimeout > 0) {
// Initiate request and wait for requestTimeout
waitForPacketFinish(r, packet);
}
else {
// Wait until request processing is finished
while (!packet.finished) {
packet.wait();
}
}
}
// If ReplyHeader object's err value is -122
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
// Clean up data
sendThread.cleanAndNotifyState();
}
return r;
}
This code first creates a ReplyHeader object, which can be understood as an object for receiving responses. Then it creates a Packet object, which is used for data transmission. The method responsible for this is queuePacket:
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
This code has three parts. The first part assembles the Packet object's data. The second part performs different operations based on the state. The third part is related to client operations. The focus is on the second part, which follows this process:
- If the client state is not alive or is closing, it performs the following operations (core processing is in the conLossPacket method):
- Set different exception information based on different states.
- Send exception events.
- If the above condition is not met, add the data to the outgoingQueue collection.
After these operations, the data is added to the outgoingQueue collection and then handed over to the send thread (sendThread) for processing. The focus is on SendThread#run. After sending the request, it needs to read the server's response information, which is also handled by the send thread, specifically in the SendThread#readResponse method.
ClientCnxnSocket Analysis
This section analyzes the ClientCnxnSocket class, which is used for socket interaction. In ZooKeeper, there are two implementation classes:
- ClientCnxnSocketNIO, based on Java NIO.
- ClientCnxnSocketNetty, based on Netty.
Let's start by understanding the ClientCnxnSocket class, beginning with its member variables:
Variable Name | Variable Type | Variable Description |
---|---|---|
lenBuffer | ByteBuffer | Buffer for reading message length |
sentCount | AtomicLong | Send count |
recvCount | AtomicLong | Receive count |
initialized | boolean | Whether initialized |
incomingBuffer | ByteBuffer | Length of read message |
lastHeard | long | Last data receive time |
lastSend | long | Last data send time |
now | long | Current time |
sendThread | ClientCnxn.SendThread | Send thread |
outgoingQueue | LinkedBlockingDeque<Packet> | Queue of packets to be sent |
clientConfig | ZKClientConfig | ZK client configuration |
sessionId | long | Session ID |
packetLen | int | Packet length |
Now, let's introduce some methods in the ClientCnxnSocket class:
- readConnectResult: Reads connection result.
- isConnected: Checks if connected.
- connect: Establishes connection.
- getRemoteSocketAddress: Gets remote socket address.
- getLocalSocketAddress: Gets local socket address.
- cleanup: Cleans up resources.
- packetAdded: Adds packet to outgoingQueue.
- onClosing: Sets connState to CLOSED and notifies ClientCnxnSocket.
- saslCompleted: Executed after SASL connection.
- connectionPrimed: Executed after completing PrimeConnection.
- doTransport: For data transmission.
- testableCloseSocket: Closes socket.
- close: Closes client.
- sendPacket: Sends packet.
- initProperties: Initializes configuration (specifically packetLen).
Let's analyze the readConnectResult method:
Analysis of ClientCnxnSocketNIO
This section will analyze the ClientCnxnSocketNIO class, which is developed based on Java NIO and primarily operates around the Selector class and SelectionKey class. Let's first analyze the connect method used for establishing connections. The specific code is as follows:
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
LOG.error("Unable to open socket to {}" + addr);
sock.close();
throw e;
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
The core process in the above code is as follows:
- Open a socket channel.
- Register the socket address and establish a connection.
- Clear the incomingBuffer variable and lenBuffer variable.
Among these three processes, we mainly focus on step (2). The specific code is as follows:
void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
```java
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
// Get the event to be processed
Object event = waitingEvents.take();
// Check if it\'s not a termination event
if (event == eventOfDeath) {
wasKilled = true;
} else {
// Process the event
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
The core processing flow in the above code is as follows:
- Set the
isRunning
flag to true. - Enter an infinite loop to continuously process events.
- Retrieve an event from the
waitingEvents
queue. - If the event is the termination event (
eventOfDeath
), set thewasKilled
flag to true. - Otherwise, process the event using the
processEvent
method. - If
wasKilled
is true and thewaitingEvents
queue is empty, setisRunning
to false and break the loop.
This event processing mechanism is crucial for handling ZooKeeper client events efficiently. It allows for asynchronous event handling, which is essential for maintaining high performance in distributed systems.
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
// Get the event that needs to be processed
Object event = waitingEvents.take();
// Check if the event is not a death event
if (event == eventOfDeath) {
wasKilled = true;
} else {
// Process the event
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
In the code above, events are processed through a loop. The processing flow is as follows:
- Retrieve an event from the member variable waitingEvents.
- If the event is an empty object (new Object()), mark the member variable wasKilled as true; otherwise, pass the event to the processEvent method for processing.
- If the member variable wasKilled is true and there are no event data in the member variable waitingEvents, mark the member variable isRunning as false and end the loop.
In the entire event thread processing, the final processing method is processEvent. In this method, events are primarily handled using the following two methods:
- Process events through the Watcher#process method.
- Process events through the processResult method in AsyncCallback implementation classes.
SendThread Analysis
This section analyzes the EventThread class, which extends the ZooKeeperThread class, which is essentially a Thread. We focus on the run method, where the core processing can be divided into four parts:
- Handling when the Socket connection is not established.
- ZooKeeper state is connected and in Sasl mode.
- ZooKeeper state is connected and not in Sasl mode.
- ZooKeeper state is in read-only connection.
Let's first analyze the first part of the operation. The handling code is as follows:
if (!clientCnxnSocket.isConnected()) {
// Exit the loop if in closing state
if (closing) {
break;
}
// Handle socket address
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
// Get socket address from host provider
serverAddress = hostProvider.next(1000);
}
// Start establishing connection
startConnect(serverAddress);
// Update data
clientCnxnSocket.updateLastSendAndHeard();
}
In this code, if the member variable closing is true, it exits the loop and ends processing. The member variable closing indicates whether it's in a closing state. If not exiting the loop, it processes the socket address. There are two modes for calculating the socket address: first, if the member variable rwServerAddress is not null, the socket address directly takes the value of rwServerAddress; second, it gets the socket address from the member variable hostProvider. After confirming the socket address, it uses the startConnect method to establish the connection. The connection establishment code is as follows:
private void startConnect(InetSocketAddress addr) throws IOException {
saslLoginFailed = false;
// Not the first connection
if (!isFirstConnect) {
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
// Set state to connecting
state = States.CONNECTING;
// Assemble address
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));
// If sasl client is enabled
if (clientConfig.isSaslClientEnabled()) {
try {
// If sasl is not null, first close the sasl client and then reconstruct it
if (zooKeeperSaslClient != null) {
zooKeeperSaslClient.shutdown();
}
zooKeeperSaslClient = new ZooKeeperSaslClient(
SaslServerPrincipal.getServerPrincipal(addr, clientConfig),
clientConfig);
} catch (LoginException e) {
LOG.warn("SASL configuration failed: " + e
+ " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
// Establish connection
clientCnxnSocket.connect(addr);
}
The main execution flow in the above code is as follows:
- Check if it's the first connection. If not, wait for 0-1000 milliseconds. This waiting time is the interval between two connections.
- Initialize the ZK state to "connecting".
- Convert the socket address to a string and put it in MDC and the thread name.
- Check if the sasl client is enabled. If enabled and the member variable sasl client exists, first close the member variable's sasl client and then create a new Sasl client.
- Log the connection attempt and establish the connection through clientCnxnSocket.
This is the detailed process of handling when the Socket connection is not established. Next, we'll analyze the content of part (2), with the core code as follows:
// ZK state is connected
if (state.isConnected()) {
// Sasl client exists
if (zooKeeperSaslClient != null) {
// Whether to send authentication event
boolean sendAuthEvent = false;
// If sasl client state is in initial state, initialize it
if (zooKeeperSaslClient.getSaslState()
== ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error(
"SASL authentication with Zookeeper Quorum member failed: "
+ e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
// Get ZK state from sasl client
KeeperState authState = zooKeeperSaslClient.getKeeperState();
// ZK state is not null
if (authState != null) {
// Check if ZK state is authentication failed
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
// ZK state is sasl authenticated
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
// Send authentication event
if (sendAuthEvent) {
// Send event
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState, null));
// If ZK state is authentication failed, send death event
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
// Calculate time difference
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
// Throw exception if time difference is less than or equal to 0
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
The core processing flow in the above code is as follows:
- Check if the sasl client exists. If it exists and the sasl client state is in the initial state, initialize it.
- Get the ZK state from the sasl client. If the ZK state is AUTH_FAILED, it means authentication failed. Set the member variable state to AuthFailed and set the variable sendAuthEvent to true. If the ZK state is sasl authenticated, set the variable sendAuthEvent to true.
- If the variable sendAuthEvent is true, send an event. Note that if the ZK state is AUTH_FAILED, a death event will be sent.
- Calculate the time difference. There are two calculation rules:
- Read timeout - (current time - last data receive time). This calculation rule is used when the ZK state is connected.
- Connection timeout - (current time - last data send time). This calculation rule is used when the ZK state is not connected.
- If the time difference is less than or equal to 0, throw a session timeout exception.
This concludes the analysis of the ZooKeeper state being connected and in Sasl mode. Next, we'll analyze the content of part (3), with the core code as follows:
if (state.isConnected()) {
// Calculate the time for the next ping
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
if (timeToNextPing <= 0
|| clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
// Send ping
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
// Override to variable
to = timeToNextPing;
}
}
}
The core processing flow in the above code is as follows:
- Calculate the time for the next ping.
- If the time for the next ping is less than or equal to 0, or the difference between the current time and the last send time is greater than 10000 milliseconds, send a ping packet.
The core operation of part (3) is to complete the sending of the ping packet. Next, we'll analyze the operation of part (4), with the core code as follows:
if (state == States.CONNECTEDREADONLY) {
// Get current time
long now = Time.currentElapsedTime();
// Time difference between current time and last ping to server
int idlePingRwServer = (int) (now - lastPingRwServer);
// Time difference is greater than ping timeout
if (idlePingRwServer >= pingRwTimeout) {
// Recalculate variables
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2 * pingRwTimeout, maxPingRwTimeout);
// Send ping
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
The core operation flow in the above code is as follows:
- Get the current time.
- Calculate the time difference between the current time and the last ping to the server, storing it in the variable idlePingRwServer.
- If idlePingRwServer is greater than or equal to the ping timeout, recalculate related variables and send a ping packet.
Through analysis, we can see that the core of part (4) is sending a ping packet. Finally, after completing the logic processing of parts (1) to (4), the actual data transmission will be carried out, with the core code as follows:
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
The processing flow of the above code has been introduced in the analysis of the ClientCnxnSocket class, so we won't repeat it here. After the request is sent, we need to shift our focus from sending to parsing the response. The specific method for parsing the response is org.apache.zookeeper.ClientCnxn.SendThread#readResponse, with the code as follows: