何为令牌桶限流
之前写过 限流工具类的文章
Redis的用武之处–由短信限流引出的分布式锁和漏斗限流(上)
Redis的用武之处–由短信限流引出的分布式锁和漏斗限流(上)
但是后面了解发现自己当初写的确实不伦不类,对漏斗限流的理解有误,再加上代码里面为了保证原子性,用了很多重量级锁,对性能也有影响,所以看了这篇文章后,决定重新写限流工具。
根据自己的实际业务,我选择令牌桶作为我的限流算法,该限流算法简单说就是一个桶里匀速会一直放令牌,当有任务要执行时,它必须先从令牌桶里取一个令牌,如果桶里有令牌,则可以执行该任务; 反之则不能执行。如果桶里令牌已经满了,就不会再放令牌进去。
算法就那么简单,主要实现上我们考虑到集群环境 以前用了分布式的锁,但是这样效率不高,有没有其它的手段保证每个步骤的原子性? 这就是接下来说的redis的lua脚本
redis Lua脚本
redis lua脚本可以帮我们把多个操作合成一个事务执行,具体优势如下(摘抄网络):
- 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延
- 原子操作。redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。因此在编写脚本的过程中无需担心会出现竞态条件,无需使用事务。
- 复用。客户端发送的脚步会永久存在redis中,这样,其他客户端可以复用这一脚本而不需要使用代码完成相同的逻辑。
这样我们在分布式环境可能出现的问题也解决了,因为有了lua脚本,我们的主要逻辑都写在脚本里,java代码很少。
jedis.scriptLoad方法 执行的就是lua脚本的逻辑
package cn.xxywithpq.limiter.token.bucket;
@Component
@Slf4j
public class TokenBucketLimiter {
private static String tokenBucketLua;
static {
try {
InputStream resourceAsStream = TokenBucketLimiter.class.getResourceAsStream("/tokenBucket.lua");
byte[] bytes = new byte[resourceAsStream.available()];
resourceAsStream.read(bytes);
tokenBucketLua = new String(bytes);
} catch (IOException e) {
log.error("read tokenBucket.lua error {}", e);
}
}
@Autowired
private JedisPool jedisPool;
@Autowired
private CustomsProperties customsProperties;
/**
* @param businessId 限流唯一id
* @param actionKey 限流类型
* @param capacity 令牌桶容量
* @param duration 令牌桶全部放满所需时间,用于计算流速(秒)
* @return 是否允许操作
*/
public boolean isActionAllowed(String businessId, String actionKey, long capacity, float duration) {
try (Jedis jedis = jedisPool.getResource()) {
// 基础信息校验
if (StringUtils.isEmpty(businessId) || StringUtils.isEmpty(actionKey) || capacity <= 0 || duration <= 0 || capacity / duration * 0.1F <= 0) {
return false;
}
String key = String.format("%s:tokenBucket:%s:%s", customsProperties.getNamespace(), actionKey, businessId);
Object result = jedis.evalsha(jedis.scriptLoad(tokenBucketLua),
Arrays.asList("key", "num", "rate", "duration"),
Arrays.asList(key, String.valueOf(capacity), String.format("%.2f", capacity / duration), String.format("%.0f", duration)));
return "1".equals(result.toString()) ? true : false;
} catch (Exception e) {
log.error("FunnelRateLimiter isActionAllowed error {}", e);
return false;
}
}
}
// 先看看redis 有没有生成当前actionKey的令牌桶,实际是用了redis的hash结构存储数据
redis.replicate_commands()
local nowStr = redis.call('TIME')[1]
local bulk = redis.call('EXISTS', ARGV[1])
if (bulk == 0) then
// 没有的话初始化令牌桶,并且设置过期时间
redis.call('HMSET', ARGV[1], 'lastTimeStamp', nowStr, 'num', ARGV[2])
redis.call('EXPIRE', ARGV[1], math.floor(ARGV[4]) + 10)
end
// 根据 生成令牌的速度 和 当前时间戳,计算出当下 令牌桶应该生成多少个令牌
local table = redis.call('HGETALL', ARGV[1])
local plusNum = (nowStr - table[2]) * ARGV[3]
if (plusNum ~= 0) then
redis.call('HMSET', ARGV[1], 'lastTimeStamp', nowStr, 'num', plusNum + table[4])
redis.call('EXPIRE', ARGV[1], math.floor(ARGV[4]) + 10)
end
// 如果令牌桶有多令牌,就令牌个数 - 1
local newNum = redis.call('HGET', ARGV[1], 'num')
if (math.floor(newNum) > 0) then
redis.call('HSET', ARGV[1], 'num', newNum - 1)
return 1 else return 0
end
lua 的语法跟java相似,所以稍微看看应该就懂怎么去写,上面很多逻辑因为有了redis lua脚本的帮助,能让我们轻松的实现一个令牌桶限流算法。
相应的我写了一个测试用例,测试用例 “TokenBucketTest:test” 会作为一个业务key,表示999秒只能通过10个业务,我用多线程跑了下没有什么问题。
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SimplifyLockSpringBootStarterApplication.class)
public class TokenBucketLimiterTest {
@Autowired
TokenBucketLimiter TokenBucketLimiter;
/**
* 带分布式锁
*/
@Test
public void main() throws InterruptedException {
int num = 500;
CountDownLatch countDownLatch = new CountDownLatch(num);
int[] count = new int[]{0};
for (int i = 0; i < num; i++) {
// Thread.sleep(1000);
new Thread(() -> {
try {
if (TokenBucketLimiter.isActionAllowed("test", "TokenBucketTest", 10, 999)) {
log.info("我被获准进来啦 {}", Thread.currentThread().getId());
int i1 = count[0];
Thread.sleep(500);
}
} catch (Exception e) {
log.error("FunnelTest e {}", e);
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
}
}
具体的代码我已经上传到 github simplify-lock-spring-boot-starter