Spark中自定义累加器Accumulator
2024-10-19 05:12:13
1. 自定义累加器
自定义累加器需要继承AccumulatorParam,实现addInPlace和zero方法。
例1:实现Long类型的累加器
object LongAccumulatorParam extends AccumulatorParam[Long]{
override def addInPlace(r1: Long, r2: Long) = {
println(s"$r1\t$r2")
r1 + r2
} override def zero(initialValue: Long) = {
println(initialValue)
0
} def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("testLongAccumulator"))
val acc = sc.accumulator(0L, "LongAccumulator")
sc.parallelize(Array(1L,2L,3L,4L,5L)).foreach(acc.add)
println(acc.value)
sc.stop()
}
}
例2:定义Set[String],可用于记录错误日志
object StringSetAccumulatorParam extends AccumulatorParam[Set[String]]{
override def addInPlace(r1: Set[String], r2: Set[String]): Set[String] = { r1 ++ r2 } override def zero(initialValue: Set[String]): Set[String] = { Set() }
} object ErrorLogHostSet extends Serializable {
@volatile private var instanceErr: Accumulator[Set[String]] = null def getInstance(sc: SparkContext): Accumulator[Set[String]] = {
if(null == instanceErr){
synchronized{
if(null == instanceErr){
instanceErr = sc.accumulator(Set[String]())(StringSetAccumulatorParam)
}
}
}
instanceErr
} def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("testSetStringAccumulator")) val dataRdd = sc.parallelize(Array("a2","c4","6v","67s","3d","45s","2c6","35d","7c8d9","34dc5"))
val errorHostSet = getInstance(sc) val a = sc.accumulableCollection("a") dataRdd.filter(ele => {
val res = ele.contains("d")
if(res) errorHostSet += Set(ele)
res
}).foreach(println) errorHostSet.value.foreach(println) sc.stop()
}
}
2. AccumulableCollection使用
object AccumulableCollectionTest { case class Employee(id: String, name: String, dept: String) def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf().setAppName("AccumulableCollectionTest").setMaster("local[4]")) val empAccu = sc.accumulableCollection(mutable.HashMap[String,Employee]()) val employees = List(
Employee("10001", "Tom", "Eng"),
Employee("10002", "Roger", "Sales"),
Employee("10003", "Rafael", "Sales"),
Employee("10004", "David", "Sales"),
Employee("10005", "Moore", "Sales"),
Employee("10006", "Dawn", "Sales"),
Employee("10007", "Stud", "Marketing"),
Employee("10008", "Brown", "QA")
) System.out.println("employee count " + employees.size) sc.parallelize(employees).foreach(e => {
empAccu += e.id -> e
}) println("empAccumulator size " + empAccu.value.size)
empAccu.value.foreach(entry =>
println("emp id = " + entry._1 + " name = " + entry._2.name))
sc.stop()
} }
最新文章
- linux第二天
- 利用DelegatingHandler实现Web Api 的Api key校验
- Python In Action:三、再来一个扩展例子,保证不难
- HDU 4942 Game on S♂play(线段树、模拟、扩栈)
- App所需申请资料
- php pdo错误:SQLSTATE[HY093]: Invalid parameter number: parameter was not defined
- The Brain as a Universal Learning Machine
- cocos2d-x 缓动曲线
- js 如何将无限级分类展示出来
- Git之路--1
- [置顶] android网络通讯之HttpClient4不指定参数名发送Post
- Android自定义shape的使用
- Angular - - $http请求服务
- MATLAB下跑Faster-RCNN+ZF实验时如何编译自己需要的external文件
- Mysql数据库建立索引的优缺点有哪些?
- Jmeter 获取系统时间,和对系统时间进行增减时间
- loadrunner 脚本录制-Protocol Advisor协议分析器的使用
- JAVA核心技术I---JAVA基础知识(static关键字)
- Bootstrap-按钮相关的class
- skynet对Windows环境支持的版本:Windows版skynet