spark sql 数据去重

在对spark sql 中的dataframe数据表去除重复数据的时候可以使用dropDuplicates()方法


  • 第一个def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)


* Returns a new Dataset that contains only the unique rows from this Dataset.
* This is an alias for `distinct`.
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
* @group typedrel
* @since 2.0.0
def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
  • 第二个def dropDuplicates(colNames: Seq[String])


* (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
* @group typedrel
* @since 2.0.0
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(, colName))
if (cols.isEmpty) {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
Deduplicate(groupCols, planWithBarrier)
  • 第三个def dropDuplicates(colNames: Array[String])


* Returns a new Dataset with duplicate rows removed, considering only
* the subset of columns.
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
* @group typedrel
* @since 2.0.0
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
  • 第四个def dropDuplicates(col1: String, cols: String*)


* Returns a new [[Dataset]] with duplicate rows removed, considering only
* the subset of columns.
* For a static batch [[Dataset]], it just drops duplicate rows. For a streaming [[Dataset]], it
* will keep all data across triggers as intermediate state to drop duplicates rows. You can use
* [[withWatermark]] to limit how late the duplicate data can be and system will accordingly limit
* the state. In addition, too late data older than watermark will be dropped to avoid any
* possibility of duplicates.
* @group typedrel
* @since 2.0.0
def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
val colNames: Seq[String] = col1 +: cols





在使用dropDuplicates() 在去重的时候,我发现有时候还是会出现重复数据的情况。


  1. 数据存在多个excuter中


  1. 数据是存放在不同分区的,





  1. winform中listview imagelist问题
  2. 队列queue的C实现
  3. android 属性
  4. 03server平台delphi程序不支持直接调用webservice
  5. Entity Framework4.0 (六) EF4的 增加、删除、更改
  6. 监听SWT文本框只能输入数字
  7. FZU 2122 又见LKity(KMP+返回所有匹配位置)
  8. 重新回顾JSP
  9. vim如何显示行号
  10. mongoDB之集合操作
  11. node.js安装使用express框架
  12. [HDFS Manual] CH4 HDFS High Availability Using the Quorum Journal Manager
  13. CentOS配置多公网
  14. Jenkins 安装启动提示“iJob for jenkins.service failed because the control process exited with error code. See "systemctl status jenkins.service" and "journalctl -xe" for details.”
  15. [Done]SnowFlake 分布式环境下基于ZK构WorkId
  16. PhpStorm 克隆下来的项目改动,版本控制不起效果
  17. 实现动态的XML文件读写操作(依然带干货)
  18. linux 下文件重命名/移动/复制命令(转)
  19. IP代理软件
  20. python步长为负时的情况


  1. 一图看懂华为云DevCloud如何应对敏捷开发的测试挑战
  2. Prometheus的伴侣:Grafana在centos下的搭建
  3. async和await的使用总结 ~ 竟然一直用错了c#中的async和await的使用。。
  4. MySQL中EXPLAIN命令详细解析
  5. Spring Cloud Data Flow用Shell来操作,方便建立CICD
  6. 039_go语言中的排序
  7. Android Studio--家庭记账本(三)
  8. C语言学习笔记之数据类型转换
  9. 连通图算法详解之① :Tarjan 和 Kosaraju 算法
  10. 又一个小而美的Java Web框架: Solon!