您当前的位置: 首页 > 

庄小焱

暂无认证

  • 2浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

并发编程——BlockingQueue原理

庄小焱 发布时间:2020-10-06 14:02:04 ,浏览量:2

摘要

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

一、线程池中的阻塞队列

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:

BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,Java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:

  1. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
  2. DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。
  3. LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
  4. PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
  5. SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

四组不同的行为方式解释:

  1. 抛异常:如果试图的操作无法立即执行,抛一个异常。
  2. 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
  3. 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
  4. 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。

可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。

二、7种常见的阻塞队列原理与应用 7.1 ArrayBlockingQueue:由数组结构组成的有界阻塞队列

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下采用非公平锁的方式实现,可以通过构造器传参控制是采用公平锁还是非公平锁实现。先看看ArrayBlockingQueue类图关系:

可以看到有3个构造器,其实最终都是会调用上图中第二个构造器进行初始化,第三个构造器在初始化之后会再进行赋值(如果传入的Collection不为空)。

ArrayBlockingQueue nonFairQueue = new ArrayBlockingQueue(10);//默认非公平锁实现
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(10,true);//true表示公平锁
package com.zhuangxiaoyan.java.base.javabase.blockingqueue;

import java.util.concurrent.ArrayBlockingQueue;

/**
 * @Classname ArrayBlockingQueueDemo
 * @Description TODO
 * @Date 2022/5/19 21:22
 * @Created by xjl
 */
public class ArrayBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue queue = new ArrayBlockingQueue(100);//默认非公平锁实现
        new Thread(new ConsumerThread(queue)).start();
        Thread.sleep(2000);
        new Thread(new ProcuctThread(queue)).start();
    }
}

class ProcuctThread extends Thread {
    private ArrayBlockingQueue queue;

    public ProcuctThread(ArrayBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            try {
                queue.put(i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class ConsumerThread extends Thread {
    private ArrayBlockingQueue queue;

    public ConsumerThread(ArrayBlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                int a = (int) queue.take();
                System.out.println("消费:" + a);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


初始化队列

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity             
关注
打赏
1657692713
查看更多评论
0.0397s