Redis限流方案

限流简介

限流算法在分布式领域是一个经常被提起的话题,当系统的处理能力有限时,如何阻止计划外的请求继续对系统施压,是一个需要重视的问题。

除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求,比如在UGC社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定时间内允许的次数,超过了次数那就是非法行为。对于非法行为,业务必须规定适当的惩处策略。

简单限流

首先我们来看一个常见的简单限流策略。系统要限定用户某个行为在指定的时间里只能发生N次,如何使用Redis的数据结构来实现这个限流功能。

首先,这个接口定义如下

#指定用户use_id的某个行为action_key在特定的时间内period只允许发生一定的次数
int max_count;
boolean is_action_allowed(user_id,action_key,period,max_count){
 return true;
}
# 调用这个接口,一分钟内只允许最多回复5个贴子
can_reply = is_action_allowed("user1","reply",60,5);
if(can_reply) do();
else return ;

解决方案
这个限流需求中存在一个滑动时间窗口,在zset数据结构中提供了zremrangebyscore key-name min max指令用于移除有序集合中分值介于min和max之间的元素集合。因此可以使用行为发生的时间戳作为zset集合中元素的score,时间戳越大,相应score越高。 而且我们只需要保留这个时间窗口,窗口之外的数据都可以砍掉,从而节省内存。

示意图如下
在这里插入图片描述
代码如下:

package example;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

public class SimpleRateLimiter {
    private Jedis jedis;
    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }
    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        String key = String.format("hist:%s:%s", userId, actionKey);
        long nowTs = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        pipe.multi();
        pipe.zadd(key, nowTs, "" + nowTs);
        pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
        Response<Long> count = pipe.zcard(key);
        pipe.expire(key, period + 1);
        pipe.exec();
        pipe.close();
        return count.get() <= maxCount;
    }
    public static void main(String[] args) {
        Jedis jedis = new Jedis();
        SimpleRateLimiter limiter = new SimpleRateLimiter(jedis);
        for (int i = 0; i < 20; i++) {
            System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5));
        }
    }
}

代码整体思路:每一个行为到来时,都维护一次时间窗口,将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset集合中只有score值非常重要,value值没有特别意义。只需要保证他的唯一性即可。

因为这几个连续的Redis操作都是针对同一个key的,使用pipeline可以显著提升Redis存取效率。但是这种方案也有缺点,因为他要记录时间窗口内所有的行为记录。如果这个量很大,比如限定60s内操作不得超过100w次这样的参数,他是不适合做这样的限流的,因为会消耗大量的存储空间。

高级限流算法——漏洞限流

漏洞限流是最常用的限流方法之一,这个算法的灵感来源于漏斗的结构。漏斗的容量是有限的,如果将漏嘴堵住,然后一直往里面灌水,他就会变满,直至再也装不进去。如果将漏嘴放开,水就会往下流,流走一部分之后,就可以继续往里面灌水。如果漏嘴流水的速率大于灌水的速率。那么漏洞永远都不会装满。如果漏嘴流水速率小雨灌水速率,那么一旦漏斗满了,灌水就需要暂停并等待漏斗腾空。

所以,漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着系统允许该行为的最大频率。

单机漏斗算法:

package example;

import java.util.HashMap;
import java.util.Map;

public class FunnelRateLimiter {
    static class Funnel {
        //漏斗容量
        int capacity;
        //漏斗速率
        float leakingRate;
        //漏斗剩余容量
        int leftQuota;
        //滑动窗口的开始时间
        long leakingTs;

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }
        void makeSpace() {
            //获取当前时间
            long nowTs = System.currentTimeMillis();
            //当前时间-滑动窗口的开始时间= 滑动窗口的时长
            long deltaTs = nowTs - leakingTs;
            //滑动窗口的时长*漏水速率 = 滑动窗口内腾出的容量
            int deltaQuota = (int) (deltaTs * leakingRate);
            if (deltaQuota < 0) { 
                // 间隔时间太长,整数数字过大溢出
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
                return;
            }
            if (deltaQuota < 1) { // 腾出空间太小,最小单位是 1
                return;
            }
            //剩余容量 = 当前剩余容量+腾出的容量
            this.leftQuota += deltaQuota;
            //重置窗口开始时间
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) {
                this.leftQuota = this.capacity;
            }
        }

        boolean watering(int quota) {
            makeSpace();
            //剩余容量>所需容量
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }
    }
    private Map<String, Funnel> funnels = new HashMap<>();
    public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return funnel.watering(1); // 需要 1quota 
    }
}

