redis multiplexing机制类似http2,在存在并发请求时能减少大量通讯延迟,但不支持blocking相关的操作,如BLPOP
配置 RedisTemplate
RedisTemplate自动根据操作类型,选择是在单连接上进行多路复用,还是申请新的连接/等待空闲连接
@Configuration
public class RedisTemplateConfig {
@Bean
public LettuceConnectionFactory lettuceConnectionFactory(@Value("${redis.cluster}") String address,
@Value("${redis.password}") String password) {
// 配置连接池管理
var poolConfig = new GenericObjectPoolConfig<StatefulRedisClusterConnection<String, String>>();
poolConfig.setMaxTotal(20);
poolConfig.setMaxIdle(20);
poolConfig.setMinIdle(2);
poolConfig.setTestWhileIdle(true);
poolConfig.setMinEvictableIdleDuration(Duration.ofMillis(60000));
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(30000));
poolConfig.setNumTestsPerEvictionRun(-1);
// 配置客户端行为
var clientConfig = LettucePoolingClientConfiguration.builder()
.clientOptions(ClusterClientOptions.builder()
.autoReconnect(true)
.pingBeforeActivateConnection(true)
.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(3)).build())
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(3)).build())
.topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
.enableAdaptiveRefreshTrigger(RefreshTrigger.MOVED_REDIRECT,
RefreshTrigger.PERSISTENT_RECONNECTS)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30))
.build())
.build())
.poolConfig(poolConfig)
.build();
// 配置集群连接信息
var redisConfig = new RedisClusterConfiguration();
redisConfig.setMaxRedirects(5);
redisConfig.setPassword(password);
String[] serverArray = address.split(",|,|;|;");// 获取服务器数组
Set<RedisNode> nodes = new HashSet<>();
for (String ipPort : serverArray) {
nodes.add(RedisNode.fromString(ipPort));
}
redisConfig.setClusterNodes(nodes);
return new LettuceConnectionFactory(redisConfig, clientConfig);
}
@Bean
public StringRedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
return new StringRedisTemplate(lettuceConnectionFactory);
}
}
验证blocking操作在独立连接上进行
实现blocking LPOP操作的方法在opsForList()里;
并发100压测期间查看客户端本地的tcp连接,可以看到和每个redis节点都建立了大量连接;
证明RedisTemplate没有选择让不同并发线程共用同一个StatefulConnection
@Test
public void blpop() throws Exception {
long start = System.currentTimeMillis();
AtomicLong err = new AtomicLong();
int maxThreads = 100;
Semaphore semaphore = new Semaphore(maxThreads);
for (int i = 0; i < maxThreads; i++) {
final int threadnum = i + 1;
semaphore.acquire(1);
new Thread(new Runnable() {
@Override
public void run() {
try {
template.opsForList().leftPop("test" + threadnum, 2, TimeUnit.SECONDS);
} catch (Exception ex) {
log.error("leftPop", ex);
err.addAndGet(1L);
} finally {
semaphore.release(1);
}
}
}).start();
}
semaphore.acquire(maxThreads);
long end = System.currentTimeMillis();
log.info("耗时{}ms, 错误{}", end - start, err.get());
}
验证单tcp连接的多路复用
发起100个线程,每个线程连续进行1000次读写操作;
执行期间查看客户端本地的tcp连接,可以看到只建立了一个和redis节点的连接;
每个线程的指令并发地在同一个tcp连接上发出和响应,一次请求+响应实际相当于http2的一路stream
@Slf4j
@EnableAutoConfiguration(exclude = {
RedisAutoConfiguration.class })
@SpringBootTest(classes = {
RedisTemplateConfig.class })
public class RedisTemplateTest {
@Autowired
StringRedisTemplate template;
@Test
public void getSet() throws Exception {
long start = System.currentTimeMillis();
int maxThreads = 100;
long maxMessagess = 1000;
AtomicLong err = new AtomicLong();
AtomicLong num = new AtomicLong();
Semaphore semaphore = new Semaphore(maxThreads);
for (int i = 0; i < maxThreads; i++) {
final int threadnum = i + 1;
semaphore.acquire(1);
new Thread(new Runnable() {
@Override
public void run() {
int j = 0;
try {
for (; j < maxMessagess; j++) {
String key = "thread" + threadnum + "test" + j;
String value = "test" + j;
template.opsForValue().set(key, value, 1, TimeUnit.SECONDS);
assertEquals(value, template.opsForValue().get(key));
}
} finally {
num.addAndGet(j);
semaphore.release(1);
}
}
}).start();
}
semaphore.acquire(maxThreads);
long end = System.currentTimeMillis();
double rate = 1000d * num.get() / (end - start);
log.info("每秒发送并读取消息{}; 耗时{}ms, 累计发送{}", rate, end - start, num.get());
}
RedisTemplate屏蔽了哪些并发命令可以共用连接的决策难点,所以不要自行使用Lettuce客户端获取连接或从连接池申请连接。