归档
简介
- Redisson 有连接池,获取连接时会从池里面去获取
测试
@Test
public void testGet1() {
RList<Integer> list = redisson.getList("list", IntegerCodec.INSTANCE);
list.addAll(Arrays.asList(1, 2, 3));
Integer i1 = list.get(0);
Integer i2 = list.get(1);
Integer i3 = list.get(2);
System.out.printf("%s %s %s", i1, i2, i3);
}
说明
org.redisson.command.RedisExecutor
protected CompletableFuture<RedisConnection> getConnection() {
if (readOnlyMode) {
connectionFuture = connectionReadOp(command);
} else {
connectionFuture = connectionWriteOp(command);
}
return connectionFuture;
}
final CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
entry = getEntry(true);
...
return entry.connectionReadOp(command);
}
org.redisson.connection.SingleEntry
public class SingleEntry extends MasterSlaveEntry {
@Override
public CompletableFuture<RedisConnection> connectionReadOp(RedisCommand<?> command) {
return super.connectionWriteOp(command);
}
}
org.redisson.connection.MasterSlaveEntry
public CompletableFuture<RedisConnection> connectionWriteOp(RedisCommand<?> command) {
return writeConnectionPool.get(command);
}
org.redisson.connection.pool.MasterConnectionPool
@Override
public CompletableFuture<RedisConnection> get(RedisCommand<?> command) {
return acquireConnection(command, entries.peek());
}
org.redisson.connection.pool.ConnectionPool
protected final CompletableFuture<T> acquireConnection(RedisCommand<?> command, ClientConnectionsEntry entry) {
...
CompletableFuture<Void> f = acquireConnection(entry, command);
f.thenAccept(r -> {
connectTo(entry, result, command);
});
...
return result;
}
private void connectTo(ClientConnectionsEntry entry, CompletableFuture<T> promise, RedisCommand<?> command) {
...
T conn = poll(entry, command);
if (conn != null) {
...
connectedSuccessful(entry, promise, conn);
return;
}
createConnection(entry, promise);
}
protected T poll(ClientConnectionsEntry entry, RedisCommand<?> command) {
return (T) entry.pollConnection(command);
}
private void createConnection(ClientConnectionsEntry entry, CompletableFuture<T> promise) {
CompletionStage<T> connFuture = connect(entry);
connFuture.whenComplete((conn, e) -> {
...
if (changeUsage()) {
promise.thenApply(c -> c.incUsage());
}
connectedSuccessful(entry, promise, conn);
});
}
protected CompletionStage<T> connect(ClientConnectionsEntry entry) {
return (CompletionStage<T>) entry.connect();
}
org.redisson.connection.ClientConnectionsEntry
public RedisConnection pollConnection(RedisCommand<?> command) {
RedisConnection c = freeConnections.poll();
if (c != null) {
c.incUsage();
}
return c;
}
public CompletionStage<RedisConnection> connect() {
CompletionStage<RedisConnection> future = client.connectAsync();
return future.whenComplete((conn, e) -> {
...
allConnections.add(conn);
});
}
org.redisson.client.RedisClient
public RFuture<RedisConnection> connectAsync() {
CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
CompletableFuture<RedisConnection> r = new CompletableFuture<>();
ChannelFuture channelFuture = bootstrap.connect(res);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future) throws Exception {
...
if (future.isSuccess()) {
RedisConnection c = RedisConnection.getFrom(future.channel());
c.getConnectionPromise().whenComplete((res, e) -> {
bootstrap.config().group().execute(new Runnable() {
@Override
public void run() {
if (e == null) {
if (!r.complete(c)) {
c.closeAsync();
}
...
}
...
}
});
});
}
...
}
});
return r;
});
return new CompletableFutureWrapper<>(f);
}
private RedisClient(RedisClientConfig config) {
...
bootstrap = createBootstrap(copy, Type.PLAIN);
...
}
private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
Bootstrap bootstrap = new Bootstrap();
...
bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
...
return bootstrap;
}
org.redisson.client.handler.RedisChannelInitializer
@Override
protected void initChannel(Channel ch) throws Exception {
initSsl(config, ch);
if (type == Type.PLAIN) {
ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
} else {
ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
}
...
}
org.redisson.client.handler.RedisConnectionHandler
@Override
RedisConnection createConnection(ChannelHandlerContext ctx) {
return new RedisConnection(redisClient, ctx.channel(), connectionPromise);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (connection == null) {
connection = createConnection(ctx);
}
super.channelRegistered(ctx);
}
org.redisson.client.RedisConnection
public <C> RedisConnection(RedisClient redisClient, Channel channel, CompletableFuture<C> connectionPromise) {
...
updateChannel(channel);
...
}
public void updateChannel(Channel channel) {
...
this.channel = channel;
channel.attr(CONNECTION).set(this);
}