Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。
一、什么是Master-Worker模式:
该模式的结构图:

Worker:用于实际处理一个任务;

Master:任务的分配和最终结果的合成;

Main:启动程序,调度开启Master。

注意点:

Master必须存在一个队列来存储客户端发送过来的任务,必须有一个队列,我们选择无阻塞无界队列

Worker实际处理任务的操作者,所以应该是线程,应该实现runnable接口

master应该有一个容器装所有的worker对象,不涉及到高并发,使用hashmap ,value就是worker对象

worker需要拥有master进程中的无阻塞无界队列的引用,用来获得任务进行处理

master是对worker处理的结果做归纳总结,所以需要有一个队列要保存worker处理的结果,要实现并发安全的ConcurrentHashMap

worker需要拥有ConcurrentHashMap的引用,把处理的结果存储在ConcurrentHashMap中,需要线程安全的

Master-Worker模式是一种将串行任务并行化的方案。
更重要的是提供了一种变化的思想。

我们来看下面程序的代码:

package com.bjsxt.height.design015;

public class Task {

    private int id;
private int price ;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
} }
package com.bjsxt.height.design015;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue;
private ConcurrentHashMap<String, Object> resultMap; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
} public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
} @Override
public void run() {
while(true){
Task input = this.workQueue.poll();
if(input == null) break;
Object output = handle(input);
this.resultMap.put(Integer.toString(input.getId()), output);
}
} private Object handle(Task input) {
Object output = null;
try {
//处理任务的耗时。。 比如说进行操作数据库。。。
Thread.sleep(500);
output = input.getPrice();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
} }
package com.bjsxt.height.design015;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //1 有一个盛放任务的容器
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); //2 需要有一个盛放worker的集合
private HashMap<String, Thread> workers = new HashMap<String, Thread>(); //3 需要有一个盛放每一个worker执行任务的结果集合
private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); //4 构造方法
public Master(Worker worker , int workerCount){
worker.setWorkQueue(this.workQueue);
worker.setResultMap(this.resultMap);
/**
* 初始化woker对象的个数,管理worker对象,开启100个worker对象,装到hashmap中
* */
for(int i = 0; i < workerCount; i ++){
this.workers.put(Integer.toString(i), new Thread(worker));
} } //5 需要一个提交任务的方法
public void submit(Task task){
/**
* 添加到任务队列中
* */
this.workQueue.add(task);
} //6 需要有一个执行的方法,启动所有的worker方法去执行任务
public void execute(){
for(Map.Entry<String, Thread> me : workers.entrySet()){
me.getValue().start();
}
} //7 判断是否运行结束的方法
public boolean isComplete() {
for(Map.Entry<String, Thread> me : workers.entrySet()){
if(me.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
} //8 计算结果方法
public int getResult() {
int priceResult = 0;
for(Map.Entry<String, Object> me : resultMap.entrySet()){
priceResult += (Integer)me.getValue();
}
return priceResult;
} }
package com.bjsxt.height.design015;

import java.util.Random;

public class Main {

    public static void main(String[] args) {

        Master master = new Master(new Worker(), 20);

        Random r = new Random();
for(int i = 1; i <= 100; i++){
Task t = new Task();
t.setId(i);
t.setPrice(r.nextInt(1000));
master.submit(t);
}
master.execute();
long start = System.currentTimeMillis(); while(true){
if(master.isComplete()){
long end = System.currentTimeMillis() - start;
int priceResult = master.getResult();
System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
break;
}
} }
}

程序运行的结果是:

最终结果:54386, 执行时间:2500

将每个worker计算得到的price结果进行相加

相当的经典,这种设计模式就是将串行化的思想转换成并行化进行处理

最新文章

  1. java 锁3
  2. Json数据报错
  3. TCP/IP协议栈与数据包封装+TCP与UDP区别
  4. 各种sensor名称统计
  5. Java基础知识强化78:正则表达式之获取功能(案例)
  6. Python爬虫实战(1):爬取Drupal论坛帖子列表
  7. 【转】delphi 保存到txt文件
  8. Jmeter之接口测试
  9. Java与C/C++的区别
  10. zabbix 安装配置介绍
  11. 在servlet中使用spring注解
  12. 一 .isinstance(obj,cls)和issubclass(sub,super)
  13. 有关Math数学运算的js函数
  14. opencv中 int main(int argc,char* argv[])详解
  15. Golang 端口复用测试
  16. Android蓝牙联机Demo解析
  17. [BZOJ4876][ZJOI2017]线段树
  18. 软件开发工具——Make
  19. OVN实战---《OVN and Containers》翻译
  20. Mysql常用命令行大全(二)

热门文章

  1. git status 命令详解
  2. DDD之1微服务设计为什么选择DDD
  3. Parrot os KDE还是MATE版本
  4. Java实现 蓝桥杯VIP 算法训练 整除问题
  5. Java实现 蓝桥杯VIP 算法训练 薪水计算
  6. Java实现 LeetCode 5355 T 秒后青蛙的位置
  7. Java实现 LeetCode 230 2的幂
  8. java实现滑动解锁
  9. Linux 自动挂载与fstab文件修复
  10. Java基础(八)