zookeeper源码(11)临时节点

本文介绍zookeeper的会话管理和临时节点。

会话管理

leader创建会话

本部分重点介绍leader节点创建会话的流程。

ConnectRequest

在客户端建立连接之后,客户端会发送一个ConnectRequest包来初始化会话:

public class ConnectRequest implements Record {
   
  private int protocolVersion; // 0
  private long lastZxidSeen; // 初始化为0
  private int timeOut; // 默认30000
  private long sessionId; // 初始化为0
  private byte[] passwd; // byte[16]
  private boolean readOnly; // false
}

ZooKeeperServer使用processConnectRequest方法处理ConnectRequest包:

int sessionTimeout = request.getTimeOut(); // timeout
byte[] passwd = request.getPasswd(); // password
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
   
    sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
   
    sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
// We don't want to receive any packets until we are sure that the session is setup
cnxn.disableRecv();
if (sessionId == 0) {
   
    // 新建session
    long id = createSession(cnxn, passwd, sessionTimeout);
} else {
   
    // 验证
    validateSession(cnxn, sessionId); // do nothing
    // 关闭session关联的旧的客户端连接
    if (serverCnxnFactory != null) {
   
        serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
    }
    if (secureServerCnxnFactory != null) {
   
        secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
    }
    cnxn.setSessionId(sessionId);
    // 重新打开session
    // 当客户端连接的服务器无法提供服务,重新连接其他服务器时会reopenSession
    reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}

新建session

long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
   
    long sessionId = sessionTracker.createSession(timeout); // 创建session
    Random r = new Random(sessionId ^ superSecret); // 生成随机密码
    r.nextBytes(passwd);
    // 封装事务提交给业务层
    CreateSessionTxn txn = new CreateSessionTxn(timeout);
    cnxn.setSessionId(sessionId);
    Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
    submitRequest(si);
    return sessionId;
}

leader、follower创建会话的方式不一样,此处看一下leader创建会话方式,后续集群会话时再介绍follower创建会话。

leader节点使用LeaderSessionTracker实现类追踪会话,创建会话方法如下:

public long createSession(int sessionTimeout) {
   
    if (localSessionsEnabled) {
    // false
        return localSessionTracker.createSession(sessionTimeout);
    }
    return globalSessionTracker.createSession(sessionTimeout);
}
// globalSessionTracker.createSession使用SessionTrackerImpl类方法
public long createSession(int sessionTimeout) {
   
    // 递增sessionId
    long sessionId = nextSessionId.getAndIncrement();
    // 下一小节详细介绍会话追踪
    trackSession(sessionId, sessionTimeout);
    return sessionId;
}

reopenSession

public void reopenSession(
        ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException {
   
    if (checkPasswd(sessionId, passwd)) {
   
        // touchSession: 验证session存在、刷新expire、finishSessionInit响应
        // 在follower节点会给leader发REVALIDATE请求验证
        revalidateSession(cnxn, sessionId, sessionTimeout);
    } else {
   
        finishSessionInit(cnxn, false); // 响应ConnectResponse(失败)
    }
}

PrepRequestProcessor

leader的createSession请求首先进入PrepRequestProcessor处理器:

CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
request.setTxn(createSessionTxn);
// 创建session
// 由于客户端可能连接follower节点,此处是为了在leader上创建集群会话
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
zks.setOwner(request.sessionId, request.getOwner());

ProposalRequestProcessor

leader发proposal,follower写txnlog文件响应ack,之后leader会commit事务,最后使用FinalRequestProcessor处理器应用事务。

FinalRequestProcessor

private ProcessTxnResult applyRequest(Request request) {
   
    ProcessTxnResult rc = zks.processTxn(request); // 应用事务

    if (request.type =&

相关推荐

  1. zookeeper(11)临时节点

    2024-03-29 18:12:07       40 阅读
  2. zookeeper(10)node增删改查及监听

    2024-03-29 18:12:07       52 阅读
  3. Zookeeper分析ZooKeeperServer

    2024-03-29 18:12:07       48 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-29 18:12:07       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-29 18:12:07       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-29 18:12:07       87 阅读
  4. Python语言-面向对象

    2024-03-29 18:12:07       96 阅读

热门阅读

  1. 【C++】动态内存分配 与 对象的动态建立与释放

    2024-03-29 18:12:07       48 阅读
  2. LeetCode 21

    2024-03-29 18:12:07       43 阅读
  3. Leetcoder Day43| 单调栈1

    2024-03-29 18:12:07       36 阅读
  4. TCP建立连接时,为什么每次的SYN都不一样

    2024-03-29 18:12:07       43 阅读
  5. Qwen1.5模型微调部署全过程

    2024-03-29 18:12:07       36 阅读
  6. 【无标题】

    2024-03-29 18:12:07       33 阅读
  7. AI大模型学习:AI大模型在特定领域的应用

    2024-03-29 18:12:07       41 阅读
  8. HDU水题刷题记录

    2024-03-29 18:12:07       49 阅读
  9. C#WPF将变量或自定义数据类绑定到控件实例

    2024-03-29 18:12:07       39 阅读
  10. vue做一个锁屏禁止页面前进后退

    2024-03-29 18:12:07       45 阅读