Redis分布式锁

一、面试题

1. Redis除了拿来做缓存,你还见过基于Redis的什么用法?

如分布式锁、位统计(例如做网站的PV,UV等)、轻量级消息队列、签到、热搜等。

2.Redis做分布式锁的时候有什么需要注意的问题?
3.你们公司自己实现的分布式锁是否用setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
4.如果是redis单点部署,会带来什么问题?
6.Redis集群模式下,比如主从模式,CAP方面有没有什么问题?
在这里插入图片描述

7.那你简单介绍一下Redlock吧?你简历上写redisson,你谈一谈?
8.Redis的分布式锁如何续期?看门狗知道吗?

二、Redis分布式锁学习

1. 锁的种类

  • 传统的单机版的锁,只作用于一个JVM虚拟机中,如synchronized或者Lock接口
  • 分布式环境下存在多个JVM,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了,此时就需要分布式锁

在这里插入图片描述

2. 一个靠谱的分布式锁需要具备的条件和刚需

  • 独占性:任何时候有且只有一个线程持有
  • 高可用:若在redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况,要在高并发场景下性能依旧很好
  • 防死锁:杜绝死锁,必须要有超时控制机制或撤销操作,有个兜底终止跳出方案
  • 不乱抢:一个线程不能释放其它线程的锁,只能自己加锁自己解锁
  • 重入性:同一个线程获取锁之后,它可以再次获取这个锁,而不需要重复加锁

3. 自定义Redis分布式锁案例

  • 场景

多个服务件保证同一时刻同一时间段内只能有一个请求(防止关键业务出现并发攻击)

  • 环境搭建

redis:搭建两个redis服务器(可以在本机部署在不同的端口),一个作为分布式锁的容器,一个作为数据容器
在这里插入图片描述

SpringBoot工程项目

首先导入依赖:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- swagger-ui -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- swagger2 -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.9</version>
        </dependency>

配置swagger

@Configuration
@EnableSwagger2
public class SwaggerConfiguration {

    @Bean
    public Docket docket() {
        return new Docket(DocumentationType.SWAGGER_2)
            .apiInfo(apiInfo())
            .select()
            .apis(RequestHandlerSelectors.basePackage("com.swagger.controller"))
            .paths(PathSelectors.any())
            .build();
    }
    // SwaggerUI界面配置
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
            .title("SpringBoot接口文档")
            .description("接口文档")
            .contact(new Contact("Bear","http://www.yuque.com/bearpessimist","1942496795@qq.com"))
            .version("1.0")
            .build();
    }
}


然后配置redis


@Configuration
public class RedisConf {
    @Bean
    public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory){
        RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return  redisTemplate;
    }

}

redis中添加一条库存数据

在这里插入图片描述

然后写controler层代码

@RestController
@Api(tags = "redis分布式锁测试")
public class InventoryController {
    @Autowired
    private InventoryService inventoryService;
    
    @ApiOperation("扣减库存,一次卖一个")
    @GetMapping("/inventory/sale")
    public String sale(){
        return inventoryService.sale();
    }
}

写service层代码

@Service
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    
    private String port="8080";
    
    private Lock lock=new ReentrantLock();
    public String sale() {
        String retMessage="";
        lock.lock();
        try{
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory=inventory001==null? 0:Integer.valueOf(inventory001);
            //3. 扣减库存
            if(inventory>0){
                stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
                retMessage="成功卖出一个商品,剩余库存"+inventory;
            }else{
                retMessage="商品库存不足";
            }
            
        }finally {
            lock.unlock();
        }
        return retMessage+"\t"+"服务器端口"+port;
    }
}

下面就可以启动项目测试扣减库存的效果了
在这里插入图片描述
发送请求扣减库存

在这里插入图片描述

库存扣减成功,上面代码就只有一个服务,也就是一个JVM,使用ReetranLock就保证了单机环境下多线程的线程安全问题,下面开始添加分布式锁的功能

