淘先锋技术网

首页 1 2 3 4 5 6 7
    while (true) {
        boolean flag = this.getLock(key);
        if (flag) {
            insert();
        }
    }
}

复制代码


给这个方法添加一个`@Transaction`注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。

比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。

一旦你的`key`长时间获取不到锁,获取锁`等待的时间`远超过数据库事务`超时时间`,程序就会报异常。

一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。

@Autowired
DataSourceTransactionManager dataSourceTransactionManager;

@Transaction
public void lock() {
    //手动开启事务
    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
    try {
        while (true) {
            boolean flag = this.getLock(key);
            if (flag) {
                insert();
                //手动提交事务
                dataSourceTransactionManager.commit(transactionStatus);
            }
        }
    } catch (Exception e) {
        //手动回滚事务
        dataSourceTransactionManager.rollback(transactionStatus);
    }
}

复制代码


#### 四、锁过期了,业务还没执行完

这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。

同样是`redis`分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,**把`redis`锁的过期时间再弄长点不就解决了吗?**

那还是有问题,我们可以在加锁的时候,手动调长`redis`锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。

**要是`redis`锁的过期时间能够自动续期就好了。**

为了解决这个问题我们使用`redis`客户端`redisson`,`redisson`很好的解决了`redis`在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对`Redis`的关注,将更多精力用在处理业务逻辑上。

`redisson`对分布式锁做了很好封装,只需调用`API`即可。

RLock lock = redissonClient.getLock(“stockLock”);
复制代码


`redisson`在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对`过期时间`进行续期。默认过期时间30秒。这个机制也被叫做:“`看门狗`”,这名字。。。

**举例子**:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。

通过分析下边`redisson`的源码实现可以发现,不管是`加锁`、`解锁`、`续约`都是客户端把一些复杂的业务逻辑,通过封装在`Lua`脚本中发送给`redis`,保证这段复杂业务逻辑执行的`原子性`。

@Slf4j
@Service
public class RedisDistributionLockPlus {

/**
 * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
 */
private static final long DEFAULT_LOCK_TIMEOUT = 30;

private static final long TIME_SECONDS_FIVE = 5 ;

/**
 * 每个key的过期时间 {@link LockContent}
 */
private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);

/**
 * redis执行成功的返回
 */
private static final Long EXEC_SUCCESS = 1L;

/**
 * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
 */
private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
        "if redis.call('exists', KEYS[1]) == 0 then " +
           "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
           "for k, v in pairs(t) do " +
             "if v == 'OK' then return tonumber(ARGV[2]) end " +
           "end " +
        "return 0 end";

/**
 * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
 */
private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
        "local ctime = tonumber(ARGV[2]) " +
        "local biz_timeout = tonumber(ARGV[3]) " +
        "if ctime > 0 then  " +
           "if redis.call('exists', KEYS[2]) == 1 then " +
               "local avg_time = redis.call('get', KEYS[2]) " +
               "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
               "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
               "else redis.call('del', KEYS[2]) end " +
           "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
        "end " +
        "return redis.call('del', KEYS[1]) " +
        "else return 0 end";
/**
 * 续约lua脚本
 */
private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";

private final StringRedisTemplate redisTemplate;

public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
    this.redisTemplate = redisTemplate;
    ScheduleTask task = new ScheduleTask(this, lockContentMap);
    // 启动定时任务
    ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
}

/**
 * 加锁
 * 取到锁加锁,取不到锁一直等待知道获得锁
 *
 * @param lockKey
 * @param requestId 全局唯一
 * @param expire   锁过期时间, 单位秒
 * @return
 */
