本文部分摘自《Java 并发编程的艺术》

模式概述

在线程的世界里,生产者就是生产数据的线程,消费者就是消费数据的数据。生产者和消费者彼此之间不直接通信,而是通过阻塞队列进行通信,所以生产者生产完数据后不用等待消费者处理,而是直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列取,阻塞队列相当于一个缓冲区,平衡了生产者和消费者的处理能力

模式实战

假设现有需求:把各部门的邮件收集起来,统一处理归纳。可以使用生产者 - 消费者模式,启动一个线程把所有邮件抽取到队列中,消费者启动多个线程处理邮件。Java 代码如下:

public class QuickCheckEmailExtractor {

    private final ThreadPoolExecutor threadsPool;

    private final BlockingQueue<EmailDTO> emailQueue;

    private final EmailService emailService;

    public QuickCheckEmailExtractor() {
emailQueue = new LinkedBlockingQueue<>();
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 101,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
emailService = new EmailService();
} public void extract() {
// 抽取所有邮件到队列里
new ExtractEmailTask().start();
// 处理队列里的邮件
check();
} private void check() {
try {
while (true) {
// 两秒内取不到就退出
EmailDTO email = emailQueue.poll(2, TimeUnit.SECONDS);
if (email == null) {
break;
}
threadsPool.submit(new CheckEmailTask());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} protected void extractEmail() {
List<EmailDTO> allEmails = emailService.queryAllEmails();
if (allEmails == null) {
return;
}
for (EmailDTO emailDTO : allEmails) {
emailQueue.offer(emailDTO);
}
} protected void checkEmail(EmailDTO email) {
System.out.println("邮件" + email.getId() + "已处理");
} public class ExtractEmailTask extends Thread { @Override
public void run() {
extractEmail();
} } public class CheckEmailTask extends Thread { private EmailDTO email; @Override
public void run() {
checkEmail(email);
} public CheckEmailTask() {
super();
} public CheckEmailTask(EmailDTO email) {
super();
this.email = email;
}
}
}

多生产者和多消费者场景

在多核时代,多线程并发处理速度比单线程处理速度更快,所以可以使用多个线程来生产数据,多个线程来消费数据。更复杂的情况是,消费者消费完的数据,可能还要交给其他消费者继续处理,如图所示:

我们在一个长连接服务器中使用这种模式,生产者 1 负责将所有客户端发送的消息存放在阻塞队列 1 里,消费者 1 从队列里读消息,然后通过消息 ID 进行散列得到 N 个队列中的一个,然后根据编号将消息存放在不同的队列里,每个阻塞队列会分配一个线程来阻塞队列里的数据。如果消费者 2 无法消费消息,就将消息再抛回阻塞队列 1 中,交给其他消费者处理

public class MsgQueueManager {

    /**
* 消息总队列
*/
private final BlockingQueue<Message> messageQueue; /**
* 消息子队列集合
*/
private final List<BlockingQueue<Message>> subMsgQueues; private MsgQueueManager() {
messageQueue = new LinkedBlockingQueue<>();
subMsgQueues = new ArrayList<>();
} public static MsgQueueManager getInstance() {
return new MsgQueueManager();
} public void put(Message msg) {
try {
messageQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} public Message take() {
try {
return messageQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
} /**
* 消费者线程获取子队列
*/
public BlockingQueue<Message> addSubMsgQueue() {
BlockingQueue<Message> subMsgQueue = new LinkedBlockingQueue<>();
subMsgQueues.add(subMsgQueue);
return subMsgQueue;
} /**
* 消息分发线程,负责把消息从大队列塞到小队列里
*/
class DispatchMessageTask implements Runnable { /**
* 控制消息分发开始与结束
*/
private boolean flag = true; public void setFlag(boolean flag) {
this.flag = flag;
} @Override
public void run() {
BlockingQueue<Message> subQueue;
while (flag) {
// 如果没有数据,则阻塞在这里
Message msg = take();
// 如果为空,表示没有Session连接,需要等待Session连接上来
while ((subQueue = getSubQueue()) == null) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 把消息放到小队列里
try {
subQueue.put(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} /**
* 均衡获取一个子队列
*/
public BlockingQueue<Message> getSubQueue() {
List<BlockingQueue<Message>> subMsgQueues = getInstance().subMsgQueues;
if (subMsgQueues.isEmpty()) {
return null;
}
int index = (int) (System.nanoTime() % subMsgQueues.size());
return subMsgQueues.get(index);
}
}
}

最新文章

  1. PetaPoco 批量插入数据
  2. bzoj1124[POI2008]枪战maf
  3. java新手笔记24 Math/String对象
  4. 【CF521C】【排列组合】Pluses everywhere
  5. Mongodb使用
  6. liststack——链表栈(procedure)
  7. angular.js学习手册(二)
  8. 一个Jquery特效(转)
  9. Bootstrap环境及屏幕适配-(一)
  10. 《RabbitMQ Tutorial》译文 第 6 章 远程过程调用(RPC)
  11. servlet的web-xml配置详解
  12. Android的Context Manager(服务管理器)源码剖析-android学习之旅(99)
  13. 带着新人学springboot的应用08(springboot+jpa的整合)
  14. 微信公众号开发 [05] 微信支付功能开发(网页JSAPI调用)
  15. VS2008中 VB 报错 检索 COM 类工厂中 CLSID 为 {28E68F9A-8D75-11D1-8DC3-3C302A000000} 的组件失败,原因是出现以下错误: 80040154 没有注册类 (异常来自 HRESULT:0x80040154 (REGDB_E_CLASSNOTREG))。
  16. CSS 重置默认样式
  17. C#7.0&amp;6.0新特性 — 完整版
  18. ArcGIS Runtime SDK for iOS开发地图图层-图形图层
  19. SQL Server如何将Id相同的字段合并,并且以逗号隔开
  20. PHP直接将文件流转换为字符串

热门文章

  1. The Weekly Web Dev Challenge: String Calculator
  2. Twitter 分享
  3. copyright@xgqfrms
  4. Flutte 什么是Widget,RenderObjects和Elements
  5. perl 打印目录结构
  6. Spring 中的 MetaData 接口
  7. 彻底解决Asp.netCore WebApi 3.1 跨域时的预检查204 options重复请求的问题
  8. Vue脚手架中默认的margin怎么清除
  9. Elasticsearch 7.x配置用户名密码访问 开启x-pack验证
  10. Markdown(2)基本语法