下面将上面单机版的案例扩展到分布式环境下,首先将上面创建的代码再copy一份再创建一个module,但注意要配置一下端口(我这里配置的8081)

然后配置nginx实现动态代理和负载均衡(这里没有实现负载均衡),首先修改conf文件

http {
    include       mime.types;
    default_type  application/octet-stream;

    sendfile        on;
    keepalive_timeout  65;

    #gzip  on;


    server {
        listen       80;
        server_name  localhost;
        location / {
            proxy_pass http://mynginx;
            #root   /usr/share/nginx/html;
            #index  index.html index.htm;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }

    }

    upstream mynginx{
        server 127.0.0.1:8080 weight=1;
        server 127.0.0.1:8081 weight=1;
    }

}

然后启动nginx

./nginx -c /usr/local/nginx/conf/nginx.conf

测试扣减库存成功

至此分布式环境就搭建完毕了

现在考虑高并发场景,我们使用jmeter来进程高并发测试
在这里插入图片描述
开始测试查看聚合报告

在这里插入图片描述

可以看到所有请求都成功了没有出现异常请求

我们现在看看redis中库存是多少

在这里插入图片描述

redis的库存是28不是我们期待的0,说明上面的系统在高并发场景下出现了问题

出现上面的根本问题就是出现了超卖问题。分析问题的根本原因是,在单机环境下,可以使用synchronized或lock来实现。但是在分布式系统中,因为竞争的线程可能不在一个节点上,所以需要一个让所有线程都能访问的锁来实现(比如zookeeper或redis)。基于此分布式锁出现了。

单机版会出现超卖问题,所以我们需要加入分布式锁

在这里插入图片描述

SETNX: 向Redis中添加一个key,只用当key不存在的时候才添加并返回1,存在则不添加返回0。并且这个命令是原子性的。使用SETNX作为分布式锁时,添加成功表示获取到锁,添加失败表示未获取到锁。至于添加的value值无所谓可以是任意值(根据业务需求),只要保证多个线程使用的是同一个key,所以多个线程添加时只会有一个线程添加成功,就只会有一个线程能够获取到锁。而释放锁锁只需要将锁删除即可。

set key value [EX seconds] [PX milliseconds] [NX|XX]
  • EX:key在多少秒后过期
  • PX:key在多少毫秒后过期
  • NX:当key不存在时,才创建key,效果等同于setnx
  • XX:当key存在时覆盖key

修改原来代码

 public String sale() {
        String retMessage = "";
        String key = "zzyyredisLock";
        String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
        Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
        //抢不到的线程需要继续重试
        if (!aBoolean) {
            //没有抢到锁,进行递归重试
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sale();
        } else {
            try{
                //抢到了锁
                //1. 查询库存信息
                String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
                //2. 判断库存是否足够
                Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
                //3. 扣减库存
                if (inventory > 0) {
                    stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                    retMessage = "成功卖出一个商品,剩余库存" + inventory;
                    System.out.println(retMessage + "\t" + "服务器端口" + port);
                } else {
                    retMessage = "商品库存不足";
                }
            }finally {
                stringRedisTemplate.delete(key);
            }

        }
        return retMessage + "\t" + "服务器端口" + port;
    }

最后打开jmeter测试
在这里插入图片描述

发现最后库存确实减为了0

上面功能就实现了分布式锁,但是我们分析一下上面代码的缺陷,前面代码中首先如果一个线程没有获取到锁会每隔20s递归重试来获取锁,而递归重试这种方式在高并发场景下会出现问题,因为在高并发场景下,容易出现栈溢出,所以我们需要用自旋替换递归。然后就是多线程环境下未来防止虚假唤醒需要使用while替换if。修改后的代码如下:

public String sale() {
        String retMessage = "";
        String key = "zzyyredisLock";
        String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
        //抢不到的线程需要继续重试
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {
            //没有抢到锁,进行递归重试
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
            stringRedisTemplate.delete(key);
        }
        return retMessage + "\t" + "服务器端口" + port;
    }

继续测试结果还是没有问题
在这里插入图片描述
继续分析上面系统,现在是不是还有什么问题,现在一个关键问题就是key的过期时间问题,我们分析上面使用setnx来设置分布式锁的流程,每个线程获取锁就相当于在redis中设置了一个key,释放锁时就删除这个key。假如获取锁的线程宕机了,且该线程没有释放key,而且key没有设置过期时间,所以这个key永远不会释放,那么后面的线程永远都不会获得锁,那么我们的所有线程都会阻塞在这里。

在这里插入图片描述

上图A线程挂了

所以为了解决这个问题,我们需要在设置锁的时候需要给key设置一个过期时间。

 public String sale() {
        String retMessage = "";
        String key = "zzyyredisLock";
        String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
        //抢不到的线程需要继续重试
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) {
            //没有抢到锁,进行递归重试
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
            stringRedisTemplate.delete(key);
        }
        return retMessage + "\t" + "服务器端口" + port;
    }
}

