文章目录
一、面试题
1. Redis除了拿来做缓存,你还见过基于Redis的什么用法?
如分布式锁、位统计(例如做网站的PV,UV等)、轻量级消息队列、签到、热搜等。
2.Redis做分布式锁的时候有什么需要注意的问题?
3.你们公司自己实现的分布式锁是否用setnx命令实现?这个是最合适的吗?你如何考虑分布式锁的可重入问题?
4.如果是redis单点部署,会带来什么问题?
6.Redis集群模式下,比如主从模式,CAP方面有没有什么问题?
7.那你简单介绍一下Redlock吧?你简历上写redisson,你谈一谈?
8.Redis的分布式锁如何续期?看门狗知道吗?
二、Redis分布式锁学习
1. 锁的种类
- 传统的单机版的锁,只作用于一个JVM虚拟机中,如
synchronized
或者Lock
接口 - 分布式环境下存在多个JVM,单机的线程锁机制不再起作用,资源类在不同的服务器之间共享了,此时就需要分布式锁
2. 一个靠谱的分布式锁需要具备的条件和刚需
- 独占性:任何时候有且只有一个线程持有
- 高可用:若在redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况,要在高并发场景下性能依旧很好
- 防死锁:杜绝死锁,必须要有超时控制机制或撤销操作,有个兜底终止跳出方案
- 不乱抢:一个线程不能释放其它线程的锁,只能自己加锁自己解锁
- 重入性:同一个线程获取锁之后,它可以再次获取这个锁,而不需要重复加锁
3. 自定义Redis分布式锁案例
- 场景
多个服务件保证同一时刻同一时间段内只能有一个请求(防止关键业务出现并发攻击)
- 环境搭建
redis:搭建两个redis服务器(可以在本机部署在不同的端口),一个作为分布式锁的容器,一个作为数据容器
SpringBoot工程项目:
首先导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- swagger-ui -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!-- swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.9</version>
</dependency>
配置swagger
@Configuration
@EnableSwagger2
public class SwaggerConfiguration {
@Bean
public Docket docket() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.select()
.apis(RequestHandlerSelectors.basePackage("com.swagger.controller"))
.paths(PathSelectors.any())
.build();
}
// SwaggerUI界面配置
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
.title("SpringBoot接口文档")
.description("接口文档")
.contact(new Contact("Bear","http://www.yuque.com/bearpessimist","1942496795@qq.com"))
.version("1.0")
.build();
}
}
然后配置redis
@Configuration
public class RedisConf {
@Bean
public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory){
RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
redis中添加一条库存数据
然后写controler层代码
@RestController
@Api(tags = "redis分布式锁测试")
public class InventoryController {
@Autowired
private InventoryService inventoryService;
@ApiOperation("扣减库存,一次卖一个")
@GetMapping("/inventory/sale")
public String sale(){
return inventoryService.sale();
}
}
写service层代码
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String port="8080";
private Lock lock=new ReentrantLock();
public String sale() {
String retMessage="";
lock.lock();
try{
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory=inventory001==null? 0:Integer.valueOf(inventory001);
//3. 扣减库存
if(inventory>0){
stringRedisTemplate.opsForValue().set("inventory001",String.valueOf(--inventory));
retMessage="成功卖出一个商品,剩余库存"+inventory;
}else{
retMessage="商品库存不足";
}
}finally {
lock.unlock();
}
return retMessage+"\t"+"服务器端口"+port;
}
}
下面就可以启动项目测试扣减库存的效果了
发送请求扣减库存
库存扣减成功,上面代码就只有一个服务,也就是一个JVM,使用ReetranLock就保证了单机环境下多线程的线程安全问题,下面开始添加分布式锁的功能
下面将上面单机版的案例扩展到分布式环境下,首先将上面创建的代码再copy一份再创建一个module,但注意要配置一下端口(我这里配置的8081)
然后配置nginx实现动态代理和负载均衡(这里没有实现负载均衡),首先修改conf文件
http {
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
#gzip on;
server {
listen 80;
server_name localhost;
location / {
proxy_pass http://mynginx;
#root /usr/share/nginx/html;
#index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
upstream mynginx{
server 127.0.0.1:8080 weight=1;
server 127.0.0.1:8081 weight=1;
}
}
然后启动nginx
./nginx -c /usr/local/nginx/conf/nginx.conf
测试扣减库存成功
至此分布式环境就搭建完毕了
现在考虑高并发场景,我们使用jmeter来进程高并发测试
开始测试查看聚合报告
可以看到所有请求都成功了没有出现异常请求
我们现在看看redis中库存是多少
redis的库存是28不是我们期待的0,说明上面的系统在高并发场景下出现了问题
出现上面的根本问题就是出现了超卖问题。分析问题的根本原因是,在单机环境下,可以使用synchronized或lock来实现。但是在分布式系统中,因为竞争的线程可能不在一个节点上,所以需要一个让所有线程都能访问的锁来实现(比如zookeeper或redis)。基于此分布式锁出现了。
单机版会出现超卖问题,所以我们需要加入分布式锁
SETNX
: 向Redis中添加一个key,只用当key不存在的时候才添加并返回1,存在则不添加返回0。并且这个命令是原子性的。使用SETNX作为分布式锁时,添加成功表示获取到锁,添加失败表示未获取到锁。至于添加的value值无所谓可以是任意值(根据业务需求),只要保证多个线程使用的是同一个key,所以多个线程添加时只会有一个线程添加成功,就只会有一个线程能够获取到锁。而释放锁锁只需要将锁删除即可。
set key value [EX seconds] [PX milliseconds] [NX|XX]
- EX:key在多少秒后过期
- PX:key在多少毫秒后过期
- NX:当key不存在时,才创建key,效果等同于setnx
- XX:当key存在时覆盖key
修改原来代码
public String sale() {
String retMessage = "";
String key = "zzyyredisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue);
//抢不到的线程需要继续重试
if (!aBoolean) {
//没有抢到锁,进行递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
sale();
} else {
try{
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
}finally {
stringRedisTemplate.delete(key);
}
}
return retMessage + "\t" + "服务器端口" + port;
}
最后打开jmeter测试
发现最后库存确实减为了0
上面功能就实现了分布式锁,但是我们分析一下上面代码的缺陷,前面代码中首先如果一个线程没有获取到锁会每隔20s递归重试来获取锁,而递归重试这种方式在高并发场景下会出现问题,因为在高并发场景下,容易出现栈溢出,所以我们需要用自旋替换递归。然后就是多线程环境下未来防止虚假唤醒需要使用while替换if。修改后的代码如下:
public String sale() {
String retMessage = "";
String key = "zzyyredisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//抢不到的线程需要继续重试
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue)) {
//没有抢到锁,进行递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务器端口" + port;
}
继续测试结果还是没有问题
继续分析上面系统,现在是不是还有什么问题,现在一个关键问题就是key的过期时间问题,我们分析上面使用setnx来设置分布式锁的流程,每个线程获取锁就相当于在redis中设置了一个key,释放锁时就删除这个key。假如获取锁的线程宕机了,且该线程没有释放key,而且key没有设置过期时间,所以这个key永远不会释放,那么后面的线程永远都不会获得锁,那么我们的所有线程都会阻塞在这里。
上图A线程挂了
所以为了解决这个问题,我们需要在设置锁的时候需要给key设置一个过期时间。
public String sale() {
String retMessage = "";
String key = "zzyyredisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//抢不到的线程需要继续重试
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) {
//没有抢到锁,进行递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
stringRedisTemplate.delete(key);
}
return retMessage + "\t" + "服务器端口" + port;
}
}
注意加锁和设置过期时间必须是一个原子操作
再次测试
我们接着分析,由于每个线程的执行时间是不确定的,如果某个线程A获得了锁设置过期时间为30s,但是如果A线程的工作时间是35s,所以A现场还没工作完锁就被释放了,此时别的线程B可以获取锁,5S后A来删除锁,但此时删除的锁是B的锁,这样就出现了误删除B的Key的问题。这个问题就是著名的key续租问题。
我们继续修改,即只能自己删除自己的锁,不能删除别人的锁。继续修改代码:
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String port = "8080";
public String sale() {
String retMessage = "";
String key = "zzyyredisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//抢不到的线程需要继续重试
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) {
//没有抢到锁,进行递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
if(stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue))
{
stringRedisTemplate.delete(key);
}
}
return retMessage + "\t" + "服务器端口" + port;
}
}
继续思考上面代码有什么问题,虽然上面我们解决了误删除key的问题,但是下面更新的代码还是有问题,即redis获取和删除的操作不是原子的,这样在高并发场景下会发生一些意想不到的问题。
if(stringRedisTemplate.opsForValue().get(key).equalsIgnoreCase(uuidValue))
{
stringRedisTemplate.delete(key);
}
}
所以如何将多条redis操作结合成一个原子操作,答案是lua脚本(介绍在后面)。首先编写lua脚本文件
然后添加lua配置类
@Configuration
public class RedisScriptConfig {
@Bean
public DefaultRedisScript<String> defaultRedisScript() {
DefaultRedisScript<String> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("classpath:redis.lua")));
defaultRedisScript.setResultType(String.class);
return defaultRedisScript;
}
}
最后更改service方法
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedisScript<String> redisScript;
private String port = "8080";
public String sale() {
String retMessage = "";
String key = "zzyyredisLock";
String uuidValue = IdUtil.simpleUUID() + ":" + Thread.currentThread().getId();
//抢不到的线程需要继续重试
while (!stringRedisTemplate.opsForValue().setIfAbsent(key, uuidValue,30L,TimeUnit.SECONDS)) {
//没有抢到锁,进行递归重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
stringRedisTemplate.execute(redisScript, Arrays.asList(key), uuidValue);
}
return retMessage + "\t" + "服务器端口" + port;
}
}
测试成功
截止到目前版本,我们已经实现了while判断并自旋重试获取锁,以及setnx含自然过期时间以及lua脚本删除锁命令。那么现在代码还有什么问题,一个关键问题就是锁的重入性问题
。
可重入锁又称为递归锁,是指一个线程在外层获取锁的时候,再进入该方法的内层会自动获取锁(前提锁是同一个对象),不会因为之前获取过还没有释放而阻塞。redis实现锁的可重入可以参考JUC里面的AQS原理,关键是如何实现
重入锁的计数问题
。
由于setnx本身的特性,它天生就不支持可重入,所以这里锁的数据结构就不能用SETNX了,那么redis中有没有哪种数据结构能够满足我们的需求,答案是HSET
。(Map<key, Map<ID,time>>)
。在我们对Hset进行维护的时候每一次操作都需要保证原子性,所以我们可以将基于HSET的加锁和解锁过程封装为不同的Lua脚本。流程如下:
- 先判断有没有分布式锁(EXISTS key)
- 判断锁是不是自己的,是自己的重入,不是自己的阻塞
加锁Lua脚本如下:
if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then
redis.call('hincrby',KEYS[1],ARGV[1],1)
redis.call('expire',KEYS[1],ARGV[2])
return 1
else
return 0
end
解锁Lua脚本如下:
if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then
return nil
elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then
return redis.call('del', KEYS[1])
else
return 0
end
实现自己的分布式锁:
package com.jack.redis_test1.util;
import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class MyRedisLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuidValue;
private long expiretime;
public MyRedisLock(StringRedisTemplate stringRedisTemplate, String lockName) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuidValue= IdUtil.simpleUUID()+":"+Thread.currentThread().getId();
this.expiretime=800L;
}
@Override
public void lock() {
tryLock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
try {
tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time == -1L) {
String script=
"if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then " +
"redis.call('hincrby',KEYS[1],ARGV[1],1) " +
"redis.call('expire',KEYS[1],ARGV[2]) " +
"return 1 " +
"else " +
"return 0 " +
"end ";
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuidValue, String.valueOf(expiretime))) {
//没有抢到锁,进行重试
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return true;
}
return false;
}
@Override
public void unlock() {
String script="if redis.call('hexists', KEYS[1], ARGV[1]) == 0 then " +
"return nil " +
"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 then " +
"return redis.call('del', KEYS[1]) " +
"else " +
"return 0 " +
"end";
Long flag = stringRedisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(lockName), uuidValue);
if (flag == null) {
throw new RuntimeException("锁不存在");
}
}
@Override
public Condition newCondition() {
return null;
}
}
实现锁工厂
使用工厂模式,按需求生产锁
@Component
public class DistributedLockFactory {
@Autowired
StringRedisTemplate stringRedisTemplate;
private String lockName;
public Lock getDistributedLock(String lockType) {
if (lockType == null) return null;
if (lockType.equalsIgnoreCase("REDIS")) {
this.lockName = "zzyyredisLock";
return new MyRedisLock(stringRedisTemplate, lockName);
} else if (lockType.equalsIgnoreCase("ZOOKEEPER")){
this.lockName = "zzyZOOKEEPERLock";
//TODO Zookeeper
}
return null;
}
}
重写service:
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String port = "8080";
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale() {
String retMessage = "";
Lock myRedisLock=distributedLockFactory.getDistributedLock("redis");
myRedisLock.lock();
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
myRedisLock.unlock();
}
return retMessage + "\t" + "服务器端口" + port;
}
}
进行测试:
测试成功
上面代码还只是实现了Hset来做redis分布式锁,现在还没有实现锁重入的要求,我们接着改造代码。
前面代码不能重入的原因是我们的uuidValue是在锁对象中实现的,一个线程第二次获取锁的时候还是会通过工厂获取,由于每次 IdUtil.simpleUUID()被调用值都不同,所以线程重入后发现uuidValue和redis数据库汇中的锁的uuidValue不同,导致不能重入,所以我们的需求是实现一个线程终身和一个uuid绑定。
package com.jack.redis_test1.service;
import cn.hutool.core.util.IdUtil;
import com.jack.redis_test1.util.MyRedisLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.locks.Lock;
@Component
public class DistributedLockFactory {
@Autowired
StringRedisTemplate stringRedisTemplate;
private String lockName;
private String UUID;
public DistributedLockFactory(){
this.UUID= IdUtil.simpleUUID();
}
public Lock getDistributedLock(String lockType) {
if (lockType == null) return null;
if (lockType.equalsIgnoreCase("REDIS")) {
this.lockName = "zzyyredisLock";
return new MyRedisLock(stringRedisTemplate, lockName,UUID);
} else if (lockType.equalsIgnoreCase("ZOOKEEPER")){
this.lockName = "zzyZOOKEEPERLock";
//TODO Zookeeper
}
return null;
}
}
现在就剩最后一个需求了,如何解决锁的续租问题。
如果我们的业务时间超过了锁的TTL,锁过期了就会被redis删除,此时别的业务就可以抢占锁,等前面业务执行完,就会删除后面业务的锁,造成误删
CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。
一致性(C): 在分布式系统中的所有数据备份,在同一时刻是否同样的值,即写操作之后的读操作,必须返回该值。(分为弱一致性、强一致性和最终一致性)
可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
分区容错性(P): 以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
Redis是AP,Zookeeper是CP,Eurka和Nacos是AP
redis异步复制造成锁的丢失,比如主节点没来的及把刚刚set进来的这条数据给从节点,master就挂了,从机上位根本没有这条数据
自动续期的lua脚本
if redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1 then
return redis.call('expire',KEYS[1],ARGV[2]);
else
return 0
end
更改service
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String port = "8080";
@Autowired
private DistributedLockFactory distributedLockFactory;
public String sale() {
String retMessage = "";
Lock myRedisLock=distributedLockFactory.getDistributedLock("redis");
myRedisLock.lock();
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
myRedisLock.unlock();
}
return retMessage + "\t" + "服务器端口" + port;
}
}
4. lua脚本语言
Lua是一种轻量小巧的脚本语言,用标准的C语言编写并以源代码形式开发,其设计目的是为了嵌入到应用程序中,从而为应用程序提供灵活的扩展和定制能力。其特点如下:
- 轻量级:它用标准的c语言编写并以源代码形式开发,编译后仅仅100余k,可以很方便嵌入别的程序中
- 可扩展:lua提供了非常易于使用和扩展的借口和机制,由宿主语言C或C++提供这些功能,Lua可以使用他们,就像本来就内置的功能一样
Redis调用Lua脚本通过eval命令保证代码执行的原子性,直接用return返回脚本执行后的结果。
eval luascripts numkeys [key[key...]][arg[arg...]]
lua脚本的hello word入门
现在我们的需求是将下面三个命令封装到lua脚本中,让它们变成一次原子操作
set k1 v1
expire k1 30
get k1
eval "redis.call('set','k1','v1') redis.call('expire','k1','30') return redis.call('get','k1') " 0
后面的0表示参数的个数
上面代码就实现了我们的需求,假如我们现在需要动态传参该怎么办,假如我们使用lua脚本实现下面命令
eval "return redis.call('mset',KEYS[1],ARGV[1],KEYS[2],ARGV[2])" 2 k1 k2 v1 v2
lua的数组下标从1开始
然后我们再加入if-else判断,需求是先检查key是否存在,存在就删除
eval "if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end" 1 k1 v1
LUA条件判断语法
if(布尔条件) then
elseif(布尔条件) then
elseif(布尔条件) then
else
业务代码
end
三、Redlock底层Redisson源码分析
1. 简介
上面我们已经一步一步实现了自己的redis分布式锁,经过版本的迭代已经基本上完善了所有的功能。思考一个问题,如果我们的并发量变成海量上面的分布式锁是不是还会有上面问题?我们先来学习一下redis自带的分布式锁。
2. Redis分布式锁-Redlock红锁提出背景
Redlock是redis分布式锁的一个实现,他实现了我们认为比普通单例方法更安全的DLM(分布式锁管理器)。红锁(RedLock)是一种分布式锁算法,由 Redis 的作者 Salvatore Sanfilippo(也称为 Antirez)设计,用于在分布式系统中实现可靠的锁机制。它的设计解决了单一 Redis 实例作为分布式锁可能出现的单点故障问题。
之前我们手写的redis锁没解决的关键问题是,假如我们的redis服务器宕机了,我们的redis锁就会失效(这就是Redis的单点故障问题)。现在考虑如果我们给redis服务器添加一个副本,那么master宕机了我们的副本能不能解决单点故障问题?答案是不能的,前面说到redis是AP,它采用的同步方式是异步同步方式,在master将数据同步到副本时,redis就可能宕机了,此时副本中就没有锁的信息,此时就可能出现一锁被多用的风险。
3. RedLock算法设计理念
Redis之父为了解决上面的问题,就创建了Redlock算法,该算法基于多个实例的分布式锁。锁的变量由多个实例维护,即使有实例发生了故障,锁变量仍然存在,客户端还是能完成锁的操作。
在算法的分布式版本当中,我们假设有N个redis主节点。这些节点是完全独立的,所以我们不使用复制或任何其它隐式协调系统,我们已经描述了如何在单个实例中安全地获取和释放锁。我们想当然地认为算法会使用在单个实例中获取和释放锁,在我们的示例中,我们设置N为5,这是一个合理的值,亿你层我们需要在不同的计算机或虚拟机上运行5个Redis master,以确保它们以几乎独立的方式发生故障。为了获取锁,客户端执行下面操作:
实际就是一个不行来多个,但多个redis之间的关系不是主从关系,它们是独立的
4. Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用 者能够将精力更集中地放在处理业务逻辑上。
简而言之,Redisson是java的redis客户端之一,提供了一些api方便操作redis
5. 案例改造
基于redisson对上面我们手写的redis案例进行改进:
- 为了满足redlock的原理,准备三台redis服务器
- 导入redisson的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.1</version>
</dependency>
- 添加redisson的配置
@Configuration
public class RedisConf {
@Bean
public RedisTemplate<String,Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory){
RedisTemplate<String,Object> redisTemplate=new RedisTemplate<>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.setHashValueSerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public Redisson redisson(){
Config config=new Config();
config.useSingleServer().setAddress("redis://192.168.31.41:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
- 改进service
@Service
public class InventoryService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String port = "8081";
@Autowired
private Redisson redisson;
public String sale() {
String retMessage = "";
RLock redissonLcok = redisson.getLock("redislock");
redissonLcok.lock();
try {
//抢到了锁
//1. 查询库存信息
String inventory001 = stringRedisTemplate.opsForValue().get("inventory001");
//2. 判断库存是否足够
Integer inventory = inventory001 == null ? 0 : Integer.valueOf(inventory001);
//3. 扣减库存
if (inventory > 0) {
stringRedisTemplate.opsForValue().set("inventory001", String.valueOf(--inventory));
retMessage = "成功卖出一个商品,剩余库存" + inventory;
System.out.println(retMessage + "\t" + "服务器端口" + port);
} else {
retMessage = "商品库存不足";
}
} finally {
redissonLcok.unlock();
}
return retMessage + "\t" + "服务器端口" + port;
}
}
到此项目改进完成,下面开始测试。
上面单redis案例就通过了
但是如果在高并发场景下,可能存在下面的bug
解决上面问题只需要在解锁时加下面代码就行了。
finally {
if(redissonLcok.isLocked() && redissonLcok.isHeldByCurrentThread()) {
redissonLcok.unlock();
}
}
前面介绍到实现一个锁逃不掉要实现加锁、可重入、续命和解锁这四个步骤,我们看看redisson解决这些问题的思想。
- redis分布式锁过期了,但是业务逻辑还没有执行完怎么办?
我的解决方法是,额外起一个线程,定期检查线程是否还持有锁,如果有则延迟过期时间,redisson里面就实现了这个方案,使用“看门狗”定期检查(每1/3的锁时间检查1次),如果线程还持有锁,则刷新过期时间。
下面分析源码:
redissonLcok.lock();
上面是redisson的加锁方法,点进去它其实就是JUC的lock,所以redisson也实现了JUC的lock规范
RLock就是Redisson的顶层Lock
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {}
public class RedissonLock extends RedissonBaseLock {
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
}
RedissonLock就是Redisson的锁逻辑的实现地方
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
public long getLockWatchdogTimeout() {
return this.lockWatchdogTimeout;
}
this.lockWatchdogTimeout = 30000L;
上面代码中这句代码就是内部锁的释放时间,可以看见是30s
这里我们就得出了第一个结论:redisson给我们新建的锁key默认过期时间是30s
- Redisson底层加锁流程
我们进入lock方法
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
//首先获得线程的ID
long threadId = Thread.currentThread().getId();
//尝试去加锁
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
CompletableFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {
entry = (RedissonLockEntry)this.commandExecutor.getInterrupted(future);
} else {
entry = (RedissonLockEntry)this.commandExecutor.get(future);
}
try {
while(true) {
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var14) {
if (interruptibly) {
throw var14;
}
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
entry.getLatch().acquire();
} else {
entry.getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(entry, threadId);
}
}
}
上面方法首先获得了线程ID,然后调用this.tryAcquire来尝试获取锁,我们进入该方法
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return (Long)this.get(this.tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
上面就是Redisson加锁的核心关键代码
我们接下来进入tryLockInnerAsync
看看内部是如何加锁的:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command, "if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
}
可以发现redisson底层也是使用lua脚本加锁的,和我们自定义的redis分布式锁的原理一样。下面我们看看redisson是如何完成续租的。
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
try {
this.renewExpiration();
} finally {
if (Thread.currentThread().isInterrupted()) {
this.cancelExpirationRenewal(threadId);
}
}
}
}
private void renewExpiration() {
ExpirationEntry ee = (ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = (ExpirationEntry)RedissonBaseLock.EXPIRATION_RENEWAL_MAP.get(RedissonBaseLock.this.getEntryName());
if (ent != null) {
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
CompletionStage<Boolean> future = RedissonBaseLock.this.renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
if (e != null) {
RedissonBaseLock.log.error("Can't update lock {} expiration", RedissonBaseLock.this.getRawName(), e);
RedissonBaseLock.EXPIRATION_RENEWAL_MAP.remove(RedissonBaseLock.this.getEntryName());
} else {
if (res) {
RedissonBaseLock.this.renewExpiration();
} else {
RedissonBaseLock.this.cancelExpirationRenewal((Long)null);
}
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
}
上面加锁和续命的源码已经看完了,下面分析解锁的源码,其实也很简单。
- 继续改造案例
上面我们改造的案例只使用了一个redis服务器,现在还没有解决单点故障问题,下面改为多机案例。首先回顾一下redlock算法
目前Redlock这个对象已经在多机环境中被弃用了,取而代之的是Multilock对象。
首先在当前目录下创建一个新的module,改配置文件
server:
port: 8083
spring:
application:
name: redlock
swagger2:
enabled: true
redis:
database: 0
password:
timeout: 3000
mode: single
pool:
conn-timeout: 3000
so-timeout: 3000
size: 10
single:
address1: 192.168.31.41:6279
address2: 192.168.31.41:6280
address3: 192.168.31.41:6281
写配置类:
@Data
public class RedisSingleProperties {
private String address1;
private String address2;
private String address3;
}
@ConfigurationProperties(prefix = "spring.redis",ignoreUnknownFields = false)
@Data
public class RedisProperties {
private int timeout=3000;
private String password;
private String mode;
//池配置
private RedisPoolProperties pool;
//单机信息配置
private RedisSingleProperties single;
}
@Data
public class RedisPoolProperties {
private int maxIdle;
private int minIdle;
private int maxActive;
private int maxWait;
private int connTimeout=10000;
private int soTimeout;
//池大小
private int size;
}
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class CacheConfiguration {
@Autowired
RedisProperties redisProperties;
@Bean("redisClient1")
RedissonClient redisClient1(){
Config config=new Config();
String node=redisProperties.getSingle().getAddress1();
node=node.startsWith("redis://")? node: "redis://"+node;
SingleServerConfig saverConfig=config.useSingleServer().setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if(StringUtils.isNotBlank(redisProperties.getPassword())){
saverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean("redisClient2")
RedissonClient redisClient2(){
Config config=new Config();
String node=redisProperties.getSingle().getAddress2();
node=node.startsWith("redis://")? node: "redis://"+node;
SingleServerConfig saverConfig=config.useSingleServer().setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if(StringUtils.isNotBlank(redisProperties.getPassword())){
saverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
@Bean("redisClient3")
RedissonClient redisClient3(){
Config config=new Config();
String node=redisProperties.getSingle().getAddress3();
node=node.startsWith("redis://")? node: "redis://"+node;
SingleServerConfig saverConfig=config.useSingleServer().setAddress(node)
.setTimeout(redisProperties.getPool().getConnTimeout())
.setConnectionPoolSize(redisProperties.getPool().getSize())
.setConnectionMinimumIdleSize(redisProperties.getPool().getMinIdle());
if(StringUtils.isNotBlank(redisProperties.getPassword())){
saverConfig.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
}
写controller
@RestController
@Api(tags = "redis分布式锁测试")
@Slf4j
public class InventoryController {
public static final String CACHE_KEY_REDLOCK = "JACKCHAI_REDLOCK";
@Resource
RedissonClient redissonClient1;
@Resource
RedissonClient redissonClient2;
@Resource
RedissonClient redissonClient3;
@ApiOperation("扣减库存,一次卖一个")
@GetMapping("/inventory/sale")
public String sale() {
String taskThreadID=Thread.currentThread().getName();
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1, lock2, lock3);
redissonMultiLock.lock();
try{
log.info("come in biz multilock:{}",taskThreadID);
try{Thread.sleep(30000);}catch (InterruptedException e) {e.printStackTrace();};
log.info("task is over multiLock:{}",taskThreadID);
} catch (Exception e){
e.printStackTrace();
log.error("multilock exception:{}",e.getCause()+"\t"+e.getMessage());
} finally{
redissonMultiLock.unlock();
log.info("释放分布式锁成功:{}",CACHE_KEY_REDLOCK);
}
return "multilock task is over"+taskThreadID;
}
}