UDAF简介

UDAF(User Defined Aggregate Function)即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。

UDAF的误区

我们可能下意识的认为UDAF是需要和group by一起使用的,实际上UDAF可以跟group by一起使用,也可以不跟group by一起使用,这个其实比较好理解,联想到mysql中的max、min等函数,可以:

select max(foo) from foobar group by bar;

表示根据bar字段分组,然后求每个分组的最大值,这时候的分组有很多个,使用这个函数对每个分组进行处理,也可以:

select max(foo) from foobar group by bar;

这种情况可以将整张表看做是一个分组,然后在这个分组(实际上就是一整张表)中求最大值。所以聚合函数实际上是对分组做处理,而不关心分组中记录的具体数量。

UDAF使用

UDAF 的使用方法有这两种

  • 继承UserDefinedAggregateFunction
  • 继承Aggregator

下面介绍两种UDAF的实现

方法一:继承UserDefinedAggregateFunction

使用UserDefinedAggregateFunction的套路:

1. 自定义类继承UserDefinedAggregateFunction,对每个阶段方法做实现

2. 在spark中注册UDAF,为其绑定一个名字

3. 然后就可以在sql语句中使用上面绑定的名字调用

下面写一个计算平均值的UDAF例子,首先定义一个类继承UserDefinedAggregateFunction:

package cc11001100.spark.sql.udaf

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._ object AverageUserDefinedAggregateFunction extends UserDefinedAggregateFunction { // 聚合函数的输入数据结构
override def inputSchema: StructType = StructType(StructField("input", LongType) :: Nil) // 缓存区数据结构
override def bufferSchema: StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) // 聚合函数返回值数据结构
override def dataType: DataType = DoubleType // 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
override def deterministic: Boolean = true // 初始化缓冲区
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer() = 0L
buffer() = 0L
} // 给聚合函数传入一条新数据进行处理
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (input.isNullAt()) return
buffer() = buffer.getLong() + input.getLong()
buffer() = buffer.getLong() +
} // 合并聚合函数缓冲区
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1() = buffer1.getLong() + buffer2.getLong()
buffer1() = buffer1.getLong() + buffer2.getLong()
} // 计算最终结果
override def evaluate(buffer: Row): Any = buffer.getLong().toDouble / buffer.getLong() }

然后注册并使用它:

package cc11001100.spark.sql.udaf

import org.apache.spark.sql.SparkSession

object SparkSqlUDAFDemo_001 {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("SparkStudy").getOrCreate()
spark.read.json("data/user").createOrReplaceTempView("v_user")
spark.udf.register("u_avg", AverageUserDefinedAggregateFunction)
// 将整张表看做是一个分组对求所有人的平均年龄
spark.sql("select count(1) as count, u_avg(age) as avg_age from v_user").show()
// 按照性别分组求平均年龄
spark.sql("select sex, count(1) as count, u_avg(age) as avg_age from v_user group by sex").show() } }

结果

//使用到的数据集
{"id": , "name": "foo", "sex": "man", "age": }
{"id": , "name": "bar", "sex": "man", "age": }
{"id": , "name": "baz", "sex": "man", "age": }
{"id": , "name": "foo1", "sex": "woman", "age": }
{"id": , "name": "bar2", "sex": "woman", "age": }
{"id": , "name": "baz3", "sex": "woman", "age": } //运行结果
+-----+--------+
| count|avg_age|
+-----+--------+
| 6 | 19.6666|
+-----+--------+
+-----+--------+---------+
| sex | count | avg_age |
+-----+--------+---------+
| man| 19.6666|20.666666| |woman| 19.6666|20.666666| +-----+--------+---------+

方法二:继承Aggregator

还有另一种方式就是继承Aggregator这个类,优点是可以带类型:

package cc11001100.spark.sql.udaf

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders} /**
* 计算平均值
*
*/
object AverageAggregator extends Aggregator[User, Average, Double] { // 初始化buffer
override def zero: Average = Average(0L, 0L) // 处理一条新的记录
override def reduce(b: Average, a: User): Average = {
b.sum += a.age
b.count += 1L
b
} // 合并聚合buffer
override def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
} // 减少中间数据传输
override def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count override def bufferEncoder: Encoder[Average] = Encoders.product // 最终输出结果的类型
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble } /**
* 计算平均值过程中使用的Buffer
*
* @param sum
* @param count
*/
case class Average(var sum: Long, var count: Long) {
} case class User(id: Long, name: String, sex: String, age: Long) {
}

调用:

package cc11001100.spark.sql.udaf

import org.apache.spark.sql.SparkSession

object AverageAggregatorDemo_001 {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("SparkStudy").getOrCreate()
import spark.implicits._
val user = spark.read.json("data/user").as[User]
user.select(AverageAggregator.toColumn.name("avg")).show() } }
//运行结果
+--------+
| avg |
+--------+
| 19.6666|
+--------+

 

最新文章

  1. c# System.Data.OracleClient需要Oracle客户端软件8.1.7或更高版本
  2. 安装Ubuntu14.04版本的操作系统
  3. 在cmd下输入/g无效
  4. 使用South时候由于两个相同id的文件引起的问题
  5. 爆牙齿的 Web 标准面试题 【转藏】
  6. matlab取整
  7. INSTALL_PARSE_FAILED_MANIFEST_MALFORMED 错误
  8. bash shell学习-shell script基础 (笔记)
  9. How to install vim on linux
  10. linux安装postgresql简洁版
  11. 数据分析---SQL(删除数据或表)
  12. win7激活成功 但每次开机后又显示此windows副本不是正版的解决办法
  13. Jmeter ----Bean shell使用
  14. jsp页面间对象传递方法
  15. Bugfree——CentOS6.8搭建测试环境
  16. Ubuntu系统无法获得锁/var/lib/dpkg/lock - open (11: 资源暂时不可用)的解决方案
  17. java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
  18. HTML的常用总结
  19. jdk自带的ThreadLocal和netty扩展的FastThreadLocal比较总结
  20. poj 1523 SPF 无向图求割点

热门文章

  1. KMP算法 (字符串的匹配)
  2. JAVA常用集合解析
  3. Java-Class-@I:java.annotation.Resource
  4. redis集群创建时报错:Sorry, can't connect to node
  5. leetcode.字符串.14最长公共前缀-Java
  6. EE5111_A0206839W
  7. 并发编程 --进、线程池、协程、IO模型
  8. 驾驶心率和呼吸,疲劳检测系统,通过安全带,坐垫等内置sensor
  9. 爬虫那些事儿--Http返回码
  10. nodejs的npm修改源