谷粒商城----通过缓存和分布式锁获取数据。

高并发下缓存失效的问题

高并发下缓存失效的问题--缓存穿透

指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

导致缓存穿透的就是红色的这条线

解决方案

null结果缓存,并且设置一个短暂的过期时间。

高并发下缓存失效的问题--缓存雪崩

缓存雪崩是指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。(大面积Key同时失效)

解决方案

原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低就很难引发集体失效的事件。

高并发下缓存失效的问题--缓存击穿

对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。

解决方案

加锁 大量并发只让一个去查,其他人等待,查到以后释放锁其他人获取到锁,先查缓存,就会有数据,不用去db

谷粒商城中的锁

接口层

/**
 * @description: 从服务器中获取一段JSON数据
 * @param: []
 * @return: java.util.Map<java.lang.String,java.util.List<com.atguigu.gulimail.product.vo.Catelog2Vo>>
 */
@ResponseBody
@GetMapping("/index/catalog.json")
public Map<String, List<Catelog2Vo>> getCatelogJson() {
    Map<String, List<Catelog2Vo>> catelogJson = categoryService.getCatelogJson();
    return catelogJson;
}

业务层

getCatelogJson

首先去缓存中获取JSON

String catalogJSONFromCache = redisTemplate.opsForValue().get("catalogJSON");

  • 有 ,通过阿里的fast-json,把redis中的字符串解析成对象,由于该对象是个复杂类型的,所以这里使用TypeReference并用泛型,指定上类型。

  • 没有,调用getCatelogJsonFromDataWithRedisLock,使用缓存锁的情况下,从数据库中获取数据。

@Override
public Map<String, List<Catelog2Vo>> (){
​
    /*a
    * 1. 空结果缓存
    * 2. 设置随机值的过期时间
    * 3. 缓存击穿
    * */
​
    //1.加入缓存的功能
    String catalogJSONFromCache = redisTemplate.opsForValue().get("catalogJSON");
​
    if(StringUtils.isEmpty(catalogJSONFromCache)){
        //2.缓存中没有,查询数据库
        return getCatelogJsonFromDataWithRedisLock();
    }
    //转为指定的对象,使用阿里的fastJSON,如果是复杂的数据类型,需要指定一个TypeReference,这里直接使用匿名内部类,进行转换
    return JSON.parseObject(catalogJSONFromCache,new TypeReference<Map<String,List<Catelog2Vo>>>(){});
}

getCatelogJsonFromDataWithRedisLock

进入getCatelogJsonFromDataWithRedisLock方法,在这里使用的分布式缓存锁,主要应对缓存失效的问题:

1. 缓存雪崩
1. 缓存穿透
1. 缓存击穿
String token = UUID.randomUUID().toString();
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",token,300,TimeUnit.SECONDS);

这里的锁,必须是要有过期时间的

万一拿到锁的服务,去操作数据库,出现异常,或者机器突然崩掉,就会导致锁无法释放,就会出现死锁的 状况。

set lock "haha" nx

nx:只有在没有这个k-v的情况下,才可以设置成功,否则就是失败的。

问题:

  1. 万一拿到锁的服务,去操作数据库,出现异常,或者机器突然崩掉,就会导致锁无法释放,就会出现死锁的状况。

    1. 解决方法,设置个自动过期时间(一定要设置成原子操作,ex)

    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111",300,TimeUnit.SECONDS);

这里每个人的锁,也就是key,对应的value都是一个不相同的UUID。

如果业务超时,会导致多人都拿到锁,而且前面的线程会删掉后面线程的锁,导致业务异常。

解决方法,设置的Value都不尽相同,可以是UUID随机值,之后在比对值后,才进行删除(一定要设置成 原子操作)。

锁的抢占

redisTemplate.opsForValue().setIfAbsent("lock",token,300,TimeUnit.SECONDS);

这些事朝Redis中放入一个key,如果不存在这个key,设置上,并返回true,如果有这个key,则直接返回false。

可以通过这个结果来判断抢占锁的成功与否。

  • 如果锁抢占失败:

    • 先让线程睡200ms。

    • 然后再次调用这个方法:getCatelogJsonFromDataWithRedisLock,抢占资源。

  • 如果锁抢占成功:

    • 执行正在从数据库或得数据的方法:data = getDataFromDB();

    • 原子操作释放锁。

      • 为什么这里要使用脚本操作?保证原子性。为什么要原子性?

        如果不是原子操作:

 

