所谓的Aggregate模式,其实就是聚合模式,跟masterWorker模式有点类似,但其出发点不同。masterWorker模式是指master向worker发送命令,worker完成某种业务逻辑。而聚合模式则刚好相反,由各个worker完成某种业务逻辑后,把结果汇总发给某个actor,这个actor不一定是masterActor。

class AggregateMasterActor extends Actor{
override def receive: Receive = {
case cmd: AggregateCommand.Aggregate =>
// 将此次汇总结果汇报给from,为了简化,此处用self替代
val from = self
val backendActor = context.actorOf(Props(new AggregateMasterBackendActor(from,cmd.parallel)),s"AggregateMasterBackendActor-${cmd.at}")
backendActor ! cmd
case AggregateBackendEvent.WorkDone(sum) =>
val from = sender()
println(s"AggregateMasterActor [${self.path.name}] 收到 ${from.path.name} 汇总结果 $sum")
}
}
class AggregateMasterBackendActor(replyTo:ActorRef,parallel:Int) extends Actor{
var counter = 0
var sum = 0L
override def receive: Receive = {
case AggregateCommand.Aggregate(_) =>
println(s"AggregateMasterBackendActor [${self.path.name}] 开始工作,parallel $parallel,工作结果汇总给 ${replyTo.path.name}")
1 to parallel foreach { i =>
val worker = context.actorOf(Props(new AggregateWorker(self)),s"AggregateWorker-$i")
worker ! AggregateBackendCommand.Aggregate(i,parallel)
}
case AggregateWorkerEvent.WorkDone(result) =>
counter += 1
sum += result
if(counter == parallel){
replyTo ! AggregateBackendEvent.WorkDone(sum)
context.stop(self)
println(s"AggregateMasterBackendActor [${self.path.name}] 工作结束退出")
}
}
}
class AggregateWorker(replyTo:ActorRef) extends Actor{
def calcResult(index:Int,parallel:Int):Long = index * parallel
override def receive: Receive = {
case AggregateBackendCommand.Aggregate(index,parallel) =>
println(s"AggregateWorker [${self.path.name}] 开始工作 index=$index,工作汇总给 ${replyTo.path.name}")
val result = calcResult(index,parallel)
replyTo ! AggregateWorkerEvent.WorkDone(result)
println(s"AggregateWorker [${self.path.name}] 工作结束退出")
context.stop(self)
}
} object AggregatePattern {
def main(args: Array[String]): Unit = {
val system = ActorSystem("AggregatePattern",ConfigFactory.load())
val aggregateMasterActor = system.actorOf(Props(new AggregateMasterActor),"AggregateMasterActor")
aggregateMasterActor ! AggregateCommand.Aggregate(3)
}
}

输出:

AggregateMasterBackendActor [AggregateMasterBackendActor-1531383454073] 开始工作,parallel 3,工作结果汇总给 AggregateMasterActor
AggregateWorker [AggregateWorker-1] 开始工作 index=1,工作汇总给 AggregateMasterBackendActor-1531383454073
AggregateWorker [AggregateWorker-2] 开始工作 index=2,工作汇总给 AggregateMasterBackendActor-1531383454073
AggregateWorker [AggregateWorker-3] 开始工作 index=3,工作汇总给 AggregateMasterBackendActor-1531383454073
AggregateWorker [AggregateWorker-1] 工作结束退出
AggregateWorker [AggregateWorker-3] 工作结束退出
AggregateWorker [AggregateWorker-2] 工作结束退出
AggregateMasterBackendActor [AggregateMasterBackendActor-1531383454073] 工作结束退出
AggregateMasterActor [AggregateMasterActor] 收到 AggregateMasterBackendActor-1531383454073 汇总结果 18

  从代码来看该设计模式也比较简单,就是由Master创建以临时的子actor,此处命名为MasterBackend,将汇报对象的actorRef以构造函数的形式传递给MasterBackend,此处为了简单用self替代;MasterBackend根据并行参数,创建对应个数的workerActor,并把本身的actorRef以构造函数的形式传递给workerActor,workerActor执行具体的业务逻辑,并将汇总结果,发送给replyTo(也就是MasterBackend);MasterBackend收到workerActor的汇总结果,根据并行参数,判断所有子actor是否执行结束,若执行结束,此次计算完成,将汇总后的结果,发送给replyTo(也就是MasterActor)。

  上面这种设计模式有一个明显的好处,就是Master可以迅速创建大量的聚合工作而不阻塞,因为它收到命令后,只是简单的创建MasterBackend,工作交给它去执行,这个过程非常快。如果某个聚合工作比较慢,并不会影响其他任务。

  之所以说这个设计模式非常重要,是因为在spark/storm等大多分布式框架中都有它的影子。他们都选择将功能进行拆解,专门的节点或actor分别负责任务的接收、创建、执行、汇总这些工作,工作之间互不影响。如果能够深刻的理解这种设计模式,你将会设计出一个架构分层合理、互相解耦的高质量应用系统。

最新文章

  1. UML建模
  2. PHP——字符串统一转码为GBK,自动判断是否UTF8并转码
  3. 项目已被os x使用 不能打开-黑苹果之路
  4. DNS服务器:主要介绍DNS的服务原理以及安装及其主从配置
  5. 记一下ajax里get与post的异同
  6. Console.WriteLine()与MessageBox.Show()的区别
  7. 设置tomcat启动超时,不会自动停止
  8. 树莓派 wheezy安装与远程登录配置
  9. 高性能JSON库---FastJson(阿里巴巴)
  10. [nginx] 网上最全面nginx教程(近100篇文章整理)
  11. GBDT与LR融合提升广告点击率预估模型
  12. Spring-MVC运行原理
  13. 深入理解JVM(六)类文件结构
  14. 9.Django组件-cookie和session
  15. vim命令详解
  16. pycharm 配置使用
  17. CentOS7.3防火墙firewalld简单配置
  18. [Elasticsearch] 向已存在的索引中加入自己定义filter/analyzer
  19. VS2008与MATLAB R2007a混合编程配置过程
  20. Selenium+C#自动化脚本开发学习

热门文章

  1. WIndows 系统下的常用命令 和 检测方法
  2. ubuntu14.04 fcitx安装
  3. 长久不用的mysql报错ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/tmp/mysql.sock' (2)
  4. 搭建Nginx服务
  5. mysql执行show processlist unauthenticated user 解决方法
  6. 438D - The Child and Sequence
  7. poj 3074
  8. Ubuntu 16.04禁用来宾账号(Guest User)
  9. MyBatis 3判断不为null
  10. linux下nginx+svn