前言
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。Redisson有一样功能是可重入的分布式锁。本文来讨论一下这个功能的特点以及源码分析。
前置知识
在讲Redisson,咱们先来聊聊分布式锁的特点以及Redis的发布/订阅机制,磨刀不误砍柴工。
分布式锁的思考
首先思考下,如果我们自己去实现一个分布式锁,这个锁需要具备哪些功能?
- 互斥(这是一个锁最基本的功能)
- 锁失效机制(也就是可以设置锁定时长,防止死锁)
- 高性能、高可用
- 阻塞、非阻塞
- 可重入、公平锁
- 。。。
可见,实现一个分布式锁,需要考虑的东西有很多。那么,如果用Redis来实现分布式锁呢?如果只需要具备上面说的1、2点功能,要怎么写?(ps:我就不写了,自己想去)
Redis订阅/发布机制
Redisson中用到了Redis的订阅/发布机制,下面简单介绍下。
简单来说就是如果client2 、 client5 和 client1 订阅了 channel1,当有消息发布到 channel1 的时候,client2 、 client5 和 client1 都会收到这个消息。
图片来自 菜鸟教程-Redis发布订阅
Redisson
源码版本:3.17.7
下面以Redisson官方的可重入同步锁例子为入口,解读下源码。
RLock lock = redisson.getLock("anyLock");
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
加锁
我用时序图来表示加锁和订阅的过程。时序图中括号后面的c1、c2代表client1,client2
当线程2获取了锁但还没释放锁时,如果线程1去获取锁,会阻塞等待,直到线程2解锁,通过Redis的发布订阅机制唤醒线程1,再次去获取锁。
加锁方法是 lock.tryLock(100, 10, TimeUnit.SECONDS),对应着就是RedissonLock#tryLock
/**
* 获取锁
* @param waitTime 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* @param leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
* @param unit 时间单位
* @return 获取锁成功返回true,失败返回false
*/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();// 当前时间
long threadId = Thread.currentThread().getId();// 当前线程id
// 尝试加锁,加锁成功返回null,失败返回锁的剩余超时时间
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 获取锁成功
if (ttl == null) {
return true;
}
// time小于0代表此时已经超过获取锁的等待时间,直接返回false
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 没看懂这个方法,里面里面空运行,有知道的大神还请不吝赐教
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.whenComplete((res, ex) -> {
// 出现异常,取消订阅
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 判断是否超时(超过了waitTime)
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
while (true) {
// 再次获取锁,成功则返回
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 阻塞等待信号量唤醒或者超时,接收到订阅时唤醒
// 使用的是Semaphore#tryAcquire()
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 因为是同步操作,所以无论加锁成