UserDefinedTypedAggregation.scala(用户可自定义类型)


import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} object UserDefinedTypedAggregation { case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { //A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L) //Commine two values to produce a new value. For performance, the function may modify `buffer`
//and return it instead of constructiong a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
} //Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
} //Transform the ouput of the reduction
def finish(reducetion: Average): Double = reducetion.sum.toDouble / reducetion.count //Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product //Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
} // $example off: type_custom_aggregation$ def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL user-defined Datasets aggregation example")
.master("local")
.getOrCreate() import spark.implicits._ val ds = spark.read.json("/Users/hadoop/app/spark/examples/src/main/resources/employees.json").as[Employee]
ds.show() val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show() spark.stop()
} }

最新文章

  1. transactionManager 以及datasource type解析
  2. dragsort html拖拽排序
  3. 第一次接触OOM
  4. 数据结构与算法分析 - 最大公约数(gcd & extended_gcd)
  5. sublime text 3 设置
  6. 《MFC游戏开发》笔记十 游戏中的碰撞检测进阶:地图类型&障碍物判定
  7. 转载robots.txt的学习
  8. IronPython 源码剖析系列(2):IronPython 引擎的运作流程
  9. AngularJS系列之总结
  10. ReentrantLock实现原理
  11. 禅道SQA
  12. 如何搭建apache服务?
  13. mysqldump 导出
  14. 寻找真正的入口(OEP)--广义ESP定律
  15. BZOJ 5477: 星际穿越
  16. python selenium+phantomJS自动化测试环境
  17. Modbus库开发笔记之三:Modbus TCP Server开发
  18. Vue Admin - 基于 Vue & Bulma 后台管理面板
  19. socket.io的用户认证
  20. css设置input不显示光标

热门文章

  1. Troubleshooting ORA-30036 - Unable To Extend Undo Tablespace (Doc ID 460481.1)
  2. C# WF 第12节 Timer控件
  3. socket.error: [Errno 10048]
  4. [C5W1] Sequence Models - Recurrent Neural Networks
  5. 2019年最新50道java基础部分面试题
  6. 洛谷 P5594 【XR-4】模拟赛
  7. [Algorithm] 1290. Convert Binary Number in a Linked List to Integer
  8. go 创建切片
  9. vs code 中配置git go
  10. 深入理解Linux内核 学习笔记(8)