生产者消费者问题是Java并发中的常见问题之一,在实现时,一般可以考虑使用juc包下的BlockingQueue接口,至于具体使用哪个类,则就需要根据具体的使用场景具体分析了。本文主要实现一个生产者消费者的原型,以及实现一个生产者消费者的典型使用场景。

第一个问题:实现一个生产者消费者的原型。

 import java.util.concurrent.*;

 class Consumer implements Runnable {
BlockingQueue q = null; public Consumer(BlockingQueue q) {
this.q = q;
} @Override
public void run() {
while(true) {
try {
q.take();
System.out.println("Consumer has taken a product.");
}catch(InterruptedException e) { }
}
}
} class Producer implements Runnable {
BlockingQueue q = null; public Producer(BlockingQueue q) {
this.q = q;
} @Override
public void run() {
while(true) {
try { // note that if there is any chance that block, usually we need a InterruptedException
q.put(new Object());
System.out.println("Producer has puted a product.");
}catch(InterruptedException e) { }
}
} } public class JC_ProducerConsumerPrototype {
static int queueCapacity = 1024;
//static BlockingQueue<Object> q = new ArrayBlockingQueue<Object>(queueCapacity); // Can also compile
static BlockingQueue q = new ArrayBlockingQueue(queueCapacity); // ABQ must has a capacity
public static void main(String[] args) {
Thread t1 = new Thread(new Producer(q));
Thread t2 = new Thread(new Consumer(q));
t1.start();
t2.start();
} }

第二个问题,现在假设生产者是在读取磁盘上的多个log文件,对于每一个文件,依次读取文件中的每一行,也就是一条log记录;消费者需要读取并分析这些记录,假设消费者是计算密集型的。如何在生产者消费者原型的基础上实现这些功能?

这个场景在server端开发中是经常碰到的,因为在Server端,不可避免地会产生大量的日志文件。

 import java.util.concurrent.*;
import java.io.*;
import java.nio.*;
import java.nio.file.*;
import java.util.*;
import java.nio.charset.*; class Producer implements Runnable {
BlockingQueue q = null;
String fileName = null;
CountDownLatch latch = null; public Producer(BlockingQueue q,String fileName,CountDownLatch latch) {
this.q = q;
this.fileName = fileName;
this.latch = latch;
} @Override
public void run() {
Path path = Paths.get(".",fileName);
try{
List<String> lines = Files.readAllLines(path,StandardCharsets.UTF_8);
for(int i=lines.size();i>0;i--){
try{
q.put(lines.get(i));
}catch(InterruptedException e) { }
}
}catch(IOException e){ }
latch.countDown();
}
} class Consumer implements Runnable {
BlockingQueue<String> q = null;
Boolean done = false; public Consumer(BlockingQueue q,Boolean done){
this.q = q;
this.done = done;
} @Override
public void run(){
while(!done||q.size()!=0){
try{
q.take();
}catch(InterruptedException e){ }
}
}
} public class JC_ProducerConsumerHandlingLog{
public static int fileCount = 1024;
public static String[] fileNames = new String[fileCount];
public static int cpuCount = 8;
public static CountDownLatch latch = new CountDownLatch(fileCount);
public static volatile boolean done = false;
public static BlockingQueue<String> q = new LinkedBlockingQueue<String>(fileCount);//one thread for one file public static void main(String[] args){
for(int i=0;i<fileCount;i++){
Thread t = new Thread(new Producer(q,fileNames[i],latch));
t.start();
}
for(int i=0;i<cpuCount;i++){//for computing tasks, we don't need too many threads.
Thread t = new Thread(new Consumer(q,done));
t.start();
}
try{
latch.await();
done = true;
}catch(InterruptedException e){ } }
}

需要稍微注意一下线程数的选择,对于计算密集型的任务,我认为线程数达到cpu的核数比较合理(在不考虑超线程的情况下,也就是说一个核只有一个线程)。有不同意见欢迎跟我交流!

最新文章

  1. vue DatePicker vue2.0的日期插件
  2. DIY操作系统(引文)
  3. Oracle中有个tkprof来格式化oracle的trace文件
  4. Java获取当前内存及硬盘使用情况
  5. iOS7适配问题
  6. Git 推送分支
  7. ca 证书、签名
  8. Python之编程基础(编程语言分类)
  9. SVN学习之windows下svn的安装
  10. 金蝶K/3 同步用核算项目配置
  11. RTMP 摄像头推流至七牛云直播
  12. ABAP接口之Http发送json报文
  13. javaScript笔记详解(1)
  14. golang使用chrome headless获取网页内容
  15. T-SQL:SQL语句处理顺序的坑(四)
  16. python中逻辑运算符“+”的特殊之处
  17. element的form表单中如何一行显示多el-form-item标签
  18. vbox 的 ova 提取vmdk 与 vdi 以及扩容
  19. Unity读取Android SDcard文件
  20. hdu 4864 任务分配贪心

热门文章

  1. C/C++ 动态存储分配
  2. Azure 上为Liunx VM 挂载File类型的存储。
  3. Hive Word count
  4. jmap,jhat分析内存
  5. java基础疑难点总结之成员变量的继承,方法重载与重写的区别,多态与动态绑定
  6. 怎么学习计算电磁学【QUORA】
  7. 我的opencv之旅:ios人脸识别
  8. Trie树 &amp; 01Trie
  9. ZOJ 3233 Lucky Number --容斥原理
  10. JavaWeb学习----http协议