您当前的位置: 首页 > 

并发编程实践之公平有界阻塞队列实现

阿里云云栖号 发布时间:2021-11-15 14:47:45 ,浏览量:3

简介: JUC 工具包是 JAVA 并发编程的利器。本文讲述在没有 JUC 工具包帮助下,借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。希望你也能在文后体会到并发编程的复杂之处,以及 JUC 工具包的强。

image.png

作者 | 李新然 来源 | 阿里技术公众号

一 背景

JUC 工具包是 JAVA 并发编程的利器。

本文讲述在没有 JUC 工具包帮助下,借助原生的 JAVA 同步原语, 如何实现一个公平有界的阻塞队列。

希望你也能在文后体会到并发编程的复杂之处,以及 JUC 工具包的强大。

二 方法

本文使用到的基本工具:

  1. 同步监听器 synchronized ,方法基本和代码块级别;
  2. Object 基础类的 wait, notify, notifyAll;

基于以上基础工具,实现公平有界的阻塞队列,此处:

  1. 将公平的定义限定为 FIFO ,也就是先阻塞等待的请求,先解除等待;
  2. 并不保证解除等待后执行 Action 的先后顺序;
  3. 确保队列的大小始终不超过设定的容量;但阻塞等待的请求数不做限制;
三 实现

1 基础版本

首先,考虑在非并发场景下,借助 ADT 实现一个基础版本

interface Queue {

    boolean offer(Object obj);

    Object poll();

}
class FairnessBoundedBlockingQueue implements Queue {
    // 当前大小
    protected int size;

    // 容量
    protected final int capacity;

    // 头指针,empty: head.next == tail == null
    protected Node head;

    // 尾指针
    protected Node tail;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.head = new Node(null);
        this.tail = head;
        this.size = 0;
    }

    // 如果队列已满,通过返回值标识
    public boolean offer(Object obj) {
        if (size < capacity) {
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            ++size;
            return true;
        }
        return false;
    }

    // 如果队列为空,head.next == null;返回空元素
    public Object poll() {
        if (head.next != null) {
            Object result = head.next.value;
            head.next.value = null;
            head = head.next; // 丢弃头结点
            --size;
            return result;
        }
        return null;
    }

    class Node {
        Object value;
        Node next;
        Node(Object obj) {
            this.value = obj;
            next = null;
        }
    }
}

以上

  1. 定义支持队列的两个基础接口, poll 和 offer;
  2. 队列的实现,采用经典实现;
  3. 考虑在队列空的情况下, poll 返回为空,非阻塞;
  4. 队列在满的情况下, offer 返回 false ,入队不成功,无异常;

需要注意的一点:在出队时,本文通过迁移头结点的方式实现,避免修改尾结点。 在下文实现并发版本时,会看到此处的用意。

2 并发版本

如果在并发场景下,上述的实现面临一些问题,同时未实现给定的一些需求。

通过添加 synchronized ,保证并发条件下的线程安全问题。

注意此处做同步的原因是为了保证类的不变式。

并发问题

在并发场景下,基础版本的实现面临的问题包括:原子性,可见性和指令重排的问题。

参考 JMM 的相关描述。

并发问题,最简单的解决方法是:通过 synchronized 加锁,一次性解决问题。

// 省略接口定义
class BoundedBlockingQueue implements Queue {
    // 当前大小
    protected int size;

    // 容量
    protected final int capacity;

    // 头指针,empty: head.next == tail == null
    protected Node head;

    // 尾指针
    protected Node tail;

    public BoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.head = new Node(null);
        this.tail = head;
        this.size = 0;
    }

    // 如果队列已满,通过返回值标识
    public synchronized boolean offer(Object obj) {
        if (size < capacity) {
            Node node = new Node(obj);
            tail.next = node;
            tail = node;
            ++size;
            return true;
        }
        return false;
    }

    // 如果队列为空,head.next == null;返回空元素
    public synchronized Object poll() {
        if (head.next != null) {
            Object result = head.next.value;
            head.next.value = null;
            head = head.next; // 丢弃头结点
            --size;
            return result;
        }
        return null;
    }
    // 省略 Node 的定义
}

以上,简单粗暴的加 synchronized 可以解决问题,但会引入新的问题:系统活性问题(此问题下文会解决)。

