您当前的位置: 首页 >  Java

星夜孤帆

暂无认证

  • 2浏览

    0关注

    626博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Java阻塞队列

星夜孤帆 发布时间:2020-08-21 10:15:22 ,浏览量:2

一、什么是阻塞队列

阻塞队列,顾名思义,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
  • 当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
  1. 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
  2. 试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程从列中移除一个或者多个元素或者完全清空队列后使队列重新变得空闲起来并后续新增。
二、为什么用?有什么好处?

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。

为什么需要BlockingQueue?

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

三、架构介绍和种类分析

  1. ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
  2. LinkedBlockingQueue:由链表结构组成的有界(但大小默认值为Integer.MAX_VALUE)阻塞队列。
  3. SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列。
  4. PriorityBlockingQueue:支持优先级排序的无解阻塞队列。
  5. DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
  6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。
  7. LikedBlockingDeque:由链表结构组成的双向阻塞队列。
四、BlockingQueue的核心方法

package com.jak.demo.Thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * ArrayBlockingQueue:是一个基于数组结构的有界队列,此队列按FIFO(先进先出)原则对元素进行排序。
 * LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue.
 * SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高。
 * 1.队列,排队,先进先出
 * 2.阻塞队列
 *   2.1 阻塞队列有没有好的一面
 *   2.2 不得不阻塞,你如何管理
 **/
public class BlockingQueueDemo {
    public static void main(String[] args) {
        List list = new ArrayList();
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));

        System.out.println(blockingQueue.element());
        
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());

    }
}
4.1.add抛异常

4.2.remove抛异常

4.3 offer/peek/poll
package com.jak.demo.Thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * ArrayBlockingQueue:是一个基于数组结构的有界队列,此队列按FIFO(先进先出)原则对元素进行排序。
 * LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue.
 * SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高。
 * 1.队列,排队,先进先出
 * 2.阻塞队列
 *   2.1 阻塞队列有没有好的一面
 *   2.2 不得不阻塞,你如何管理
 **/
public class BlockingQueueDemo {
    public static void main(String[] args) {
        List list = new ArrayList();
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("d"));

        //队首元素
        System.out.println(blockingQueue.peek());

        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());

    }
}

4.4 put/take/offer
package com.jak.demo.Thread;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
 * ArrayBlockingQueue:是一个基于数组结构的有界队列,此队列按FIFO(先进先出)原则对元素进行排序。
 * LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue.
 * SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高。
 * 1.队列,排队,先进先出
 * 2.阻塞队列
 *   2.1 阻塞队列有没有好的一面
 *   2.2 不得不阻塞,你如何管理
 **/
public class BlockingQueueDemo {
    public static void main(String[] args) throws Exception{
        List list = new ArrayList();
        BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        System.out.println("=============================");
        blockingQueue.put("d");


        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();
        blockingQueue.take();

    }
}

4.5 SynchronousQueue

SynchronousQueue没有容量,与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

package com.jak.demo.Thread;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * ArrayBlockingQueue:是一个基于数组结构的有界队列,此队列按FIFO(先进先出)原则对元素进行排序。
 * LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO(先进先出)排序元素,吞吐量通常要高于ArrayBlockingQueue.
 * SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高。
 * 1.队列,排队,先进先出
 * 2.阻塞队列
 *   2.1 阻塞队列有没有好的一面
 *   2.2 不得不阻塞,你如何管理
 **/
public class BlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue blockingQueue = new SynchronousQueue();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getName() +" put 1");
                    blockingQueue.put("1");
                    System.out.println(Thread.currentThread().getName() +" put 2");
                    blockingQueue.put("2");
                    System.out.println(Thread.currentThread().getName() +" put 3");
                    blockingQueue.put("3");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "AAA").start();
        new Thread(() -> {
            try {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" " + blockingQueue.take());
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" " + blockingQueue.take());
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" " + blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "BBB").start();
    }
}

五、用在哪里 5.1 生产者消费者模式 5.1.1 传统版
//资源类
class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void increment() throws InterruptedException {
        lock.lock();
        try {
            //多线程的判断必须用while
            // 1 判断
            while (number != 0) {
                //等待,不能生产
                condition.await();
            }
            // 2 干活
            number++;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            // 3 通知唤醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    @SuppressWarnings("all")
    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            // 1 判断
            while (number == 0) {
                //等待,不能生产
                condition.await();
            }
            // 2 干活
            number--;
            System.out.println(Thread.currentThread().getName() + "\t" + number);
            // 3 通知唤醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

/**
 * 题目:一个初始值为零的变量,两个线程对其交替操作,一个加1一个减1,来5轮
 * 1. 线程    操作(方法)  资源类
 * 2. 判断    干活        通知
 * 3. 防止虚假唤醒机制
 */
public class ProdConsumer_TraditonDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();
        new Thread(() -> {
            for (int i=1; i {
            for (int i=1; i {
            System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Prod").start();
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
            try {
                myResource.myConsumer();
                System.out.println();
                System.out.println();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "Consumer").start();
        //暂停一会儿线程
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println();
        System.out.println();
        System.out.println();
        System.out.println("5秒钟时间到,大老板main线程叫停,活动结束");
        myResource.stop();
    }
}

 

为什么需要BlockingQueue

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须去自己控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

视频教程,源码

关注
打赏
1636984416
查看更多评论
立即登录/注册

微信扫码登录

0.0438s