一、生产者消费者模式示例
-
示例需求
一个初始值为0的变量 两个线程交替操作 一个加1 一个减1 每个线程遍历5轮
-
示例代码
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class Resource { private volatile boolean flag = true;//标志位,默认开启 进行生产消费的交互 private AtomicInteger atomicInteger = new AtomicInteger();//默认值是0 //声明阻塞队列BlockingQueue private BlockingQueue blockingQueue = null; //构造方法的方式注入 public Resource(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; System.out.println(blockingQueue.getClass().getName()); } //生产者 public void myProd() throws Exception { String data = null; boolean returnValue; while (flag) {//当flag=true,开始生产 //atomicInteger默认值0,进行+1操作,并转成字符串 data = atomicInteger.incrementAndGet() + ""; //每2秒钟把data的值添加到队列中 returnValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if (returnValue) { System.out.println(Thread.currentThread().getName() + "\t 插入队列数据" + data + "成功"); } else { System.out.println(Thread.currentThread().getName() + "\t 插入队列数据" + data + "失败"); } //睡眠一秒 TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName() + "\t flag=" + flag+"\t ,生产操作停止"); } //消费者 public void myConsumer() throws Exception { String result = null; while (flag) {//当flag=true,开始消费 //每2秒钟从队列中取值 result = blockingQueue.poll(2L, TimeUnit.SECONDS); //队列中值为空 if(null==result||"".equalsIgnoreCase(result)){ flag=false; System.out.println(Thread.currentThread().getName()+"\t"+"超过2m没有取到 消费退出"); System.out.println(); System.out.println(); return; } System.out.println(Thread.currentThread().getName() + "消费队列" + result + "成功"); } } public void stop() throws Exception{ flag=false; } } /** * @description: 生产者消费者模式示例代码(阻塞队列版) * @author: xz */ public class ProdConsumerBlockQueueDemo { public static void main(String[] args) throws Exception { //创建Resource对象,并传入由数组结构组成的有界阻塞队列,初始大小为10 Resource resource = new Resource(new ArrayBlockingQueue(10)); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t生产线程启动"); try { resource.myProd(); System.out.println(); System.out.println(); } catch (Exception e) { e.printStackTrace(); } },"Prod 线程").start(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"\t消费线程启动"); try { resource.myConsumer(); System.out.println(); System.out.println(); } catch (Exception e) { e.printStackTrace(); } },"consumer 线程").start(); //睡眠5秒钟 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); System.out.println(); System.out.println(); System.out.println("5秒钟时间到,停止活动"); resource.stop(); } }
-
输出结果如下图: