BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。使用场景。

首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示:

通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出;在生产者消费者模式中,通过队列的方式可以很方便的实现两者之间的数据共享。强大的BlockingQueue使我们不用关心什么时候需要阻塞线程,什么时候需要唤醒线程。

BlockingQueue的核心方法:

放入数据:

  offer(anObject) 如果BlockingQueue可以容纳,返回为true,否则返回false.

  offer(E o,long timeout,TimeUnit unit),设置等待时间,如果指定时间内,还不能往队列中加入BlockingQueue,则返回失败。

  put(anObject)把anObject加到BlockingQueue中,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续。

获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入; 
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

测试代码:

package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; public class BlockingQueueTest {
public static void main(String args[]) throws InterruptedException{
BlockingQueue<String> queue = new ArrayBlockingQueue(10); Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
Consumer consumer = new Consumer(queue); ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer); Thread.sleep(10 * 1000);
producer1.stop();
producer2.stop();
producer3.stop(); Thread.sleep(2000);
// 退出Executor
service.shutdown();
}
}

生产者:

package BlockingQueue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable{ private volatile boolean isRunning = true;
private BlockingQueue<String> queue;
private static AtomicInteger count = new AtomicInteger();
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Producer(BlockingQueue queue){
this.queue = queue;
} public void run(){
String data = null;
Random r = new Random();
System.out.println("启动生产者线程");
try{
while(isRunning){
System.out.println("正在生产数据.....");
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP)); data = "data:" + count.incrementAndGet();
System.out.println("将数据:" + data + "放入队列...");
if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
System.out.println("放入数据失败:" + data);
}
}
}catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
finally{
System.out.println("退出生产者线程!");
}
} public void stop(){
isRunning = false;
} }

消费者:

package BlockingQueue;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; public class Consumer implements Runnable{
private BlockingQueue<String> queue;
private static final int DEFAULT_RANGE_FOR_SLEEP = 1000; public Consumer(BlockingQueue<String> queue){
this.queue = queue;
} public void run(){
System.out.println("启动消费者线程:");
Random r = new Random();
boolean isRunning = true;
try{
while(isRunning){
System.out.println("正从队列获取数据...");
String data = queue.poll(2,TimeUnit.SECONDS);
if(null != data){
System.out.println("拿到数据:" + data);
System.out.println("正在消费数据:" + data);
Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));
}else{
isRunning = false;
}
}
}catch(InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}finally{
System.out.println("退出消费者线程!");
}
}
}

参考:http://wsmajunfeng.iteye.com/blog/1629354

最新文章

  1. bzoj3551 Peaks加强版
  2. Java应用程序访问网络资源--HttpClient
  3. Android 中的 Service 全面总结
  4. 【GoLang】GoLang 错误处理 -- 异常处理思路示例
  5. Google Chrome 扩展程序开发
  6. Linux流量监控工具 - iftop (最全面的iftop教程)
  7. Spark学习笔记--概念知识
  8. JQUERY选择和操作DOM元素(利用正则表达式的方法匹配字符串中的一部分)
  9. 7 Ways to earn money on programming(转)
  10. python类与对象基本语法
  11. 第二次项目冲刺(Beta阶段)--第七天
  12. 如何开发webpack loader
  13. S3C6410板子移植 Android2.2
  14. .net core 2.x - ids4 - identity - two factory 登录认证
  15. 使用PowerDesigner 15对现有数据库进行生成图表结构
  16. 2018牛客网暑假ACM多校训练赛(第十场)H Rikka with Ants 类欧几里德算法
  17. linux常用命令:pwd 命令
  18. MongoDB -的连接和使用
  19. 【转】Lucene工作原理——反向索引
  20. 中小型研发团队架构实践七:集中式日志ELK

热门文章

  1. hdu3416 最短路+最大流
  2. kuangbin_UnionFind B (POJ 1611)
  3. C++@sublime GDB调试
  4. linux中的find命令——查找文件名
  5. linux oracle profile配置
  6. JSP 相关试题(五)
  7. 【转】Deprecated: Function ereg_replace() is deprecated的解决方法
  8. ubuntu- eclipse、CDT安装
  9. 转载: scikit-learn学习之回归分析
  10. CSharp使用log4net记录日志