前言
上篇文章10分钟从源码级别搞懂AQS(AbstractQueuedSynchronizer)说到JUC并发包中的同步组件大多使用AQS来实现
本篇文章通过AQS自己来实现一个同步组件,并从源码级别聊聊JUC并发包中的常用同步组件
本篇文章需要的前置知识就是AQS,如果不了解AQS的同学可以看上一篇文章哈~
阅读本篇文章大概需要13分钟
自定义同步组件
为了更容易理解其他同步组件,我们先来使用AQS自己来实现一个常用的可重入锁
AQS模板方法流程是固定的,我们主要只需要来实现它的尝试获取同步状态和尝试释放同步状态方法即可
首先我们先规定要实现的可重入锁是独占式的
规定同步状态一开始为0,当有线程获取锁成功同步状态就为1,当这个线程重入时就累加同步状态
规定释放同步状态时每次扣减1个同步状态,只有当同步状态扣减到0时,才是真正的释放独占锁
我们使用一个内部类Sync 来继承AQS 并重写tryAcquire
尝试获取同步状态、tryRelease
尝试释放同步状态、isHeldExclusively
判断当前线程是否持有同步状态(等待、通知时会用到该方法)
static class Sync extends AbstractQueuedSynchronizer {
/**
* 判断当前线程是否持有同步状态
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
在获取同步状态中
- 先判断是否有同步状态(即同步状态是否为0),如果有同步状态就用CAS去获取(0->1),成功就设置当前线程为获取同步状态的线程
- 如果没有同步状态(即同步状态不为0) ,就查看获取同步状态的线程是否为当前线程,如果是当前线程则说明此次是重入,累加重入次数
- 其他情况说明未获取到同步状态,返回false 后续走AQS流程(构建节点加入AQS)
/**
* 尝试获取同步状态
*
* @param arg 获取同步状态的数量
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
//1.获取同步状态
int state = getState();
//2.如果有同步状态则CAS替换 0->1
if (state == 0) {
if (compareAndSetState(state, 1)) {
//替换成功 说明获取到同步状态 设置当前获取同步状态线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
} else if (getExclusiveOwnerThread() == Thread.currentThread()) {
//3.没有同步状态 查看获取同步资源的线程是否为当前线程 可重入 累加重入次数
setState(state + arg);
return true;
}
//其他情况就是没获取到同步状态
return false;
}
在释放同步状态中
只有当同步状态要改成0时才是真正释放,否则情况情况下就是重入扣减次数
/**
* 尝试释放同步状态
*
* @param arg 释放同步状态的数量
* @return
*/
@Override
protected boolean tryRelease(int arg) {
//目标状态
int targetState = getState() - arg;
//真正释放锁
if (targetState == 0) {
setExclusiveOwnerThread(null);
setState(targetState);
return true;
}
//其他情况 扣减状态
setState(targetState);
return false;
}
使用内部类实现AQS的方法后,我们在自定义同步组件类中去实现Lock接口,并用内部类实现AQS的方法去实现Lock接口的方法
将要获取、释放的同步状态都设置成1,对应响应中断、超时的方法就用AQS中对应的方法即可
public class MySynchronizedComponent implements Lock {
public MySynchronizedComponent() {
sync = new Sync();
}
private Sync sync;
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.new ConditionObject();
}
}
实际上我们只需要去实现尝试获取、释放同步状态方法就能够完成自己的同步组件,这就是使用AQS带来的好处
代码案例可以去git仓库获取,放在本文最后
ReentrantLock
ReentrantLock是并发包中提供的可重入锁,它除了能够实现synchronized的功能外还可以响应中断、超时、实现公平锁等,其底层也是通过AQS来实现的
ReentrantLock的功能与synchronized类似,可重入的独占锁,用于保证并发场景下同步操作
使用时需要显示加锁、解锁,常用格式如下:
reentrantLock.lock();
try{
//....
}finally {
reentrantLock.unlock();
}
finally中最先去解锁,并且加锁要放在try块的最外层,并保证加锁和try块之间不会抛出异常
加锁不放在try中是因为加锁实现未知可能抛出不受检查unchecked的异常,当加锁抛出异常时,后续finally块解锁也会抛出非法监视器的异常从而导致覆盖
加锁和try块之间如果抛出异常,那么就无法执行解锁了
ReentrantLock除了提供基本的同步功能,还提供响应中断、超时的API,同学们可以私下去查看
熟悉ReentrantLock实现的同学,可能看上面自定义同步组件的代码很熟悉,其实就是参考ReentrantLock非公平锁写的
ReentrantLock中使用内部类Sync来继承AQS,同时内部类NonfairSync和FairSync来继承Sync去实现非公平、