业务里需要请求第三方的接口,而第三方接口设置了一定时间的限流,这导致我们如果请求过快,会失败好多数据。
最初是使用RateLimiter来进行单机限流,这种方法非常不友好,如果第三方修改了限流或新增了集群节点 都需要对代码进行修改,且在任务少的时候没有利用上最大的速率,造成性能浪费。
后来决定优化成分布式限流,只需要配置好总体限流速率,无论应对新增集群节点、任务少时的性能浪费都能很好的处理。
使用redis作为限流的中间媒介,使用lua脚本来保证原子性,实现令牌桶算法;
redis.replicate_commands();
-- 需要限流的key
local key = KEYS[1]
-- 记录速率时间的key
local key_time = key..':tokenTime'
-- 当前令牌桶速率
local update_time = tonumber(ARGV[1])
-- 当前令牌桶的容量
local token_size = tonumber(ARGV[2])
-- 获取当前时间(这里的curr_time_arr 中第一个是 秒数,第二个是 秒数后微秒数),(注意:redis数组下标是从1开始的)
local curr_time_arr = redis.call('TIME')
-- 当前时间
local nowTime = tonumber((curr_time_arr[1]*1000)+(curr_time_arr[2]/1000))
-- 从redis中获取上次更新token时间
local curr_key_time = tonumber(redis.call('get',key_time) or 0)
-- 获取当前key对应令牌桶中的令牌数
local token_count = tonumber(redis.call('get',KEYS[1]) or -1)
-- 令牌桶数量小于0 说明令牌桶没有初始化
if token_count < 0 then
redis.call('set',key_time,nowTime)
redis.call('set',key,token_size -1)
return token_size -1
else
if nowTime - curr_key_time > update_time then -- 当前时间-上次更新token的时间 超过规定毫秒数时,重置令牌
redis.call('set',key,token_size -1)
redis.call('set',key_time,nowTime)
return token_size-1
else
if token_count>0 then
redis.call('set',key,token_count - 1)
return token_count-1 -- 返回剩余令牌数量
else
return -1
end
end
end
我本身对lua脚本不太熟悉,借鉴了这位大佬的脚本进行了修改
https://blog.csdn.net/bortherLiang/article/details/109189176
/**
* 商盾限流注解
* 功能:相同的key会限制其调用次数,速率在切面类中修改
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ShangDunLimiter {
//限流唯一标示
String key() default "";
}
/**
* 全局限流切面类
* @author hejiale
*/
@Aspect
@Slf4j
@Component
public class ShangDunLimiterAspect {
//限流时间(单位为ms) 默认未10
public static final int TIME = 1000;
//单位时间内限制的访问次数
public static final int COUNT = 15;
@Autowired
private RedisService redisService;
@Before("@annotation(com.chinadaas.platform.batch.performer.aspect.ShangDunLimiter)")
public void Limiter(JoinPoint joinPoint) {
try {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
//获取注解
ShangDunLimiter annoLimiter = signature.getMethod().getAnnotation(ShangDunLimiter.class);
if (Objects.isNull(annoLimiter)) {
return;
}
String key = annoLimiter.key();
if (StringUtils.isBlank(key)) {
return;
}
key= "limiter:"+key;
boolean limitToken = redisService.getLimitToken(key, TIME, COUNT);
// 循环尝试获取
for (int i = 0; i < 5 && !limitToken; i++) {
// 阻塞线程
Thread.sleep(500);
limitToken = redisService.getLimitToken(key, TIME, COUNT);
}
} catch (InterruptedException e) {
e.printStackTrace();
log.error("获取限流令牌时发生异常:{}",e.getMessage());
}
return;
}
}
限流阻塞线程这块我一直觉得直接sleep不太友好,各位有更好的方法,欢迎评论!