淘先锋技术网

首页 1 2 3 4 5 6 7

何为令牌桶限流

之前写过 限流工具类的文章

Redis的用武之处–由短信限流引出的分布式锁和漏斗限流(上)

Redis的用武之处–由短信限流引出的分布式锁和漏斗限流(上)

但是后面了解发现自己当初写的确实不伦不类,对漏斗限流的理解有误,再加上代码里面为了保证原子性,用了很多重量级锁,对性能也有影响,所以看了这篇文章后,决定重新写限流工具。

简析限流算法

根据自己的实际业务,我选择令牌桶作为我的限流算法,该限流算法简单说就是一个桶里匀速会一直放令牌,当有任务要执行时,它必须先从令牌桶里取一个令牌,如果桶里有令牌,则可以执行该任务; 反之则不能执行。如果桶里令牌已经满了,就不会再放令牌进去。

算法就那么简单,主要实现上我们考虑到集群环境 以前用了分布式的锁,但是这样效率不高,有没有其它的手段保证每个步骤的原子性? 这就是接下来说的redis的lua脚本

redis Lua脚本

redis lua脚本可以帮我们把多个操作合成一个事务执行,具体优势如下(摘抄网络):

  • 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延
  • 原子操作。redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。因此在编写脚本的过程中无需担心会出现竞态条件,无需使用事务。
  • 复用。客户端发送的脚步会永久存在redis中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。

这样我们在分布式环境可能出现的问题也解决了,因为有了lua脚本,我们的主要逻辑都写在脚本里,java代码很少。

jedis.scriptLoad方法 执行的就是lua脚本的逻辑

package cn.xxywithpq.limiter.token.bucket;


@Component
@Slf4j
public class TokenBucketLimiter {

    private static String tokenBucketLua;

    static {
        try {
            InputStream resourceAsStream = TokenBucketLimiter.class.getResourceAsStream("/tokenBucket.lua");
            byte[] bytes = new byte[resourceAsStream.available()];
            resourceAsStream.read(bytes);
            tokenBucketLua = new String(bytes);
        } catch (IOException e) {
            log.error("read tokenBucket.lua error {}", e);
        }
    }

    @Autowired
    private JedisPool jedisPool;
    @Autowired
    private CustomsProperties customsProperties;

    /**
     * @param businessId 限流唯一id
     * @param actionKey  限流类型
     * @param capacity   令牌桶容量
     * @param duration   令牌桶全部放满所需时间,用于计算流速(秒)
     * @return 是否允许操作
     */
    public boolean isActionAllowed(String businessId, String actionKey, long capacity, float duration) {
        try (Jedis jedis = jedisPool.getResource()) {
            // 基础信息校验
            if (StringUtils.isEmpty(businessId) || StringUtils.isEmpty(actionKey) || capacity <= 0 || duration <= 0 || capacity / duration * 0.1F <= 0) {
                return false;
            }
            String key = String.format("%s:tokenBucket:%s:%s", customsProperties.getNamespace(), actionKey, businessId);
            Object result = jedis.evalsha(jedis.scriptLoad(tokenBucketLua),
                    Arrays.asList("key", "num", "rate", "duration"),
                    Arrays.asList(key, String.valueOf(capacity), String.format("%.2f", capacity / duration), String.format("%.0f", duration)));
            return "1".equals(result.toString()) ? true : false;
        } catch (Exception e) {
            log.error("FunnelRateLimiter isActionAllowed error {}", e);
            return false;
        }
    }

}
// 先看看redis 有没有生成当前actionKey的令牌桶,实际是用了redis的hash结构存储数据
redis.replicate_commands()
local nowStr = redis.call('TIME')[1]
local bulk = redis.call('EXISTS', ARGV[1])
if (bulk == 0) then
// 没有的话初始化令牌桶,并且设置过期时间
    redis.call('HMSET', ARGV[1], 'lastTimeStamp', nowStr, 'num', ARGV[2])
    redis.call('EXPIRE', ARGV[1], math.floor(ARGV[4]) + 10)
end
// 根据 生成令牌的速度 和 当前时间戳,计算出当下 令牌桶应该生成多少个令牌
local table = redis.call('HGETALL', ARGV[1])
local plusNum = (nowStr - table[2]) * ARGV[3]
if (plusNum ~= 0) then
    redis.call('HMSET', ARGV[1], 'lastTimeStamp', nowStr, 'num', plusNum + table[4])
    redis.call('EXPIRE', ARGV[1], math.floor(ARGV[4]) + 10)
end
// 如果令牌桶有多令牌,就令牌个数 - 1
local newNum = redis.call('HGET', ARGV[1], 'num')
if (math.floor(newNum) > 0) then
    redis.call('HSET', ARGV[1], 'num', newNum - 1)
    return 1 else return 0
end

lua 的语法跟java相似,所以稍微看看应该就懂怎么去写,上面很多逻辑因为有了redis lua脚本的帮助,能让我们轻松的实现一个令牌桶限流算法。

相应的我写了一个测试用例,测试用例 “TokenBucketTest:test” 会作为一个业务key,表示999秒只能通过10个业务,我用多线程跑了下没有什么问题。

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SimplifyLockSpringBootStarterApplication.class)
public class TokenBucketLimiterTest {

    @Autowired
    TokenBucketLimiter TokenBucketLimiter;

    /**
     * 带分布式锁
     */
    @Test
    public void main() throws InterruptedException {
        int num = 500;
        CountDownLatch countDownLatch = new CountDownLatch(num);

        int[] count = new int[]{0};
        for (int i = 0; i < num; i++) {
//            Thread.sleep(1000);
            new Thread(() -> {
                try {
                    if (TokenBucketLimiter.isActionAllowed("test", "TokenBucketTest", 10, 999)) {
                        log.info("我被获准进来啦 {}", Thread.currentThread().getId());
                        int i1 = count[0];
                        Thread.sleep(500);
                    }

                } catch (Exception e) {
                    log.error("FunnelTest e {}", e);
                } finally {
                    countDownLatch.countDown();
                }
            }).start();
        }

        countDownLatch.await();
    }


}

具体的代码我已经上传到 github simplify-lock-spring-boot-starter