注意加锁和设置过期时间必须是一个原子操作

再次测试
在这里插入图片描述
我们接着分析,由于每个线程的执行时间是不确定的,如果某个线程A获得了锁设置过期时间为30s,但是如果A线程的工作时间是35s,所以A现场还没工作完锁就被释放了,此时别的线程B可以获取锁,5S后A来删除锁,但此时删除的锁是B的锁,这样就出现了误删除B的Key的问题。这个问题就是著名的key续租问题。

在这里插入图片描述
我们继续修改,即只能自己删除自己的锁,不能删除别人的锁。继续修改代码:

@Service
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private String port = "8080";


    public String sale() {
        String retMessage = "";
        String key = "zzyyredisLock";
        String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
        //抢不到的线程需要继续重试
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) {
            //没有抢到锁,进行递归重试
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
           if(stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue))
            {
                stringRedisTemplate.delete(key);
            }
        }
        return retMessage + "\t" + "服务器端口" + port;
    }
}

继续思考上面代码有什么问题,虽然上面我们解决了误删除key的问题,但是下面更新的代码还是有问题,即redis获取和删除的操作不是原子的,这样在高并发场景下会发生一些意想不到的问题。

  if(stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue))
            {
                stringRedisTemplate.delete(key);
            }
  }

所以如何将多条redis操作结合成一个原子操作,答案是lua脚本(介绍在后面)。首先编写lua脚本文件
(
然后添加lua配置类

@Configuration
public class RedisScriptConfig {

    @Bean
    public DefaultRedisScript<String> defaultRedisScript() {
        DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript<>();
        defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("classpath:redis.lua")));
        defaultRedisScript.setResultType(String.class);
        return defaultRedisScript;
    }
}

最后更改service方法

@Service
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedisScript<String> redisScript;

    private String port = "8080";


    public String sale() {
        String retMessage = "";
        String key = "zzyyredisLock";
        String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
        //抢不到的线程需要继续重试
        while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) {
            //没有抢到锁,进行递归重试
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
            stringRedisTemplate.execute(redisScript, Arrays.asList(key), uuidValue);
        }
        return retMessage + "\t" + "服务器端口" + port;
    }
}

测试成功
在这里插入图片描述
截止到目前版本,我们已经实现了while判断并自旋重试获取锁,以及setnx含自然过期时间以及lua脚本删除锁命令。那么现在代码还有什么问题,一个关键问题就是锁的重入性问题

可重入锁又称为递归锁,是指一个线程在外层获取锁的时候,再进入该方法的内层会自动获取锁(前提锁是同一个对象),不会因为之前获取过还没有释放而阻塞。redis实现锁的可重入可以参考JUC里面的AQS原理,关键是如何实现重入锁的计数问题

由于setnx本身的特性,它天生就不支持可重入,所以这里锁的数据结构就不能用SETNX了,那么redis中有没有哪种数据结构能够满足我们的需求,答案是HSET(Map<key, Map<ID,time>>)。在我们对Hset进行维护的时候每一次操作都需要保证原子性,所以我们可以将基于HSET的加锁和解锁过程封装为不同的Lua脚本。流程如下:

  1. 先判断有没有分布式锁(EXISTS key)
  2. 判断锁是不是自己的,是自己的重入,不是自己的阻塞

