我们知道,单个线程计算是串行的,只有等上一个任务结束之后,才能执行下一个任务,所以执行效率是比较低的。

那么,如果用多线程执行任务,就可以在单位时间内执行更多的任务,而Master-Worker就是多线程并行计算的一种实现方式。

它的思想是,启动两个进程协同工作:Master和Worker进程。

Master负责任务的接收和分配,Worker负责具体的子任务执行。每个Worker执行完任务之后把结果返回给Master,最后由Master汇总结果。(其实也是一种分而治之的思想,和forkjoin计算框架有相似之处,参看:并行任务计算框架forkjoin

Master-Worker工作示意图如下:

下面用Master-Worker实现计算1-100的平方和,思路如下:

  1. 定义一个Task类用于存储每个任务的数据。
  2. Master生产固定个数的Worker,把所有worker存放在workers变量(map)中,Master需要存储所有任务的队列workqueue(ConcurrentLinkedQueue)和所有子任务返回的结果集resultMap(ConcurrentHashMap)。
  3. 每个Worker执行自己的子任务,然后把结果存放在resultMap中。
  4. Master汇总resultMap中的数据,然后返回给Client客户端。
  5. 为了扩展Worker的功能,用一个MyWorker继承Worker重写任务处理的具体方法。

Task类:

package com.thread.masterworker;
public class Task {
private int id;
private String name;
private int num; public int getId() {
return id;
} public void setId(int id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public int getNum() {
return num;
} public void setNum(int num) {
this.num = num;
}
}

Master实现:

package com.thread.masterworker;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; public class Master {
//所有任务的队列
private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>(); //所有worker
private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //共享变量,worker返回的结果
private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>(); //构造方法,初始化所有worker
public Master(Worker worker,int workerCount){
worker.setWorkerQueue(this.workerQueue);
worker.setResultMap(this.resultMap); for (int i = 0; i < workerCount; i++) {
Thread t = new Thread(worker);
this.workers.put("worker-"+i,t);
}
} //任务的提交
public void submit(Task task){
this.workerQueue.add(task);
} //执行任务
public int execute(){
for (Map.Entry<String, Thread> entry : workers.entrySet()) {
entry.getValue().start();
} //一直循环,直到结果返回
while (true){
if(isComplete()){
return getResult();
}
} } //判断是否所有线程都已经执行完毕
public boolean isComplete(){
for (Map.Entry<String, Thread> entry : workers.entrySet()) {
//只要有任意一个线程没有结束,就返回false
if(entry.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
} //处理结果集返回最终结果
public int getResult(){
int res = 0;
for (Map.Entry<String,Object> entry : resultMap.entrySet()) {
res += (Integer) entry.getValue();
}
return res;
} }

父类Worker:

package com.thread.masterworker;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workerQueue; private ConcurrentHashMap<String,Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) {
this.workerQueue = workerQueue;
} public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
} @Override
public void run() {
while(true){
//从任务队列中取出一个任务
Task task = workerQueue.poll();
if(task == null) break;
//处理具体的任务
Object res = doTask(task);
//把每次处理的结果放到结果集里面,此处直接把num值作为结果
resultMap.put(String.valueOf(task.getId()),res);
} } public Object doTask(Task task) {
return null;
}
}

子类MyWorker继承父类Worker,重写doTask方法实现具体的逻辑:

package com.thread.masterworker;

public class MyWorker extends Worker {
@Override
public Object doTask(Task task) {
//暂停0.5秒,模拟任务处理
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//计算数字的平方
int num = task.getNum();
return num * num;
}
}

客户端Client:

package com.thread.masterworker;

import java.util.Random;

public class Client {
public static void main(String[] args) { Master master = new Master(new MyWorker(), 10); //提交n个任务到任务队列里
for (int i = 0; i < 100; i++) {
Task task = new Task();
task.setId(i);
task.setName("任务"+i);
task.setNum(i+1);
master.submit(task);
} //执行任务
long start = System.currentTimeMillis();
int res = master.execute();
long time = System.currentTimeMillis() - start;
System.out.println("结果:"+res+",耗时:"+time);
}
}

以上,我们用10个线程去执行子任务,最终由Master做计算求和(1-100的平方和)。每个线程暂停500ms,计算数字的平方值。

总共100个任务,分10个线程并行计算,相当于每个线程均分10个任务,一个任务的时间大概为500ms,故10个任务为5000ms,再加上计算平方值的时间,故稍大于5000ms。结果如下,

结果:338350,耗时:5084

最新文章

  1. json是个啥东东
  2. iOS开发 适配iOS10以及Xcode8[转]
  3. 006 复杂的数据类型&amp;函数(方法)
  4. Postman-CI集成Jenkins
  5. 第一零二天上课 PHP TP框架 引入文件路径问题和调用验证码的方式
  6. POJ2226 Muddy Fields(二分图最小点覆盖集)
  7. Asp.Net MVC路由调试好帮手RouteDebugger
  8. 无需Get更多技能,快速打造一个可持久化的任务调度
  9. ACM==迷茫
  10. 入侵检测中需要监控的注册表路径研究(Windows Registry Security Check)
  11. Python Async/Await入门指南
  12. React中props和state相同点和不同点
  13. js-JavaScript常见的创建对象的几种方式
  14. Start-Sleep 帮助信息
  15. java环境和Tomcat环境
  16. 关于 NPOI 导出的 Excel 出现“部分内容有问题” 的解决方法
  17. Python|一文简单看懂 深度&amp;广度 优先算法
  18. [Javascript] Automate the process of flattening deeply nested arrays using ES2019&#39;s flat method
  19. RESTful接口签名认证实现机制
  20. STM32L0 复位和时钟控制 Reset and clock control (RCC)

热门文章

  1. 【JavaWeb学习】过滤器Filter
  2. vue的param和query两种传参方式及URL的显示
  3. 带限制的广搜 codeforces
  4. 用PHP写下HELLO WORLD!
  5. 公文流转系统v0.1
  6. 神秘常量0x077CB531,德布莱英序列的恩赐
  7. 创建dynamics CRM client-side (六) - form &amp; field notification
  8. python实例:自动爬取豆瓣读书短评,分析短评内容
  9. RTMP、HTTP、HLS协议比较
  10. docker swoft