public Map<String, List<Catelog2Vo>> getCatelogJsonFromDataWithRedisLock() {
​
        //抢占锁 ,并且设置过期时间(30s之后删除)
        //每一个线程的锁都是不一样的。
        String token = UUID.randomUUID().toString();
        Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",token,300,TimeUnit.SECONDS);
        //返回的结果
        if(lock){
            System.err.println("获取分布式锁成功");
            Map<String, List<Catelog2Vo>> data = null;
            try{
               //加锁成功,执行业务
               data = getDataFromDB();
           }finally {
               String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
               Integer result = redisTemplate.execute(new DefaultRedisScript<Integer>(script, Integer.class),
                       Collections.singletonList("lock"),
                       token
               );
           }
            //业务执行成功,解锁
//            redisTemplate.delete("lock");
​
            //删锁的就需要进行值比对
//            String lockValue = redisTemplate.opsForValue().get("lock");
//            if(lockValue.equals(token)){
//                //删除自己的锁
//                redisTemplate.delete("lock");
//            }
            //返回结果
            return data;
        }else{
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
​
            }
            System.err.println("获取分布式锁不成功...等待重试");
            //加锁失败,重试
            return getCatelogJsonFromDataWithRedisLock();
        }
    }

getDataFromDB

String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");

这里为什么要进行第二次的查看缓存数据?

getCatelogJson方法中,就进行了一次缓存的读取,为什么这里还需要一次呢?可以想象一个场景:当缓存中没有数据时,用大量并发线程都访问这个方法获取JSON数据,第一次的查缓存肯定是null,所以所有的线程都进入了getCatelogJsonFromDataWithRedisLock方法,排队获得锁,第一个获得锁的线程,读取数据,把内容放到缓存之后,就是剩下的线程获得锁,进入getDataFromDB这个方法,如果不再次查缓存的话,剩下的 线程就还是从数据库中获取数据,因为第一层缓存查询,没有过滤掉这些线程(用词可能不合适),这下就变成排队查数据库了,还是出现了缓存失效的问题。

private Map<String, List<Catelog2Vo>> getDataFromDB() {
    //拿到锁之后,第一步就应该是去缓存中看一下,是否已经有了内容,如果有了,就不用去操作数据库了。
    //如果不进行一次,这个锁就失去意义了,就变成排队查数据库了
    String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
    if(!StringUtils.isEmpty(catalogJSON)){
        //缓存不为空,就返回数据
        return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {
        });
    }
    System.err.println("====================================Info-Message================================");
    List<CategoryEntity> selectList = baseMapper.selectList(null);
    //1.查出所有1级分类
    List<CategoryEntity> level1 = getParent_cid(selectList, 0L);
    //2.封装数据
    Map<String, List<Catelog2Vo>> parent_cid = level1.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
                //1.查出1级分类中所有2级分类
                List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
                //2.封装上面的结果
                List<Catelog2Vo> catelog2Vos = null;
                if (categoryEntities != null) {
                    catelog2Vos = categoryEntities.stream().map(l2 -> {
                        Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());
                        //查询当前2级分类的3级分类
                        List<CategoryEntity> level3 = getParent_cid(selectList, l2.getCatId());
                        if (level3 != null) {
                            List<Catelog2Vo.Catelog3Vo> collect = level3.stream().map(l3 -> {
                                //封装指定格式
                                Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
                                return catelog3Vo;
                            }).collect(Collectors.toList());
                            catelog2Vo.setCatalog3List(collect);
                        }
                        return catelog2Vo;
                    }).collect(Collectors.toList());
                }
                return catelog2Vos;
            }
    ));

整个流程

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-09 18:52:07       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-09 18:52:07       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-09 18:52:07       57 阅读
  4. Python语言-面向对象

    2024-07-09 18:52:07       68 阅读

热门阅读

  1. 使用Spring Boot和Couchbase实现NoSQL数据库

    2024-07-09 18:52:07       30 阅读
  2. R语言学习笔记3-基本类型篇

    2024-07-09 18:52:07       26 阅读
  3. pytorch通过 tensorboardX 调用 Tensorboard 进行可视化

    2024-07-09 18:52:07       25 阅读
  4. PHP框架详解 - symfony框架

    2024-07-09 18:52:07       29 阅读
  5. PyTorch简介

    2024-07-09 18:52:07       32 阅读
  6. Apache AGE vs Neo4j

    2024-07-09 18:52:07       27 阅读
  7. 数据库基础

    2024-07-09 18:52:07       27 阅读
  8. centos7系统如何使用GPT分区

    2024-07-09 18:52:07       30 阅读