- 初步认识AbstractQueuedSynchonizer
- 什么是AQS
- AQS的接口和示例
- 官网例子
- AQS实现分析
- 核心思想
- sync queue 同步队列
- 同步队列的数据结构
- 独占式同步状态获取与释放
- 源码部分
- 共享式同步状态获取与释放
- 独占式与共享式的主要区别
- condition queue 等待队列
- 等待与通知模式示例
- 同步队列与条件队列的关系
源码采用JDK8
什么是AQSAQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int的变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
它的主要使用方式是继承,子类通过继承AQS并实现他的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,AQS为此提供了三个方法(getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
)来操进行作操作,它们可以保证修改是安全的。使用自定义同步组件的静态内部类来实现子类,同步器自身没有实现任何同步接口,它仅仅定义了一些关于同步状态获取和释放的方法来使用。AQS既支持抢占式的获取同步状态,也支持共享式的获取同步状态。
AQS的设计基于模板方法模式的,这就需要我们继承AQS并重写指定的方法,随后AQS组合在自定义的组件实现中,并调用同步器提供的模板方法,而这些模板方法会调用我们重写的方法。子类需要重写的三个方法:
- getState() :获取当前同步状态
- setState(int newState):设置当前同步状态
- compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
AQS可以被重写的方法
方法名称描述protected boolean tryAcquire(int arg)独占式获取同步状态,实现的时候需要查询当前状态并判断是否符合预期,再使用CAS设置同步状态protected boolean tryRelease(int arg)独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态protected int tryAcquireShared(int arg)共享式获取同步状态,返回大于等于0的值,表示获取成功,否知失败protected boolean tryReleaseShared(int arg)共享式释放同步状态protected boolean isHeldExclusively()当前AQS是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占AQS提供的模板方法基本分为3类
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程情况
在JDK1.8源码里面有提供了一个独占锁的实现例子Mutex自定义同步组件,它在同一时刻只能有一个线程能获取到锁。在Mutex中定义一个静态内部类Sync。
Sync继承了AQS,并使用getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
重写了isHeldExclusively()
、tryAcquire(int acquires)
、tryRelease(int releases)
方法。
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock, java.io.Serializable {
// 继承AQS的子类,静态内部类形式
private static class Sync extends AbstractQueuedSynchronizer {
// 是否是独占的
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当同步状态是0时,获取锁
@Override
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁将 同步状态改为0
@Override
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供一个ConditionObject
Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// sync完成所有艰苦的工作。 我们是需要将操作代理到sync上。
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
编写测试MutexTest类,会使用AQS的模板方法进行操作。通过lock()
进行获取锁(内部调用了Sync的tryAcquire),unlock()
(Sync的tryRelease)解锁。
public class MutexTest {
public static void main(String[] args) {
Mutex lock = new Mutex();
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i {
lock.lock();
try {
System.out.println(Thread.currentThread() + " get lock");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread() + " release lock");
lock.unlock();
}
});
}
executorService.shutdown();
}
}
通过结果,可以看出一次只有一个线程获取到锁。
Thread[pool-1-thread-1,5,main] get lock. 17:21:23
Thread[pool-1-thread-1,5,main] release lock. 17:21:24
Thread[pool-1-thread-2,5,main] get lock. 17:21:24
Thread[pool-1-thread-2,5,main] release lock. 17:21:25
Thread[pool-1-thread-3,5,main] get lock. 17:21:25
Thread[pool-1-thread-3,5,main] release lock. 17:21:26
Thread[pool-1-thread-4,5,main] get lock. 17:21:26
Thread[pool-1-thread-4,5,main] release lock. 17:21:27
Thread[pool-1-thread-5,5,main] get lock. 17:21:27
Thread[pool-1-thread-5,5,main] release lock. 17:21:28
AQS实现分析
核心思想
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制在AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。队列有同步队列(sync queue)和条件队列(condition queue)。
CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列,即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。
我的理解就是没有用类似QUEUE那样的队列的实例,而是通过NODE中存放前后结点PreNode和NextNode形成一种双向链表似的关系
sync queue 同步队列AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS操作对该同步状态进行原子操作实现对其值的修改。CAS操作主要借助sun.misc.Unsafed类来实现。
// 代表同步状态的变量
private volatile int state;
同步队列中的结点用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继结点,结点的属性类型与名称, 源码如下
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态的值
// CANCELLED,值为1,表示当前的线程被取消(由于等待超时或者中断)
// SIGNAL,值为-1,表示当前结点的后继结点包含的线程需要唤醒(处于等待状态),也就是unpark。当前结点的线程释放同步状态或者取消了,将唤醒后继结点的线程
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中。
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行。共享式同步状态获取的时候将会无条件传播下去
// 值为0,初始值。表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
/**
* 前驱结点
*/
volatile Node prev;
/**
* 后继结点
*/
volatile Node next;
/**
* 结点所对应的线程
*/
volatile Thread thread;
/**
* 下一个等待者 后继结点
*/
Node nextWaiter;
/**
* 节点是否在共享模式下等待 当Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 获取前驱结点,若前驱结点为空,抛出异常
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//无参构造方法
Node() { // 用于建立初始头部或共享标记
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
同步队列的数据结构
在AQS中有两个结点类型的引用,head是指向头结点(状态值不会是1),tail是指向尾结点。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
独占式同步状态获取与释放
以上述官网独占式获取同步状态为例,在使用sync.acquire(1);
获取同步状态失败的时候,会执行addWaiter(Node.EXCLUSIVE), arg)
,先通过compareAndSetTail(pred, node)
尝试快速填充队尾,如果填充失败或者当没有尾结点时,去调用enq(final Node node)
进行队列初始化,通过compareAndSetHead(Node update)
和compareAndSetTail(Node expect, Node update)
来设置AQS的head结点和tail结点。在完成addWaiter之后,继续执行acquireQueued(final Node node, int arg)
,如果当前结点的前驱结点是首结点,再次尝试获同步状态,若成功,将当前结点更新head结点,若失败,进行线程park,等待前驱结点释放锁唤醒当前线程,若期间当前线程中断,也会被唤醒。若发生异常情况, 会通过cancelAcquire(Node node)
取消继续获取(资源)。
锁获取的主要流程图如下:
独占锁的释放主要通过调用release(int arg)
来释放锁。
addWaiter(Node mode)
private Node addWaiter(Node mode) {
// 为当前线程构建一个Node,独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 当队列尾结点不为空,快速填充队尾
if (pred != null) {
node.prev = pred;
// 比较pred是否为尾结点,是则将尾结点设置为node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾结点为空(即还没有被初始化过),或者是compareAndSetTail操作失败,则入队列
enq(node);
return node;
}
enq(final Node node)
private Node enq(final Node node) {
// 无限循环 势必将结点加入队列中
for (;;) {
// 获取AQS当前尾结点
Node t = tail;
// 如果尾结点是null,则进行初始化,新建一个空Node同时作为head结点和tail结点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾结点不为空,即已经被初始化过
// 将当前尾结点作为node结点的前置结点
node.prev = t;
// 比较结点t是否为尾结点,若是则将尾结点设置为node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued(final Node node, int arg)
// 以独占不间断模式获取已在队列中的线程。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标志
boolean interrupted = false;
// 无限循环
for (;;) {
// 获取前置结点
final Node p = node.predecessor();
// 如果前置结点是head且当前线程成功获取到同步状态,将自身结点变为head结点,返回中断标记
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当获取资源失败,更新结点状态并阻塞线程,返回其中断标识
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 取消加入队列失败的节点的资源获取
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)
// 当获取(资源)失败后,检查并且更新结点状态--只有当该节点的前驱结点的状态为SIGNAL时,才可以对该结点所封装的线程进行park操作。否则,将不能进行park操作。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前置结点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前置节点已经设置了使后置结点阻塞等待的信号,因此它可以安全地park。
*/
return true;
if (ws > 0) {
/*
* 前置结点已经取消了等待该锁,从前置结点向前遍历,找到未取消的节点,设置为当前节点的前置结点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus必须为0或PROPAGATE。我们需要信号,不是立即park。调用者将需要重试,以确保在park前。它不能获得同步状态。
* 尝试将前驱结点的信号变为SIGNAL
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
// 将其线程阻塞--线程被唤醒后或中断后会在此后继续执行
LockSupport.park(this);
// 返回当前线程是否已被中断,并对中断标识位进行复位
return Thread.interrupted();
}
cancelAcquire(Node node)
// 取消继续获取(资源)
private void cancelAcquire(Node node) {
// 忽略结点已经不存在的情况
if (node == null)
return;
// 清空node结点的thread
node.thread = null;
// Skip cancelled predecessors
// 保存node的前驱结点,如果前驱节点已经是取消的状态,则一直向前遍历,取不是取消状态的结点作为当前结点的前驱结点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 获取前驱结点的下一个节点(此时应该就是当前结点)
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 将当前结点的状态变为取消
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
// 如果当前结点是尾结点,且将前驱节点成功设置为尾结点,则将前驱节点的下一个节点变为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
// 当前结点不为尾结点,或者将前驱结点设置为尾结点失败了
int ws;
// 当前驱结点既不是head,它的thread又不为空时,如果前驱节点的状态是SIGNAL或可以讲前驱结点的状态变为SIGNAL,那么可以去获取当前结点的后置结点,如果后置结点不为空,且状态不是取消的话,可以将前驱结点的后置结点直接变为当前结点的后置结点。这样就从队列中去掉了当前结点。
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?