加锁Lua脚本如下:

if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
	redis.call('hincrby',KEYS[1],ARGV[1],1)
	redis.call('expire',KEYS[1],ARGV[2])
	return 1
else 
	return 0
end

解锁Lua脚本如下:

if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then
    return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then
    return redis.call('del', KEYS[1])
else
    return 0
end

实现自己的分布式锁:

package com.jack.redis_test1.util;

import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class MyRedisLock implements Lock {

    private StringRedisTemplate stringRedisTemplate;

    private String lockName;

    private String uuidValue;

    private long expiretime;


    public MyRedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.lockName = lockName;
        this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
        this.expiretime=800L;
    }

    @Override
    public void lock() {
        tryLock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        try {
            tryLock(-1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        if (time == -1L) {
            String script=
                    "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
                        "redis.call('hincrby',KEYS[1],ARGV[1],1) " +
                        "redis.call('expire',KEYS[1],ARGV[2]) " +
                        "return 1 " +
                    "else " +
                        "return 0 " +
                    "end ";
            while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expiretime))) {
                //没有抢到锁,进行重试
                try {
                    TimeUnit.MILLISECONDS.sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return true;
        }
        return false;
    }

    @Override
    public void unlock() {
        String script="if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
                        "return nil " +
                      "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +
                        "return redis.call('del', KEYS[1]) " +
                      "else " +
                        "return 0 " +
                      "end";
        Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(lockName), uuidValue);
        if (flag == null) {
            throw  new RuntimeException("锁不存在");
        }
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

实现锁工厂

使用工厂模式,按需求生产锁

@Component
public class DistributedLockFactory {
    @Autowired
    StringRedisTemplate stringRedisTemplate;

    private String lockName;

    public Lock getDistributedLock(String lockType) {
        if (lockType == null) return null;
        if (lockType.equalsIgnoreCase("REDIS")) {
            this.lockName = "zzyyredisLock";
            return new MyRedisLock(stringRedisTemplate, lockName);
        } else if (lockType.equalsIgnoreCase("ZOOKEEPER")){
            this.lockName = "zzyZOOKEEPERLock";
            //TODO Zookeeper
        }
        return null;
    }
}

重写service:

@Service
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private String port = "8080";

    @Autowired
    private DistributedLockFactory distributedLockFactory;

    public String sale() {
        String retMessage = "";
        Lock myRedisLock=distributedLockFactory.getDistributedLock("redis");
        myRedisLock.lock();
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
            myRedisLock.unlock();
        }
        return retMessage + "\t" + "服务器端口" + port;
    }
}

进行测试:
在这里插入图片描述
在这里插入图片描述

测试成功

上面代码还只是实现了Hset来做redis分布式锁,现在还没有实现锁重入的要求,我们接着改造代码。

前面代码不能重入的原因是我们的uuidValue是在锁对象中实现的,一个线程第二次获取锁的时候还是会通过工厂获取,由于每次 IdUtil.simpleUUID()被调用值都不同,所以线程重入后发现uuidValue和redis数据库汇中的锁的uuidValue不同,导致不能重入,所以我们的需求是实现一个线程终身和一个uuid绑定。

package com.jack.redis_test1.service;

import cn.hutool.core.util.IdUtil;
import com.jack.redis_test1.util.MyRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.locks.Lock;

@Component
public class DistributedLockFactory {
    @Autowired
    StringRedisTemplate stringRedisTemplate;

    private String lockName;
    
    private String UUID;

    public DistributedLockFactory(){
        this.UUID= IdUtil.simpleUUID();
    }
    public Lock getDistributedLock(String lockType) {
        if (lockType == null) return null;
        if (lockType.equalsIgnoreCase("REDIS")) {
            this.lockName = "zzyyredisLock";
            return new MyRedisLock(stringRedisTemplate, lockName,UUID);
        } else if (lockType.equalsIgnoreCase("ZOOKEEPER")){
            this.lockName = "zzyZOOKEEPERLock";
            //TODO Zookeeper
        }
        return null;
    }
}

