一、阻塞队列概述
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),⼀旦条件满⾜,被挂起的线程⼜ 会⾃动被唤醒。
阻塞队列 是⼀个队列,在数据结构中起的作⽤如下图:
- 当队列是空的,从队列中获取(Take)元素的操作将会被阻塞
- 当队列是满的,从队列中添加(Put)元素的操作将会被阻塞
- 试图中空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插⼊新的元素
- 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除⼀个或多个元素 或者完全清空,使队列变得空闲起来后并后续新增
好处:阻塞队列不⽤⼿动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。
体系:Collection→Queue→BlockingQueue→七个阻塞队列实现类。
粗体标记的三个⽤得⽐较多,许多消息中间件底层就是⽤它们实现的。
需要注意的是 LinkedBlockingQueue 虽然是有界的,但有个巨坑,其默认⼤⼩ 是 Integer.MAX_VALUE ,⾼达21亿,⼀般情况下内存早爆了(在线程池的 ThreadPoolExecutor 有 体现)。
API:- 抛出异常是指当队列满时,再次插⼊会抛出异常;
- 返回布尔是指当队列满时,再次插⼊会返回false;
- 阻塞是指当队列满时,再次插⼊会被阻塞,直 到队列取出⼀个元素,才能插⼊。
- 超时是指当⼀个时限过后,才会插⼊或者取出。
API使⽤⻅ BlockingQueueDemo。
package thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 队列长度为3
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
addAndRemove(blockingQueue);
// offerAndPoll(blockingQueue);
// putAndTake(blockingQueue);
// outOfTime(blockingQueue);
}
private static void outOfTime(BlockingQueue blockingQueue) throws InterruptedException {
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
}
private static void putAndTake(BlockingQueue blockingQueue) throws InterruptedException {
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
}
private static void offerAndPoll(BlockingQueue blockingQueue)
{
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("e"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
private static void addAndRemove(BlockingQueue blockingQueue)
{
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("e"));
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
}
}
二、阻塞队列的应⽤——⽣产者消费者 1 传统模式 1.1 传统模式使⽤ Synchronized 来进⾏操作
package thread;
class Aircondition {
// 0或者1
private int number = 0;
//⽼版写法
// ++1
public synchronized void increment() throws Exception{
//1.判断
if (number != 0){
this.wait();
}
//2.⼲活
number++;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//3通知
this.notifyAll();
}
// --1
public synchronized void decrement() throws Exception{
//1.判断
if (number == 0){
this.wait();
}
//2.⼲活
number--;
System.out.println(Thread.currentThread().getName()+"\t"+number);
//3通知
this.notifyAll();
}
}
/** *
* 题⽬:现在两个线程,可以操作 初始值为零的 ⼀个变量,
* * 实现⼀个线程对该变量加1,⼀个线程对该变量-1,
* * 实现交替,来10轮,变量初始值为0.
* 1.⾼内聚低耦合前提下,线程操作资源类
* 2.判断/⼲活/通知
* 3.防⽌虚假唤醒(判断只能⽤while,不能⽤if)
*
* 知识⼩总结:多线程编程套路+while判断+新版写法
*
* */
public class ProdConsumerDemo {
public static void main(String[] args) {
Aircondition aircondition = new Aircondition();
// thread A
new Thread(()->{
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
for (int i = 1; i {
System.out.println(Thread.currentThread().getName() + "\t⽣产线 程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "prod-2").start();
// thread Cons
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消费线 程启动");
try {
myResource.myCons();
} catch (Exception e) {
e.printStackTrace();
}
}, "cons").start();
// thread Cons-2
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消费线 程启动");
try {
myResource.myCons();
} catch (Exception e) {
e.printStackTrace();
}
}, "cons-2").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("5秒钟后,叫停");
myResource.stop();
}
}
class MyResource {
//默认开启,进⾏⽣产+消费,false为停止
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
// 阻塞队列
private BlockingQueue blockingQueue = null;
// 初始化
public MyResource(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
// 生产
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + ""; //++i
// 往队列中加
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t" + "插⼊队列" + data + "成功");
}
else {
System.out.println(Thread.currentThread().getName() + "\t" + "插⼊队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "\t⽼板叫停 了,FLAG已更新为false,停⽌⽣产");
}
// 消费
public void myCons() throws Exception {
String res;
while (FLAG) {
// 从队列中取
res = blockingQueue.poll(2L, TimeUnit.SECONDS);
if (null == res || "".equals(res)) {
// FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 超过2秒钟没有消费,退出消费");
return;
}
System.out.println(Thread.currentThread().getName() + "\t\t消费 队列" + res + "成功");
}
}
// 停止
public void stop() {
this.FLAG = false;
}
}