sparkSQL中的example学习(3)
2024-09-08 13:18:09
UserDefinedTypedAggregation.scala(用户可自定义类型)
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
object UserDefinedTypedAggregation {
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
//A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
//Commine two values to produce a new value. For performance, the function may modify `buffer`
//and return it instead of constructiong a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
//Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
//Transform the ouput of the reduction
def finish(reducetion: Average): Double = reducetion.sum.toDouble / reducetion.count
//Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
//Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
// $example off: type_custom_aggregation$
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("Spark SQL user-defined Datasets aggregation example")
.master("local")
.getOrCreate()
import spark.implicits._
val ds = spark.read.json("/Users/hadoop/app/spark/examples/src/main/resources/employees.json").as[Employee]
ds.show()
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
spark.stop()
}
}

最新文章
- transactionManager 以及datasource type解析
- dragsort html拖拽排序
- 第一次接触OOM
- 数据结构与算法分析 - 最大公约数(gcd &; extended_gcd)
- sublime text 3 设置
- 《MFC游戏开发》笔记十 游戏中的碰撞检测进阶:地图类型&;障碍物判定
- 转载robots.txt的学习
- IronPython 源码剖析系列(2):IronPython 引擎的运作流程
- AngularJS系列之总结
- ReentrantLock实现原理
- 禅道SQA
- 如何搭建apache服务?
- mysqldump 导出
- 寻找真正的入口(OEP)--广义ESP定律
- BZOJ 5477: 星际穿越
- python selenium+phantomJS自动化测试环境
- Modbus库开发笔记之三:Modbus TCP Server开发
- Vue Admin - 基于 Vue &; Bulma 后台管理面板
- socket.io的用户认证
- css设置input不显示光标
热门文章
- Troubleshooting ORA-30036 - Unable To Extend Undo Tablespace (Doc ID 460481.1)
- C# WF 第12节 Timer控件
- socket.error: [Errno 10048]
- [C5W1] Sequence Models - Recurrent Neural Networks
- 2019年最新50道java基础部分面试题
- 洛谷 P5594 【XR-4】模拟赛
- [Algorithm] 1290. Convert Binary Number in a Linked List to Integer
- go 创建切片
- vs code 中配置git go
- 深入理解Linux内核 学习笔记(8)