public boolean lock(String lockKey, String requestId, long expire) {
    log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
    for (; ; ) {
        // 判断是否已经有线程持有锁,减少redis的压力
        LockContent lockContentOld = lockContentMap.get(lockKey);
        boolean unLocked = null == lockContentOld;
        // 如果没有被锁,就获取锁
        if (unLocked) {
            long startTime = System.currentTimeMillis();
            // 计算超时时间
            long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
            String lockKeyRenew = lockKey + "_renew";

            RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
            List<String> keys = new ArrayList<>();
            keys.add(lockKey);
            keys.add(lockKeyRenew);
            Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
            if (null != lockExpire && lockExpire > 0) {
                // 将锁放入map
                LockContent lockContent = new LockContent();
                lockContent.setStartTime(startTime);
                lockContent.setLockExpire(lockExpire);
                lockContent.setExpireTime(startTime + lockExpire * 1000);
                lockContent.setRequestId(requestId);
                lockContent.setThread(Thread.currentThread());
                lockContent.setBizExpire(bizExpire);
                lockContent.setLockCount(1);
                lockContentMap.put(lockKey, lockContent);
                log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
                return true;
            }
        }
        // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
        if (Thread.currentThread() == lockContentOld.getThread()
                  && requestId.equals(lockContentOld.getRequestId())){
            // 计数 +1
            lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
            return true;
        }

        // 如果被锁或获取锁失败,则等待100毫秒
        try {
            TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
            // 这里用lombok 有问题
            log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
            return false;
        }
    }
}

/**
 * 解锁
 *
 * @param lockKey
 * @param lockValue
 */
public boolean unlock(String lockKey, String lockValue) {
    String lockKeyRenew = lockKey + "_renew";
    LockContent lockContent = lockContentMap.get(lockKey);

    long consumeTime;
    if (null == lockContent) {
        consumeTime = 0L;
    } else if (lockValue.equals(lockContent.getRequestId())) {
        int lockCount = lockContent.getLockCount();
        // 每次释放锁, 计数 -1,减到0时删除redis上的key
        if (--lockCount > 0) {
            lockContent.setLockCount(lockCount);
            return false;
        }
        consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
    } else {
        log.info("释放锁失败,不是自己的锁。");
        return false;
    }

    // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
    lockContentMap.remove(lockKey);

    RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
    List<String> keys = new ArrayList<>();
    keys.add(lockKey);
    keys.add(lockKeyRenew);

    Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
            Long.toString(lockContent.getBizExpire()));
    return EXEC_SUCCESS.equals(result);

}

/**
 * 续约
 *
 * @param lockKey
 * @param lockContent
 * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
 */
public boolean renew(String lockKey, LockContent lockContent) {

    // 检测执行业务线程的状态
    Thread.State state = lockContent.getThread().getState();
    if (Thread.State.TERMINATED == state) {
        log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
        return false;
    }

    String requestId = lockContent.getRequestId();
    long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;

    RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
    List<String> keys = new ArrayList<>();
    keys.add(lockKey);

    Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
    log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
    return EXEC_SUCCESS.equals(result);
}

static class ScheduleExecutor {

    public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
        long delay = unit.toMillis(initialDelay);
        long period_ = unit.toMillis(period);
        // 定时执行
        new Timer("Lock-Renew-Task").schedule(task, delay, period_);
    }
}

static class ScheduleTask extends TimerTask {

    private final RedisDistributionLockPlus redisDistributionLock;
    private final Map<String, LockContent> lockContentMap;

    public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
        this.redisDistributionLock = redisDistributionLock;
        this.lockContentMap = lockContentMap;
    }

    @Override
    public void run() {
        if (lockContentMap.isEmpty()) {
            return;
        }
        Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
        for (Map.Entry<String, LockContent> entry : entries) {
            String lockKey = entry.getKey();
            LockContent lockContent = entry.getValue();
            long expireTime = lockContent.getExpireTime();
            // 减少线程池中任务数量
            if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                //线程池异步续约
                ThreadPool.submit(() -> {
                    boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                    if (renew) {
                        long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                        lockContent.setExpireTime(expireTimeNew);
                    } else {
                        // 续约失败,说明已经执行完 OR redis 出现问题
                        lockContentMap.remove(lockKey);
                    }