Redisson-获取连接原理

归档

简介

  • Redisson 有连接池,获取连接时会从池里面去获取

测试

  • RedissonListTest 加个方法
    @Test
    public void testGet1() {
        RList<Integer> list = redisson.getList("list", IntegerCodec.INSTANCE);
        list.addAll(Arrays.asList(1, 2, 3));
        Integer i1 = list.get(0); // 打断点
        Integer i2 = list.get(1);
        Integer i3 = list.get(2);
        System.out.printf("%s %s %s", i1, i2, i3);
    }

// 在 org.redisson.command.RedisExecutor #getConnection() 方法打个断点
// 在 org.redisson.command.RedisExecutor #connectionReadOp() 方法打个断点

说明

  • org.redisson.command.RedisExecutor
    /*** 获取连接 */
    protected CompletableFuture<RedisConnection> getConnection() {
        if (readOnlyMode) {
            connectionFuture = connectionReadOp(command);
        } else {
            connectionFuture = connectionWriteOp(command);
        }
        return connectionFuture;
    }
    /*** 获取读模式的连接 */
    final CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
        entry = getEntry(true);
        ...
        return entry.connectionReadOp(command);
    }
  • org.redisson.connection.SingleEntry
/*** 继承主从连接 */
public class SingleEntry extends MasterSlaveEntry {
    /*** 单服务时,会使用写连接 */
    @Override
    public CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
        return super.connectionWriteOp(command);
    }
}
  • org.redisson.connection.MasterSlaveEntry
    /*** 从连接池获取连接 */
    public CompletableFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
        return writeConnectionPool.get(command);
    }
  • org.redisson.connection.pool.MasterConnectionPool
    /*** 获取连接 */
    @Override
    public CompletableFuture<RedisConnection> get(RedisCommand<?> command) {
        return acquireConnection(command, entries.peek()); // 调用父类获取连接
    }
  • org.redisson.connection.pool.ConnectionPool
    /*** 获取连接 */
    protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
        ...
        CompletableFuture<Void> f = acquireConnection(entry, command); // 进行计数,有可用的才返回
        f.thenAccept(r -> {
            connectTo(entry, result, command); // 连接
        });
        ...
        return result;
    }
    /*** 获取连接或创建连接 */
    private void connectTo(ClientConnectionsEntry entry, CompletableFuture<T> promise, RedisCommand<?> command) {
        ...
        T conn = poll(entry, command);
        if (conn != null) {
            ...
            connectedSuccessful(entry, promise, conn); // 启动时,会创建连接池,一般到这一步就返回
            return;
        }
        createConnection(entry, promise); // 没获取到,则创建新的连接
    }
    /*** 从队列里面拿 */
    protected T poll(ClientConnectionsEntry entry, RedisCommand<?> command) {
        return (T) entry.pollConnection(command);
    }
    /*** 创建连接 */
    private void createConnection(ClientConnectionsEntry entry, CompletableFuture<T> promise) {
        CompletionStage<T> connFuture = connect(entry); // 连接
        connFuture.whenComplete((conn, e) -> {
            ...
            if (changeUsage()) {
                promise.thenApply(c -> c.incUsage()); // 增加计数
            }
            connectedSuccessful(entry, promise, conn);
        });
    }
    /*** 连接 */
    protected CompletionStage<T> connect(ClientConnectionsEntry entry) {
        return (CompletionStage<T>) entry.connect(); // 使用 entry 进行连接
    }
  • org.redisson.connection.ClientConnectionsEntry
    /*** 从队列中获取连接 */
    public RedisConnection pollConnection(RedisCommand<?> command) {
        RedisConnection c = freeConnections.poll(); // Deque 双向队列
        if (c != null) {
            c.incUsage(); // 将获取的连接增加计数
        }
        return c;
    }
    /*** 获取连接 */
    public CompletionStage<RedisConnection> connect() {
        CompletionStage<RedisConnection> future = client.connectAsync(); // 使用客户端创建连接
        return future.whenComplete((conn, e) -> {
            ...
            allConnections.add(conn); // 添加到池
        });
    }
  • org.redisson.client.RedisClient
    /*** 使用 Netty Bootstrap 进行连接 */
    public RFuture<RedisConnection> connectAsync() {
        CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
        CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
            CompletableFuture<RedisConnection> r = new CompletableFuture<>();
            ChannelFuture channelFuture = bootstrap.connect(res); // 连接指定地址,可进行多次连接,创建多个 Channel
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(final ChannelFuture future) throws Exception {
                    ...
                    if (future.isSuccess()) { // 连接成功
                        // 从 Channel 获取连接,实现为:(C) channel.attr(RedisConnection.CONNECTION).get();
                        // 连接成功时会创建 RedisConnection,RedisConnection 构造器会将自己注册到 Channel 属性中
                        RedisConnection c = RedisConnection.getFrom(future.channel());
                        c.getConnectionPromise().whenComplete((res, e) -> {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (e == null) {
                                        if (!r.complete(c)) { // 将连接返回出去
                                            c.closeAsync();
                                        } 
                                        ...
                                    }
                                    ...
                                }
                            });
                        });
                    } 
                    ...
                }
            });
            return r;
        });
        return new CompletableFutureWrapper<>(f);
    }
    // ---------------------------------
    // -- 设置 RedisConnection 解释
    // ---------------------------------
    /*** 构造器创建 Bootstrap */
    private RedisClient(RedisClientConfig config) {
        ...
        bootstrap = createBootstrap(copy, Type.PLAIN);
        ...
    }
    /*** 创建 Bootstrap */
    private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap();
        ...
        // 添加 Channel 初始化器
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
        ...
        return bootstrap;
    }
  • org.redisson.client.handler.RedisChannelInitializer
    @Override
    protected void initChannel(Channel ch) throws Exception {
        initSsl(config, ch);
        
        if (type == Type.PLAIN) {
            // 添加连接处理器
            ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
        } else {
            ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
        }
        ...
    }
  • org.redisson.client.handler.RedisConnectionHandler
    /*** 创建 Redis 连接对象。在父类中被调用 */
    @Override
    RedisConnection createConnection(ChannelHandlerContext ctx) {
        return new RedisConnection(redisClient, ctx.channel(), connectionPromise);
    }
    // 父类:BaseConnectionHandler extends ChannelInboundHandlerAdapter
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (connection == null) {
            connection = createConnection(ctx); // Channel 注册时调用
        }
        super.channelRegistered(ctx);
    }
  • org.redisson.client.RedisConnection
    /*** 构造器 */
    public <C> RedisConnection(RedisClient redisClient, Channel channel, CompletableFuture<C> connectionPromise) {
        ...
        updateChannel(channel); // 更新 Channel
        ...
    }
    /*** 更新 Channel */
    public void updateChannel(Channel channel) {
        ...
        this.channel = channel;
        channel.attr(CONNECTION).set(this); // 将自己注册到 Channel 属性里面去
    }

