1. 启动类

object Application extends App{

  val _system = ActorSystem("HelloAkka")  //构建akka容器
val master:ActorRef = _system.actorOf(Props[MasterActor],name="master") //akka容器创建actor println("master.path ==>\t"+master.path) //akka://HelloAkka/user/master master ! "hi my name is spark, so happy"
master ! "hi my zsh"
master ! "xixi"
Thread.sleep(1000)
master ! new Result Thread.sleep(500)
_system.terminate
}

2. MasterActor创建map,reduce,aggregate任务的actor

class MasterActor extends Actor{
val aggregateActor:ActorRef = context.actorOf(Props[AggregateActor],name="aggregate")
val reduceActor:ActorRef = context.actorOf(Props(new ReduceActor(aggregateActor)),name="reduce")
val mapActor:ActorRef = context.actorOf(Props(new MapActor(reduceActor)),name="map") println("aggregateActor ==>\t"+aggregateActor.path) //akka://HelloAkka/user/master/aggregate (master的子actor)
println("mapActor ==>\t"+mapActor.path)
println("reduceActor ==>\t"+reduceActor.path) override def receive: Receive = { // Receive用type重命名的PartialFunction
case msg:String => mapActor ! msg
case msg:Result => aggregateActor ! msg
case _ =>
}
}

3. map任务

class MapActor(var reduceActor: ActorRef)extends Actor{
val STOP_WORDS = List("is","a")
override def receive: Receive = {
case msg:String => reduceActor ! evlExpression(msg)
case _ =>
} def evlExpression(line:String):MapData = {
val dataList = new ArrayBuffer[Word] // scala可变数组
val parser:StringTokenizer = new StringTokenizer(line)
while(parser.hasMoreTokens){
val str: String = parser.nextToken()
if(!STOP_WORDS.contains(str)){
dataList += (new Word(str,1))
}
}
new MapData(dataList)
}

4. reduce任务

class ReduceActor(var aggregateActor: ActorRef) extends Actor{
override def receive: Receive = {
case msg: MapData => aggregateActor ! reduce(msg.dataList)
case _ =>
} def reduce(dataList:ArrayBuffer[Word]) : ReduceData ={
val map = new HashMap[String,Int]
for(w:Word <- dataList){
val str: String = w.word
map += (str -> map.getOrElse(str,1))
}
new ReduceData(map)
}
}

5. aggregate任务

class AggregateActor extends Actor{

  var finalMap = new HashMap[String,Int]

  override def receive: Receive = {
case msg:ReduceData => sum(msg.raduceMap)
case msg:Result => println(finalMap)
}
def sum(map:HashMap[String,Int]){ //多个reduceactor会向aggregateactor发送整理好的map
for(tuple <- map){
val c = finalMap.getOrElse(tuple._1,0)+tuple._2
finalMap += (tuple._1 -> c)
}
}
}

6. 用到的实体类

class Word(val word:String,val count:Int)

case class Result();

class MapData(val dataList:ArrayBuffer[Word])

class ReduceData(val raduceMap:HashMap[String,Int])

最新文章

  1. SEO技巧之WordPress篇幅
  2. jQuery文件上传插件jQuery Upload File 有上传进度条
  3. LIstView 滚动 异步 加载更多 mono for android ScrollStateChanged ScrollState.Idle; Fling;TouchScroll
  4. 安装生物信息学软件-Biopython
  5. 一个自动生成插入与更新SQL语句的小类
  6. OC-字典
  7. [Swust OJ 838]--最优价值(0-1背包+数学)
  8. OpenJDK1.8.0 源码解析————HashMap的实现(二)
  9. 谷歌Web中国开发手册:1目的&amp;amp;夹
  10. 用C#操作文件/文件夹(删除,复制,移动)
  11. pat1081-1090
  12. 敏捷冲刺每日报告——Day2
  13. [bzoj4625][BeiJing2016]水晶
  14. 基于flexbox纯css框架的解析
  15. git初始化本地项目及关联github远程库
  16. 孤岛营救问题 (BFS+状压)
  17. merge函数:R语言,根据相同的列或ID合并不同的文件
  18. python基础——操作系统简介
  19. isNaN与parseInt/parseFloat
  20. Hadoop源码分析之读文件时NameNode和DataNode的处理过程

热门文章

  1. jquery中prop()方法和attr()方法的区别
  2. 转:Struts标签checkbox使用总结(默认选择设置)
  3. Apache配置站点根目录、用户目录及页面访问属性
  4. poj1417 带权并查集+0/1背包
  5. VIM进阶学习之几种模式和按键映射
  6. 论文笔记之: Bilinear CNN Models for Fine-grained Visual Recognition
  7. js获取ifram对象
  8. Viewpager图片自动轮播,网络图片加载,图片自动刷新
  9. Floating Action Button(漂浮按钮)
  10. SSRS入门相关笔记