现在就剩最后一个需求了,如何解决锁的续租问题

如果我们的业务时间超过了锁的TTL,锁过期了就会被redis删除,此时别的业务就可以抢占锁,等前面业务执行完,就会删除后面业务的锁,造成误删

CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

一致性(C): 在分布式系统中的所有数据备份,在同一时刻是否同样的值,即写操作之后的读操作,必须返回该值。(分为弱一致性、强一致性和最终一致性)
可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
分区容错性(P): 以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。

Redis是AP,Zookeeper是CP,Eurka和Nacos是AP

redis异步复制造成锁的丢失,比如主节点没来的及把刚刚set进来的这条数据给从节点,master就挂了,从机上位根本没有这条数据

自动续期的lua脚本

if redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1 then
	return redis.call('expire',KEYS[1],ARGV[2]);
else
	return 0
end

更改service

@Service
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private String port = "8080";

    @Autowired
    private DistributedLockFactory distributedLockFactory;

    public String sale() {
        String retMessage = "";
        Lock myRedisLock=distributedLockFactory.getDistributedLock("redis");
        myRedisLock.lock();
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
            myRedisLock.unlock();
        }
        return retMessage + "\t" + "服务器端口" + port;
    }
}

4. lua脚本语言

Lua是一种轻量小巧的脚本语言,用标准的C语言编写并以源代码形式开发,其设计目的是为了嵌入到应用程序中,从而为应用程序提供灵活的扩展和定制能力。其特点如下:

  • 轻量级:它用标准的c语言编写并以源代码形式开发,编译后仅仅100余k,可以很方便嵌入别的程序中
  • 可扩展:lua提供了非常易于使用和扩展的借口和机制,由宿主语言C或C++提供这些功能,Lua可以使用他们,就像本来就内置的功能一样

Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果。

eval luascripts numkeys [key[key...]][arg[arg...]]

lua脚本的hello word入门

在这里插入图片描述

现在我们的需求是将下面三个命令封装到lua脚本中,让它们变成一次原子操作

set k1 v1
expire k1 30
get k1
eval "redis.call('set','k1','v1') redis.call('expire','k1','30') return redis.call('get','k1') " 0

后面的0表示参数的个数

在这里插入图片描述

上面代码就实现了我们的需求,假如我们现在需要动态传参该怎么办,假如我们使用lua脚本实现下面命令

eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])"  2 k1 k2 v1 v2

lua的数组下标从1开始

然后我们再加入if-else判断,需求是先检查key是否存在,存在就删除

eval "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 k1 v1

在这里插入图片描述

LUA条件判断语法

if(布尔条件) then

elseif(布尔条件) then

elseif(布尔条件) then

else 
  业务代码
end

三、Redlock底层Redisson源码分析

1. 简介

上面我们已经一步一步实现了自己的redis分布式锁,经过版本的迭代已经基本上完善了所有的功能。思考一个问题,如果我们的并发量变成海量上面的分布式锁是不是还会有上面问题?我们先来学习一下redis自带的分布式锁。

2. Redis分布式锁-Redlock红锁提出背景

Redlock是redis分布式锁的一个实现,他实现了我们认为比普通单例方法更安全的DLM(分布式锁管理器)。红锁(RedLock)是一种分布式锁算法,由 Redis 的作者 Salvatore Sanfilippo(也称为 Antirez)设计,用于在分布式系统中实现可靠的锁机制。它的设计解决了单一 Redis 实例作为分布式锁可能出现的单点故障问题。

