本文参考

参考《Spark快速大数据分析》动物书中的第四章"键值对操作",本篇是对RDD转化操作和行动操作API归纳的最后一篇

RDD转化操作API归纳:https://www.cnblogs.com/kuluo/p/12545374.html

RDD行动操作API归纳:https://www.cnblogs.com/kuluo/p/12550938.html

pair RDD转化操作API归纳:https://www.cnblogs.com/kuluo/p/12558563.html

环境

idea + spark 2.4.5 + scala 2.11.12

RDD均通过SparkContext的parallelize()函数创建

countByKey()函数

目的:

对每个键对应的元素分别计数

代码:

/*
* (a,3) (b,5) (c,4) (d,2)
*/
val
testList1 = List("a a a b b b", "b b c c c", "c d d")
/*
* (a,5) (b,4)
*/
val
testList2 = List("a a a a a b b", "b b")

val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)

val map = testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  .union(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
  .countByKey()

for ((x, y) <- map) {

  println(s"($x, $y)")
}

输出:

(d, 1)

(a, 2)

(b, 2)

(c, 1)

注意:

This method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver's memory. To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.

countByKey()函数会将结果全部加载到驱动器进程中,不适合结果集较大时使用

我们在源码中可以看到它调用了collect()函数

def countByKey(): Map[K, Long] = self.withScope { self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap }

因此在处理大数据量时,应当使用.mapValues(_ => 1L).reduceByKey(_ + _)两个函数返回一个RDD

collectAsMap()函数

目的:

collect()函数针对pair RDD的实现,将结果以映射表的形式返回

代码:

/*
* (a,3) (b,5) (c,4) (d,2)
*/
val
testList1 = List("a a a b b b", "b b c c c", "c d d")
/*
* (a,5) (b,4)
*/
val
testList2 = List("a a a a a b b", "b b")

val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)

val map = testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  .union(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
  .collectAsMap()

for ((x, y) <- map) {

  println(s"($x, $y)")
}

输出:

(b, 4)

(d, 2)

(a, 5)

(c, 4)

注意:

this doesn't return a multimap (so if you have multiple values to the same key, only one value per key is preserved in the map returned)

也正如本例所示,pair RDD中有重复的键时,collectByKey函数只会保留一个

因为内部调用了collect()函数,不适合结果集较大时使用

lookup()函数

目的:

返回给定键对应的所有值

代码:

/*
* (a,3) (b,5) (c,4) (d,2)
*/
val
testList1 = List("a a a b b b", "b b c c c", "c d d")
/*
* (a,5) (b,4)
*/
val
testList2 = List("a a a a a b b", "b b")

val testRdd1 = sc.parallelize(testList1)
val testRdd2 = sc.parallelize(testList2)

println(testRdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
  .union(testRdd2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _))
  .lookup("a"))

输出:

ArrayBuffer(3, 5)

最新文章

  1. flex自适应小例子
  2. Struts2框架之-注解开发
  3. Android之数据库的创建
  4. 顺序执行到来的消息 actor
  5. Python 判断一个字符串是否在列表中任何一个字符串中出现过
  6. win2008修改最大远程桌面连接数
  7. LintCode: isSubTree
  8. [转载]MVC3缓存:使用页面缓存
  9. C++ Primer 学习笔记_2_高速入口(继续)
  10. json-server假数据
  11. Mac 系统安装教程
  12. 吴恩达机器学习笔记37-学习曲线(Learning Curves)
  13. Omi框架学习之旅 - 插件机制之omi-finger 及原理说明
  14. Spark记录-Scala类与对象小例子
  15. mysql备份的三种方式详解
  16. gtest运行小析
  17. Oracle12c 之后的路线图
  18. 解题:HEOI 2016 求和
  19. Adapter.notifyDataSetChanged()源码分析以及与ListView.setAdapter的区别
  20. js forEach for区别

热门文章

  1. 案例七:shell实现开机自动播放挂载本地yum仓库程序
  2. 【基础知识】CPU 指令执行的五个阶段,cpu就是用来执行指令的
  3. 【C#程序集】程序集
  4. 服务器CPU很高-怎么办(Windbg使用坑点)
  5. selenium+python自动化101-使用execute_script() 方法获取 JavaScript 返回值
  6. ASP.NET Core 6框架揭秘实例演示[15]:针对控制台的日志输出
  7. Python:2维(平面/数组/矩阵)缺省值插值
  8. Qt:QJsonDocument以及与QJsonArray、QJsonObject、QJsonValue的关联
  9. 如何建立自己的代理IP池,减少爬虫被封的几率
  10. MariaDB 与Mysql版本对应关系