Funnel对象的make_space方法是漏斗算法的核心,其次在每次灌水前都会被调用以触发漏水,给漏斗腾出空间。能腾出多少空间取决于过去了多久以及流水的速率。Funnel对象占据的空间大小不再和行为的频率成正比,她的空间占用是一个常量。

分布式限流 Redis-Cell

Redis-Cell语法
在这里插入图片描述

127.0.0.1:6379> cl.throttle mytest 99 5 100 2
1) (integer) 0                        #0 表示成功, 1表示失败
2) (integer) 100                      # 令牌桶的容量
3) (integer) 98                       # 当前令牌桶的令牌数
4) (integer) -1                       # 成功时该值为-1,失败时表还需要等待多少秒可以有足够的令牌
5) (integer) 41                       # 预计多少秒后令牌桶会满

多次从令牌桶中取出数据

127.0.0.1:6379> cl.throttle mytest 99 5 100 40
1) (integer) 0
2) (integer) 100
3) (integer) 54
4) (integer) -1
5) (integer) 911
127.0.0.1:6379> cl.throttle mytest 99 5 100 40
1) (integer) 0
2) (integer) 100
3) (integer) 14
4) (integer) -1
5) (integer) 1708
127.0.0.1:6379> cl.throttle mytest 99 5 100 40
1) (integer) 1                      #失败,拒绝取出
2) (integer) 100
3) (integer) 14
4) (integer) 505                  # 取出失败,令牌桶还有14个令牌,还需505秒才能够取出
5) (integer) 1705

代码示例

package example;

import io.rebloom.client.Client;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;

public class RedisCellExample {
    private Jedis jedis;
    public RedisCellExample() {
        jedis = new Jedis("127.0.0.1", 6379);
    }
    public Boolean rush() {
        //对接口进行限流操作
        //检查令牌桶,返回的第一值是否为 0: 0-流量够,1-限流中
        String script = "return redis.call('cl.throttle',KEYS[1],ARGV[1],ARGV[2],ARGV[3],ARGV[4])";
        List<String> keys = new ArrayList<>();
        keys.add("redbag");
        String maxBurst = "99";  //漏洞容量
        String countPerPeriod = "10";
        String period = "100";
        String quantity = "10";
        List<String> values = new ArrayList<>();
        values.add(maxBurst);
        values.add(countPerPeriod);
        values.add(period);
        values.add(quantity);
        List<Integer> list = (List) jedis.eval(script, keys, values);
        if (!list.isEmpty() && list.get(0) == 0) {
            return true;
        } else {
            return false;
        }
    }
}

相关推荐

  1. Redis实现

    2024-06-10 17:14:04       58 阅读
  2. Redis 做接口

    2024-06-10 17:14:04       56 阅读
  3. 服务实现方案

    2024-06-10 17:14:04       54 阅读
  4. Springboot Redis Lua 分布式

    2024-06-10 17:14:04       71 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-10 17:14:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-10 17:14:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-06-10 17:14:04       87 阅读
  4. Python语言-面向对象

    2024-06-10 17:14:04       96 阅读

热门阅读

  1. Redis的基本数据类型

    2024-06-10 17:14:04       26 阅读
  2. Python中的贪婪算法详解与应用

    2024-06-10 17:14:04       35 阅读
  3. Leetcode 3181. Maximum Total Reward Using Operations II

    2024-06-10 17:14:04       27 阅读