Spark SQL dropDuplicates
spark sql 数据去重
在对spark sql 中的dataframe数据表去除重复数据的时候可以使用dropDuplicates()
方法
dropDuplicates()有4个重载方法
- 第一个
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
/**
* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `distinct`.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
- 第二个
def dropDuplicates(colNames: Seq[String])
传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
/**
* (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
cols
}
Deduplicate(groupCols, planWithBarrier)
}
- 第三个
def dropDuplicates(colNames: Array[String])
传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法。
/**
* Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
- 第四个
def dropDuplicates(col1: String, cols: String*)
传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。
/**
* Returns a new [[Dataset]] with duplicate rows removed, considering only
* the subset of columns.
*
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
val colNames: Seq[String] = col1 +: cols
dropDuplicates(colNames)
}
第三和第四个本质上还是调用了第二个方法,所以我们在使用的时候如果需要根据指定的列进行数据去重,可以直接传入一个Seq。
第一个方法默认根据所有列去重,实际上也是调用了第二个方法,然后传入参数this.columns
,即所有的列组成的Seq。
所以各位想深究dropDuplicate()
去重的核心代码,只需要研究第二个去重方法即可。等我有时间我也会把去重的核心源码讲解继续补充。
dropDuplicates()的坑!
在使用dropDuplicates() 在去重的时候,我发现有时候还是会出现重复数据的情况。
我分析了一下还出现重复数据的原因:
- 数据存在多个excuter中
因为spark是分布式计算的,数据在计算的时候会分布在不同的excutor上,使用dropDuplicate去重的时候,可能只是一个excutor内的数据进行了去重,别的excutor上可能还会有重复的数据。
- 数据是存放在不同分区的,
因为spark是分布式计算的,数据在计算的时候会分散在不同的分区中,使用dropDuplicate去重的时候,不同的区分可能还会存在相同的数据。
我试了只启动一个excutor多分区的情况下进行计算,没有出现重复的数据,然后多个excutor将数据先合并到一个分区在去重还是有重复的数据。所以觉得可能是第一种猜测的情况比较大,但是如果只使用一个excutor就失去了分布式计算的意义和优势,所以还是得想想其它办法。
各位有什么好的解决办法也可以在评论区交流!
最新文章
- winform中listview imagelist问题
- 队列queue的C实现
- android 属性
- 03server平台delphi程序不支持直接调用webservice
- Entity Framework4.0 (六) EF4的 增加、删除、更改
- 监听SWT文本框只能输入数字
- FZU 2122 又见LKity(KMP+返回所有匹配位置)
- 重新回顾JSP
- vim如何显示行号
- mongoDB之集合操作
- node.js安装使用express框架
- [HDFS Manual] CH4 HDFS High Availability Using the Quorum Journal Manager
- CentOS配置多公网
- Jenkins 安装启动提示“iJob for jenkins.service failed because the control process exited with error code. See ";systemctl status jenkins.service"; and ";journalctl -xe"; for details.”
- [Done]SnowFlake 分布式环境下基于ZK构WorkId
- PhpStorm 克隆下来的项目改动,版本控制不起效果
- 实现动态的XML文件读写操作(依然带干货)
- linux 下文件重命名/移动/复制命令(转)
- IP代理软件
- python步长为负时的情况
热门文章
- 一图看懂华为云DevCloud如何应对敏捷开发的测试挑战
- Prometheus的伴侣:Grafana在centos下的搭建
- async和await的使用总结 ~ 竟然一直用错了c#中的async和await的使用。。
- MySQL中EXPLAIN命令详细解析
- Spring Cloud Data Flow用Shell来操作,方便建立CICD
- 039_go语言中的排序
- Android Studio--家庭记账本(三)
- C语言学习笔记之数据类型转换
- 连通图算法详解之① :Tarjan 和 Kosaraju 算法
- 又一个小而美的Java Web框架: Solon!