Master-Worker模式是一种使用多线程进行数据处理的结构,多个worker进程协作处理用户请求,master进程负责维护worker进程,并整合最终处理结果

主要参与者

  • Worker:用于实际处理一个任务
  • Master:用于任务的分配和最终结果的合成
  • Main:启动系统,调度开启Master

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque; /**
* Master进程负责接收和分配任务
*/
public class Master {
//子任务队列 ConcurrentLinkedDeque:双向链表结构的无界并发队列
protected Queue<Object> workQueue = new ConcurrentLinkedDeque<>();
//worker进程队列
protected Map<String,Thread> threadMap = new HashMap<>();
//子任务处理结果集
protected Map<String,Object> resultMap = new ConcurrentHashMap<>();
//是否所以的子任务都结束了
public boolean isComplete(){
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
//停止状态
if (entry.getValue().getState()!= Thread.State.TERMINATED){
return false;
}
}
return true;
} // Worker进程,Worker进程数量
public Master(Worker worker, int countWorker) {
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i),new Thread(worker,Integer.toString(i)));
}
}
//提交一个任务
public void submit(Object job){
workQueue.add(job);
} //返回子任务结果集
public Map<String, Object> getResultMap() {
return resultMap;
}
//运行所以Worker进程,进行处理
public void execute(){
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}

import java.util.Map;
import java.util.Queue; /**
* Worker进程负责处理子任务
*/
public class Worker implements Runnable {
//子任务队列
protected Queue<Object> workQueue;
//子任务处理结果集
protected Map<String,Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
//子任务处理的逻辑,在子类中实现具体逻辑
public Object handle(Object input){
return input;
}
@Override
public void run() {
while (true){
//获取子任务
Object input = workQueue.poll();
if (input==null) break;
//处理子任务
Object re = handle(input);
//将处理结果写入结果集
resultMap.put(Integer.toString(input.hashCode()),re);
}
}
}

/**
* 任务 求 i^2
*/
public class PlusWorker extends Worker {
public Object handle(Object input){
Integer i = (Integer) input;
return i*i;
}
}

import java.util.Map;
import java.util.Set; /**
* 求 1^2 + 2^2 + 3^2 + 4^2 + 5^2
* 1 + 4 + 9 + 16 + 25 = 55
*/
public class Main {
public static void main(String[] args){
Master m = new Master(new PlusWorker(),5);
for (int i = 1; i <= 5; i++) {
m.submit(i);
}
m.execute();
int re = 0;
Map<String, Object> resultMap = m.getResultMap();
// 任务结果相加
while (resultMap.size()>0||!m.isComplete()){
Set<String> keys = resultMap.keySet();
String key = null;
for (String k : keys) { //每次只取一次
key = k;
break;
}
Integer i = null;
if (key!=null){
i = (Integer) resultMap.get(key);
}
if (i!=null){
re+=i; //任务结果相加
}
if (key!=null){
resultMap.remove(key); //移除已经被计算的结果项
}
}
System.out.println(re); //55
}
}

最新文章

  1. Java技术体系图
  2. Good-Bye
  3. (淘宝无限适配)手机端rem布局详解(转载非原创)
  4. Android --ListView分页
  5. Why did Jimmy Wales invest in Quora? Is he afraid that it will take over Wikipedia?
  6. web.xml 详解
  7. brew命令
  8. Setup FTP Server On CentOS, RHEL, Scientific Linux 6.5/6.4/6.3
  9. javascript基础笔记学习
  10. 用MVC导入导出
  11. AWWWB.COM网站克隆器
  12. StoreType.java 存储方式
  13. Tornado框架配置使用Jinja2模板引擎
  14. 20165237 2017-2018-2 《Java程序设计》第十周考试补做及编程题
  15. IDEA使用笔记(四)——工具栏的显示隐藏切换
  16. 调用的执行器&ldquo;executor://mstestadapter/v2&rdquo;时发生异常: 无法找到程序集&ldquo;log4net, Version=1.2.15.0, Culture=neutral, PublicKeyToken=669e0ddf0bb1aa2a&rdquo;
  17. Android tesseract-orc之扫描身份证号码
  18. 跨域nginx,CORS
  19. php 冒泡法 排序
  20. 通用Json的处理办法

热门文章

  1. webRTC脱坑笔记(四)— windows下Nginx对Node服务的反向代理
  2. 刷题or源码链接
  3. sentinel集群docker-compose.yml配置
  4. 前端必用正则(js)
  5. PHP curl_file_create函数
  6. JavaScript中的编码解码
  7. 富文本编辑器——百度UEditor插件安装教程
  8. php面试专题---4、流程控制考点
  9. display:inline-block在IE6/Ie7和IE8中的区别
  10. Nginx允许跨域访问的配置问题