Redission分布式锁加锁原理源码解析
- 2020 年 1 月 13 日
- 筆記
追踪一下redission加锁的实现源码,并详细介绍核心加锁代码lua脚本的执行原理和过程。
1.获取锁
这里是我们自己实现,调用redission的方法,获取锁,然后加锁。lock.lock(expireTime, timeUnit)
是关键,我们追进去。
/** * 获取锁,如果没有主动调用unlock解锁,expireTime后会自动释放 * @param lockKey * @param expireTime 如果没有调用unlock解锁,expireTime 后自动释放 * @param timeUnit 时间单位 * @return */ public RLock lock(String lockKey,Integer expireTime,TimeUnit timeUnit){ RLock lock = redisson.getLock(lockKey); lock.lock(expireTime, timeUnit); logger.info("【Redisson lock】success to acquire lock for [ "+lockKey+" ],expire time:"+expireTime+timeUnit); return lock; }
进入 lock.lock(expireTime, timeUnit);
/** * Acquires the lock. * * <p>If the lock is not available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until the * lock has been acquired. * * If the lock is acquired, it is held until <code>unlock</code> is invoked, * or until leaseTime milliseconds have passed * since the lock was granted - whichever comes first. * * @param leaseTime the maximum time to hold the lock after granting it, * before automatically releasing it if it hasn't already been released by invoking <code>unlock</code>. * If leaseTime is -1, hold the lock until explicitly unlocked. * @param unit the time unit of the {@code leaseTime} argument * */ void lock(long leaseTime, TimeUnit unit);
进入实现方法
@Override public void lock(long leaseTime, TimeUnit unit) { try { lockInterruptibly(leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
进入 lockInterruptibly(leaseTime, unit);
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); }
进入Long ttl = tryAcquire(leaseTime, unit, threadId);
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
进入
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
进入核心tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG)
这里是加锁的核心方法:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //时间转化为毫秒值 internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
这里最终的是执行了一段具有原子性的lua脚本,由CommandAsynExecutor执行; 这个锁最终持久化到redis时,使用的是hash类型的key field value; 这里注意下最后一行几个参数的对应关系: getName()), internalLockLeaseTime, getLockName(threadId) 分别是key[1],ARGV[1],ARGV[2];
- getName()是逻辑锁名称,例如:我们发起锁的一方传递的锁名称 productId1672822;
- internalLockLeaseTime是毫秒单位的锁过期时间;
- getLockName则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:UUID+":"threadId
Lua脚本中的执行分为以下三步:
- 1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称KEYS[1],线程级别的锁名称[ARGV[2],value=1,设置到redis。并设置逻辑锁名称的过期时间ARGV[2],返回;
- 2:如果检查到存在KEYS[1],[ARGV[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间
- 3:key不存,直接返回key的剩余过期时间(-2)