相关推荐

  1. Redisson-获取连接原理

    2024-06-17 16:56:04       8 阅读
  2. Redisson

    2024-06-17 16:56:04       43 阅读
  3. Redisson

    2024-06-17 16:56:04       19 阅读
  4. Redisson

    2024-06-17 16:56:04       9 阅读
  5. Redisson分布式锁的实现原理(小白话)

    2024-06-17 16:56:04       41 阅读
  6. Redisson 分布式限流器 RRateLimiter 的使用及原理

    2024-06-17 16:56:04       30 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-17 16:56:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-17 16:56:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-17 16:56:04       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-17 16:56:04       20 阅读

热门阅读

  1. 从零开始做ROS机器人

    2024-06-17 16:56:04       5 阅读
  2. MySQL触发器基本结构

    2024-06-17 16:56:04       7 阅读
  3. js如何添加新元素到数组中

    2024-06-17 16:56:04       7 阅读
  4. HTML中的文本标签:微观排版的艺术

    2024-06-17 16:56:04       7 阅读
  5. python项目发布Docker Harbor

    2024-06-17 16:56:04       8 阅读
  6. 军用FPGA软件 Verilog语言的编码准测之时钟

    2024-06-17 16:56:04       9 阅读
  7. golang学习笔记——结构体嵌套接口

    2024-06-17 16:56:04       9 阅读