早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在JDK1.5中新增了 AQS,提供了这种通用的同步器机制。
是什么抽象的队列同步器
AbstractOwnableSynchronizer AbstractQueuedLongSynchronizer AbstractQueuedSynchronizer
通常地:AbstractQueuedSynchronizer简称为AQS
是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量state表示持有锁的状态
CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO
特点:AQS全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态 setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet 子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
AQS为什么是JUC内容中最重要的基石和AQS有关的
ReentrantLock
CountDownLatch
ReentrantReadWriteLock
Semaphore
锁,面向锁的使用者
定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。
同步器,面向锁的实现者
Java并发大神DougLee,提出统一规范并简化了锁的实现, 屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。
AQS能干嘛加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要队列(AQS管理队列)
抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。
有阻塞就需要排队,实现排队必然需要队列
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的 FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成 一个Node节点来实现锁的分配,通过CAS完成对State值的修改。
1) state 设计state 使用 volatile 配合 cas 保证其修改时的原子性 state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想
2) 阻塞恢复设计早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题 park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细 park 线程还可以通过 interrupt 打断
3) 队列设计–CLH队列的变体实现–双向队列使用了 FIFO 先入先出队列,并不支持优先级队列 设计时借鉴了 CLH 队列,它是一种单向无锁队列
AQS内部体系架构AQS中有一个Node静态内部类 用来保存等待获取锁的线程
head 、tail表示同步器头、尾节点
state 同步状态
AQS同步队列的基本结构CLH:Craig、Landin and Hagersten 队列,是个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)
AQS的int变量StateAQS的同步状态 State成员变量 持有锁的状态
/**
* The synchronization state.
*/
private volatile int state;
可以看做银行办理业务的受理窗口状态
0就是没人,空闲状态可以办理
大于等于1,就是有人占用窗口,需要排队等着
AQS的CLH队列CLH队列(三个大牛的名字组成),为一个双向队列
可以看做银行候客区的等待顾客 排队等待
CLH 好处: 无锁,使用自旋
快速,无阻塞
AQS 在一些方面改进了 CLH
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列中还没有元素 tail 为 null
if (t == null) { // Must initialize
// 将 head 从 null -> dummy 哨兵(傀儡)节点
if (compareAndSetHead(new Node()))
// 将 node 的 prev 设置为原来的 tail
tail = head;
} else {
node.prev = t;
// 将 tail 从原来的 tail 设置为 node
if (compareAndSetTail(t, node)) {
// 原来 tail 的 next 设置为 nod
t.next = node;
return t;
}
}
}
}
内部类Node(Node类在AQS类内部)
Node类的讲解
内部结构
属性说明
Node的等待状态waitState成员变量
队列中每个排队的个体就是一个 Node
waitState可以看做等候区其它顾客(其它线程)的等待状态
AQS实现一个不可重入锁自定义同步器
package com.dongguo.aqs;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
/**
* @author Dongguo
* @date 2021/9/13 0013-14:26
* @description: 同步器类 独占锁
*/
public class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
//初始state值为0
if (compareAndSetState(0, 1)) {
//加锁成功,并且设置锁的持有者Owner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 是否持有独占锁
*
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
自定义锁 有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁
package com.dongguo.aqs;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author Dongguo
* @date 2021/9/13 0013-14:27
* @description: 自定义锁(不可重入锁)
*/
public class MyLock implements Lock {
//独占锁
private MySync sync = new MySync();
/**
* 加锁,失败进入等待
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 加锁,可打断
* @throws InterruptedException
*/
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 尝试一次加锁
* @return
*/
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 尝试加锁带超时时间
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
/**
* 解锁
*/
@Override
public void unlock() {
sync.release(1);
}
/**
* 创建条件变量
* @return
*/
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
测试
package com.dongguo.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-14:44
* @description:
*/
@Slf4j(topic = "d.Client")
public class Client {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
try {
//t1睡眠1s
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t2").start();
}
}
运行结果
14:48:04 [t1] d.Client - locking...
14:48:05 [t1] d.Client - unlocking...
14:48:05 [t2] d.Client - locking...
14:48:05 [t2] d.Client - unlocking...
测试不可重入
package com.dongguo.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-14:44
* @description:
*/
@Slf4j(topic = "d.Client")
public class Client {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
log.debug("locking...");
lock.lock();
log.debug("locking...");
try {
try {
//t1睡眠1s
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
log.debug("unlocking...");
lock.unlock();
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();
}
}
只会打印一次 locking 进入阻塞,不可重入