验证Lettuce在单连接上进行多路复用


redis multiplexing机制类似http2,在存在并发请求时能减少大量通讯延迟,但不支持blocking相关的操作,如BLPOP

配置 RedisTemplate

RedisTemplate自动根据操作类型,选择是在单连接上进行多路复用,还是申请新的连接/等待空闲连接

@Configuration
public class RedisTemplateConfig {
   
   @Bean
   public LettuceConnectionFactory lettuceConnectionFactory(@Value("${redis.cluster}") String address,
           @Value("${redis.password}") String password) {
   
       // 配置连接池管理
       var poolConfig = new GenericObjectPoolConfig<StatefulRedisClusterConnection<String, String>>();
       poolConfig.setMaxTotal(20);
       poolConfig.setMaxIdle(20);
       poolConfig.setMinIdle(2);
       poolConfig.setTestWhileIdle(true);
       poolConfig.setMinEvictableIdleDuration(Duration.ofMillis(60000));
       poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(30000));
       poolConfig.setNumTestsPerEvictionRun(-1);
       // 配置客户端行为
       var clientConfig = LettucePoolingClientConfiguration.builder()
               .clientOptions(ClusterClientOptions.builder()
                       .autoReconnect(true)
                       .pingBeforeActivateConnection(true)
                       .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(3)).build())
                       .timeoutOptions(TimeoutOptions.builder().fixedTimeout(Duration.ofSeconds(3)).build())
                       .topologyRefreshOptions(ClusterTopologyRefreshOptions.builder()
                               .enableAdaptiveRefreshTrigger(RefreshTrigger.MOVED_REDIRECT,
                                       RefreshTrigger.PERSISTENT_RECONNECTS)
                               .adaptiveRefreshTriggersTimeout(Duration.ofSeconds(30))
                               .build())
                       .build())
               .poolConfig(poolConfig)
               .build();
       // 配置集群连接信息
       var redisConfig = new RedisClusterConfiguration();
       redisConfig.setMaxRedirects(5);
       redisConfig.setPassword(password);
       String[] serverArray = address.split(",|,|;|;");// 获取服务器数组
       Set<RedisNode> nodes = new HashSet<>();
       for (String ipPort : serverArray) {
   
           nodes.add(RedisNode.fromString(ipPort));
       }
       redisConfig.setClusterNodes(nodes);
       return new LettuceConnectionFactory(redisConfig, clientConfig);
   }

   @Bean
   public StringRedisTemplate redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
   
       return new StringRedisTemplate(lettuceConnectionFactory);
   }
}

验证blocking操作在独立连接上进行

实现blocking LPOP操作的方法在opsForList()里;
并发100压测期间查看客户端本地的tcp连接,可以看到和每个redis节点都建立了大量连接;
证明RedisTemplate没有选择让不同并发线程共用同一个StatefulConnection

    @Test
    public void blpop() throws Exception {
   
        long start = System.currentTimeMillis();
        AtomicLong err = new AtomicLong();
        int maxThreads = 100;
        Semaphore semaphore = new Semaphore(maxThreads);
        for (int i = 0; i < maxThreads; i++) {
   
            final int threadnum = i + 1;
            semaphore.acquire(1);
            new Thread(new Runnable() {
   
                @Override
                public void run() {
   
                    try {
   
                        template.opsForList().leftPop("test" + threadnum, 2, TimeUnit.SECONDS);
                    } catch (Exception ex) {
   
                        log.error("leftPop", ex);
                        err.addAndGet(1L);
                    } finally {
   
                        semaphore.release(1);
                    }
                }
            }).start();
        }
        semaphore.acquire(maxThreads);
        long end = System.currentTimeMillis();
        log.info("耗时{}ms, 错误{}", end - start, err.get());
    }

验证单tcp连接的多路复用

发起100个线程,每个线程连续进行1000次读写操作;
执行期间查看客户端本地的tcp连接,可以看到只建立了一个和redis节点的连接;
每个线程的指令并发地在同一个tcp连接上发出和响应,一次请求+响应实际相当于http2的一路stream

@Slf4j
@EnableAutoConfiguration(exclude = {
    RedisAutoConfiguration.class })
@SpringBootTest(classes = {
    RedisTemplateConfig.class })
public class RedisTemplateTest {
   
    @Autowired
    StringRedisTemplate template;

    @Test
    public void getSet() throws Exception {
   
        long start = System.currentTimeMillis();
        int maxThreads = 100;
        long maxMessagess = 1000;
        AtomicLong err = new AtomicLong();
        AtomicLong num = new AtomicLong();
        Semaphore semaphore = new Semaphore(maxThreads);
        for (int i = 0; i < maxThreads; i++) {
   
            final int threadnum = i + 1;
            semaphore.acquire(1);
            new Thread(new Runnable() {
   
                @Override
                public void run() {
   
                    int j = 0;
                    try {
   
                        for (; j < maxMessagess; j++) {
   
                            String key = "thread" + threadnum + "test" + j;
                            String value = "test" + j;
                            template.opsForValue().set(key, value, 1, TimeUnit.SECONDS);
                            assertEquals(value, template.opsForValue().get(key));
                        }
                    } finally {
   
                        num.addAndGet(j);
                        semaphore.release(1);
                    }
                }
            }).start();
        }
        semaphore.acquire(maxThreads);
        long end = System.currentTimeMillis();
        double rate = 1000d * num.get() / (end - start);
        log.info("每秒发送并读取消息{}; 耗时{}ms, 累计发送{}", rate, end - start, num.get());
    }

RedisTemplate屏蔽了哪些并发命令可以共用连接的决策难点,所以不要自行使用Lettuce客户端获取连接或从连接池申请连接。

相关推荐

  1. 验证Lettuce连接进行复用

    2024-01-12 20:22:05       57 阅读
  2. Spring Boot中使用Validation进行验证

    2024-01-12 20:22:05       22 阅读
  3. HTTP复用

    2024-01-12 20:22:05       25 阅读
  4. IO复用

    2024-01-12 20:22:05       11 阅读
  5. io复用

    2024-01-12 20:22:05       11 阅读
  6. IO的复用

    2024-01-12 20:22:05       44 阅读
  7. Redis——IO复用

    2024-01-12 20:22:05       41 阅读
  8. IO复用--epoll

    2024-01-12 20:22:05       23 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-12 20:22:05       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-12 20:22:05       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-12 20:22:05       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-12 20:22:05       18 阅读

热门阅读

  1. 10000访问量纪念日

    2024-01-12 20:22:05       38 阅读
  2. 【计算机二级考试C语言】C程序结构

    2024-01-12 20:22:05       36 阅读
  3. 算法训练营Day36

    2024-01-12 20:22:05       32 阅读
  4. 【力扣每日一题】力扣2707字符串中的额外字符

    2024-01-12 20:22:05       34 阅读
  5. 自定义Flink SourceFunction定时读取数据库

    2024-01-12 20:22:05       33 阅读
  6. 学习使用php、js脚本关闭当前页面窗口的方法

    2024-01-12 20:22:05       34 阅读
  7. Wine源码中添加新的DLL模块

    2024-01-12 20:22:05       38 阅读
  8. SpringBoot 配置文件

    2024-01-12 20:22:05       25 阅读
  9. Springboot 中接口服务重试机制

    2024-01-12 20:22:05       24 阅读