感谢我的同事 李震给我讲解UDAF

网上找到的大部分都只有代码,但是缺少讲解,官网的的API有讲解,但是看不太明白。我还是自己记录一下吧,或许对其他人有帮助。

接下来以一个求几何平均数的例子来说明如何实现一个自己的UDAF

首先需要导入这些包:

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

需要继承实现这个抽象类
class GeometricMean extends UserDefinedAggregateFunction {
// This is the input fields for your aggregate function.
就是需要输入的列的类型,可以有多个列,多个列的写法如下:
/*
StructType(StructField("slot",IntegerType) :: StructField("score",IntegerType)::Nil)
*/
override def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", DoubleType) :: Nil) 存储聚合结果的中间buffer
// This is the internal fields you keep for computing your aggregate.
override def bufferSchema: StructType = StructType(
StructField("count", LongType) ::
StructField("product", DoubleType) :: Nil
) // This is the output type of your aggregatation function.
返回结果的类型,比如这个集合平均数就是返回一个double值
override def dataType: DataType = DoubleType

是每次运行结果都过一样,但是我也不太明白啊
override def deterministic: Boolean = true 初始化存储聚合结果的buffer
// This is the initial value for your buffer schema.
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 1.0
}

每次更新怎么更新,比如新来了一行,如何加入更新聚合的结果
// This is how to update your buffer schema given an input.
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Long](0) + 1
buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
}

spark会把数据划分成多个块,每个块都会进行处理,然后把每个块的结果进行合并处理
// This is how to merge two objects with the bufferSchema type.
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
}

返回结果
// This is where you output the final value, given the final value of your bufferSchema.
override def evaluate(buffer: Row): Any = {
math.pow(buffer.getDouble(1), 1.toDouble / buffer.getLong(0))
}
}

使用方法:

先注册

sqlContext.udf.register("gm", new GeometricMean)

使用自定义的UDAF
%sql
-- Use a group_by statement and call the UDAF.
select group_id, gm(id) from simple group by group_id
 
 
 

参考资料:

https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

最新文章

  1. IO例子
  2. Linux下快速迁移海量文件的操作记录
  3. BZOJ4551——[Tjoi2016&Heoi2016]树
  4. WP7、WP8 格式化时间为距当前多少时间
  5. IOS 中openGL使用教程1(openGL ES 入门篇 | 搭建openGL环境)
  6. PHP连接SQLServer
  7. 对访问修饰关键字public, protected, internal and private的说明
  8. SAS 5/iR Adapter 驱动下载
  9. php核心知识要点
  10. WCF基于Cookie回传的系列(概述)
  11. Array of Objects
  12. 从运营商小广告到HTTPS
  13. SCP测试服务器的上行/下行带宽
  14. 阿里云centos 安装和配置 DokuWiki
  15. 【quickhybrid】API规划
  16. 关于Python3.6中Twisted模块安装的问题
  17. Tomcat系列(2)——Tomcat文件目录7个
  18. vim中制表符tabstop用法
  19. lnk快捷方式变记事本打开还原,桌面图标变lnk还原方法
  20. Maven 配置tomcat插件

热门文章

  1. ssh访问跳过RSA key"yes/no"验证
  2. 修改docker时区
  3. Oracle重做日志REDO
  4. Word Formation
  5. navicat 中执行sql脚本 喊中文错误
  6. The OpenCV Coding Style Guide
  7. HTML布局四剑客-Flex,Grid,Table,Float
  8. 总结学习! xml与java对象转换 --- JDK自带的JAXB(Java Architecture for XML Binding)
  9. ubuntu14 编译安装(升级)g++
  10. DRF(5) - 频率组件、url注册器、响应器、分页器