1. 自定义累加器

自定义累加器需要继承AccumulatorParam,实现addInPlace和zero方法。

例1:实现Long类型的累加器

object LongAccumulatorParam extends AccumulatorParam[Long]{
override def addInPlace(r1: Long, r2: Long) = {
println(s"$r1\t$r2")
r1 + r2
} override def zero(initialValue: Long) = {
println(initialValue)
0
} def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("testLongAccumulator"))
val acc = sc.accumulator(0L, "LongAccumulator")
sc.parallelize(Array(1L,2L,3L,4L,5L)).foreach(acc.add)
println(acc.value)
sc.stop()
}

例2:定义Set[String],可用于记录错误日志

object StringSetAccumulatorParam extends AccumulatorParam[Set[String]]{
override def addInPlace(r1: Set[String], r2: Set[String]): Set[String] = { r1 ++ r2 } override def zero(initialValue: Set[String]): Set[String] = { Set() }
} object ErrorLogHostSet extends Serializable {
@volatile private var instanceErr: Accumulator[Set[String]] = null def getInstance(sc: SparkContext): Accumulator[Set[String]] = {
if(null == instanceErr){
synchronized{
if(null == instanceErr){
instanceErr = sc.accumulator(Set[String]())(StringSetAccumulatorParam)
}
}
}
instanceErr
} def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("testSetStringAccumulator")) val dataRdd = sc.parallelize(Array("a2","c4","6v","67s","3d","45s","2c6","35d","7c8d9","34dc5"))
val errorHostSet = getInstance(sc) val a = sc.accumulableCollection("a") dataRdd.filter(ele => {
val res = ele.contains("d")
if(res) errorHostSet += Set(ele)
res
}).foreach(println) errorHostSet.value.foreach(println) sc.stop()
}
}

2. AccumulableCollection使用

object AccumulableCollectionTest {

  case class Employee(id: String, name: String, dept: String)

  def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("AccumulableCollectionTest").setMaster("local[4]")) val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]()) val employees = List(
Employee("10001", "Tom", "Eng"),
Employee("10002", "Roger", "Sales"),
Employee("10003", "Rafael", "Sales"),
Employee("10004", "David", "Sales"),
Employee("10005", "Moore", "Sales"),
Employee("10006", "Dawn", "Sales"),
Employee("10007", "Stud", "Marketing"),
Employee("10008", "Brown", "QA")
) System.out.println("employee count " + employees.size) sc.parallelize(employees).foreach(e => {
empAccu += e.id -> e
}) println("empAccumulator size " + empAccu.value.size)
empAccu.value.foreach(entry =>
println("emp id = " + entry._1 + " name = " + entry._2.name))
sc.stop()
} }

最新文章

  1. linux第二天
  2. 利用DelegatingHandler实现Web Api 的Api key校验
  3. Python In Action:三、再来一个扩展例子,保证不难
  4. HDU 4942 Game on S♂play(线段树、模拟、扩栈)
  5. App所需申请资料
  6. php pdo错误:SQLSTATE[HY093]: Invalid parameter number: parameter was not defined
  7. The Brain as a Universal Learning Machine
  8. cocos2d-x 缓动曲线
  9. js 如何将无限级分类展示出来
  10. Git之路--1
  11. [置顶] android网络通讯之HttpClient4不指定参数名发送Post
  12. Android自定义shape的使用
  13. Angular - - $http请求服务
  14. MATLAB下跑Faster-RCNN+ZF实验时如何编译自己需要的external文件
  15. Mysql数据库建立索引的优缺点有哪些?
  16. Jmeter 获取系统时间,和对系统时间进行增减时间
  17. loadrunner 脚本录制-Protocol Advisor协议分析器的使用
  18. JAVA核心技术I---JAVA基础知识(static关键字)
  19. Bootstrap-按钮相关的class
  20. skynet对Windows环境支持的版本:Windows版skynet

热门文章

  1. Nexys4 DDR MIG控制器引脚文件
  2. vmware之VMware Remote Console (VMRC) SDK(一)
  3. silverlight PopupWindow Resizeable兼容问题
  4. Transaction And Lock--由外键导致的死锁
  5. c#实现高斯模糊
  6. [转载] Linux 下产生和调试core文件
  7. 算法 UVA 11729
  8. APIO2014 连珠线
  9. 广告小程序后端开发(2.Models设计)
  10. 《Think in Java》17~18