并发编程之 AQS(AbstractQueuedSynchronizer)框架
什么是 AQS?
AQS 的全称为(AbstractQueuedSynchronizer),这个类在 java.util.concurrent.locks 包下面。AQS 是一个用来构建锁和同步器的框架,比如 ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。
AQS 是怎么实现的?
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,将暂时获取不到锁的线程加入到队列中。
AQS 的实现有两种模式,独占模式(独占锁)和共享模式(共享锁)。独占模式是只有一个线程能够访问资源。而共享模式可以允许多个线程访问资源(例如:读写锁)。每次只要原子更新 state 的值从 0 变为 1 成功了就获取了锁,可重入是通过不断把 state 原子更新加 1 实现的。
- 锁状态变量 state: AQS 用一个变量(volatile int state) 属性来表示锁的状态,子类通过 CAS 原子操作去维护这个状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。
- AQS 队列: AQS 中维护了一个队列,获取锁失败(非 tryLock())的线程都将进入这个队列中排队,等待锁释放后唤醒下一个排队的线程(互斥锁模式下)。
- Condition 队列: AQS 中还有另一个非常重要的内部类 ConditionObject,它实现了 Condition 接口,主要用于实现条件锁。如 ReentrantLock 中的 Condition 和 signal() 方法。
- 模板设计模式: AQS 这个抽象类里面定义了一系列的模板方法,如果我们自己要写一个同步器时,只需要子类实现特定的几个方法,就可以完成一个同步器,示例如下。
自定义实现 AQS 框架(自己写一个类似于 ReentrantLock 的锁)
继承 AQS 实现其主要方法
java
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
public class CustomSync extends AbstractQueuedSynchronizer {
@Override
public boolean tryAcquire(long acquires) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(long arg) {
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
实现 Lock 接口实现加锁解锁
java
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;
@Slf4j(topic = "enjoy")
public class LockTest10 implements Lock {
CustomSync customSync = new CustomSync();
@Override
public void lock() {
customSync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
customSync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return customSync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return customSync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
customSync.release(1);
}
@Override
public Condition newCondition() {
return customSync.newCondition();
}
public static void main(String[] args) throws InterruptedException {
LockTest10 l = new LockTest10();
new Thread(()->{
l.lock();
log.debug("xxx");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
l.unlock();
},"t1").start();
TimeUnit.SECONDS.sleep(1);
l.lock();
log.debug("main");
l.unlock();
}
}