淘先锋技术网

首页 1 2 3 4 5 6 7
	业务里需要请求第三方的接口,而第三方接口设置了一定时间的限流,这导致我们如果请求过快,会失败好多数据。
	最初是使用RateLimiter来进行单机限流,这种方法非常不友好,如果第三方修改了限流或新增了集群节点 都需要对代码进行修改,且在任务少的时候没有利用上最大的速率,造成性能浪费。
	后来决定优化成分布式限流,只需要配置好总体限流速率,无论应对新增集群节点、任务少时的性能浪费都能很好的处理。
	使用redis作为限流的中间媒介,使用lua脚本来保证原子性,实现令牌桶算法;
redis.replicate_commands();
-- 需要限流的key
local key = KEYS[1]
-- 记录速率时间的key
local key_time = key..':tokenTime'



-- 当前令牌桶速率
local update_time = tonumber(ARGV[1])
-- 当前令牌桶的容量
local token_size = tonumber(ARGV[2])



-- 获取当前时间(这里的curr_time_arr 中第一个是 秒数,第二个是 秒数后微秒数),(注意:redis数组下标是从1开始的)
 local curr_time_arr = redis.call('TIME')
-- 当前时间
local nowTime = tonumber((curr_time_arr[1]*1000)+(curr_time_arr[2]/1000))


-- 从redis中获取上次更新token时间
local curr_key_time = tonumber(redis.call('get',key_time) or 0)

-- 获取当前key对应令牌桶中的令牌数
local token_count = tonumber(redis.call('get',KEYS[1]) or -1)



-- 令牌桶数量小于0 说明令牌桶没有初始化
if token_count < 0 then
	redis.call('set',key_time,nowTime)
	redis.call('set',key,token_size -1)
	return token_size -1
else 
	if nowTime - curr_key_time > update_time then -- 当前时间-上次更新token的时间 超过规定毫秒数时,重置令牌
		redis.call('set',key,token_size -1)
		redis.call('set',key_time,nowTime)
		return token_size-1
	else 
		if token_count>0 then      
			redis.call('set',key,token_count - 1)
			return token_count-1   -- 返回剩余令牌数量
		else 
			return -1
		end
	end
end

我本身对lua脚本不太熟悉,借鉴了这位大佬的脚本进行了修改

https://blog.csdn.net/bortherLiang/article/details/109189176

/**
 * 商盾限流注解
 * 功能:相同的key会限制其调用次数,速率在切面类中修改
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ShangDunLimiter {

    //限流唯一标示
    String key() default "";

}

/**
 * 全局限流切面类
 * @author hejiale
 */

@Aspect
@Slf4j
@Component
public class ShangDunLimiterAspect {

    //限流时间(单位为ms) 默认未10
    public static final int TIME = 1000;

    //单位时间内限制的访问次数
    public static final int COUNT = 15;

    @Autowired
    private RedisService redisService;


    @Before("@annotation(com.chinadaas.platform.batch.performer.aspect.ShangDunLimiter)")
    public void Limiter(JoinPoint joinPoint) {
        try {
            MethodSignature signature = (MethodSignature) joinPoint.getSignature();
            //获取注解
            ShangDunLimiter annoLimiter = signature.getMethod().getAnnotation(ShangDunLimiter.class);
            if (Objects.isNull(annoLimiter)) {
                return;
            }

            String key = annoLimiter.key();
            if (StringUtils.isBlank(key)) {
                return;
            }
            key= "limiter:"+key;
            boolean limitToken = redisService.getLimitToken(key, TIME, COUNT);

            // 循环尝试获取
            for (int i = 0; i < 5 && !limitToken; i++) {
                // 阻塞线程
                Thread.sleep(500);
                limitToken = redisService.getLimitToken(key, TIME, COUNT);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            log.error("获取限流令牌时发生异常:{}",e.getMessage());
        }
        return;
    }
}

限流阻塞线程这块我一直觉得直接sleep不太友好,各位有更好的方法,欢迎评论!