之前我们手写的redis锁没解决的关键问题是,假如我们的redis服务器宕机了,我们的redis锁就会失效(这就是Redis的单点故障问题)。现在考虑如果我们给redis服务器添加一个副本,那么master宕机了我们的副本能不能解决单点故障问题?答案是不能的,前面说到redis是AP,它采用的同步方式是异步同步方式,在master将数据同步到副本时,redis就可能宕机了,此时副本中就没有锁的信息,此时就可能出现一锁被多用的风险。

在这里插入图片描述

3. RedLock算法设计理念

Redis之父为了解决上面的问题,就创建了Redlock算法,该算法基于多个实例的分布式锁。锁的变量由多个实例维护,即使有实例发生了故障,锁变量仍然存在,客户端还是能完成锁的操作。

在算法的分布式版本当中,我们假设有N个redis主节点。这些节点是完全独立的,所以我们不使用复制或任何其它隐式协调系统,我们已经描述了如何在单个实例中安全地获取和释放锁。我们想当然地认为算法会使用在单个实例中获取和释放锁,在我们的示例中,我们设置N为5,这是一个合理的值,亿你层我们需要在不同的计算机或虚拟机上运行5个Redis master,以确保它们以几乎独立的方式发生故障。为了获取锁,客户端执行下面操作:
在这里插入图片描述

实际就是一个不行来多个,但多个redis之间的关系不是主从关系,它们是独立的

在这里插入图片描述

在这里插入图片描述

4. Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用 者能够将精力更集中地放在处理业务逻辑上。

在这里插入图片描述

简而言之,Redisson是java的redis客户端之一,提供了一些api方便操作redis

5. 案例改造

基于redisson对上面我们手写的redis案例进行改进:

  • 为了满足redlock的原理,准备三台redis服务器

在这里插入图片描述

  • 导入redisson的依赖
<dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.19.1</version>
        </dependency>
  • 添加redisson的配置

@Configuration
public class RedisConf {
    @Bean
    public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory){
        RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return  redisTemplate;
    }
    
    @Bean
    public Redisson redisson(){
         Config config=new Config();
         config.useSingleServer().setAddress("redis://192.168.31.41:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}
  • 改进service
@Service
public class InventoryService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private String port = "8081";

    @Autowired
    private Redisson redisson;

    public String sale() {
        String retMessage = "";

        RLock redissonLcok = redisson.getLock("redislock");
        redissonLcok.lock();
        try {
            //抢到了锁
            //1. 查询库存信息
            String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
            //2. 判断库存是否足够
            Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
            //3. 扣减库存
            if (inventory > 0) {
                stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
                retMessage = "成功卖出一个商品,剩余库存" + inventory;
                System.out.println(retMessage + "\t" + "服务器端口" + port);
            } else {
                retMessage = "商品库存不足";
            }
        } finally {
            redissonLcok.unlock();
        }
        return retMessage + "\t" + "服务器端口" + port;
    }
}

到此项目改进完成,下面开始测试。

在这里插入图片描述
在这里插入图片描述

上面单redis案例就通过了

但是如果在高并发场景下,可能存在下面的bug

在这里插入图片描述
解决上面问题只需要在解锁时加下面代码就行了。

finally {
            if(redissonLcok.isLocked() && redissonLcok.isHeldByCurrentThread()) {
                redissonLcok.unlock();
            }
        }

前面介绍到实现一个锁逃不掉要实现加锁、可重入、续命和解锁这四个步骤,我们看看redisson解决这些问题的思想。

  • redis分布式锁过期了,但是业务逻辑还没有执行完怎么办?

我的解决方法是,额外起一个线程,定期检查线程是否还持有锁,如果有则延迟过期时间,redisson里面就实现了这个方案,使用“看门狗”定期检查(每1/3的锁时间检查1次),如果线程还持有锁,则刷新过期时间。

在这里插入图片描述

下面分析源码:

    redissonLcok.lock();

上面是redisson的加锁方法,点进去它其实就是JUC的lock,所以redisson也实现了JUC的lock规范

在这里插入图片描述

RLock就是Redisson的顶层Lock

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {}

public class RedissonLock extends RedissonBaseLock {

public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
}

RedissonLock就是Redisson的锁逻辑的实现地方

 this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
public long getLockWatchdogTimeout() {
        return this.lockWatchdogTimeout;
 }
this.lockWatchdogTimeout = 30000L;

上面代码中这句代码就是内部锁的释放时间,可以看见是30s

这里我们就得出了第一个结论:redisson给我们新建的锁key默认过期时间是30s

  • Redisson底层加锁流程

我们进入lock方法

  private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //首先获得线程的ID
        long threadId = Thread.currentThread().getId();
        //尝试去加锁
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
        if (ttl != null) {
            CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);
            this.pubSub.timeout(future);
            RedissonLockEntry entry;
            if (interruptibly) {
                entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);
            } else {
                entry = (RedissonLockEntry)this.commandExecutor.get(future);
            }

            try {
                while(true) {
                    ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                    if (ttl == null) {
                        return;
                    }

                    if (ttl >= 0L) {
                        try {
                            entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var14) {
                            if (interruptibly) {
                                throw var14;
                            }

                            entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                this.unsubscribe(entry, threadId);
            }
        }
    }

上面方法首先获得了线程ID,然后调用this.tryAcquire来尝试获取锁,我们进入该方法

  private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }

