您当前的位置: 首页 >  Java
  • 0浏览

    0关注

    1477博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

java并发编程(8)-- 线程 阻塞队列 生产者消费者 lock synchronized

软件工程小施同学 发布时间:2021-02-08 12:14:26 ,浏览量:0

一、阻塞队列概述

 

1. 概念:

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),⼀旦条件满⾜,被挂起的线程⼜ 会⾃动被唤醒。

阻塞队列 是⼀个队列,在数据结构中起的作⽤如下图:

  • 当队列是空的,从队列中获取(Take)元素的操作将会被阻塞
  • 当队列是满的,从队列中添加(Put)元素的操作将会被阻塞
  • 试图中空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插⼊新的元素
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除⼀个或多个元素 或者完全清空,使队列变得空闲起来后并后续新增

好处:阻塞队列不⽤⼿动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。

体系:Collection→Queue→BlockingQueue→七个阻塞队列实现类。

 

 

粗体标记的三个⽤得⽐较多,许多消息中间件底层就是⽤它们实现的。

需要注意的是 LinkedBlockingQueue 虽然是有界的,但有个巨坑,其默认⼤⼩ 是 Integer.MAX_VALUE ,⾼达21亿,⼀般情况下内存早爆了(在线程池的 ThreadPoolExecutor 有 体现)。

API:
  • 抛出异常是指当队列满时,再次插⼊会抛出异常;
  • 返回布尔是指当队列满时,再次插⼊会返回false;
  • 阻塞是指当队列满时,再次插⼊会被阻塞,直 到队列取出⼀个元素,才能插⼊。
  • 超时是指当⼀个时限过后,才会插⼊或者取出。

API使⽤⻅ BlockingQueueDemo。

package thread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {

        // 队列长度为3
        BlockingQueue  blockingQueue = new ArrayBlockingQueue(3);

        addAndRemove(blockingQueue);
//        offerAndPoll(blockingQueue);
//        putAndTake(blockingQueue);
//        outOfTime(blockingQueue);

    }

    private static void outOfTime(BlockingQueue blockingQueue) throws InterruptedException {

        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));
        System.out.println(blockingQueue.offer("a",2L, TimeUnit.SECONDS));

    }

    private static void putAndTake(BlockingQueue blockingQueue) throws InterruptedException {

        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        blockingQueue.put("d");
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());

    }

    private static void offerAndPoll(BlockingQueue blockingQueue)
    {

        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        System.out.println(blockingQueue.offer("e"));
        System.out.println(blockingQueue.peek());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());

    }

    private static void addAndRemove(BlockingQueue blockingQueue)
    {

        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        System.out.println(blockingQueue.add("e"));

        System.out.println(blockingQueue.element());

        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());

    }

}

 

 

 

二、阻塞队列的应⽤——⽣产者消费者 1 传统模式 1.1 传统模式使⽤ Synchronized 来进⾏操作
package thread;

class Aircondition {

    // 0或者1
    private int number = 0;

    //⽼版写法
    // ++1
    public synchronized void increment() throws Exception{

        //1.判断
        if (number != 0){
            this.wait();
        }

        //2.⼲活
        number++;
        System.out.println(Thread.currentThread().getName()+"\t"+number);

        //3通知
        this.notifyAll();

    }

    // --1
    public synchronized void decrement() throws Exception{

        //1.判断
        if (number == 0){
            this.wait();
        }

        //2.⼲活
        number--;
        System.out.println(Thread.currentThread().getName()+"\t"+number);

        //3通知
        this.notifyAll();
    }

}

/** *
 * 题⽬:现在两个线程,可以操作 初始值为零的 ⼀个变量,
 * * 实现⼀个线程对该变量加1,⼀个线程对该变量-1,
 * * 实现交替,来10轮,变量初始值为0.
 *  1.⾼内聚低耦合前提下,线程操作资源类
 *  2.判断/⼲活/通知
 *  3.防⽌虚假唤醒(判断只能⽤while,不能⽤if)
 *
 * 知识⼩总结:多线程编程套路+while判断+新版写法
 * 
 * */

public class ProdConsumerDemo {

    public static void main(String[] args) {

        Aircondition aircondition = new Aircondition();

        // thread A
        new Thread(()->{

            for (int i = 1; i {

            for (int i = 1; i  {

            for (int i = 1; i  {

            for (int i = 1; i  {

            for (int i = 1; i  {
            for (int i = 1; i  {

            for (int i = 1; i  {

            for (int i = 1; i  {
            for (int i = 1; i  {

            for (int i = 1; i  {
            System.out.println(Thread.currentThread().getName() + "\t⽣产线 程启动");
            try {
                myResource.myProd();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, "prod-2").start();


        // thread Cons
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t消费线 程启动");
            try {
                myResource.myCons();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "cons").start();


        // thread Cons-2
        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t消费线 程启动");
            try {
                myResource.myCons();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }, "cons-2").start();


        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (Exception e) {

            e.printStackTrace();
        }

        System.out.println("5秒钟后,叫停");
        myResource.stop();

    }

}

class MyResource {

    //默认开启,进⾏⽣产+消费,false为停止
    private volatile boolean FLAG = true;

    private AtomicInteger atomicInteger = new AtomicInteger();

    // 阻塞队列
    private BlockingQueue blockingQueue = null;

    // 初始化
    public MyResource(BlockingQueue blockingQueue) {

        this.blockingQueue = blockingQueue;
        System.out.println(blockingQueue.getClass().getName());

    }

    // 生产
    public void myProd() throws Exception {

        String data = null;
        boolean retValue;

        while (FLAG) {

            data = atomicInteger.incrementAndGet() + ""; //++i

            // 往队列中加
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);

            if (retValue) {
                System.out.println(Thread.currentThread().getName() + "\t" + "插⼊队列" + data + "成功");
            }
            else {
                System.out.println(Thread.currentThread().getName() + "\t" + "插⼊队列" + data + "失败");
            }
            TimeUnit.SECONDS.sleep(1);

        }

        System.out.println(Thread.currentThread().getName() + "\t⽼板叫停 了,FLAG已更新为false,停⽌⽣产");

    }

    // 消费
    public void myCons() throws Exception {

        String res;
        while (FLAG) {

            // 从队列中取
            res = blockingQueue.poll(2L, TimeUnit.SECONDS);

            if (null == res || "".equals(res)) {
                // FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 超过2秒钟没有消费,退出消费");
                return;
            }

            System.out.println(Thread.currentThread().getName() + "\t\t消费 队列" + res + "成功");

        }

    }

    // 停止
    public void stop() {
        this.FLAG = false;
    }

}

 

 

 

 

 

关注
打赏
1665320866
查看更多评论
立即登录/注册

微信扫码登录

0.0450s