性能优化&问题修复
布隆过滤器比分布式锁性能高多少倍?
验证:先注释掉sentinel(限流的),然后创建分布式锁的方法(代替布隆过滤器)。
因为分布式锁是串行的,而布隆过滤器可以做到并行。通过我在本地进行两种方式的压测,大概评估布隆过滤器是分布式锁的 6 倍性能。理论上说,当并发越高,这个性能差距就越明显。其次,通过分布式锁查询的是 MySQL 数据库,这里还要算上数据库的性能和缓存的差距。
而且,因为我们访问短链接跳转原始链接接口处理缓存穿透场景,需要使用布隆过滤器完成。所以在这里直接使用是刚好的。
1. 面试题:为什么分布式锁要锁住创建短链接这一整个过程,查询数据库有没有这个短链接的的时候一起去查询不是可以的吗?
高并发场景下:因为创建短链接后还需要去查询数据库里有没有这个短链接,没有的话才可以加入数据库,这个过程不是原子的,中间如果多个线程因为冲突创建了一样的短链接,都判断数据库中没有这个fullshorturl,其中一个插入成功之后,别的线程再插入,就会报唯一索引异常,而需要重新走生成短链接,查询数据库这个流程,这样在大量并发情况下,冲突可能比较多,很多线程都要重复重新生成短链、查询数据库有没有的步骤,消耗性能,不如锁住这个流程,让生成的短链是唯一的,反而能保证短链尽快生成成功,不用反复生成并判断。
并发不高,通过数据库直接抗压力就好的场景:一旦并发创建短链接的请求过高,那么数据库的压力就会很大。 量化下这个数据库的访问指标,绝大部分的本地安装 MySQL 数据库 TPS(每秒事务) 都不会很高,因为磁盘、CPU 这些决定了一些性能(大家可以自己买按量计费的服务测试)。我测试过腾讯云的 4C8G(忘了 8 还是 16 了)云数据库,如果我没记错的话,TPS 应该能到 5000 左右。也就是说,如果靠数据库抗,每秒能承受这些压力。这还只是算创建接口,如果有用户查询统计等场景查询数据库,那么又得分配出去一些资源,整体来说,性能会进一步下降。 针对数据库这个点,想要支撑更高的压力,只能说分库处理,通过多个库水平拆分,假设分为 4 个,那么并发能力就是 5000 * 4。但是这个架构方案整体来说就比较复杂了。 所以说,我们应该和面试官去分析场景以及利弊。然后,针对于分布式锁这个问题,要说的就是在并发场景下,我们为了防止大量查询和新增冲突以及保护数据库资源被频繁访问。试想,这种简单场景缓存就能把压力扛住,为什么要本末倒置通过数据库层面解决问题,对吧。
2. 使用布隆过滤器方案和使用分布式锁方案进行比较是否公平。在分布式锁实现的功能是数据库互斥访问(并没有防止缓存击穿,因为创建流程没有缓存),那么改成用布隆过滤器只是过滤掉已经存在于数据库中的“不合法”创建请求,如果此时有不同用户创建了大量不重复的短链接,这时候插入数据库中,相当于布隆没有起到任何保护作用,所有请求都会打到数据库上。用一个对数据库有保护的方案和对无数据只有在极少数情况下(短链接生成重复)才有保护的方案进行对比是否公平呢,即使快了6倍。
如果说没有布隆过滤器,查询短链接是否存在(可能存在 Hash 冲突还会多次查询)以及插入等操作都会直接操作数据库。这种情况,对数据库一般会有较大压力,分布式锁可以有效缓解,通过限制并行保护数据库。 如果我们用了布隆过滤器,那么至少可以减少一至多次的查询数据库操作。但是依然会有大量新增请求会打到数据库,那么基于这种行为咱们可以使用 Sentinel 进行限流。 而且通过提升性能的角度上来看,查询数据库是否存在和查询缓存是否存在还是有性能差距的,而且咱们后面跳转方法缓存穿透也会用到,所以一步到位。 那能不能直接使用限流来保护数据库,不使用分布式锁?只要能避免数据库压力过大,分布式锁和 Sentinel 都是方案。简历示例上完全可以换一句话,比如:通过布隆过滤器完成判断短链接是否已存在,性能远胜查询数据库短链接表方案。
注册用户异常信息返回错误
一开始抛异常的原因是,未获取锁,然后执行unlock就会发现当前不是自己的锁,所以就会抛出全局异常。
修改后(先判断是否获取了锁,一般来说,获取了锁,后面的注册都没有问题)
重构读写锁&延迟队列功能
tomcat最大线程数是200,如果是201个用户,那么第201个用户就需要在阻塞队列中等1s,所以实际处理时间是2s(等1s和自己处理1s)。
如果写锁被获取,300ms,本身读锁处理只需要5ms,因为写锁被获取后,读锁就被阻塞了,如果这时候有多个获取读锁的请求,就会出现oom。对于该种情况可以使用延迟队列来解决,如果发现写锁已经获取了,读锁就不等,直接把写锁放进延迟队列中,让他后面再写入数据库。从而就不会有300ms的阻塞。
后面使用消息队列,消息队列它的消费组逻辑不会再有这种海量并发的消费逻辑,因为消息队列的消费逻辑是匀速的。
消息队列中,消费完一个才会消费下一个。所以就不会再有oom的风险,从而既可以不再需要延迟队列。
简单总结一下 1. 加入消息队列的原因: 因为tomcat最多支持200个线程并发,所以当海量请求到来时,需要进行削峰,否则会有OOM的风险,具体做法就是将保存短链接访问记录的操作异步化,当前线程只负责帮用户跳转链接并发送短链接访问记录到消息队列中。
2. 加入延迟队列以及删除延迟队列的原因: 当用户修改短链接的gid时,为了保证数据的一致性,需要加读写锁,即在短链接gid修改期间加写锁,短链接访问记录保存的时候加读锁。 现在假设一种情况,当一个线程a从消息队列中拉取短链接访问记录准备保存到数据库中时,如果此时该短链接的gid正在被修改,因为修改短链接的线程拿到了写锁,所以此时线程a可能会因为拿不到读锁被阻塞,使用延迟队列是正是为了防止线程a被阻塞。 但是这里并没有必要使用延迟队列,为什么?因为这里不存在占用过多线程从而导致OOM的问题,消费者组只会安排固定的几个线程去拉取消息,如果碰到上面那种拿不到锁的情况,阻塞等待就好。
3. 为什么修改短链接也不用tryLock和延迟队列了? 原因和上面一样,抢读锁的就是消费者组中的那几个固定线程,readLock.lock() 获取读锁的时候,其实会检查锁同步队列的队头是否是一个写线程在等待,如果是的话,那么读线程会直接进入锁同步队列等待,因此这里写线程其实是可以避免被读线程不断插队的。 但前提是你获取读锁的时候用的是 readLock.lock(),如果你用的是 readLock.tryLock(),那么读线程将不会检查锁同步队列的队头,而是直接加读锁,导致写线程被不断插队。这也就意味着修改短链接的线程总会有机会拿到写锁,只需要阻塞等待其他读锁释放即可。
对 RReadWriteLock 做了一个测试,就是虽然 RReadWriteLock 不支持公平锁,但是每次读线程 readLock.lock() 获取读锁的时候,其实会检查锁同步队列的队头是否是一个写线程在等待,如果是的话,那么读线程会直接进入锁同步队列等待,因此这里写线程其实是可以避免被读线程不断插队的。 但前提是你获取读锁的时候用的是 readLock.lock(),如果你用的是 readLock.tryLock(),那么读线程将不会检查锁同步队列的队头,而是直接加读锁,导致写线程被不断插队。 这个和 JDK 的 ReentranReadWriteLock 是同一个道理,区别在于 RReadWriteLock 只实现了非公平锁,但是在这个锁并发度不高的场景下已经足够避免写线程长时间饥饿了。
lock()
方法:- 是一个阻塞式的方法。当线程调用
lock()
方法时,如果锁已经被其他线程持有,那么当前线程就会进入等待状态,直到锁被释放并且当前线程获得锁为止。在等待过程中,线程会一直处于阻塞状态,不做其他事情。 - 类似于synchronized关键字的行为,但Lock接口提供了更灵活的锁操作,如尝试非阻塞地获取锁、可中断地获取锁以及超时获取锁等。
- 是一个阻塞式的方法。当线程调用
tryLock()
方法:- 是一个非阻塞式的方法。当线程调用
tryLock()
方法时,如果锁已经被其他线程持有,那么这个方法会立即返回false
,不会让线程进入等待状态。如果锁没有被其他线程持有,那么当前线程就会立即获得锁并返回true
。 - 这种方法允许线程在无法立即获取锁时,能够继续执行其他任务,从而避免了长时间的等待和可能的死锁问题。
- 是一个非阻塞式的方法。当线程调用
lock()
方法:
没有返回值。线程调用此方法后,要么成功获得锁并继续执行后续代码,要么进入等待状态直到获得锁。
tryLock()
方法:
返回一个布尔值。如果调用线程成功获得了锁,则返回true
;如果锁已被其他线程持有,则返回false
。
修复幂等&Redis-Stream消息队列线上消费停止问题
少一个throw;redisson版本问题。
Redis-Stream消息队列重构为RocketMQ
海量访问短链接,直接访问数据库,会导致数据库负载变高,甚至数据库宕机。为此,需要引入消息队列削峰。
从零到一学习RocketMQ | 拿个offer - 开源&项目实战
引入 pom.xml 组件库
<rocketmq-spring-boot-starter.version>2.2.3</rocketmq-spring-boot-starter.version>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter.version}</version>
</dependency>
消息队列配置
复制到 shortlink-project
和 shortlink-aggregation
配置文件。
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: short-link_project-service_stats-save_pg
topic: short-link_project-service_topic
send-message-timeout: 2000
retry-times-when-send-failed: 1
retry-times-when-send-async-failed: 1
consumer:
group: short-link_project-service_stats-save_cg
SpringBoot3 适配低版本 RocketMQ 组件库
创建 META-INF 以及 下属文件夹 spring,在该目录下创建 org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件,内容如下:
# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
消息发送者
package com.nageoffer.shortlink.project.mq.producer;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
/**
* 短链接监控状态保存消息队列生产者
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ShortLinkStatsSaveProducer {
private final RocketMQTemplate rocketMQTemplate;
@Value("${rocketmq.producer.topic}")
private String statsSaveTopic;
/**
* 发送延迟消费短链接统计
*/
public void send(Map<String, String> producerMap) {
String keys = UUID.randomUUID().toString();
producerMap.put("keys", keys);
Message<Map<String, String>> build = MessageBuilder
.withPayload(producerMap)
.setHeader(MessageConst.PROPERTY_KEYS, keys)
.build();
SendResult sendResult;
try {
sendResult = rocketMQTemplate.syncSend(statsSaveTopic, build, 2000L);
log.info("[消息访问统计监控] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
} catch (Throwable ex) {
log.error("[消息访问统计监控] 消息发送失败,消息体:{}", JSON.toJSONString(producerMap), ex);
// 自定义行为...
}
}
}
消息消费者
package com.nageoffer.shortlink.project.mq.consumer;
/**
* 短链接监控状态保存消息队列消费者
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = "${rocketmq.producer.topic}",
consumerGroup = "${rocketmq.consumer.group}"
)
public class ShortLinkStatsSaveConsumer implements RocketMQListener<Map<String, String>> {
private final ShortLinkMapper shortLinkMapper;
private final ShortLinkGotoMapper shortLinkGotoMapper;
private final RedissonClient redissonClient;
private final LinkAccessStatsMapper linkAccessStatsMapper;
private final LinkLocaleStatsMapper linkLocaleStatsMapper;
private final LinkOsStatsMapper linkOsStatsMapper;
private final LinkBrowserStatsMapper linkBrowserStatsMapper;
private final LinkAccessLogsMapper linkAccessLogsMapper;
private final LinkDeviceStatsMapper linkDeviceStatsMapper;
private final LinkNetworkStatsMapper linkNetworkStatsMapper;
private final LinkStatsTodayMapper linkStatsTodayMapper;
private final MessageQueueIdempotentHandler messageQueueIdempotentHandler;
@Value("${short-link.stats.locale.amap-key}")
private String statsLocaleAmapKey;
@Override
public void onMessage(Map<String, String> producerMap) {
String keys = producerMap.get("keys");
if (!messageQueueIdempotentHandler.isMessageProcessed(keys)) {
// 判断当前的这个消息流程是否执行完成
if (messageQueueIdempotentHandler.isAccomplish(keys)) {
return;
}
throw new ServiceException("消息未完成流程,需要消息队列重试");
}
try {
String fullShortUrl = producerMap.get("fullShortUrl");
if (StrUtil.isNotBlank(fullShortUrl)) {
String gid = producerMap.get("gid");
ShortLinkStatsRecordDTO statsRecord = JSON.parseObject(producerMap.get("statsRecord"), ShortLinkStatsRecordDTO.class);
actualSaveShortLinkStats(fullShortUrl, gid, statsRecord);
}
} catch (Throwable ex) {
log.error("记录短链接监控消费异常", ex);
throw ex;
}
messageQueueIdempotentHandler.setAccomplish(keys);
}
public void actualSaveShortLinkStats(String fullShortUrl, String gid, ShortLinkStatsRecordDTO statsRecord) {
fullShortUrl = Optional.ofNullable(fullShortUrl).orElse(statsRecord.getFullShortUrl());
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock(String.format(LOCK_GID_UPDATE_KEY, fullShortUrl));
RLock rLock = readWriteLock.readLock();
rLock.lock();
try {
if (StrUtil.isBlank(gid)) {
LambdaQueryWrapper<ShortLinkGotoDO> queryWrapper = Wrappers.lambdaQuery(ShortLinkGotoDO.class)
.eq(ShortLinkGotoDO::getFullShortUrl, fullShortUrl);
ShortLinkGotoDO shortLinkGotoDO = shortLinkGotoMapper.selectOne(queryWrapper);
gid = shortLinkGotoDO.getGid();
}
int hour = DateUtil.hour(new Date(), true);
Week week = DateUtil.dayOfWeekEnum(new Date());
int weekValue = week.getIso8601Value();
LinkAccessStatsDO linkAccessStatsDO = LinkAccessStatsDO.builder()
.pv(1)
.uv(statsRecord.getUvFirstFlag() ? 1 : 0)
.uip(statsRecord.getUipFirstFlag() ? 1 : 0)
.hour(hour)
.weekday(weekValue)
.fullShortUrl(fullShortUrl)
.gid(gid)
.date(new Date())
.build();
linkAccessStatsMapper.shortLinkStats(linkAccessStatsDO);
Map<String, Object> localeParamMap = new HashMap<>();
localeParamMap.put("key", statsLocaleAmapKey);
localeParamMap.put("ip", statsRecord.getRemoteAddr());
String localeResultStr = HttpUtil.get(AMAP_REMOTE_URL, localeParamMap);
JSONObject localeResultObj = JSON.parseObject(localeResultStr);
String infoCode = localeResultObj.getString("infocode");
String actualProvince = "未知";
String actualCity = "未知";
if (StrUtil.isNotBlank(infoCode) && StrUtil.equals(infoCode, "10000")) {
String province = localeResultObj.getString("province");
boolean unknownFlag = StrUtil.equals(province, "[]");
LinkLocaleStatsDO linkLocaleStatsDO = LinkLocaleStatsDO.builder()
.province(actualProvince = unknownFlag ? actualProvince : province)
.city(actualCity = unknownFlag ? actualCity : localeResultObj.getString("city"))
.adcode(unknownFlag ? "未知" : localeResultObj.getString("adcode"))
.cnt(1)
.fullShortUrl(fullShortUrl)
.country("中国")
.gid(gid)
.date(new Date())
.build();
linkLocaleStatsMapper.shortLinkLocaleState(linkLocaleStatsDO);
}
LinkOsStatsDO linkOsStatsDO = LinkOsStatsDO.builder()
.os(statsRecord.getOs())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkOsStatsMapper.shortLinkOsState(linkOsStatsDO);
LinkBrowserStatsDO linkBrowserStatsDO = LinkBrowserStatsDO.builder()
.browser(statsRecord.getBrowser())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkBrowserStatsMapper.shortLinkBrowserState(linkBrowserStatsDO);
LinkDeviceStatsDO linkDeviceStatsDO = LinkDeviceStatsDO.builder()
.device(statsRecord.getDevice())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkDeviceStatsMapper.shortLinkDeviceState(linkDeviceStatsDO);
LinkNetworkStatsDO linkNetworkStatsDO = LinkNetworkStatsDO.builder()
.network(statsRecord.getNetwork())
.cnt(1)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkNetworkStatsMapper.shortLinkNetworkState(linkNetworkStatsDO);
LinkAccessLogsDO linkAccessLogsDO = LinkAccessLogsDO.builder()
.user(statsRecord.getUv())
.ip(statsRecord.getRemoteAddr())
.browser(statsRecord.getBrowser())
.os(statsRecord.getOs())
.network(statsRecord.getNetwork())
.device(statsRecord.getDevice())
.locale(StrUtil.join("-", "中国", actualProvince, actualCity))
.gid(gid)
.fullShortUrl(fullShortUrl)
.build();
linkAccessLogsMapper.insert(linkAccessLogsDO);
shortLinkMapper.incrementStats(gid, fullShortUrl, 1, statsRecord.getUvFirstFlag() ? 1 : 0, statsRecord.getUipFirstFlag() ? 1 : 0);
LinkStatsTodayDO linkStatsTodayDO = LinkStatsTodayDO.builder()
.todayPv(1)
.todayUv(statsRecord.getUvFirstFlag() ? 1 : 0)
.todayUip(statsRecord.getUipFirstFlag() ? 1 : 0)
.gid(gid)
.fullShortUrl(fullShortUrl)
.date(new Date())
.build();
linkStatsTodayMapper.shortLinkTodayState(linkStatsTodayDO);
} catch (Throwable ex) {
log.error("短链接访问量统计异常", ex);
} finally {
rLock.unlock();
}
}
}
删除 RedisStreamConfiguration
重构短链接分组监控以及其他优化
-监控表删除分组标识
监控表中存储了 gid 字段,本意是更好的支持分组查询功能,但是有个问题,那就是短链接的访问量非常大的时候,如果迁移分组,就会涉及到大量的数据变更。
考虑到这种场景,我们换了一种实现思路,gid 不再存储监控表,而是通过短链接表和监控表内联的形势解决 gid 分组查询。
这样的话,短链接修改就不再需要变更监控表大量 gid,非常好的节省了性能。上面代码中的一大堆修改监控表的代码也就可以删除了。
- 当天监控查询取消分表
因为我们把监控表的分组标识信息已经删掉,如果说还要保留 t_link_stats_today
的分表,那么两个表之前的语句将不再合适,为此,我们取消了当天监控的分表。
那如何解决当天监控表数据过大问题?基于该背景,我们可以使用冷热数据分离存储。
在 t_link_stats_today
表中仅保留需要查询的记录,我们假设保存一个月,然后超过一个月的记录,通过定时任务迁移到 t_link_stats_today_back
历史备份表中。t_link_stats_today
表中的数据保存的就是热数据,而备份表则是不再查询的冷数据。
- 监控历史数据分页失效
后管忘记传递分页参数所导致,已修复。之前 ShortLinkStatsController
的方法中是没有传递 current 和 size 的。
/**
* 访问单个短链接指定时间内访问记录监控数据
*/
@GetMapping("/api/short-link/admin/v1/stats/access-record")
public Result<Page<ShortLinkStatsAccessRecordRespDTO>> shortLinkStatsAccessRecord(ShortLinkStatsAccessRecordReqDTO requestParam) {
return shortLinkActualRemoteService.shortLinkStatsAccessRecord(
requestParam.getFullShortUrl(),
requestParam.getGid(),
requestParam.getStartDate(),
requestParam.getEndDate(),
requestParam.getEnableStatus(),
requestParam.getCurrent(),
requestParam.getSize()
);
}
- 修复原始链接变更缓存未更新场景
// 短链接如何保障缓存和数据库一致性?详情查看:https://nageoffer.com/shortlink/question
if (!Objects.equals(hasShortLinkDO.getValidDateType(), requestParam.getValidDateType())
|| !Objects.equals(hasShortLinkDO.getValidDate(), requestParam.getValidDate())
|| !Objects.equals(hasShortLinkDO.getOriginUrl(), requestParam.getOriginUrl())) {
stringRedisTemplate.delete(String.format(GOTO_SHORT_LINK_KEY, requestParam.getFullShortUrl()));
if (hasShortLinkDO.getValidDate() != null && hasShortLinkDO.getValidDate().before(new Date())) {
if (Objects.equals(requestParam.getValidDateType(), VailDateTypeEnum.PERMANENT.getType()) || requestParam.getValidDate().after(new Date())) {
stringRedisTemplate.delete(String.format(GOTO_IS_NULL_SHORT_LINK_KEY, requestParam.getFullShortUrl()));
}
}
}
监控保存Gid错误&短链唯一
队列消费时读取最新Gid
之前的的短链接监控信息保存的消费逻辑里,如果发送者传递了 Gid,那么消费端就不需要查询,直接使用。但这会带来一个问题,那就是记录短链接的分组不对。
假设我们有短链接 /123,分组 Gid 为 abc,然后执行了以下逻辑:
- 某线程在变更短链接的分组,从 abc 变更到 bcd,获取到了写锁,执行流程中。
- 用户访问短链接时恰巧缓存失效,然后去数据库读取记录,然后查出短链接 Gid 为 abc,并投递消息队列。
- 消息队列消费线程执行时,因为写锁正在被占用,那么被阻塞。
- 线程修改短链接分组完毕,释放写锁,分组标识 Gid 成功从 abc 变更到 bcd。
- 消息队列消费拿到读锁,将短链接 /123 和 Gid 为 abc 进行操作,然后数据错乱。
解决:
将客户端所有传递 Gid 的入参去掉,通过短链接实时查询 Gid 即可。
(如果在消息队列消费中每次都查询一次数据库,对数据库压力大不大?因为我们是在消息队列消费中查询,线程是固定的,没有压力。)
不同分组下创建时能保障短链唯一么?
假设一个我们有两个分组:abc、bcd。分别创建出了短链接是 /123456,这个时候能保障数据唯一么?
很多同学可能会说能保证一致,因为我们在数据库层面做了唯一索引兜底。但是,这句话存在纰漏,因为我们 link 表是按照 gid hash 方式分表的,如果说这两个分组不在一张表,唯一索引还能保障短链接唯一么?答案是不能。
解决:
为了保障短链接访问时跳转,加了一张通过 t_link_goto
短链接分片的路由表,只需要在这张表里加上唯一索引即可解决。
CREATE TABLE `t_link_goto` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`gid` varchar(32) DEFAULT 'default' COMMENT '分组标识',
`full_short_url` varchar(128) DEFAULT NULL COMMENT '完整短链接',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_full_short_url` (`full_short_url`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
优化大量空缓存查询数据库&修复后管限流逻辑
防止大量空缓存查询数据库
在短链接跳转解决缓存穿透那里。如果大量请求访问不存在的值,先访问缓存发现不存在,访问布隆过滤器存在,然后访问一遍空值也不存在。那么依次拿锁查数据库。这里拿到锁不应该先访问一下空值吗,不然即使空值更新,大量请求仍然会依次拿锁过一遍数据库。马哥的代码只有拿到锁先访问一下缓存值。
解决:
// ......
RLock lock = redissonClient.getLock(String.format(LOCK_GOTO_SHORT_LINK_KEY, fullShortUrl));
lock.lock();
try {
originalLink = stringRedisTemplate.opsForValue().get(String.format(GOTO_SHORT_LINK_KEY, fullShortUrl));
if (StrUtil.isNotBlank(originalLink)) {
shortLinkStats(buildLinkStatsRecordAndSetUser(fullShortUrl, request, response));
((HttpServletResponse) response).sendRedirect(originalLink);
return;
}
// 新增加逻辑
gotoIsNullShortLink = stringRedisTemplate.opsForValue().get(String.format(GOTO_IS_NULL_SHORT_LINK_KEY, fullShortUrl));
if (StrUtil.isNotBlank(gotoIsNullShortLink)) {
((HttpServletResponse) response).sendRedirect("/page/notfound");
return;
}
// ......
}
可能较真的同学有个疑问,为什么获取到锁后先查缓存是否存在,而不是先查空缓存是否存在?
这涉及到一次无用网络 IO 的问题,如果把缓存放上面,那就证明我们假设系统中大量的访问都是正常的。反之,则是考虑系统会被大量攻击。
修复后管限流逻辑无限刷新问题
原逻辑:
-- 设置用户访问频率限制的参数
local username = KEYS[1]
local timeWindow = tonumber(ARGV[1]) -- 时间窗口,单位:秒
-- 构造 Redis 中存储用户访问次数的键名
local accessKey = "short-link:user-flow-risk-control:" .. username
-- 原子递增访问次数,并获取递增后的值
local currentAccessCount = redis.call("INCR", accessKey)
-- 设置键的过期时间
redis.call("EXPIRE", accessKey, timeWindow)
-- 返回当前访问次数
return currentAccessCount
代码修复后:
-- 设置用户访问频率限制的参数
local username = KEYS[1]
local timeWindow = tonumber(ARGV[1]) -- 时间窗口,单位:秒
-- 构造 Redis 中存储用户访问次数的键名
local accessKey = "short-link:user-flow-risk-control:" .. username
-- 原子递增访问次数,并获取递增后的值
local currentAccessCount = redis.call("INCR", accessKey)
-- 设置键的过期时间
if currentAccessCount == 1 then
redis.call("EXPIRE", accessKey, timeWindow)
end
-- 返回当前访问次数
return currentAccessCount
修复用户访问短链接监控数据横向越权问题
什么是用户横向越权?
用户横向越权是指在系统中,用户 A 能够访问或操作用户 B 的数据或功能,而这种访问或操作是未经授权的。这种安全漏洞通常出现在访问控制不严格或错误配置的系统中。
如何复现用户横向越权?
咱们短链接监控有四个获取监控数据方法,假设系统有两个用户:用户 A 和用户 B。用户 A 创建短链接 /aaaaaa
,用户 B 创建短链接 /bbbbbb
,只要用户任意登录,就可以访问任意创建的短链接监控数据。
如何解决越权?
在访问短链接监控数据时,需要确认短链接属于用户所有。
但是现在有个问题,Gid 是用户下唯一,咱们短链接表里仅存储了短链接分组 Gid,相当于两个用户下可能有一样的 Gid。这样的话,我们就不能通过 t_link
表判断短链接归属用户所有。
解决方案如下:
- 在短链接
t_link
表中添加用户名 username 字段,通过两个字段一起判断。 - 🌟 设置分组标识 Gid 全局唯一。
1. 修改 t_group 唯一索引
需要通过数据库唯一索引兜底,之前通过 t_group
表无法兜底,因为按照 username
分表的话,可能两个用户创建了一样的 gid,结果存储到两个表里,就无法进行兜底了。
为此,我们创建 t_group_unique
表单独创建个 gid 唯一索引进行兜底。
CREATE TABLE `t_group_unique` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`gid` varchar(32) DEFAULT NULL COMMENT '分组标识',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_unique_gid` (`gid`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
2. 设置分组标识 Gid 全局唯一
@Override
public void saveGroup(String username, String groupName) {
RLock lock = redissonClient.getLock(String.format(LOCK_GROUP_CREATE_KEY, username));
lock.lock();
try {
LambdaQueryWrapper<GroupDO> queryWrapper = Wrappers.lambdaQuery(GroupDO.class)
.eq(GroupDO::getUsername, username)
.eq(GroupDO::getDelFlag, 0);
List<GroupDO> groupDOList = baseMapper.selectList(queryWrapper);
if (CollUtil.isNotEmpty(groupDOList) && groupDOList.size() == groupMaxNum) {
throw new ClientException(String.format("已超出最大分组数:%d", groupMaxNum));
}
int retryCount = 0;
int maxRetries = 10;
String gid = null;
// 如果布隆过滤器满了,可能会一直重复,所以这里加一个限制条件
while (retryCount < maxRetries) {
gid = saveGroupUniqueReturnGid();
if (StrUtil.isNotEmpty(gid)) {
GroupDO groupDO = GroupDO.builder()
.gid(gid)
.sortOrder(0)
.username(username)
.name(groupName)
.build();
baseMapper.insert(groupDO);
gidRegisterCachePenetrationBloomFilter.add(gid);
break;
}
retryCount++;
}
if (StrUtil.isEmpty(gid)) {
throw new ServiceException("生成分组标识频繁");
}
} finally {
lock.unlock();
}
}
// 视频中有个逻辑错误,请以当前文档为准
private String saveGroupUniqueReturnGid() {
String gid = RandomGenerator.generateRandom();
if (gidRegisterCachePenetrationBloomFilter.contains(gid)) {
return null;
}
GroupUniqueDO groupUniqueDO = GroupUniqueDO.builder()
.gid(gid)
.build();
try {
// 线程 A 和 B 同时生成了相同的 Gid,会被数据库的唯一索引校验触发异常
// 流程不能被这个异常阻断,需要获取异常重试
groupUniqueMapper.insert(groupUniqueDO);
} catch (DuplicateKeyException e) {
return null;
}
return gid;
}
用户注册默认创建分组调用 saveGroup
方法,之前新增布隆过滤器在上,saveGroup
在下。这里调整了下顺序。为什么要这么做?
@Transactional(rollbackFor = Exception.class)
@Override
public void register(UserRegisterReqDTO requestParam) {
if (!hasUsername(requestParam.getUsername())) {
throw new ClientException(USER_NAME_EXIST);
}
RLock lock = redissonClient.getLock(LOCK_USER_REGISTER_KEY + requestParam.getUsername());
if (!lock.tryLock()) {
throw new ClientException(USER_NAME_EXIST);
}
try {
int inserted = baseMapper.insert(BeanUtil.toBean(requestParam, UserDO.class));
if (inserted < 1) {
throw new ClientException(USER_SAVE_ERROR);
}
groupService.saveGroup(requestParam.getUsername(), "默认分组");
userRegisterCachePenetrationBloomFilter.add(requestParam.getUsername());
} catch (DuplicateKeyException ex) {
throw new ClientException(USER_EXIST);
} finally {
lock.unlock();
}
}
3. 检查用户查询权限
关于用户查询权限校验有两个说法,一个是 admin、project 都校验,一个是在 project 项目校验,关于这两个聊下优缺点:
- admin、project 都校验:如果用户访问越权,直接在第一层拦截住,就避免了 project 无用调用。缺点是如果用户没有越权,会多一次 admin 检查性能。
- project 校验:如果用户访问越权,会多一层无效调用。优点是用户没有越权,少一次 admin 检查性能。
综合考虑,最终选择了 project 检验,因为我们做系统面临用户正常访问和不正常访问两种情况和性能挂钩时,尽量选择相信用户。如果是检验 Redis 的话,我就倾向于 admin、project 都校验。
四个监控方法里在业务方法执行前检查分组标识是否属于用户。
public void checkGroupBelongToUser(String gid) throws ServiceException {
String username = Optional.ofNullable(UserContext.getUsername())
.orElseThrow(() -> new ServiceException("用户未登录"));
LambdaQueryWrapper<GroupDO> queryWrapper = Wrappers.lambdaQuery(GroupDO.class)
.eq(GroupDO::getGid, gid)
.eq(GroupDO::getUsername, username);
List<GroupDO> groupDOList = linkGroupMapper.selectList(queryWrapper);
if (CollUtil.isEmpty(groupDOList)) {
throw new ServiceException("用户信息与分组标识不匹配");
}
}
这里有个小问题,UserContext
是在后管中才有的,我们通过后端系统调用 Project,是获取不到用户登录信息,如何获取当前用户?
- 通过参数形式传递:如果用户直接调用 Project 监控接口,可以伪造用户名。
- OpenFeign 透传用户登录信息。
创建 OpenFeign 请求透传信息 Bean,并获取当前登录用户信息,传递到调用 Project 项目的网络请求中。
import com.nageoffer.shortlink.admin.common.biz.user.UserContext;
import feign.RequestInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* openFeign 微服务调用传递用户信息配置
*/
@Configuration
public class OpenFeignConfiguration {
@Bean
public RequestInterceptor requestInterceptor() {
return template -> {
template.header("username", UserContext.getUsername());
template.header("userId", UserContext.getUserId());
template.header("realName", UserContext.getRealName());
};
}
}
远程调用 OpenFeign 接口指定配置类
@FeignClient(
value = "short-link-project",
url = "${aggregation.remote-url:}",
configuration = OpenFeignConfiguration.class
)
public interface ShortLinkActualRemoteService {
}
通过这种方案,我们就可以在 Project 项目中创建拦截器设置用户上下文。
后管项目中用的是过滤器,之前有同学说可以尝试下拦截器,这里使用下。
import cn.hutool.core.util.StrUtil;
import jakarta.annotation.Nullable;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
/**
* 用户信息传输拦截器
*/
@Component
public class UserTransmitFilter implements HandlerInterceptor {
@Override
public boolean preHandle(@Nullable HttpServletRequest request, @Nullable HttpServletResponse response, @Nullable Object handler) throws Exception {
String username = request.getHeader("username");
if (StrUtil.isNotBlank(username)) {
String userId = request.getHeader("userId");
String realName = request.getHeader("realName");
UserInfoDTO userInfoDTO = new UserInfoDTO(userId, username, realName);
UserContext.setUser(userInfoDTO);
}
return true;
}
@Override
public void afterCompletion(@Nullable HttpServletRequest request, @Nullable HttpServletResponse response, @Nullable Object handler, Exception exception) throws Exception {
UserContext.removeUser();
}
}
因为如果新增布隆过滤器在上面,新增成功后,假设 saveGroup
方法失败,是没办法回滚布隆过滤器操作的,所以我们把两者顺序颠倒。如果数据库事务失败,就不操作布隆过滤器了。
之前是通过用户名 username 和分组标识 Gid 一起判断是否重复,现在修改为 Gid 单独控制。
总结: 缓存穿透解决方案 解决方案:使用布隆过滤器,但是做了改进:布隆过滤器 + 缓存无效数据 注意:这里的缓存无效数据和传统的缓存空值不一样,因为它的key的前缀和正常数据的key的前缀是区分开的。相当于单独对那些不存在的数据做了缓存 思路: 首先查缓存,如果缓存存在,直接返回 如果缓存不存在,查询布隆过滤器: 如果布隆过滤器里不存在,说明一定不存在,把这个key放入无效数据缓存里(设置较短的过期时间),然后返回 如果布隆过滤器里存在,说明可能存在,于是查询无效数据缓存 如果无效数据缓存里有这个key,说明这个值一定不存在,对这个无效数据缓存进行续期,然后返回 如果无效数据缓存里没有这个key,说明这个key可能存在,于是去查询数据库 查询数据库: 首先获取分布式锁 获取到分布式锁之后,重新判定一下缓存是否存在,如果存在则使用缓存,直接返回。如果不存在还需要再次查询一次空值缓存,如果空值缓存存在则直接返回,否则进行下一步。 否则查询数据库,并更新缓存和无效数据缓存 暂时无法在飞书文档外展示此内容 问题 为什么不对无效数据缓存也使用布隆过滤器?这样不是可以节省内存空间吗? 因为布隆过滤器不能删除元素。如果现在这个key不存在,以后又存在了,无法从布隆过滤器里删除 为什么要对无效数据缓存设置过期时间? 首先,无效数据缓存只在布隆过滤器误判的情况下才会插入数据,所以无效数据缓存的数据量不会很大。 为了防止攻击者故意伪造会导致误判的且不存在的数据而导致服务器宕机,给它设置了过期时间 为什么获取到分布式锁之后,再次检查缓存发现不存在,还需要查询无效数据缓存?为什么不用查询布隆过滤器了? 第一个拿到锁的线程查询数据库之后必定会更新缓存和无效数据缓存中的一个。后续线程只要判断这两个缓存就行了 在这里,布隆过滤器不会更新,不能提供有效信息
优化监控跨时间异常以及消息幂等方法命名
短链接监控场景跨时间异常
在此基础上又加了一些优化,那就是访问时间由生产者传递,消费者里不再 new Date()
可以更好记录用户行为。
方法命名
Redis-Stream消息队列问题答疑
为什么用了线程池依然是单线程消费?
测试下,在 Redis Stream 消费者逻辑添加一行当前线程名打印。
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("当前线程:" + Thread.currentThread().getName());
}
实际上是一个监听(比如 ShortLinkStatsSaveConsumer
)是一个线程池中的线程运行,如果有个多个监听类,就会启用多个线程。
线程池中的线程运行的是 StreamPollTask
类,而这个类中运行的任务实际是个死循环,会不断去 Redis Stream 获取任务,这也就是为什么只有一个线程的原因。
关系图如下:
为什么消费流程执行异常后就不再消费?
因为创建监听类时 cancelOnError
默认为 true,意味着出现异常则停止消费。监听类修改为以下配置即可遇到异常也能正常执行。
/**
* Redis Stream 消息队列配置
*/
@Configuration
@RequiredArgsConstructor
public class RedisStreamConfiguration {
private final RedisConnectionFactory redisConnectionFactory;
private final ShortLinkStatsSaveConsumer shortLinkStatsSaveConsumer;
@Bean
public ExecutorService asyncStreamConsumer() {
AtomicInteger index = new AtomicInteger();
return new ThreadPoolExecutor(1,
1,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
runnable -> {
Thread thread = new Thread(runnable);
thread.setName("stream_consumer_short-link_stats_" + index.incrementAndGet());
thread.setDaemon(true);
return thread;
},
new ThreadPoolExecutor.DiscardOldestPolicy()
);
}
@Bean
public Subscription shortLinkStatsSaveConsumerSubscription(ExecutorService asyncStreamConsumer) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次最多获取多少条消息
.batchSize(10)
// 执行从 Stream 拉取到消息的任务流程
.executor(asyncStreamConsumer)
// 如果没有拉取到消息,需要阻塞的时间。不能大于 ${spring.data.redis.timeout},否则会超时
.pollTimeout(Duration.ofSeconds(3))
.build();
StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest =
StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(SHORT_LINK_STATS_STREAM_TOPIC_KEY, ReadOffset.lastConsumed()))
.cancelOnError(throwable -> false)
.consumer(Consumer.from(SHORT_LINK_STATS_STREAM_GROUP_KEY, "stats-consumer"))
.autoAcknowledge(true)
.build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);
Subscription subscription = listenerContainer.register(streamReadRequest, shortLinkStatsSaveConsumer);
listenerContainer.start();
return subscription;
}
}