在这里插入图片描述

上面就是Redisson加锁的核心关键代码

我们接下来进入tryLockInnerAsync看看内部是如何加锁的:

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }

在这里插入图片描述

可以发现redisson底层也是使用lua脚本加锁的,和我们自定义的redis分布式锁的原理一样。下面我们看看redisson是如何完成续租的。

protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);

            try {
                this.renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    this.cancelExpirationRenewal(threadId);
                }

            }
        }

    }

private void renewExpiration() {
        ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
                public void run(Timeout timeout) throws Exception {
                    ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
                    if (ent != null) {
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
                            future.whenComplete((res, e) -> {
                                if (e != null) {
                                    RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e);
                                    RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
                                } else {
                                    if (res) {
                                        RedissonBaseLock.this.renewExpiration();
                                    } else {
                                        RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
                                    }

                                }
                            });
                        }
                    }
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            ee.setTimeout(task);
        }
    }

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }

在这里插入图片描述

上面加锁和续命的源码已经看完了,下面分析解锁的源码,其实也很简单。
在这里插入图片描述

  • 继续改造案例

上面我们改造的案例只使用了一个redis服务器,现在还没有解决单点故障问题,下面改为多机案例。首先回顾一下redlock算法

在这里插入图片描述
目前Redlock这个对象已经在多机环境中被弃用了,取而代之的是Multilock对象。

在这里插入图片描述
首先在当前目录下创建一个新的module,改配置文件

server:
  port: 8083
spring:
  application:
    name: redlock
  swagger2:
    enabled: true
  redis:
    database: 0
    password:
    timeout: 3000
    mode: single
    pool:
      conn-timeout: 3000
      so-timeout: 3000
      size: 10
    single:
      address1: 192.168.31.41:6279
      address2: 192.168.31.41:6280
      address3: 192.168.31.41:6281

写配置类:

@Data
public class RedisSingleProperties {
    private String address1;
    private String address2;
    private String address3;
}
@ConfigurationProperties(prefix = "spring.redis",ignoreUnknownFields = false)
@Data
public class RedisProperties {
    private int timeout=3000;
    private String password;
    private String mode;
    //池配置
    private RedisPoolProperties pool;
    //单机信息配置
    private RedisSingleProperties single;
}
@Data
public class RedisPoolProperties {
    private int maxIdle;

    private int minIdle;

    private int maxActive;

    private int maxWait;

    private int connTimeout=10000;

    private int soTimeout;

