为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这种生产消费能力不均衡的问题,所以便有了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。

生产者消费者模式实战

利用BlockingQueue

package com;

import java.util.Random;
import java.util.concurrent.BlockingQueue; public class Comsumer implements Runnable {
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME = 1000;
public Comsumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer Id:"+Thread.currentThread().getId());
Random r = new Random();
Boolean isrunning = true;
try {
while(isrunning){
PCData data = queue.take();
if(data != null){
System.out.println("Comsumer data:"+data);
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

  

package com;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable{
private volatile boolean isRunning = true;
//内存缓冲区
private BlockingQueue<PCData> queue;
//总数 AtomicInteger
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME = 1000; public Producer(BlockingQueue<PCData> queue){
this.queue = queue;
} public void run(){
PCData data = null;
Random r = new Random();
System.out.println("start producting id:"+ Thread.currentThread().getId());
while(isRunning){
try {
while(isRunning){
Thread.sleep(r.nextInt(SLEEPTIME));
data = new PCData(count.incrementAndGet());
if(!queue.offer(data,2,TimeUnit.SECONDS)){
System.out.println("加入队列失败");
}else{
System.out.println("Producer data:"+data);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
} public void stop(){
isRunning = false;
}
}

  

package com;

public class PCData {
private final int intData;
public PCData(int d){
intData = d;
} public PCData(String d){
intData = Integer.valueOf(d);
} public int getData(){
return intData;
} @Override
public String toString(){
return ""+intData;
}
}

  

package com;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; public class Main {
public static void main(String[] args) throws InterruptedException{
BlockingQueue<PCData> queue = new LinkedBlockingQueue<>(10);
Producer p1 = new Producer(queue);
Producer p2 = new Producer(queue);
Producer p3 = new Producer(queue);
Comsumer c1 = new Comsumer(queue);
Comsumer c2 = new Comsumer(queue);
Comsumer c3 = new Comsumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
Thread.sleep(10*1000);
p1.stop();
p2.stop();
p3.stop();
Thread.sleep(3000);
service.shutdown();
}
}

  利用notifyAll和wait

package com;

import java.util.List;

public class Consumer implements Runnable{
private List<PCData> queue;
public Consumer(List<PCData> queue){
this.queue = queue;
} @Override
public void run() {
while(true){
PCData data = null;
try {
synchronized (queue) {
if(queue.size() == 0){
System.out.println(Thread.currentThread().getId()+"队列为空,无法消费");
queue.notifyAll();
queue.wait();
}else{
data = queue.remove(0);
System.out.println(Thread.currentThread().getId()+"消费:"+data);
}
}
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

  

package com;

import java.util.List;
import java.util.Random; public class Producer implements Runnable {
private List<PCData> queue;
private int length; public Producer(List<PCData> queue,int length){
this.queue = queue;
this.length = length;
} @Override
public void run() {
while(true){
Random r = new Random();
PCData data = new PCData(r.nextInt(100));
try {
synchronized (queue) {
if(queue.size() >= length){
System.out.println(Thread.currentThread().getId()+"队列满了,无法加入 ");
queue.notifyAll();
queue.wait();
}else{
queue.add(data);
System.out.println(Thread.currentThread().getId()+"生产了:"+data);
}
}
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
} } }

  

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class Main {
public static void main(String[] args){
List<PCData> queue = new ArrayList<>();
int length =10;
Producer p1 = new Producer(queue, length);
Producer p2 = new Producer(queue, length);
Producer p3 = new Producer(queue, length);
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
ExecutorService service = Executors.newCachedThreadPool();
service.execute(p1);
service.execute(p2);
service.execute(p3);
service.execute(c1);
service.execute(c2);
service.execute(c3);
}
}

  

最新文章

  1. [bigdata] 使用Redis队列来实现与机器无关的Job提交与执行 (python实现)
  2. Linux中设定umask的作用
  3. [转]使用Jenkins搭建持续集成(CI)环境
  4. (Xaml) Type &#39;DeviceA&#39; is not defined.
  5. su su- sudo的区别
  6. AngularJS之手动加载模块app和controller
  7. NDK开发总结
  8. 介绍一种css水平垂直居中的方法(非常好用!)
  9. hdu1874 最短路模板题
  10. win7共享wifi
  11. jQuery之手风琴图片
  12. C#中如何只保留小数点后面两位?
  13. bash Shell条件测试
  14. Java的基本程序设计结构【2】
  15. Android包管理机制(一) PackageInstaller的初始化
  16. 【C++ Primer 第13章】1. 拷贝控制、赋值和销毁
  17. centos mysql密码忘记了如何修改
  18. H5添加禁止缩放功能
  19. Java基础-处理json字符串解析案例
  20. CSS一个元素同时使用多个类选择器(class selector)

热门文章

  1. 洛谷 P1426 小鱼会有危险吗
  2. Android(java)学习笔记129:对ListView等列表组件中数据进行增、删、改操作
  3. Java执行系统命令工具类(JDK自带功能)
  4. Forbidden You don&#39;t have permission to access /phpStudyTest/application/index/controller/Index.php on this server.
  5. 使用lua实现Spine动画的预加载
  6. IOS7的变化
  7. PostgreSQL学习(1)-- 安装pageinspect extension
  8. 【Python学习之五】高级特性2(切片、迭代、列表生成器、生成器、迭代器)
  9. ipmitool的使用
  10. 用Python手把手教你搭建一个web框架-flask微框架!