本文介绍zookeeper的会话管理和临时节点。
会话管理
leader创建会话
本部分重点介绍leader节点创建会话的流程。
ConnectRequest
在客户端建立连接之后,客户端会发送一个ConnectRequest包来初始化会话:
public class ConnectRequest implements Record {
private int protocolVersion; // 0
private long lastZxidSeen; // 初始化为0
private int timeOut; // 默认30000
private long sessionId; // 初始化为0
private byte[] passwd; // byte[16]
private boolean readOnly; // false
}
ZooKeeperServer使用processConnectRequest方法处理ConnectRequest包:
int sessionTimeout = request.getTimeOut(); // timeout
byte[] passwd = request.getPasswd(); // password
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the session is setup
cnxn.disableRecv();
if (sessionId == 0) {
// 新建session
long id = createSession(cnxn, passwd, sessionTimeout);
} else {
// 验证
validateSession(cnxn, sessionId); // do nothing
// 关闭session关联的旧的客户端连接
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
cnxn.setSessionId(sessionId);
// 重新打开session
// 当客户端连接的服务器无法提供服务,重新连接其他服务器时会reopenSession
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
新建session
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
long sessionId = sessionTracker.createSession(timeout); // 创建session
Random r = new Random(sessionId ^ superSecret); // 生成随机密码
r.nextBytes(passwd);
// 封装事务提交给业务层
CreateSessionTxn txn = new CreateSessionTxn(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
submitRequest(si);
return sessionId;
}
leader、follower创建会话的方式不一样,此处看一下leader创建会话方式,后续集群会话时再介绍follower创建会话。
leader节点使用LeaderSessionTracker实现类追踪会话,创建会话方法如下:
public long createSession(int sessionTimeout) {
if (localSessionsEnabled) {
// false
return localSessionTracker.createSession(sessionTimeout);
}
return globalSessionTracker.createSession(sessionTimeout);
}
// globalSessionTracker.createSession使用SessionTrackerImpl类方法
public long createSession(int sessionTimeout) {
// 递增sessionId
long sessionId = nextSessionId.getAndIncrement();
// 下一小节详细介绍会话追踪
trackSession(sessionId, sessionTimeout);
return sessionId;
}
reopenSession
public void reopenSession(
ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
if (checkPasswd(sessionId, passwd)) {
// touchSession: 验证session存在、刷新expire、finishSessionInit响应
// 在follower节点会给leader发REVALIDATE请求验证
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
finishSessionInit(cnxn, false); // 响应ConnectResponse(失败)
}
}
PrepRequestProcessor
leader的createSession请求首先进入PrepRequestProcessor处理器:
CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// 创建session
// 由于客户端可能连接follower节点,此处是为了在leader上创建集群会话
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());
ProposalRequestProcessor
leader发proposal,follower写txnlog文件响应ack,之后leader会commit事务,最后使用FinalRequestProcessor处理器应用事务。
FinalRequestProcessor
private ProcessTxnResult applyRequest(Request request) {
ProcessTxnResult rc = zks.processTxn(request); // 应用事务
if (request.type =&