分布式-锁-初见

介绍几种常见的分布式锁写法

多线程中为了防止多个线程同时执行同一段代码,我们可以用 synchronized 关键字或 JUC 里面的 ReentrantLock 类来控制,

但是目前几乎任何一个系统都是部署多台机器的,单机部署的应用很少,synchronized 和 ReentrantLock 发挥不出任何作用,

此时就需要一把全局的锁,来代替 JAVA 中的 synchronized 和 ReentrantLock。

分布式锁的实现方式流行的主要有三种

  1. 分别是基于缓存 Redis 的实现方式
  2. 基于 zk 临时顺序节点的实现
  3. 基于数据库行锁的实现。

官网

目录 · redisson/redisson Wiki · GitHub

Jedis

使用 Redis 做分布式锁的思路是:

在 redis 中设置一个值表示加了锁,然后释放锁的时候就把这个 key 删除。

    /**
     * 尝试获取分布式锁
     *
     * @param jedis      Redis客户端
     * @param lockKey    锁
     * @param requestId  请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
        // set支持多个参数 NX(not exist) XX(exist) EX(seconds) PX(million seconds)
        String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }


    /**
     * 释放分布式锁
     *
     * @param jedis     Redis客户端
     * @param lockKey   锁
     * @param requestId 请求标识,当前工作线程线程的名称
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

问题

  • SET key value ,而没有使用 SETNX+EXPIRE 的命令,原因是 SETNX+EXPIRE 是两条命令无法保证原子性,而 SET 是原子操作。

  • 那这里为什么要设置超时时间呢?

    原因是当一个客户端获得了锁在执行任务的过程中挂掉了,来不及显式地释放锁,这块资源将会永远被锁住,

    这将会导致死锁,所以必须设置一个超时时间

  • A 加的锁 B 不能去 del 掉,谁加的锁就谁去解,我们一般把 value 设为当前线程的 Id,

    Thread.currentThread().getId(),然后在删的时候判断下是不是当前线程。

  • 验证和释放锁是两个独立操作,不是原子性,

    使用 Lua 脚本,即 if redis.call(‘get’, KEYS[1]) == ARGV[1] then returnredis.call(‘del’, KEYS[1]) else return 0 end,它能给我们保证原子性。

当 redis.call() 在执行命令的过程中发生错误时,脚本会停止执行,并返回一个脚本错误,

错误的输出信息会说明错误造成的原因:

Redisson

Redisson 是 Java 的 Redis 客户端之一,提供了一些 API 方便操作 Redis。

Redisson 跟 Jedis 定位不同,它不是一个单纯的 Redis 客户端,

而是基于 Redis 实现的分布式的服务,

锁只是它的冰山一角,并且它对主从,哨兵,集群等模式都支持。

public class LockTest{
	private static Redis sonClient redissonClient;
    
	static {
		Config config = new Config();
		config. useSingleServer().setAddress("redis://127.0.0.1:6379");
		redissonClient = Redisson.create ( config);
    }

    public static void main(String[] args) throws InterruptedException {
		RLock rLock = redissonClient . getLock(”zwt" );
		//最多等待100秒、 上锁10s以后自动解锁
		if (rLock.tryLock(100, 10, TimeUnit. SECONDS)) {
			System . out . println("获取锁成功" );
		}
		//Thread. sleep(20000);
		rLock. unlock( );
		redissonClient . shutdown();
	}
}


这里获取锁有很多种的方式,有公平锁有读写锁,我们使用的是 redissonClient.getLock, 这是一个可重入锁。

在加锁的时候,写入了一个 HASH 类型的值,key 是锁名称 zwt,field 是线程的名称,而 value 是 1(即表示锁的重入次数)。

点进 tryLock() 方法的 tryAcquire() 方法,再到->tryAcquireAsync() 再到->tryLockInnerAsync(),

终于见到庐山真面目了,原来它最终也是通过 Lua 脚本来实现的。

<T> RFuture<T> tryLockInnerAsync (long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
	internalLockLeaseTime = unit. toMillis( leaseTime);

    return commandExecutor . evalWriteAsync getName(), LongCodec . INSTANCE, command,
				"if (redis.call( 'exists', KEYS[1]) == 0) then" +
					"redis.call('hset', KEYS[1], ARGV[2], 1);" + 
					"redis. call('pexpire', KEYS[1], ARGV[1]);" +
					"return nil;" +
				"end; " + 
				"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
					"redis. call( 'hincrby', KEYS[1], ARGV[2], 1);" +
					"redis. call( 'pexpire', KEYS[1], ARGV[1]);" +
					"return nil;" +
				"end;" +
				"return redis.call('pttl', KEYS[1]);",
				Collections.<~>singLetonL ist(getName()), internalLockLeaseTime, getLockName (threadId));
}

Lua脚本拉出来分析一下:

// KEYS[1] 锁名称 updateAccount
// ARGV[1] key 过期时间 10000ms
// ARGV[2] 线程名称
// 锁名称不存在
if (redis.call('exists', KEYS[1]) == 0) then
// 创建一个 hash,key=锁名称,field=线程名,value=1
redis.call('hset', KEYS[1], ARGV[2], 1);
// 设置 hash 的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 锁名称存在,判断是否当前线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
// 如果是,value+1,代表重入次数+1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
// 重新获得锁,需要重新设置 Key 的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 锁存在,但是不是当前线程持有,返回过期时间(毫秒)
return redis.call('pttl', KEYS[1]);

unlock() 中的 unlockInnerAsync() 释放锁,同样也是通过 Lua 脚本实现。

// KEYS[1] 锁的名称 updateAccount
// KEYS[2] 频道名称 redisson_lock__channel:{updateAccount}
// ARGV[1] 释放锁的消息 0
// ARGV[2] 锁释放时间 10000
// ARGV[3] 线程名称
// 锁不存在(过期或者已经释放了)
if (redis.call('exists', KEYS[1]) == 0) then
// 发布锁已经释放的消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
// 锁存在,但是不是当前线程加的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;

// 锁存在,是当前线程加的锁
// 重入次数-1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
// -1 后大于 0,说明这个线程持有这把锁还有其他的任务需要执行
if (counter > 0) then
// 重新设置锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
// -1 之后等于 0,现在可以删除锁了
redis.call('del', KEYS[1]);
// 删除之后发布释放锁的消息
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;

// 其他情况返回 nil
return nil;

问题

  • 业务没执行完,锁到期了怎么办,这个由watchdog来保障。
  • 集群模式下,如果对多个master加锁,导致重复加锁怎么办,Redission会自动选择同一个 master。
  • 业务没执行完,Redis master挂掉了怎么办,没关系,Redis slave还有这个数据。

RedLock

名字由来

RedLock 的中文是直译过来的,就叫红锁。

红锁并非是一个工具,而是 Redis 官方提出的一种分布式锁的算法

我们知道如果采用单机部署模式,会存在单点问题,只要 redis 故障了,加锁就不行了。

如果采用 master-slave 模式,加锁的时候只对一个节点加锁,

即便通过 sentinel 做了高可用,但是如果 master 节点故障了,发生主从切换,

此时就会有可能出现锁丢失的问题。

基于以上的考虑,其实 redis 的作者 Antirez 也考虑到这个问题,他提出了一个 RedLock 的算法。

算法实现

通过以下步骤获取一把锁:

1.获取当前时间戳,单位是毫秒

2.轮流尝试在每个 master 节点上创建锁,过期时间设置较短,一般就几十毫秒

3.尝试在大多数节点上建立一个锁,比如5个节点就要求是3个节点(n / 2 +1)

4.客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了

5.要是锁建立失败了,那么就依次删除这个锁

6.只要别人建立了一把分布式锁,你就得不断轮询去尝试获取锁

RLock lock1 = redissonInstance1. getLock("lock1");
RLock lock2 = redissonInstance2. getLock("lock2");
RLock 1ock3 = redissonInstance3. getLock("lock3");
RedissonRedLock lock = new RedissonRedlock(lock1, lock2, lock3);
//同时加锁。lock1 lock2 lock3
//红锁在大部分节点上加锁成功就算成功。
lock.lock();

//…………

lock.unlock();

Zookeeper写法(Curator)

获取锁

Client1 得到了锁,Client2 监听了 Lock1,Client3 监听了 Lock2。这恰恰形成了一个等待队列

释放锁

1.任务完成,客户端显示释放

当任务完成时,Client1 会显示调用删除节点 Lock1 的指令。

2.任务执行过程中,客户端崩溃

获得锁的 Client1 在任务执行过程中,如果 Duang 的一声崩溃,则会断开与 Zookeeper 服务端的链接。

根据临时节点的特性,相关联的节点 Lock1 会随之自动删除。

Client2 一直监听着 Lock1 的存在状态,当 Lock1 节点被删除,Client2 会立刻收到通知。

这时候 Client2 会再次查询 ParentLock 下面的所有节点,确认自己创建的节点 Lock2 是不是目前最小的节点。

如果是最小,则 Client2 顺理成章获得了锁。

Curator

在 Apache 的开源框架 Apache Curator 中,包含了对 Zookeeper 分布式锁的实现。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.3.0</version>
</dependency>

Curator的几种锁的实现

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
public class ZkDistributedLock implements DistributedLock {

    private final CuratorF ramework client;

    public ZkDistributedLock ( CuratorFramework client) { this.client = client; }

    @override
    public void acquire(String key) throws Exception {
		InterProcessMutex lock = new InterProcessMutex(client, key);
	lock.acquire();
	}

    @Override
    public boolean acquire(String key, long maxwait, TimeUnit waitunit) throws Exception{
		InterProcessMutex lock = new InterProcessMutex(client, key);
		return lock.acquire (maxWait, waitUnit);
	}

    @Override
	public void release(String key) throws Exception {
		InterProcessMutex lock = new InterProcessMutex(client, key);
		lock. release();
	}
}

总结

zookeeper 天生设计定位就是分布式协调,强一致性,锁很健壮。

如果获取不到锁,只需要添加一个监听器就可以了,不用一直轮询,性能消耗较小。

缺点: 在高请求高并发下,系统疯狂的加锁释放锁,最后 zk 承受不住这么大的压力可能会存在宕机的风险。

zk 锁性能比 redis 低的原因:zk 中的角色分为 leader,flower,

每次写请求只能请求 leader,leader 会把写请求广播到所有 flower,

如果 flower 都成功才会提交给 leader,在加锁的时候是一个写请求,

当写请求很多时,zk 会有很大的压力,最后导致服务器响应很慢。

redis 锁实现简单,理解逻辑简单,性能好,可以支撑高并发的获取、释放锁操作。

缺点: Redis 容易单点故障,集群部署,并不是强一致性的,锁的不够健壮;

​ key 的过期时间设置多少不明确,只能根据实际情况调整;

​ 需要自己不断去尝试获取锁,比较消耗性能。

最后不管 redis 还是 zookeeper,它们都应满足分布式锁的特性:

  • 具备可重入特性(已经获得锁的线程在执行的过程中不需要再次获得锁)
  • 异常或者超时自动删除,避免死锁
  • 互斥性,只有一个客户端能够持有锁
  • 分布式环境下高性能、高可用、容错机制

各有千秋,具体业务场景具体使用。