Java多线程_Master-Worker设计模式
2024-09-01 04:51:29
Master-Worker模式是常用的并行模式之一,它的核心思想是:系统由Master进程和Worker进程两类进程协同工作,Master负责接收和分配任务,Wroker负责处理子任务。当各个Worker进程将子任务处理完成后,将结果返回给Master进程,由Master进程进行汇总,从而得到最终的结果。
Master-Worker 模式的好处,它能够将一个大任务分解成若干个小任务并行执行,从而提高系统的吞吐量。而对于系统请求者 Client 来说,任务一旦提交,Master进程会分配任务并立即返回,并不会等待系统全部处理完成后再返回,其处理过程是异步的。因此,Client 不会出现等待现象。
Master-Worker 主要角色分配如下所示:
示例:
实现一个计算立方和的应用,并计算 1-100 的平方和,即 1² + 2² + 3² + ... + 100²。
(1)我们可以先实现Master-Worker的框架:
worker:
import java.util.Map;
import java.util.Queue; public class Worker implements Runnable {
protected Queue<Object> workQueue;
protected Map<String, Object> resultMap; public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
} public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
} public Object handle(Object input) {
return input;
} @Override
public void run() {
while (true) {
// 获取子任务
Object input = workQueue.poll();
if (input == null) {
break;
}
// 处理子任务
Object re = handle(input);
resultMap.put(Integer.toString(input.hashCode()), re);
}
}
}
master:
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; public class Master {
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
// Worker进程队列
protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
// 子任务处理结果集
protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 构造函数
public Master(Worker worker, int countWorker) {
worker.setWorkQueue(workQueue); // 添加任务队列
worker.setResultMap(resultMap); // 添加计算结果集合
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); // 循环添加任务进程
}
} // 是否所有的子任务都结束了
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED)
return false; // 存在未完成的任务
}
return true;
} // 提交一个子任务
public void submit(Object job) {
workQueue.add(job);
} // 返回子任务结果集
public Map<String, Object> getResultMap() {
return resultMap;
} // 执行所有Worker进程,进行处理
public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
这两个类就实现了Master-Worker设计模式的框架。
(2)开始写需求,重写Worker的handle方法,里面写需求
import java.util.Map;
import java.util.Set; public class PlusWorker extends Worker { @Override
public Object handle(Object input) {
int i = (Integer) input;
return i * i;
}
}
实现求平方和的需求。
(3)测试类,声明worker个数,实现大任务拆分小任务,master整合结果:
public static void main(String[] args) {
Master master = new Master(new Worker(), 10);
for (int i = 1; i <= 100; i++) {
master.submit(i);
}
master.execute();
int result = 0;
Map<String, Object> resultMap = master.getResultMap();
while (true) {
Set<String> keys = resultMap.keySet();
String key = null;
for (String k : keys) {
key = k;
break;
}
Integer i = null;
if (key != null) {
i = (Integer) resultMap.get(key);
}
if (i != null) {
result += i;
}
if (key != null) {
resultMap.remove(key);
}
if (master.isComplete() && resultMap.size() == 0) {
break;
}
}
System.out.println(result);
}
结果:
最新文章
- [tem]高精度1
- 彻底卸载Oracle
- Matlab 进阶学习记录
- pgadmin中的备份功能消失的原因和找回方法
- java内存知识
- Power Gating的设计(模块二)
- 文本框的onchange事件,如何兼容各大浏览器
- 【转】【Android】对话框 AlertDialog -- 不错不错
- Codeforces 616E - Sum of Remainders
- struts2中 ServletActionContext与ActionContext区别
- NodeJS会是昙花一现吗?
- Python 安装虚拟环境
- WPF 简易手风琴 (ListBox+Expander)
- 第二次项目冲刺(Beta阶段)--第二天
- Nifi自定义processor
- 微信小程序 数据库指引 前端操纵数据库失败
- 使用Log4J收集日志
- OO第二单元小结
- 爬虫_电影天堂 热映电影(xpath)
- cacti监控服务器