Spark SQL中UDF和UDAF
2024-10-19 04:27:58
转载自:https://blog.csdn.net/u012297062/article/details/52227909
UDF: User Defined Function,用户自定义的函数,函数的输入是一条具体的数据记录,实现上讲就是普通的Scala函数;
UDAF:User Defined Aggregation Function,用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的基础上进行自定义操作;
实质上讲,例如说UDF会被Spark SQL中的Catalyst封装成为Expression,最终会通过eval方法来计算输入的数据Row(此处的Row和DataFrame中的Row没有任何关系)
不说太多直接上代码
1、创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("SparkSQLUDFUDAF") //设置应用程序的名称,在程序运行的监控界面可以看到名称
//conf.setMaster("spark://DaShuJu-040:7077") //此时,程序在Spark集群
conf.setMaster("local[4]")
2、创建SparkContext对象和SQLContext对象
//创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) //构建SQL上下文
3、模拟实际使用的数据
val bigData = Array("Spark", "Spark", "Hadoop", "Spark", "Hadoop", "Spark", "Spark", "Hadoop", "Spark", "Hadoop")
4、基于提供的数据创建DataFrame
val bigDataRDD = sc.parallelize(bigData)
val bigDataRDDRow = bigDataRDD.map(item => Row(item))
val structType = StructType(Array(StructField("word", StringType, true)))
val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow,structType)
5、注册成为临时表
bigDataDF.registerTempTable("bigDataTable")
6、通过SQLContext注册UDF,在Scala 2.10.x版本UDF函数最多可以接受22个输入参数
sqlContext.udf.register("computeLength", (input: String) => input.length)
//直接在SQL语句中使用UDF,就像使用SQL自动的内部函数一样
sqlContext.sql("select word, computeLength(word) as length from bigDataTable").show
7、通过SQLContext注册UDAF
sqlContext.udf.register("wordCount", new MyUDAF)
sqlContext.sql("select word,wordCount(word) as count,computeLength(word) as length" +
" from bigDataTable group by word").show()
8、按照模板实现UDAF
class MyUDAF extends UserDefinedAggregateFunction {
// 该方法指定具体输入数据的类型
override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
//在进行聚合操作的时候所要处理的数据的结果的类型
override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
//指定UDAF函数计算后返回的结果类型
override def dataType: DataType = IntegerType
// 确保一致性 一般用true
override def deterministic: Boolean = true
//在Aggregate之前每组数据的初始化结果
override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer() =}
// 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
// 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer() = buffer.getAs[Int]() +
}
//最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1() = buffer1.getAs[Int]() + buffer2.getAs[Int]()
}
//返回UDAF最后的计算结果
override def evaluate(buffer: Row): Any = buffer.getAs[Int]()
}
最新文章
- JAVA模板方法设计模式(从现实生活角度理解代码原理)
- SpringMVC+Mybatis+Spring整合
- android studio gradle结构项目引入本地代码
- 父容器根据子容器高度自适应:设置父容器 height:100%;overflow:hidden;
- JavaScripts 基础详细笔记整理
- 课本[Teb]软件设计
- POJ1384完全背包问题
- JS escape()、encodeURI()和encodeURIComponent()的区别
- dedecms自定义表单提交成功如何返回当前页面
- HTML5 拼图游戏
- 第四课 Grid Control实验 GC OMS安装(第二台机器部署)
- (MonoGame从入门到放弃-2) 初识MonoGame
- PA教材提纲 TAW10-1
- Jersey RESTful WebService框架学习(一)
- 【bzoj4401】块的计数 结论题
- linux-open-source-development-tools【重点】
- Linus 谈软件开发管理经验
- 生成自签名CA+SSL证书
- Eclipse Tomcat部署项目没有加载新加的静态资源文件
- 如何找回未保存过的 Excel 文件?
热门文章
- &;quot;crsctl check crs&;quot; command hangs at EVMD check
- go json解析
- 构造 - SGU 109 Magic of David Copperfield II
- [mysql] Navicat for mysql_导入导出表结构
- json剥离
- Softmax vs. Softmax-Loss VS cross-entropy损失函数 Numerical Stability(转载)
- Linux中安装配置hadoop集群
- 去死吧!USB转串口!!!
- jdk 配置时时区设置
- 概览C++之const