使用BlockingQueue的生产者消费者模式
BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。
首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:
通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。
BlockingQueue的核心方法:
放入数据:
offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.
offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。
获取数据:
poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
取不到时返回null;
poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
测试代码:
package BlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class BlockingQueueTest {
public static void main(String args[]) throws InterruptedException{
BlockingQueue<String> queue = new ArrayBlockingQueue(10); Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer); Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop(); Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}
生产者:
package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable{ private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Producer(BlockingQueue queue){
this.queue = queue;
} public void run(){
String data = null;
Random r = new Random();
System.out.println("启动生产者线程");
try{
while(isRunning){
System.out.println("正在生产数据.....");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入数据失败:" + data);
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
finally{
System.out.println("退出生产者线程!");
}
} public void stop(){
isRunning = false;
} }
消费者:
package BlockingQueue; import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; public class Consumer implements Runnable{
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Consumer(BlockingQueue<String> queue){
this.queue = queue;
} public void run(){
System.out.println("启动消费者线程:");
Random r = new Random();
boolean isRunning = true;
try{
while(isRunning){
System.out.println("正从队列获取数据...");
String data = queue.poll(2,TimeUnit.SECONDS);
if(null != data){
System.out.println("拿到数据:" + data);
System.out.println("正在消费数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}else{
isRunning = false;
}
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}finally{
System.out.println("退出消费者线程!");
}
}
}
参考:http://wsmajunfeng.iteye.com/blog/1629354
最新文章
- bzoj3551 Peaks加强版
- Java应用程序访问网络资源--HttpClient
- Android 中的 Service 全面总结
- 【GoLang】GoLang 错误处理 -- 异常处理思路示例
- Google Chrome 扩展程序开发
- Linux流量监控工具 - iftop (最全面的iftop教程)
- Spark学习笔记--概念知识
- JQUERY选择和操作DOM元素(利用正则表达式的方法匹配字符串中的一部分)
- 7 Ways to earn money on programming(转)
- python类与对象基本语法
- 第二次项目冲刺(Beta阶段)--第七天
- 如何开发webpack loader
- S3C6410板子移植 Android2.2
- .net core 2.x - ids4 - identity - two factory 登录认证
- 使用PowerDesigner 15对现有数据库进行生成图表结构
- 2018牛客网暑假ACM多校训练赛(第十场)H Rikka with Ants 类欧几里德算法
- linux常用命令:pwd 命令
- MongoDB -的连接和使用
- 【转】Lucene工作原理——反向索引
- 中小型研发团队架构实践七:集中式日志ELK
热门文章
- hdu3416 最短路+最大流
- kuangbin_UnionFind B (POJ 1611)
- C++@sublime GDB调试
- linux中的find命令——查找文件名
- linux oracle profile配置
- JSP 相关试题(五)
- 【转】Deprecated: Function ereg_replace() is deprecated的解决方法
- ubuntu- eclipse、CDT安装
- 转载: scikit-learn学习之回归分析
- CSharp使用log4net记录日志