同时,简单加 synchronized 同步是无法实现阻塞等待;即

  1. 如果队列为空,那么出队的动作还是会立即返回,返回为空;
  2. 如果队列已满,那么入队动作还是会立即返回,返回操作不成功;

实现阻塞等待,需要借助 JAVA 中的 PV 原语:wait, notify, notifyAll 。

参考:JDK 中对 wait, notify, notifyAll 的相关描述。

卫式方法

阻塞等待,可以通过简单的卫式方法来实现,此问题本质上可以抽象为:

  1. 任何一个方法都需要在满足一定条件下才可以执行;
  2. 执行方法前需要首先校验不变式,然后执行变更;
  3. 在执行完成后,校验是否满足后验不变式;
WHEN(condition) Object action(Object arg) {
    checkPreCondition();
    doAction(arg);
    checkPostCondition();
}

此种抽象 Ada 在语言层面上实现。在 JAVA 中,借助 wait, notify, notifyAll 可以翻译为:

// 当前线程
synchronized Object action(Object arg) {
    while(!condition) {
        wait();
    }
    // 前置条件,不变式
    checkPreCondition();
    doAction();
    // 后置条件,不变式
    checkPostCondition();
}

// 其他线程
synchronized Object notifyAction(Object arg) {
    notifyAll();
}

需要注意:

  1. 通常会采用 notifyAll 发送通知,而非 notify ;因为如果当前线程收到 notify 通知后被中断,那么系统将一直等待下去。
  2. 如果使用了 notifyAll 那么卫式语句必须放在 while 循环中;因为线程唤醒后,执行条件已经不满足,虽然当前线程持有互斥锁。
  3. 卫式条件的所有变量,有任何变更都需要发送 notifyAll 不然面临系统活性问题

据此,不难实现简单的阻塞版本的有界队列,如下

interface Queue {

    boolean offer(Object obj) throws InterruptedException;

    Object poll() throws InterruptedException;

}
class FairnessBoundedBlockingQueue implements Queue {
    // 当前大小
    protected int size;

    // 容量
    protected final int capacity;

    // 头指针,empty: head.next == tail == null
    protected Node head;

    // 尾指针
    protected Node tail;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.head = new Node(null);
        this.tail = head;
        this.size = 0;
    }

    // 如果队列已满,通过返回值标识
    public synchronized boolean offer(Object obj) throws InterruptedException {
        while (size < capacity) {
            wait();
        }
        Node node = new Node(obj);
        tail.next = node;
        tail = node;
        ++size;
        notifyAll(); // 可以出队
        return true;
    }

    // 如果队列为空,阻塞等待
    public synchronized Object poll() throws InterruptedException {
        while (head.next == null) {
            wait();
        }
        Object result = head.next.value;
        head.next.value = null;
        head = head.next; // 丢弃头结点
        --size;
        notifyAll(); // 可以入队
        return result;
    }
    // 省略 Node 的定义
}

以上,实现了阻塞等待,但也引入了更大的性能问题

  1. 入队和出队动作阻塞等待同一把锁,恶性竞争;
  2. 当队列变更时,所有阻塞线程被唤醒,大量的线程上下文切换,竞争同步锁,最终可能只有一个线程能执行;

需要注意的点:

  1. 阻塞等待 wait 会抛出中断异常。关于异常的问题下文会处理;
  2. 接口需要支持抛出中断异常;
  3. 队里变更需要 notifyAll 避免线程中断或异常,丢失消息;

3 锁拆分优化

以上第一个问题,可以通过锁拆分来解决,即:定义两把锁,读锁和写锁;读写分离。

// 省略接口定义
class FairnessBoundedBlockingQueue implements Queue {
    // 容量
    protected final int capacity;

    // 头指针,empty: head.next == tail == null
    protected Node head;

    // 尾指针
    protected Node tail;

    // guard: canPollCount, head
    protected final Object pollLock = new Object();
    protected int canPollCount;

    // guard: canOfferCount, tail
    protected final Object offerLock = new Object();
    protected int canOfferCount;

    public FairnessBoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        this.canPollCount = 0;
        this.canOfferCount = capacity;
        this.head = new Node(null);
        this.tail = head;
    }

    // 如果队列已满,通过返回值标识
    public boolean offer(Object obj) throws InterruptedException {
        synchronized(offerLock) {
            while(canOfferCount             
关注
打赏
1688896170
查看更多评论
0.2872s