Spark实现分组TopN
2024-09-08 00:23:00
一.概述
在许多数据中,都存在类别的数据,在一些功能中需要根据类别分别获取前几或后几的数据,用于数据可视化或异常数据预警。在这种情况下,实现分组TopN就显得非常重要了,因此,使用了Spark聚合函数和排序算法实现了分布式TopN计算功能。
二.代码实现
package scala import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession} /**
* 计算分组topN
* Created by Administrator on 2019/11/20.
*/
object GroupTopN {
Logger.getLogger("org").setLevel(Level.WARN) // 设置日志级别
def main(args: Array[String]) {
//创建测试数据
val test_data = Array("CJ20191120,201911", "CJ20191120,201910", "CJ20191105,201910", "CJ20191105,201909", "CJ20191111,201910")
val spark = SparkSession.builder().appName("GroupTopN").master("local[2]").getOrCreate()
val sc = spark.sparkContext
val test_data_rdd = sc.parallelize(test_data).map(row => {
val Array(scene, cycle) = row.split(",")
Row(scene, cycle)
})
// 设置数据模式
val structType = StructType(Array(
StructField("scene", StringType, true),
StructField("cycle", StringType, true)
))
// 转换为df
val test_data_df = spark.createDataFrame(test_data_rdd, structType)
test_data_df.createOrReplaceTempView("test_data_df")
// 拼接周期
val scene_ws = spark.sql("select scene,concat_ws(',',collect_set(cycle)) as cycles from test_data_df group by scene")
scene_ws.count()
scene_ws.show()
scene_ws.createOrReplaceTempView("scene_ws")
/**
* 定义参数确定N的大小,暂定为1
*/
val sum = 1
// 创建广播变量,把N的大小广播出去
val broadcast = sc.broadcast(sum)
/**
* 定义Udf实现获取组内的前N个数据
*/
spark.udf.register("getTopN", (cycles : String) => {
val sum = broadcast.value
var mid = ""
if(cycles.contains(",")){ // 多值
val cycle = cycles.split(",").sorted.reverse // 降序排序
val min = Math.min(cycle.length, sum)
for(i <- 0 until min){
if(mid.equals("")){
mid = cycle(i)
}else{
mid += "," + cycle(i)
}
}
}else{ // 单值
mid = cycles
}
mid
}) val result = spark.sql("select scene,getTopN(cycles) cycles from scene_ws")
result.show()
spark.stop()
}
}
三.结果
四.备注
当N大于1时,多个数据会拼接在一起,若想每个一行,可是使用使用列转行功能,参考我的博客:https://www.cnblogs.com/yszd/p/11266552.html
最新文章
- 51nod1118(水题)
- Selenium2学习-040-JavaScript弹出框(alert、confirm、prompt)操作演示实例
- SQL对字符串数组的处理
- {转自MC}NVIDIA DirectX 11演示DEMO详解
- C# 线程--第二线程方法
- lightoj 1020 (博弈)
- 了解node.js
- [置顶] Codeforces Round #190 (Div. 2)(完全)
- js架构设计模式——前端MVVM框架设计及实现(一)
- MVC(3DOnLine)开发过程的一些难点以及知识点
- python 爬去拉钩测试招聘信息
- 配置SESSION超时与请求超时
- Good Bye 2017 D. New Year and Arbitrary Arrangement
- 谷歌搜索技巧(转)https://www.runningcheese.com/google
- Uploadify提示-Failed,上传不了文件,跟踪onUploadError事件,errorMsg:2156 SecurityError Error #2156 null
- Java jdbc链接 mySQL 写的crud
- maven指定项目的构建、打包和tomcat插件的pom.xml配置
- mongodb复制集部署文档
- 51nod 1292 字符串中的最大值V2(后缀自动机)
- Perl6 Bailador框架(4):路径匹配