您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 2浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

AbstractQueuedSynchronizer

Dongguo丶 发布时间:2021-09-26 15:47:21 ,浏览量:2

早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不够优雅,于是在JDK1.5中新增了 AQS,提供了这种通用的同步器机制。

是什么

抽象的队列同步器

image-20210909104111219

AbstractOwnableSynchronizer AbstractQueuedLongSynchronizer AbstractQueuedSynchronizer

通常地:AbstractQueuedSynchronizer简称为AQS

是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量state表示持有锁的状态

image-20210909104217602

CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO

特点:

AQS全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

getState - 获取 state 状态 setState - 设置 state 状态

compareAndSetState - cas 机制设置 state 状态

独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet 子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

AQS为什么是JUC内容中最重要的基石

和AQS有关的

image-20210909104535290

image-20210913142242745

ReentrantLock

image-20210909104654695

CountDownLatch

image-20210909104706704

ReentrantReadWriteLock

image-20210909104720405

Semaphore

image-20210909104734314

进一步理解锁和同步器的关系

锁,面向锁的使用者

定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可。

同步器,面向锁的实现者

Java并发大神DougLee,提出统一规范并简化了锁的实现, 屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。

AQS能干嘛

加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要队列(AQS管理队列)

抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果。

image-20210909105138428

AQS= state+CLH队列(FIFO)

image-20210909105224074

有阻塞就需要排队,实现排队必然需要队列

AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的 FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成 一个Node节点来实现锁的分配,通过CAS完成对State值的修改。

1) state 设计

state 使用 volatile 配合 cas 保证其修改时的原子性 state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想

2) 阻塞恢复设计

早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume那么 suspend 将感知不到 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题 park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细 park 线程还可以通过 interrupt 打断

3) 队列设计–CLH队列的变体实现–双向队列

使用了 FIFO 先入先出队列,并不支持优先级队列 设计时借鉴了 CLH 队列,它是一种单向无锁队列

AQS内部体系架构

image-20210909105253943

image-20210909105307289

AQS中有一个Node静态内部类 用来保存等待获取锁的线程

head 、tail表示同步器头、尾节点

state 同步状态

AQS同步队列的基本结构

image-20210909105358421

CLH:Craig、Landin and Hagersten 队列,是个单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO)

AQS的int变量State

AQS的同步状态 State成员变量 持有锁的状态

/**
 * The synchronization state.
 */
private volatile int state;

可以看做银行办理业务的受理窗口状态

0就是没人,空闲状态可以办理

大于等于1,就是有人占用窗口,需要排队等着

AQS的CLH队列

CLH队列(三个大牛的名字组成),为一个双向队列

image-20210909110838697

可以看做银行候客区的等待顾客 排队等待

CLH 好处: 无锁,使用自旋

快速,无阻塞

AQS 在一些方面改进了 CLH

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 队列中还没有元素 tail 为 null
        if (t == null) { // Must initialize
            // 将 head 从 null -> dummy 哨兵(傀儡)节点
            if (compareAndSetHead(new Node()))
                // 将 node 的 prev 设置为原来的 tail
                tail = head;
        } else {
            node.prev = t;
            // 将 tail 从原来的 tail 设置为 node
            if (compareAndSetTail(t, node)) {
                // 原来 tail 的 next 设置为 nod
                t.next = node;
                return t;
            }
        }
    }
}
内部类Node(Node类在AQS类内部) Node类的讲解

内部结构

image-20210909111142124

属性说明

image-20210909111158061

Node的int变量waitState

Node的等待状态waitState成员变量

image-20210909111030109

队列中每个排队的个体就是一个 Node

waitState可以看做等候区其它顾客(其它线程)的等待状态

AQS实现一个不可重入锁

自定义同步器

package com.dongguo.aqs;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

/**
 * @author Dongguo
 * @date 2021/9/13 0013-14:26
 * @description: 同步器类 独占锁
 */
public class MySync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int arg) {
        //初始state值为0
        if (compareAndSetState(0, 1)) {
            //加锁成功,并且设置锁的持有者Owner为当前线程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {

        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    /**
     * 是否持有独占锁
     *
     * @return
     */
    @Override
    protected boolean isHeldExclusively() {
        return getState() == 1;
    }

    public Condition newCondition() {
        return new ConditionObject();
    }
}

自定义锁 有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

package com.dongguo.aqs;


import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @author Dongguo
 * @date 2021/9/13 0013-14:27
 * @description: 自定义锁(不可重入锁)
 */
public class MyLock implements Lock {

    //独占锁
    private MySync sync = new MySync();

    /**
     * 加锁,失败进入等待
     */
    @Override
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 加锁,可打断
     * @throws InterruptedException
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * 尝试一次加锁
     * @return
     */
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    /**
     * 尝试加锁带超时时间
     * @param time
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    /**
     * 解锁
     */
    @Override
    public void unlock() {
        sync.release(1);
    }

    /**
     * 创建条件变量
     * @return
     */
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

测试

package com.dongguo.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/9/13 0013-14:44
 * @description:
 */
@Slf4j(topic = "d.Client")
public class Client {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
                try {
                    //t1睡眠1s
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();
        new Thread(() -> {
            lock.lock();
            try {
                log.debug("locking...");
            } finally {
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t2").start();
    }
}
运行结果
14:48:04 [t1] d.Client - locking...
14:48:05 [t1] d.Client - unlocking...
14:48:05 [t2] d.Client - locking...
14:48:05 [t2] d.Client - unlocking...

测试不可重入

package com.dongguo.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/9/13 0013-14:44
 * @description:
 */
@Slf4j(topic = "d.Client")
public class Client {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(() -> {
            lock.lock();
            log.debug("locking...");
            lock.lock();
            log.debug("locking...");
            try {
                try {
                    //t1睡眠1s
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                log.debug("unlocking...");
                lock.unlock();
                log.debug("unlocking...");
                lock.unlock();
            }
        }, "t1").start();
    }
}

image-20210913145221737

只会打印一次 locking 进入阻塞,不可重入

关注
打赏
1638062488
查看更多评论
立即登录/注册

微信扫码登录

0.0389s