文章目录
什么是分布式锁
分布式锁,即在分布式系统或集群模式中使用的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。
分布式锁的几种实现方案
- 基于数据库实现(已淘汰)
- 基于Zookeeper
- 基于Redis setnx实现
- Redis框架 Redisson、RedisLock
基于Redis setnx实现分布式锁
Redis 中的 setnx 命令简单介绍
在 Redis 中,SETNX 是一个用于设置键-值对的命令,仅在键不存在时才设置该键。SETNX 是 “Set if Not Exists”(如果不存在则设置)的缩写。
命令语法如下:
setnx key value
- 其中 key 是要设置的键名,value 是要设置的值。
- 如果键 key 不存在,则将键 key 的值设置为 value,并返回 1 表示设置成功。如果键 key 已经存在,则不进行任何操作,返回 0 表示设置失败。
实现代码
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class RedisServiceImpl implements RedisService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String redisLockKey = "myLock";
/**
* 重试时间
*/
private long timeout = 5000;
/**
* 缓存redis锁
*/
private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
/**
* 获取锁
* @return
*/
@Override
public boolean tryLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo info = lockCacheMap.get(currentThread);
if (info != null && info.isStatus()) {
log.info("当前线程已经获取到锁");
return true;
}
String uuid = UUID.randomUUID().toString();
long lockExpire = 3000L;
long startTime = System.currentTimeMillis();
while (true) {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, uuid, lockExpire, TimeUnit.SECONDS);
if (lock) {
log.info("锁获取成功");
RedisLockInfo lockInfo = new RedisLockInfo(currentThread, lockExpire,uuid);
lockCacheMap.put(currentThread, lockInfo);
new Thread(new MyLifeExtensionThread(lockInfo, 10000)).start();
return true;
}
long endTime = System.currentTimeMillis();
if (endTime - startTime > timeout) {
log.info("锁获取失败");
return false;
}
try {
//防止CPU飙高
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 释放锁
* @return
*/
public boolean releaseLock() {
RedisLockInfo info = lockCacheMap.get(Thread.currentThread());
if(info == null){
return false;
}
String lockId = stringRedisTemplate.opsForValue().get(redisLockKey);
if(StringUtils.isBlank(lockId)){
log.info("key已过期");
return false;
}
//比较获取锁时生成的锁id(uuid)是否相等判断是自己创建的锁,是否可以删除key
if(!lockId.equals(info.getLockId())){
log.info("当前线程不能删除该key");
return false;
}
return stringRedisTemplate.delete(redisLockKey);
// 以上还有缺陷,查询锁和删除锁的逻辑不是原子性的,所以要将查询锁和删除锁这两步作为原子指令操作
/*
//script:Redis专属脚本
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
stringRedisTemplate.execute(
new DefaultRedisScript<Long>(script, Long.class),
Arrays.asList("lock"), info.getLockId());*/
}
/**
* 模拟看门狗机制
*/
class MyLifeExtensionThread implements Runnable {
private RedisLockInfo info;
/**
* 每十秒续命3次,每次重新刷新超时时间
*/
private long sleep;
public MyLifeExtensionThread(RedisLockInfo redisLockInfo, long sleep) {
this.info = redisLockInfo;
this.sleep = sleep;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread thread = info.getCurrentThread();
//是否需要续命
if(info!=null){
if(info.isStatus() && thread.isInterrupted()){
log.info("当前线程获取到锁之后,已经执行完毕...不需要再次续命");
}
}
info.getLifeExtensionCount().incrementAndGet();
if (info.getLifeExtensionCount().get() > 3) {
log.info("重复续命已超过3次");
// 回滚当前事务
// 停止线程
thread.interrupt();
// 主动释放该锁
stringRedisTemplate.delete(redisLockKey);
return;
}
stringRedisTemplate.expire(redisLockKey, info.getLockExpire(), TimeUnit.SECONDS);
}
}
}
}
注意事项
- 在使用setnx命令上锁时需要添加过期时间,避免出现业务代码出现异常或者服务器宕机,没有执行删除锁的逻辑,就造成了死锁。
- 注意获取锁的时候,也需要设置锁的过期时间,这是一个原子操作,要么都成功执行,要么都不执行。避免如果在这两步之间发生了异常,则锁的过期时间根本就没有设置成功,锁永远不能过期。
- 设置锁的过期时间时,还需要设置唯一编号。避免在主动删除锁的时候,误删其他线程设置的锁。
Redisson实现分布式锁
Redisson框架基本介绍
- Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。
- 使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。
- 同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
Spring Boot整合 Redisson
Maven依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.1</version>
</dependency>
配置类
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* 配置RedissonConfig配置
*/
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
//设置看门狗时间 续命线程定时间隔时间
// config.setLockWatchdogTimeout(60000);
//设置单机版本redis
config.useSingleServer().setAddress("redis://" + host + ":" + port);
//设置集群的方式
// config.useClusterServers().addNodeAddress("redis://" + host + ":" + port);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
public void redissonTest(Long id) {
RLock lock = null;
try {
lock = redissonClient.getLock("id=" + id);
lock.lock();
// 执行锁的业务逻辑
} catch (Exception e) {
} finally {
if (lock != null) {
lock.unlock();
log.info(">>释放锁<<");
}
}
}
lock()源码解析
//RedissonLock
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 返回空表示拿到锁,返回不为空表示key什么时候过期
if (ttl == null) {
return;
}
...
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//通过lua脚本写入key
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//写入key成功开始做续命,默认是无限续命
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//== 0表示key不存在
"if (redis.call('exists', KEYS[1]) == 0) then " +
//如果key不存在使用lua代码创建一个hashset
//call(创建一个hashset,getName(),getLockName(threadId),重入锁为1)
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
//在设置key的过期时间
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果是相同的线程调用lock方法 需要将重入次数+1
//如果当前key已存在[相同的线程又调用了一次lock()]
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//重入次数+1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//另外的jvm获取锁的时候会失败,需要知道该key在什么时候过期。
"return redis.call('pttl', KEYS[1]);",
//getName() 表示key KEYS[1]
//internalLockLeaseTime 参数1/ARGV[1]
//getLockName(threadId) 参数2/ARGV[2] 返回一个uuid:线程id的字符串
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
unlock()源码解析
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//判断当前线程获取锁是否存在,如果不存在返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//将该value(重入锁次数)-1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果该value值>0 还原过期key的时间(续命)
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//如果该value值<=0就会将该key删除,释放锁。
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}