1 生产者消费者模式概述

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,

直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,

才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式

   2 实现生产者消费者模式

   产品类

package com.thread.pc.blockingqueue;

import java.util.UUID;

/**
* 产品类
*
* @author yyx 2018年12月22日
*/
public class Product {
private UUID proCode; // 产品唯一编码 public Product(UUID proCode) {
super();
this.proCode = proCode;
} public UUID getProCode() {
return proCode;
} public void setProCode(UUID proCode) {
this.proCode = proCode;
} }

   生产者

package com.thread.pc.lockcondition;

/**
* 生产者
* @author yyx 2019年1月5日
*/
public class Producer implements Runnable {
private Warehouse warehouse; public Producer(Warehouse warehouse) {
super();
this.warehouse = warehouse;
} @Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
warehouse.addProduct();
}
} }

   消费者

package com.thread.pc.lockcondition;

/**
* 消费者
*
* @author yyx 2019年1月5日
*/
public class Consumer implements Runnable {
private Warehouse warehouse; public Consumer(Warehouse warehouse) {
super();
this.warehouse = warehouse;
} @Override
public void run() {
for (int i = 1; i <= 10; i++) {
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
warehouse.removeProduct();
}
}
}

2.1 使用lock、condition和await、singalAll

   仓库类

package com.thread.pc.lockcondition;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; /**
* 仓库类
*
* @author yyx 2019年1月5日
*/
public class Warehouse {
private final int MAX_SIZE = 5;
private List<Product> listProduct;
private Lock lock;
private Condition conditionProducer;
private Condition conditionConsumer; public Warehouse(List<Product> listProduct, Lock lock, Condition conditionProducer, Condition conditionConsumer) {
super();
this.listProduct = listProduct;
this.lock = lock;
this.conditionProducer = conditionProducer;
this.conditionConsumer = conditionConsumer;
} public void addProduct() {
lock.lock();
try {
String currentName = Thread.currentThread().getName();
while (listProduct.size() >= MAX_SIZE) { // 为了避免虚假唤醒,应该总是使用在循环中
try {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
conditionProducer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.add(product);
conditionConsumer.signalAll();
} finally {
lock.unlock();
}
} public void removeProduct() {
lock.lock();
try {
String currentName = Thread.currentThread().getName();
while (listProduct.size() <= 0) {
try {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
conditionConsumer.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = listProduct.get(0);
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.remove(0);
conditionProducer.signalAll();
} finally {
lock.unlock();
}
}
}

   测试类

package com.thread.pc.lockcondition;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 测试类
* @author 2018年12月22日
*/
public class TestModel {
public static void main(String[] args) {
List<Product> listProduct=new ArrayList<Product>();
Lock lock = new ReentrantLock();
Condition conditionProducer = lock.newCondition();
Condition conditionConsumer = lock.newCondition(); Warehouse warehouse = new Warehouse(listProduct,lock, conditionProducer,conditionConsumer);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer,"生产者A").start();
new Thread(producer,"生产者B").start();
new Thread(consumer,"消费者C").start();
new Thread(consumer,"消费者D").start();
}
}

2.2 使用synchronized修饰代码块

   仓库类

package com.thread.pc.synchronizedcodeblock;

import java.util.List;
import java.util.UUID; /**
* 仓库类
*
* @author yyx 2019年1月5日
*/
public class Warehouse {
private final int MAX_SIZE = 10; // 最大数量
private List<Product> listProduct; public Warehouse(List<Product> listProduct) {
super();
this.listProduct = listProduct;
} /**
* 生产产品
*/
public void addProduct() {
synchronized (listProduct) {
String currentName = Thread.currentThread().getName();
while (listProduct.size() >= MAX_SIZE) { // 为了避免虚假唤醒,应该总是使用在循环中
try {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
listProduct.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.add(product);
listProduct.notifyAll();
}
} /**
* 消费产品
*/
public void removeProduct() {
synchronized (listProduct) {
String currentName = Thread.currentThread().getName();
while (listProduct.size() <= 0) {
try {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
listProduct.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = listProduct.get(0);
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.remove(0);
listProduct.notifyAll();
}
}
}

   测试类

package com.thread.pc.synchronizedcodeblock;

import java.util.ArrayList;
import java.util.List; /**
* 测试类
* @author yyx
* 2019年1月5日
*/
public class TestModel {
public static void main(String[] args) {
List<Product> listProduct = new ArrayList<Product>(); Warehouse warehouse = new Warehouse(listProduct);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer,"生产者A").start();
new Thread(producer,"生产者B").start();
new Thread(consumer,"消费者C").start();
new Thread(consumer,"消费者D").start();
}
}

2.3 使用synchronized修饰方法

   仓库类

package com.thread.pc.synchronizedmethod;

import java.util.List;
import java.util.UUID; /**
* 仓库类
*
* @author yyx 2019年1月5日
*/
public class Warehouse {
private final int MAX_SIZE = 10;
private List<Product> listProduct; public Warehouse(List<Product> listProduct) {
super();
this.listProduct = listProduct;
} /**
* 生产产品
*/
public synchronized void addProduct() {
String currentName = Thread.currentThread().getName();
while (listProduct.size() >= MAX_SIZE) {
try {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.add(product);
notifyAll();
} /**
* 消费产品
*/
public synchronized void removeProduct() {
String currentName = Thread.currentThread().getName();
while (listProduct.size() <= 0) {
try {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = listProduct.get(0);
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
listProduct.remove(0);
notifyAll();
}
}

   测试类

package com.thread.pc.synchronizedmethod;

import java.util.ArrayList;
import java.util.List; /**
* 测试类
*
* @author yyx 2019年1月5日
*/
public class TestModel {
public static void main(String[] args) {
List<Product> listProduct = new ArrayList<Product>(); Warehouse warehouse = new Warehouse(listProduct);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer, "生产者A").start();
new Thread(producer, "生产者B").start();
new Thread(consumer, "消费者C").start();
new Thread(consumer, "消费者D").start();
}
}

2.4 使用BlockingQueue

   仓库类

package com.thread.pc.blockingqueue;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;
/**
* 仓库
* @author yyx 2018年12月22日
*/
public class Warehouse {
private final int MAX_SIZE = 10;
private BlockingQueue<Product> blockingQueue; public Warehouse(BlockingQueue<Product> blockingQueue) {
super();
this.blockingQueue = blockingQueue;
} public void addProduct() {
String currentName = Thread.currentThread().getName();
if (blockingQueue.size() >= MAX_SIZE) {
System.out.println("产品列表已满,不再生产!" + currentName + "进入等待");
} else {
Product product = new Product(UUID.randomUUID());
System.out.println(currentName + "生产了一个产品,它的编号是:" + product.getProCode().toString());
try {
blockingQueue.put(product);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} public void removeProduct() {
String currentName = Thread.currentThread().getName();
if (blockingQueue.size() <= 0) {
System.out.println("产品列表不足,不再消费!" + currentName + "进入等待");
} else {
try {
Product product = blockingQueue.take();
System.out.println(currentName + "消费了一个产品,它的编号是:" + product.getProCode().toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

   测试类

package com.thread.pc.blockingqueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 测试类
* @author yyx 2018年12月22日
*/
public class TestModel {
public static void main(String[] args) {
BlockingQueue<Product> blockingQueue = new LinkedBlockingQueue<>(10); Warehouse warehouse = new Warehouse(blockingQueue);
Producer producer = new Producer(warehouse);
Consumer consumer = new Consumer(warehouse); new Thread(producer).start();
new Thread(producer).start();
new Thread(consumer).start();
}
}

最新文章

  1. linux命令大全
  2. netstat相关
  3. python中在同一个位置输出数据
  4. 圆角卖萌式登录表单和width的百分比值
  5. Chance – 功能强大的 JavaScript 随机数生成类库
  6. jQuery鼠标悬停内容动画切换效果
  7. 苹果应用 Windows 申请 普通证书 和Push 证书 Hbuilder 个推(2)
  8. 系统隐式 Intent
  9. linux命令合集
  10. WIN7开无线
  11. Arrays.fill方法的陷阱
  12. virt
  13. 51NOD 1376 最长递增子序列的数量 [CDQ分治]
  14. ASP.NET Core 2.2 十九. Action参数的映射与模型绑定
  15. 中文dumps显示
  16. SpringBoot2.0 redis生成组建和读写配置文件
  17. 小学四则运算APP 第二个冲刺 第一天
  18. python练习题-day5
  19. bzoj千题计划196:bzoj4826: [Hnoi2017]影魔
  20. spark脚本日志输出级别设置

热门文章

  1. winform窗体启动过程
  2. selenium+xpath在不同层级的写法
  3. oracle创建表空间 授权
  4. sqlserver配置允许快照隔离
  5. 报错解决——Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX AVX2
  6. Windows 10正式版的历史版本
  7. rem、em 、font-size随着屏幕大小的改变而改变
  8. Windows本机搭建Redis
  9. 【LeetCode每天一题】Fibonacci Number(斐波那契数列)
  10. mybatis 调用 oracle 存储过程 select into 无记录时NO_DATA_FOUND异常处理分析