    //池大小
    private  int size;
}
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {
    @Autowired
    RedisProperties redisProperties;

    @Bean("redisClient1")
    RedissonClient redisClient1(){
        Config config=new Config();
        String node=redisProperties.getSingle().getAddress1();
        node=node.startsWith("redis://")? node: "redis://"+node;
        SingleServerConfig saverConfig=config.useSingleServer().setAddress(node)
                .setTimeout(redisProperties.getPool().getConnTimeout())
                .setConnectionPoolSize(redisProperties.getPool().getSize())
                .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
        if(StringUtils.isNotBlank(redisProperties.getPassword())){
            saverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }

    @Bean("redisClient2")
    RedissonClient redisClient2(){
        Config config=new Config();
        String node=redisProperties.getSingle().getAddress2();
        node=node.startsWith("redis://")? node: "redis://"+node;
        SingleServerConfig saverConfig=config.useSingleServer().setAddress(node)
                .setTimeout(redisProperties.getPool().getConnTimeout())
                .setConnectionPoolSize(redisProperties.getPool().getSize())
                .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
        if(StringUtils.isNotBlank(redisProperties.getPassword())){
            saverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }

    @Bean("redisClient3")
    RedissonClient redisClient3(){
        Config config=new Config();
        String node=redisProperties.getSingle().getAddress3();
        node=node.startsWith("redis://")? node: "redis://"+node;
        SingleServerConfig saverConfig=config.useSingleServer().setAddress(node)
                .setTimeout(redisProperties.getPool().getConnTimeout())
                .setConnectionPoolSize(redisProperties.getPool().getSize())
                .setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
        if(StringUtils.isNotBlank(redisProperties.getPassword())){
            saverConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }
}

写controller

@RestController
@Api(tags = "redis分布式锁测试")
@Slf4j
public class InventoryController {

    public static final String CACHE_KEY_REDLOCK = "JACKCHAI_REDLOCK";
    @Resource
    RedissonClient redissonClient1;

    @Resource
    RedissonClient redissonClient2;

    @Resource
    RedissonClient redissonClient3;

    @ApiOperation("扣减库存,一次卖一个")
    @GetMapping("/inventory/sale")
    public String sale() {
        String taskThreadID=Thread.currentThread().getName();
        RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
        RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
        RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
        RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1, lock2, lock3);

        redissonMultiLock.lock();
        try{
            log.info("come in biz multilock:{}",taskThreadID);
            try{Thread.sleep(30000);}catch (InterruptedException e) {e.printStackTrace();};
            log.info("task is over multiLock:{}",taskThreadID);
        } catch (Exception e){
            e.printStackTrace();
            log.error("multilock exception:{}",e.getCause()+"\t"+e.getMessage());
        } finally{
            redissonMultiLock.unlock();
            log.info("释放分布式锁成功:{}",CACHE_KEY_REDLOCK);
        }
        return "multilock task is over"+taskThreadID;
    }
}

相关推荐

  1. redis笔记】分布式

    2024-03-10 08:30:05       60 阅读
  2. Redis - 分布式、Redisson

    2024-03-10 08:30:05       54 阅读
  3. redis——分布式

    2024-03-10 08:30:05       60 阅读
  4. Redis分布式

    2024-03-10 08:30:05       68 阅读
  5. Redis分布式

    2024-03-10 08:30:05       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-10 08:30:05       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-10 08:30:05       101 阅读
  3. 在Django里面运行非项目文件

    2024-03-10 08:30:05       82 阅读
  4. Python语言-面向对象

    2024-03-10 08:30:05       91 阅读

热门阅读

  1. Lua 脚本语言基础语法及应用

    2024-03-10 08:30:05       35 阅读
  2. HTML:用对 preload、prefetch提升网页加载速度

    2024-03-10 08:30:05       42 阅读
  3. 计算机等级考试:信息安全技术 知识点一

    2024-03-10 08:30:05       44 阅读
  4. Go语言:一门简洁高效的编程语言

    2024-03-10 08:30:05       39 阅读
  5. jupyter notebook

    2024-03-10 08:30:05       40 阅读
  6. Golang中处理map和list的初始化问题

    2024-03-10 08:30:05       39 阅读
  7. SSRF漏洞原理及其修复方式和加固方式

    2024-03-10 08:30:05       42 阅读