一.概述

  在许多数据中,都存在类别的数据,在一些功能中需要根据类别分别获取前几或后几的数据,用于数据可视化或异常数据预警。在这种情况下,实现分组TopN就显得非常重要了,因此,使用了Spark聚合函数和排序算法实现了分布式TopN计算功能。

  

二.代码实现

 package scala

 import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession} /**
* 计算分组topN
* Created by Administrator on 2019/11/20.
*/
object GroupTopN {
Logger.getLogger("org").setLevel(Level.WARN) // 设置日志级别
def main(args: Array[String]) {
//创建测试数据
val test_data = Array("CJ20191120,201911", "CJ20191120,201910", "CJ20191105,201910", "CJ20191105,201909", "CJ20191111,201910")
val spark = SparkSession.builder().appName("GroupTopN").master("local[2]").getOrCreate()
val sc = spark.sparkContext
val test_data_rdd = sc.parallelize(test_data).map(row => {
val Array(scene, cycle) = row.split(",")
Row(scene, cycle)
})
// 设置数据模式
val structType = StructType(Array(
StructField("scene", StringType, true),
StructField("cycle", StringType, true)
))
// 转换为df
val test_data_df = spark.createDataFrame(test_data_rdd, structType)
test_data_df.createOrReplaceTempView("test_data_df")
// 拼接周期
val scene_ws = spark.sql("select scene,concat_ws(',',collect_set(cycle)) as cycles from test_data_df group by scene")
scene_ws.count()
scene_ws.show()
scene_ws.createOrReplaceTempView("scene_ws")
/**
* 定义参数确定N的大小,暂定为1
*/
val sum = 1
// 创建广播变量,把N的大小广播出去
val broadcast = sc.broadcast(sum)
/**
* 定义Udf实现获取组内的前N个数据
*/
spark.udf.register("getTopN", (cycles : String) => {
val sum = broadcast.value
var mid = ""
if(cycles.contains(",")){ // 多值
val cycle = cycles.split(",").sorted.reverse // 降序排序
val min = Math.min(cycle.length, sum)
for(i <- 0 until min){
if(mid.equals("")){
mid = cycle(i)
}else{
mid += "," + cycle(i)
}
}
}else{ // 单值
mid = cycles
}
mid
}) val result = spark.sql("select scene,getTopN(cycles) cycles from scene_ws")
result.show()
spark.stop()
}
}

三.结果

  

  

四.备注

  当N大于1时,多个数据会拼接在一起,若想每个一行,可是使用使用列转行功能,参考我的博客:https://www.cnblogs.com/yszd/p/11266552.html

最新文章

  1. 51nod1118(水题)
  2. Selenium2学习-040-JavaScript弹出框(alert、confirm、prompt)操作演示实例
  3. SQL对字符串数组的处理
  4. {转自MC}NVIDIA DirectX 11演示DEMO详解
  5. C# 线程--第二线程方法
  6. lightoj 1020 (博弈)
  7. 了解node.js
  8. [置顶] Codeforces Round #190 (Div. 2)(完全)
  9. js架构设计模式——前端MVVM框架设计及实现(一)
  10. MVC(3DOnLine)开发过程的一些难点以及知识点
  11. python 爬去拉钩测试招聘信息
  12. 配置SESSION超时与请求超时
  13. Good Bye 2017 D. New Year and Arbitrary Arrangement
  14. 谷歌搜索技巧(转)https://www.runningcheese.com/google
  15. Uploadify提示-Failed,上传不了文件,跟踪onUploadError事件,errorMsg:2156 SecurityError Error #2156 null
  16. Java jdbc链接 mySQL 写的crud
  17. maven指定项目的构建、打包和tomcat插件的pom.xml配置
  18. mongodb复制集部署文档
  19. 51nod 1292 字符串中的最大值V2(后缀自动机)
  20. Perl6 Bailador框架(4):路径匹配

热门文章

  1. 读架构漫谈&amp;我眼中的架构师
  2. 【java】isEmpty VS isBlank 的区别
  3. java实体 和 xml相互转换
  4. Linux性能优化实战学习笔记:第五十八讲
  5. [LeetCode] 209. Minimum Size Subarray Sum 最短子数组之和
  6. (转)yum只下载不安装软件包
  7. C# 方法递归
  8. SpringCloud Ribbon 负载均衡 通过服务器名无法连接的神坑一个
  9. 阿里云RDS数据库备份同步到自建库方法(SHELL脚本)
  10. Laravel框架下路由的使用(源码解析)